[HUDI-3716] OOM occurred when use bulk_insert cow table with flink BUCKET index (#5135)
This commit is contained in:
@@ -459,17 +459,17 @@ public class FlinkOptions extends HoodieConfig {
|
||||
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
|
||||
+ "waits for the instant commit success, only for internal use");
|
||||
|
||||
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions
|
||||
.key("write.bulk_insert.shuffle_by_partition")
|
||||
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_INPUT = ConfigOptions
|
||||
.key("write.bulk_insert.shuffle_input")
|
||||
.booleanType()
|
||||
.defaultValue(true)
|
||||
.withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true");
|
||||
.withDescription("Whether to shuffle the inputs by specific fields for bulk insert tasks, default true");
|
||||
|
||||
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions
|
||||
.key("write.bulk_insert.sort_by_partition")
|
||||
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_INPUT = ConfigOptions
|
||||
.key("write.bulk_insert.sort_input")
|
||||
.booleanType()
|
||||
.defaultValue(true)
|
||||
.withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true");
|
||||
.withDescription("Whether to sort the inputs by specific fields for bulk insert tasks, default true");
|
||||
|
||||
public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
|
||||
.key("write.sort.memory")
|
||||
|
||||
@@ -19,14 +19,19 @@
|
||||
package org.apache.hudi.sink.bucket;
|
||||
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.index.bucket.BucketIdentifier;
|
||||
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
|
||||
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
|
||||
import org.apache.hudi.sink.bulk.RowDataKeyGen;
|
||||
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.table.api.DataTypes;
|
||||
import org.apache.flink.table.data.GenericRowData;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.data.StringData;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -38,36 +43,67 @@ import java.io.IOException;
|
||||
*/
|
||||
public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class);
|
||||
public static final String FILE_GROUP_META_FIELD = "_fg";
|
||||
|
||||
private final int bucketNum;
|
||||
private final String indexKeyFields;
|
||||
private final int recordArity;
|
||||
|
||||
private String lastFileId; // for efficient code path
|
||||
|
||||
public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
|
||||
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
|
||||
super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
|
||||
this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
||||
this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||
this.recordArity = rowType.getFieldCount();
|
||||
}
|
||||
|
||||
public void write(RowData record) throws IOException {
|
||||
public void write(RowData tuple) throws IOException {
|
||||
try {
|
||||
RowData record = tuple.getRow(1, this.recordArity);
|
||||
String recordKey = keyGen.getRecordKey(record);
|
||||
String partitionPath = keyGen.getPartitionPath(record);
|
||||
final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum);
|
||||
String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
|
||||
getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record);
|
||||
String fileId = tuple.getString(0).toString();
|
||||
if ((lastFileId == null) || !lastFileId.equals(fileId)) {
|
||||
LOG.info("Creating new file for partition path " + partitionPath);
|
||||
handle = getRowCreateHandle(partitionPath, fileId);
|
||||
lastFileId = fileId;
|
||||
}
|
||||
handle.write(recordKey, partitionPath, record);
|
||||
} catch (Throwable throwable) {
|
||||
LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable);
|
||||
throw throwable;
|
||||
}
|
||||
}
|
||||
|
||||
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) {
|
||||
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) throws IOException {
|
||||
if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId
|
||||
if (this.isInputSorted) {
|
||||
// if records are sorted, we can close all existing handles
|
||||
close();
|
||||
}
|
||||
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
|
||||
instantTime, taskPartitionId, taskId, taskEpochId, rowType);
|
||||
handles.put(fileId, rowCreateHandle);
|
||||
}
|
||||
return handles.get(fileId);
|
||||
}
|
||||
|
||||
public static SortOperatorGen getFileIdSorterGen(RowType rowType) {
|
||||
return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
|
||||
}
|
||||
|
||||
private static String getFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
|
||||
String recordKey = keyGen.getRecordKey(record);
|
||||
final int bucketId = BucketIdentifier.getBucketId(recordKey, indexKeyFields, bucketNum);
|
||||
return BucketIdentifier.newBucketFileIdPrefix(bucketId);
|
||||
}
|
||||
|
||||
public static RowData rowWithFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
|
||||
final String fileId = getFileId(keyGen, record, indexKeyFields, bucketNum);
|
||||
return GenericRowData.of(StringData.fromString(fileId), record);
|
||||
}
|
||||
|
||||
public static RowType rowTypeWithFileId(RowType rowType) {
|
||||
LogicalType[] types = new LogicalType[] {DataTypes.STRING().getLogicalType(), rowType};
|
||||
String[] names = new String[] {FILE_GROUP_META_FIELD, "record"};
|
||||
return RowType.of(types, names);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,9 +57,9 @@ public class BulkInsertWriterHelper {
|
||||
protected final HoodieTable hoodieTable;
|
||||
protected final HoodieWriteConfig writeConfig;
|
||||
protected final RowType rowType;
|
||||
private final Boolean arePartitionRecordsSorted;
|
||||
protected final Boolean isInputSorted;
|
||||
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
|
||||
private HoodieRowDataCreateHandle handle;
|
||||
protected HoodieRowDataCreateHandle handle;
|
||||
private String lastKnownPartitionPath = null;
|
||||
private final String fileIdPrefix;
|
||||
private int numFilesWritten = 0;
|
||||
@@ -75,7 +75,7 @@ public class BulkInsertWriterHelper {
|
||||
this.taskId = taskId;
|
||||
this.taskEpochId = taskEpochId;
|
||||
this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
|
||||
this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION);
|
||||
this.isInputSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
|
||||
this.fileIdPrefix = UUID.randomUUID().toString();
|
||||
this.keyGen = RowDataKeyGen.instance(conf, rowType);
|
||||
}
|
||||
@@ -112,7 +112,7 @@ public class BulkInsertWriterHelper {
|
||||
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
|
||||
if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path
|
||||
// if records are sorted, we can close all existing handles
|
||||
if (arePartitionRecordsSorted) {
|
||||
if (isInputSorted) {
|
||||
close();
|
||||
}
|
||||
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.apache.hudi.sink.StreamWriteOperator;
|
||||
import org.apache.hudi.sink.append.AppendWriteOperator;
|
||||
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
|
||||
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
|
||||
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
|
||||
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
|
||||
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
|
||||
import org.apache.hudi.sink.bulk.RowDataKeyGen;
|
||||
@@ -53,6 +54,7 @@ import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||
import org.apache.flink.streaming.api.operators.ProcessOperator;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
|
||||
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
/**
|
||||
@@ -92,7 +94,18 @@ public class Pipelines {
|
||||
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
|
||||
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
|
||||
return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
|
||||
RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
|
||||
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
|
||||
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
|
||||
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum),
|
||||
typeInfo);
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
|
||||
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
|
||||
dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator());
|
||||
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
|
||||
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
|
||||
}
|
||||
return dataStream
|
||||
.transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
|
||||
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
|
||||
@@ -103,7 +116,7 @@ public class Pipelines {
|
||||
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
|
||||
if (partitionFields.length > 0) {
|
||||
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) {
|
||||
|
||||
// shuffle by partition keys
|
||||
// use #partitionCustom instead of #keyBy to avoid duplicate sort operations,
|
||||
@@ -112,7 +125,7 @@ public class Pipelines {
|
||||
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels);
|
||||
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
|
||||
}
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
|
||||
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
|
||||
// sort by partition keys
|
||||
dataStream = dataStream
|
||||
|
||||
@@ -916,7 +916,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
|
||||
String hoodieTableDDL = sql("hoodie_sink")
|
||||
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
|
||||
.option(FlinkOptions.OPERATION, "bulk_insert")
|
||||
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true)
|
||||
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
|
||||
.option(FlinkOptions.INDEX_TYPE, indexType)
|
||||
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
|
||||
.end();
|
||||
|
||||
Reference in New Issue
Block a user