1
0

[HUDI-4078][HUDI-FLINK]BootstrapOperator contains the pending compact… (#5545)

* [HUDI-4078][HUDI-FLINK]BootstrapOperator contains the pending compaction files
This commit is contained in:
Bo Cui
2022-05-13 14:32:48 +08:00
committed by GitHub
parent 8ad0bb9745
commit 7fb436d3cf
6 changed files with 40 additions and 11 deletions

View File

@@ -124,7 +124,7 @@ public interface TableFileSystemView {
* @param maxInstantTime Max Instant Time
* @return
*/
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime);
Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime);
/**
* Stream all the latest file slices, in the given range.

View File

@@ -33,28 +33,28 @@ public interface Functions {
/**
* A function which has not any parameter.
*/
public interface Function0<R> extends Serializable {
interface Function0<R> extends Serializable {
R apply();
}
/**
* A function which contains only one parameter.
*/
public interface Function1<T1, R> extends Serializable {
interface Function1<T1, R> extends Serializable {
R apply(T1 val1);
}
/**
* A function which contains two parameters.
*/
public interface Function2<T1, T2, R> extends Serializable {
interface Function2<T1, T2, R> extends Serializable {
R apply(T1 val1, T2 val2);
}
/**
* A function which contains three parameters.
*/
public interface Function3<T1, T2, T3, R> extends Serializable {
interface Function3<T1, T2, T3, R> extends Serializable {
R apply(T1 val1, T2 val2, T3 val3);
}
}

View File

@@ -199,7 +199,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true)
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.collect(toList());
for (FileSlice fileSlice : fileSlices) {

View File

@@ -352,6 +352,10 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
// reset the config option
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
validateIndexLoaded();
}
protected void validateIndexLoaded() throws Exception {
preparePipeline(conf)
.consume(TestData.DATA_SET_UPDATE_INSERT)
.checkIndexLoaded(
@@ -418,7 +422,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
return TestHarness.instance().preparePipeline(tempFile, conf);
}
private TestHarness preparePipeline(Configuration conf) throws Exception {
protected TestHarness preparePipeline(Configuration conf) throws Exception {
return TestHarness.instance().preparePipeline(tempFile, conf);
}

View File

@@ -20,8 +20,10 @@ package org.apache.hudi.sink;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
@@ -36,6 +38,27 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
@Test
public void testIndexStateBootstrapWithCompactionScheduled() throws Exception {
// sets up the delta commits as 1 to generate a new compaction plan.
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
// open the function and ingest data
preparePipeline(conf)
.consume(TestData.DATA_SET_INSERT)
.assertEmptyDataFiles()
.checkpoint(1)
.assertNextEvent()
.checkpointComplete(1)
.checkWrittenData(EXPECTED1, 4)
.end();
// reset config options
conf.removeConfig(FlinkOptions.COMPACTION_DELTA_COMMITS);
// sets up index bootstrap
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
validateIndexLoaded();
}
@Override
public void testInsertClustering() {
// insert clustering is only valid for cow table.

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.utils;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -641,10 +642,11 @@ public class TestData {
File[] dataFiles = partitionDir.listFiles(file ->
file.getName().contains(".log.") && !file.getName().startsWith(".."));
assertNotNull(dataFiles);
HoodieMergedLogRecordScanner scanner = getScanner(
fs, baseFile.getPath(), Arrays.stream(dataFiles).map(File::getAbsolutePath)
.sorted(Comparator.naturalOrder()).collect(Collectors.toList()),
schema, latestInstant);
List<String> logPaths = Arrays.stream(dataFiles)
.sorted((f1, f2) -> HoodieLogFile.getLogFileComparator()
.compare(new HoodieLogFile(f1.getPath()), new HoodieLogFile(f2.getPath())))
.map(File::getAbsolutePath).collect(Collectors.toList());
HoodieMergedLogRecordScanner scanner = getScanner(fs, baseFile.getPath(), logPaths, schema, latestInstant);
List<String> readBuffer = scanner.getRecords().values().stream()
.map(hoodieRecord -> {
try {