diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index 5f81559ec..503a5bf06 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -18,22 +18,6 @@ package org.apache.hudi.sink.bootstrap; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.state.CheckpointListener; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.util.Collector; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; @@ -47,11 +31,26 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndexUtils; -import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunc; +import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.scala.typeutils.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,18 +60,16 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; /** - * The function to load index from exists hoodieTable. + * The function to load index from existing hoodieTable. * - *

Each subtask in bootstrapFunction triggers the bootstrap index with the first element, - * Received record cannot be sent until the index is all sent. + *

Each subtask of the function triggers the index bootstrap when the first element came in, + * the record cannot be sent until all the index records have been sent. * *

The output records should then shuffle by the recordKey and thus do scalable write. - * - * @see BootstrapFunction */ public class BootstrapFunction - extends ProcessFunction - implements CheckpointedFunction, CheckpointListener { + extends ProcessFunction + implements CheckpointedFunction, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(BootstrapFunction.class); @@ -95,12 +92,8 @@ public class BootstrapFunction @Override public void initializeState(FunctionInitializationContext context) throws Exception { - this.bootstrapState = context.getOperatorStateStore().getListState( - new ListStateDescriptor<>( - "bootstrap-state", - TypeInformation.of(new TypeHint() {}) - ) - ); + this.bootstrapState = context.getOperatorStateStore() + .getListState(new ListStateDescriptor<>("bootstrap-state", Types.BOOLEAN())); if (context.isRestored()) { LOG.info("Restoring state for the {}.", getClass().getSimpleName()); @@ -123,8 +116,9 @@ public class BootstrapFunction @SuppressWarnings("unchecked") public void processElement(I value, Context ctx, Collector out) throws IOException { if (!alreadyBootstrap) { - LOG.info("Start loading records in table {} into the index state, taskId = {}", conf.getString(FlinkOptions.PATH), getRuntimeContext().getIndexOfThisSubtask()); String basePath = hoodieTable.getMetaClient().getBasePath(); + int taskID = getRuntimeContext().getIndexOfThisSubtask(); + LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID); for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) { if (pattern.matcher(partitionPath).matches()) { loadRecords(partitionPath, out); @@ -132,30 +126,30 @@ public class BootstrapFunction } // wait for others bootstrap task send bootstrap complete. - updateAndWaiting(); + waitForBootstrapReady(taskID); alreadyBootstrap = true; - LOG.info("Finish send index to BucketAssign, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); + LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); } - // send data to next operator + // send the trigger record out.collect((O) value); } /** - * Wait for other bootstrap task send bootstrap complete. + * Wait for other bootstrap tasks to finish the index bootstrap. */ - private void updateAndWaiting() { + private void waitForBootstrapReady(int taskID) { int taskNum = getRuntimeContext().getNumberOfParallelSubtasks(); int readyTaskNum = 1; while (taskNum != readyTaskNum) { try { - readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunc.NAME, getRuntimeContext().getIndexOfThisSubtask(), new BootstrapAggFunc()); - LOG.info("Waiting for others bootstrap task complete, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); + readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new BootstrapAggFunction()); + LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", taskID); TimeUnit.SECONDS.sleep(5); } catch (Exception e) { - LOG.warn("update global aggregate error", e); + LOG.warn("Update global task bootstrap summary error", e); } } } @@ -178,7 +172,7 @@ public class BootstrapFunction long start = System.currentTimeMillis(); BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat()); List latestBaseFiles = - HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable); + HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, this.hoodieTable); LOG.info("All baseFile in partition {} size = {}", partitionPath, latestBaseFiles.size()); final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); @@ -193,20 +187,20 @@ public class BootstrapFunction final List hoodieKeys; try { hoodieKeys = - fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); + fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath())); } catch (Exception e) { throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e); } for (HoodieKey hoodieKey : hoodieKeys) { - out.collect((O) new BootstrapRecord(generateHoodieRecord(hoodieKey, baseFile))); + out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, baseFile))); } } } long cost = System.currentTimeMillis() - start; LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.", - this.getClass().getSimpleName(), taskID, partitionPath, cost); + this.getClass().getSimpleName(), taskID, partitionPath, cost); } @SuppressWarnings("unchecked") diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/IndexRecord.java similarity index 84% rename from hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/IndexRecord.java index 025d844b2..2fe83b71c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapRecord.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/IndexRecord.java @@ -22,12 +22,12 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; /** - * An record to mark HoodieRecord or IndexRecord. + * The index record. */ -public class BootstrapRecord extends HoodieRecord { +public class IndexRecord extends HoodieRecord { private static final long serialVersionUID = 1L; - public BootstrapRecord(HoodieRecord record) { + public IndexRecord(HoodieRecord record) { super(record); } } \ No newline at end of file diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java index 80067f067..14630a1f8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAccumulator.java @@ -23,7 +23,7 @@ import java.util.HashSet; import java.util.Set; /** - * Aggregate accumulator. + * Bootstrap ready task id accumulator. */ public class BootstrapAccumulator implements Serializable { private static final long serialVersionUID = 1L; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java similarity index 86% rename from hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java index 2233e8422..075de6dc8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunc.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/aggregate/BootstrapAggFunction.java @@ -21,10 +21,11 @@ package org.apache.hudi.sink.bootstrap.aggregate; import org.apache.flink.api.common.functions.AggregateFunction; /** - * Aggregate Function that accumulates the loaded task number of function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}. + * Aggregate Function that accumulates the loaded task number of + * function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}. */ -public class BootstrapAggFunc implements AggregateFunction { - public static final String NAME = BootstrapAggFunc.class.getSimpleName(); +public class BootstrapAggFunction implements AggregateFunction { + public static final String NAME = BootstrapAggFunction.class.getSimpleName(); @Override public BootstrapAccumulator createAccumulator() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 00a82747f..75a345430 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -30,7 +30,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.bootstrap.BootstrapRecord; +import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.util.StreamerUtil; @@ -151,10 +151,10 @@ public class BucketAssignFunction> @Override public void processElement(I value, Context ctx, Collector out) throws Exception { - if (value instanceof BootstrapRecord) { - BootstrapRecord bootstrapRecord = (BootstrapRecord) value; - this.context.setCurrentKey(bootstrapRecord.getRecordKey()); - this.indexState.update((HoodieRecordGlobalLocation) bootstrapRecord.getCurrentLocation()); + if (value instanceof IndexRecord) { + IndexRecord indexRecord = (IndexRecord) value; + this.context.setCurrentKey(indexRecord.getRecordKey()); + this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation()); } else { processRecord((HoodieRecord) value, out); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 0d8f7f293..4fbcbb584 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -24,7 +24,6 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.bootstrap.BootstrapFunction; -import org.apache.hudi.sink.bootstrap.BootstrapRecord; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; @@ -88,11 +87,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, } DataStream pipeline = hoodieDataStream - .transform("index_bootstrap", - TypeInformation.of(BootstrapRecord.class), - new ProcessOperator<>(new BootstrapFunction<>(conf))) // Key-by record key, to avoid multiple subtasks write to a bucket at the same time - .keyBy(BootstrapRecord::getRecordKey) + .keyBy(HoodieRecord::getRecordKey) .transform( "bucket_assigner", TypeInformation.of(HoodieRecord.class), diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index 705780cdc..9ba8b8efe 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.bootstrap.BootstrapFunction; -import org.apache.hudi.sink.bootstrap.BootstrapRecord; +import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; @@ -157,7 +157,7 @@ public class StreamWriteFunctionWrapper { Collector> bootstrapCollector = new Collector>() { @Override public void collect(HoodieRecord record) { - if (record instanceof BootstrapRecord) { + if (record instanceof IndexRecord) { bootstrapRecords.add(record); } }