1
0

delete duplicate bootstrap function (#3052)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-06-09 19:29:57 +08:00
committed by GitHub
parent e8fcf04b57
commit 728089a888
7 changed files with 53 additions and 62 deletions

View File

@@ -18,22 +18,6 @@
package org.apache.hudi.sink.bootstrap; package org.apache.hudi.sink.bootstrap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
@@ -47,11 +31,26 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunc; import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -61,14 +60,12 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
* The function to load index from exists hoodieTable. * The function to load index from existing hoodieTable.
* *
* <p>Each subtask in bootstrapFunction triggers the bootstrap index with the first element, * <p>Each subtask of the function triggers the index bootstrap when the first element came in,
* Received record cannot be sent until the index is all sent. * the record cannot be sent until all the index records have been sent.
* *
* <p>The output records should then shuffle by the recordKey and thus do scalable write. * <p>The output records should then shuffle by the recordKey and thus do scalable write.
*
* @see BootstrapFunction
*/ */
public class BootstrapFunction<I, O extends HoodieRecord> public class BootstrapFunction<I, O extends HoodieRecord>
extends ProcessFunction<I, O> extends ProcessFunction<I, O>
@@ -95,12 +92,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
@Override @Override
public void initializeState(FunctionInitializationContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception {
this.bootstrapState = context.getOperatorStateStore().getListState( this.bootstrapState = context.getOperatorStateStore()
new ListStateDescriptor<>( .getListState(new ListStateDescriptor<>("bootstrap-state", Types.BOOLEAN()));
"bootstrap-state",
TypeInformation.of(new TypeHint<Boolean>() {})
)
);
if (context.isRestored()) { if (context.isRestored()) {
LOG.info("Restoring state for the {}.", getClass().getSimpleName()); LOG.info("Restoring state for the {}.", getClass().getSimpleName());
@@ -123,8 +116,9 @@ public class BootstrapFunction<I, O extends HoodieRecord>
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void processElement(I value, Context ctx, Collector<O> out) throws IOException { public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
if (!alreadyBootstrap) { if (!alreadyBootstrap) {
LOG.info("Start loading records in table {} into the index state, taskId = {}", conf.getString(FlinkOptions.PATH), getRuntimeContext().getIndexOfThisSubtask());
String basePath = hoodieTable.getMetaClient().getBasePath(); String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) { for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
if (pattern.matcher(partitionPath).matches()) { if (pattern.matcher(partitionPath).matches()) {
loadRecords(partitionPath, out); loadRecords(partitionPath, out);
@@ -132,30 +126,30 @@ public class BootstrapFunction<I, O extends HoodieRecord>
} }
// wait for others bootstrap task send bootstrap complete. // wait for others bootstrap task send bootstrap complete.
updateAndWaiting(); waitForBootstrapReady(taskID);
alreadyBootstrap = true; alreadyBootstrap = true;
LOG.info("Finish send index to BucketAssign, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
} }
// send data to next operator // send the trigger record
out.collect((O) value); out.collect((O) value);
} }
/** /**
* Wait for other bootstrap task send bootstrap complete. * Wait for other bootstrap tasks to finish the index bootstrap.
*/ */
private void updateAndWaiting() { private void waitForBootstrapReady(int taskID) {
int taskNum = getRuntimeContext().getNumberOfParallelSubtasks(); int taskNum = getRuntimeContext().getNumberOfParallelSubtasks();
int readyTaskNum = 1; int readyTaskNum = 1;
while (taskNum != readyTaskNum) { while (taskNum != readyTaskNum) {
try { try {
readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunc.NAME, getRuntimeContext().getIndexOfThisSubtask(), new BootstrapAggFunc()); readyTaskNum = aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME, taskID, new BootstrapAggFunction());
LOG.info("Waiting for others bootstrap task complete, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask()); LOG.info("Waiting for other bootstrap tasks to complete, taskId = {}.", taskID);
TimeUnit.SECONDS.sleep(5); TimeUnit.SECONDS.sleep(5);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("update global aggregate error", e); LOG.warn("Update global task bootstrap summary error", e);
} }
} }
} }
@@ -199,7 +193,7 @@ public class BootstrapFunction<I, O extends HoodieRecord>
} }
for (HoodieKey hoodieKey : hoodieKeys) { for (HoodieKey hoodieKey : hoodieKeys) {
out.collect((O) new BootstrapRecord(generateHoodieRecord(hoodieKey, baseFile))); out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, baseFile)));
} }
} }
} }

View File

@@ -22,12 +22,12 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
/** /**
* An record to mark HoodieRecord or IndexRecord. * The index record.
*/ */
public class BootstrapRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> { public class IndexRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public BootstrapRecord(HoodieRecord<T> record) { public IndexRecord(HoodieRecord<T> record) {
super(record); super(record);
} }
} }

View File

@@ -23,7 +23,7 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
/** /**
* Aggregate accumulator. * Bootstrap ready task id accumulator.
*/ */
public class BootstrapAccumulator implements Serializable { public class BootstrapAccumulator implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@@ -21,10 +21,11 @@ package org.apache.hudi.sink.bootstrap.aggregate;
import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.AggregateFunction;
/** /**
* Aggregate Function that accumulates the loaded task number of function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}. * Aggregate Function that accumulates the loaded task number of
* function {@link org.apache.hudi.sink.bootstrap.BootstrapFunction}.
*/ */
public class BootstrapAggFunc implements AggregateFunction<Integer, BootstrapAccumulator, Integer> { public class BootstrapAggFunction implements AggregateFunction<Integer, BootstrapAccumulator, Integer> {
public static final String NAME = BootstrapAggFunc.class.getSimpleName(); public static final String NAME = BootstrapAggFunction.class.getSimpleName();
@Override @Override
public BootstrapAccumulator createAccumulator() { public BootstrapAccumulator createAccumulator() {

View File

@@ -30,7 +30,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bootstrap.BootstrapRecord; import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.sink.utils.PayloadCreation;
import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
@@ -151,10 +151,10 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
@Override @Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception { public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (value instanceof BootstrapRecord) { if (value instanceof IndexRecord) {
BootstrapRecord bootstrapRecord = (BootstrapRecord) value; IndexRecord<?> indexRecord = (IndexRecord<?>) value;
this.context.setCurrentKey(bootstrapRecord.getRecordKey()); this.context.setCurrentKey(indexRecord.getRecordKey());
this.indexState.update((HoodieRecordGlobalLocation) bootstrapRecord.getCurrentLocation()); this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
} else { } else {
processRecord((HoodieRecord<?>) value, out); processRecord((HoodieRecord<?>) value, out);
} }

View File

@@ -24,7 +24,6 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction; import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.BootstrapRecord;
import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -88,11 +87,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
} }
DataStream<Object> pipeline = hoodieDataStream DataStream<Object> pipeline = hoodieDataStream
.transform("index_bootstrap",
TypeInformation.of(BootstrapRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(BootstrapRecord::getRecordKey) .keyBy(HoodieRecord::getRecordKey)
.transform( .transform(
"bucket_assigner", "bucket_assigner",
TypeInformation.of(HoodieRecord.class), TypeInformation.of(HoodieRecord.class),

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.bootstrap.BootstrapFunction; import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.BootstrapRecord; import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.StreamWriteFunction; import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -157,7 +157,7 @@ public class StreamWriteFunctionWrapper<I> {
Collector<HoodieRecord<?>> bootstrapCollector = new Collector<HoodieRecord<?>>() { Collector<HoodieRecord<?>> bootstrapCollector = new Collector<HoodieRecord<?>>() {
@Override @Override
public void collect(HoodieRecord<?> record) { public void collect(HoodieRecord<?> record) {
if (record instanceof BootstrapRecord) { if (record instanceof IndexRecord) {
bootstrapRecords.add(record); bootstrapRecords.add(record);
} }
} }