From 7fb436d3cf66748f32776cf3db2d943f97b52160 Mon Sep 17 00:00:00 2001 From: Bo Cui Date: Fri, 13 May 2022 14:32:48 +0800 Subject: [PATCH] =?UTF-8?q?[HUDI-4078][HUDI-FLINK]BootstrapOperator=20cont?= =?UTF-8?q?ains=20the=20pending=20compact=E2=80=A6=20(#5545)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [HUDI-4078][HUDI-FLINK]BootstrapOperator contains the pending compaction files --- .../table/view/TableFileSystemView.java | 2 +- .../apache/hudi/common/util/Functions.java | 8 +++---- .../sink/bootstrap/BootstrapOperator.java | 2 +- .../hudi/sink/TestWriteCopyOnWrite.java | 6 ++++- .../hudi/sink/TestWriteMergeOnRead.java | 23 +++++++++++++++++++ .../java/org/apache/hudi/utils/TestData.java | 10 ++++---- 6 files changed, 40 insertions(+), 11 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 733028673..c32e2cabb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -124,7 +124,7 @@ public interface TableFileSystemView { * @param maxInstantTime Max Instant Time * @return */ - public Stream getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime); + Stream getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime); /** * Stream all the latest file slices, in the given range. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java index 0b82f0914..728ac717e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Functions.java @@ -33,28 +33,28 @@ public interface Functions { /** * A function which has not any parameter. */ - public interface Function0 extends Serializable { + interface Function0 extends Serializable { R apply(); } /** * A function which contains only one parameter. */ - public interface Function1 extends Serializable { + interface Function1 extends Serializable { R apply(T1 val1); } /** * A function which contains two parameters. */ - public interface Function2 extends Serializable { + interface Function2 extends Serializable { R apply(T1 val1, T2 val2); } /** * A function which contains three parameters. */ - public interface Function3 extends Serializable { + interface Function3 extends Serializable { R apply(T1 val1, T2 val2, T3 val3); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 7c3f5a932..f0ef3bccb 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -199,7 +199,7 @@ public class BootstrapOperator> Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema(); List fileSlices = this.hoodieTable.getSliceView() - .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true) + .getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp()) .collect(toList()); for (FileSlice fileSlice : fileSlices) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 4771a7a34..403d0272b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -352,6 +352,10 @@ public class TestWriteCopyOnWrite extends TestWriteBase { // reset the config option conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); + validateIndexLoaded(); + } + + protected void validateIndexLoaded() throws Exception { preparePipeline(conf) .consume(TestData.DATA_SET_UPDATE_INSERT) .checkIndexLoaded( @@ -418,7 +422,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase { return TestHarness.instance().preparePipeline(tempFile, conf); } - private TestHarness preparePipeline(Configuration conf) throws Exception { + protected TestHarness preparePipeline(Configuration conf) throws Exception { return TestHarness.instance().preparePipeline(tempFile, conf); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index a35a0ac8d..f2c0500f9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -20,8 +20,10 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.utils.TestData; import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; @@ -36,6 +38,27 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite { conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); } + @Test + public void testIndexStateBootstrapWithCompactionScheduled() throws Exception { + // sets up the delta commits as 1 to generate a new compaction plan. + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + // open the function and ingest data + preparePipeline(conf) + .consume(TestData.DATA_SET_INSERT) + .assertEmptyDataFiles() + .checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenData(EXPECTED1, 4) + .end(); + + // reset config options + conf.removeConfig(FlinkOptions.COMPACTION_DELTA_COMMITS); + // sets up index bootstrap + conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); + validateIndexLoaded(); + } + @Override public void testInsertClustering() { // insert clustering is only valid for cow table. diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 61f1657c2..c31c2bbad 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -21,6 +21,7 @@ package org.apache.hudi.utils; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -641,10 +642,11 @@ public class TestData { File[] dataFiles = partitionDir.listFiles(file -> file.getName().contains(".log.") && !file.getName().startsWith("..")); assertNotNull(dataFiles); - HoodieMergedLogRecordScanner scanner = getScanner( - fs, baseFile.getPath(), Arrays.stream(dataFiles).map(File::getAbsolutePath) - .sorted(Comparator.naturalOrder()).collect(Collectors.toList()), - schema, latestInstant); + List logPaths = Arrays.stream(dataFiles) + .sorted((f1, f2) -> HoodieLogFile.getLogFileComparator() + .compare(new HoodieLogFile(f1.getPath()), new HoodieLogFile(f2.getPath()))) + .map(File::getAbsolutePath).collect(Collectors.toList()); + HoodieMergedLogRecordScanner scanner = getScanner(fs, baseFile.getPath(), logPaths, schema, latestInstant); List readBuffer = scanner.getRecords().values().stream() .map(hoodieRecord -> { try {