1
0

[HUDI-2052] Support load logFile in BootstrapFunction (#3134)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-06-30 20:37:00 +08:00
committed by GitHub
parent 94f0f40fec
commit 07e93de8b4
4 changed files with 104 additions and 36 deletions

View File

@@ -22,20 +22,24 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListState;
@@ -54,11 +58,12 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
/** /**
* The function to load index from existing hoodieTable. * The function to load index from existing hoodieTable.
* *
@@ -78,6 +83,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
private final Configuration conf; private final Configuration conf;
private transient org.apache.hadoop.conf.Configuration hadoopConf; private transient org.apache.hadoop.conf.Configuration hadoopConf;
private transient HoodieWriteConfig writeConfig;
private GlobalAggregateManager aggregateManager; private GlobalAggregateManager aggregateManager;
private ListState<Boolean> bootstrapState; private ListState<Boolean> bootstrapState;
@@ -108,13 +114,14 @@ public class BootstrapFunction<I, O extends HoodieRecord>
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
super.open(parameters); super.open(parameters);
this.hadoopConf = StreamerUtil.getHadoopConf(); this.hadoopConf = StreamerUtil.getHadoopConf();
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
this.hoodieTable = getTable(); this.hoodieTable = getTable();
this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager(); this.aggregateManager = ((StreamingRuntimeContext) getRuntimeContext()).getGlobalAggregateManager();
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void processElement(I value, Context ctx, Collector<O> out) throws IOException { public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (!alreadyBootstrap) { if (!alreadyBootstrap) {
String basePath = hoodieTable.getMetaClient().getBasePath(); String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask(); int taskID = getRuntimeContext().getIndexOfThisSubtask();
@@ -155,11 +162,10 @@ public class BootstrapFunction<I, O extends HoodieRecord>
} }
private HoodieFlinkTable getTable() { private HoodieFlinkTable getTable() {
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
new SerializableConfiguration(this.hadoopConf), new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext())); new FlinkTaskContextSupplier(getRuntimeContext()));
return HoodieFlinkTable.create(writeConfig, context); return HoodieFlinkTable.create(this.writeConfig, context);
} }
/** /**
@@ -168,32 +174,64 @@ public class BootstrapFunction<I, O extends HoodieRecord>
* @param partitionPath The partition path * @param partitionPath The partition path
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void loadRecords(String partitionPath, Collector<O> out) { private void loadRecords(String partitionPath, Collector<O> out) throws Exception {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat()); BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
List<HoodieBaseFile> latestBaseFiles = Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable);
LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size());
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask(); final int taskID = getRuntimeContext().getIndexOfThisSubtask();
for (HoodieBaseFile baseFile : latestBaseFiles) {
boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
baseFile.getFileId(), maxParallelism, parallelism) == taskID;
if (shouldLoad) { Option<HoodieInstant> latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline()
LOG.info("Load records from file {}.", baseFile); .filterCompletedInstants().lastInstant();
final List<HoodieKey> hoodieKeys;
try { if (latestCommitTime.isPresent()) {
hoodieKeys = List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true)
} catch (Exception e) { .collect(toList());
throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
for (FileSlice fileSlice : fileSlices) {
if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
continue;
} }
LOG.info("Load records from {}.", fileSlice);
for (HoodieKey hoodieKey : hoodieKeys) { // load parquet records
out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, baseFile))); fileSlice.getBaseFile().ifPresent(baseFile -> {
// filter out crushed files
if (baseFile.getFileSize() <= 0) {
return;
}
final List<HoodieKey> hoodieKeys;
try {
hoodieKeys =
fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
}
for (HoodieKey hoodieKey : hoodieKeys) {
out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)));
}
});
// load avro log records
List<String> logPaths = fileSlice.getLogFiles()
// filter out crushed files
.filter(logFile -> logFile.getFileSize() > 0)
.map(logFile -> logFile.getPath().toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = scanLog(logPaths, schema, latestCommitTime.get().getTimestamp());
try {
for (String recordKey : scanner.getRecords().keySet()) {
out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice)));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
} }
} }
} }
@@ -203,17 +241,44 @@ public class BootstrapFunction<I, O extends HoodieRecord>
this.getClass().getSimpleName(), taskID, partitionPath, cost); this.getClass().getSimpleName(), taskID, partitionPath, cost);
} }
private HoodieMergedLogRecordScanner scanLog(
List<String> logPaths,
Schema logSchema,
String latestInstantTime) {
String basePath = this.hoodieTable.getMetaClient().getBasePath();
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(FSUtils.getFs(basePath, this.hadoopConf))
.withBasePath(basePath)
.withLogFilePaths(logPaths)
.withReaderSchema(logSchema)
.withLatestInstantTime(latestInstantTime)
.withReadBlocksLazily(this.writeConfig.getCompactionLazyBlockReadEnabled())
.withReverseReader(false)
.withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize())
.withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge())
.withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath())
.build();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, HoodieBaseFile baseFile) { public static HoodieRecord generateHoodieRecord(HoodieKey hoodieKey, FileSlice fileSlice) {
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null); HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, null);
hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), baseFile.getCommitTime(), baseFile.getFileId())); hoodieRecord.setCurrentLocation(new HoodieRecordGlobalLocation(hoodieKey.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId()));
hoodieRecord.seal(); hoodieRecord.seal();
return hoodieRecord; return hoodieRecord;
} }
private static boolean shouldLoadFile(String fileId,
int maxParallelism,
int parallelism,
int taskID) {
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
fileId, maxParallelism, parallelism) == taskID;
}
@Override @Override
public void notifyCheckpointComplete(long checkpointId) throws Exception { public void notifyCheckpointComplete(long checkpointId) {
// no operation // no operation
} }

View File

@@ -571,7 +571,7 @@ public class TestWriteCopyOnWrite {
checkWrittenData(tempFile, expected, 1); checkWrittenData(tempFile, expected, 1);
} }
Map<String, String> getMiniBatchExpected() { protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>(); Map<String, String> expected = new HashMap<>();
// the last 2 lines are merged // the last 2 lines are merged
expected.put("par1", "[" expected.put("par1", "["
@@ -581,6 +581,10 @@ public class TestWriteCopyOnWrite {
return expected; return expected;
} }
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED2;
}
@Test @Test
public void testIndexStateBootstrap() throws Exception { public void testIndexStateBootstrap() throws Exception {
// open the function and ingest data // open the function and ingest data
@@ -637,7 +641,9 @@ public class TestWriteCopyOnWrite {
nextEvent = funcWrapper.getNextEvent(); nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
checkWrittenData(tempFile, EXPECTED2);
Map<String, String> expected = getExpectedBeforeCheckpointComplete();
checkWrittenData(tempFile, expected);
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");

View File

@@ -37,8 +37,6 @@ import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.File; import java.io.File;
import java.util.Comparator; import java.util.Comparator;
@@ -83,13 +81,12 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema); TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
} }
@Disabled @Override
@Test protected Map<String, String> getExpectedBeforeCheckpointComplete() {
public void testIndexStateBootstrap() { return EXPECTED1;
// Ignore the index bootstrap because we only support parquet load now.
} }
Map<String, String> getMiniBatchExpected() { protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>(); Map<String, String> expected = new HashMap<>();
// MOR mode merges the messages with the same key. // MOR mode merges the messages with the same key.
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");

View File

@@ -45,7 +45,7 @@ public class TestWriteMergeOnReadWithCompact extends TestWriteCopyOnWrite {
// Ignore the index bootstrap because we only support parquet load now. // Ignore the index bootstrap because we only support parquet load now.
} }
Map<String, String> getMiniBatchExpected() { protected Map<String, String> getMiniBatchExpected() {
Map<String, String> expected = new HashMap<>(); Map<String, String> expected = new HashMap<>();
// MOR mode merges the messages with the same key. // MOR mode merges the messages with the same key.
expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]"); expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");