1
0

[HUDI-3085] Improve bulk insert partitioner abstraction (#4441)

This commit is contained in:
Yuwei XIAO
2022-04-25 18:42:17 +08:00
committed by GitHub
parent 9054b85961
commit f2ba0fead2
9 changed files with 65 additions and 45 deletions

View File

@@ -18,12 +18,18 @@
package org.apache.hudi.table;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.WriteHandleFactory;
import java.io.Serializable;
/**
* Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. - Average records per output spark
* partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
*/
public interface BulkInsertPartitioner<I> {
public interface BulkInsertPartitioner<I> extends Serializable {
/**
* Repartitions the input records into at least expected number of output spark partitions.
@@ -38,4 +44,24 @@ public interface BulkInsertPartitioner<I> {
* @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
*/
boolean arePartitionRecordsSorted();
/**
* Return file group id prefix for the given data partition.
* By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group
* @param partitionId data partition
* @return
*/
default String getFileIdPfx(int partitionId) {
return FSUtils.createNewFileIdPfx();
}
/**
* Return write handle factory for the given partition.
* @param partitionId data partition
* @return
*/
default Option<WriteHandleFactory> getWriteHandleFactory(int partitionId) {
return Option.empty();
}
}

View File

@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
BulkInsertPartitioner partitioner,
boolean addMetadataFields,
int parallelism,
WriteHandleFactory writeHandleFactory);

View File

@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
@@ -121,16 +122,16 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
*
* @param strategyParams Strategy parameters containing columns to sort the data by when clustering.
* @param schema Schema of the data including metadata fields.
* @return empty for now.
* @return partitioner for the java engine
*/
protected Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
protected BulkInsertPartitioner<List<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new JavaCustomColumnsSortPartitioner(
return new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema),
getWriteConfig().isConsistentLogicalTimestampEnabled()));
getWriteConfig().isConsistentLogicalTimestampEnabled());
} else {
return Option.empty();
return JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
}
}

View File

@@ -77,8 +77,11 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
config.shouldAllowMultiWriteOnSameInstant());
}
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
// write new files
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
@@ -90,7 +93,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
BulkInsertPartitioner partitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory writeHandleFactory) {
@@ -103,12 +106,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
parallelism, table);
}
final List<HoodieRecord<T>> repartitionedRecords;
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
? userDefinedBulkInsertPartitioner.get()
: JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
// only List is supported for Java partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
config.getFileIdPrefixProviderClassName(),
@@ -119,7 +117,8 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
config, instantTime, table,
fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(),
new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
// Always get the first WriteHandleFactory, as there is only a single data partition for hudi java engine.
(WriteHandleFactory) partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll);
return writeStatuses;
}

View File

@@ -46,6 +46,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
@@ -137,7 +138,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
* @param schema Schema of the data including metadata fields.
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty.
*/
protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
@@ -159,7 +160,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
});
}).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
}
/**

View File

@@ -72,6 +72,7 @@ public class SparkSingleFileSortExecutionStrategy<T extends HoodieRecordPayload<
.withProps(getWriteConfig().getProps()).build();
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
}

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.function.Function2;
@@ -41,27 +42,27 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
private boolean areRecordsSorted;
private HoodieWriteConfig config;
private HoodieTable hoodieTable;
private List<String> fileIDPrefixes;
private boolean useWriterSchema;
private BulkInsertPartitioner partitioner;
private WriteHandleFactory writeHandleFactory;
public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
HoodieWriteConfig config, HoodieTable hoodieTable,
List<String> fileIDPrefixes, boolean useWriterSchema,
boolean useWriterSchema, BulkInsertPartitioner partitioner,
WriteHandleFactory writeHandleFactory) {
this.instantTime = instantTime;
this.areRecordsSorted = areRecordsSorted;
this.config = config;
this.hoodieTable = hoodieTable;
this.fileIDPrefixes = fileIDPrefixes;
this.useWriterSchema = useWriterSchema;
this.writeHandleFactory = writeHandleFactory;
this.partitioner = partitioner;
}
@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> recordItr) {
return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema,
writeHandleFactory);
partitioner.getFileIdPfx(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema,
(WriteHandleFactory) partitioner.getWriteHandleFactory(partition).orElse(this.writeHandleFactory));
}
}

View File

@@ -49,9 +49,9 @@ import java.util.List;
public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
private final HoodieSparkEngineContext sparkEngineContext;
private final transient HoodieSparkEngineContext sparkEngineContext;
private final String[] orderByColumns;
private final Schema schema;
private final SerializableSchema schema;
private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy;
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType;
@@ -64,14 +64,13 @@ public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
this.orderByColumns = orderByColumns;
this.layoutOptStrategy = layoutOptStrategy;
this.curveCompositionStrategyType = curveCompositionStrategyType;
this.schema = schema;
this.schema = new SerializableSchema(schema);
}
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
SerializableSchema serializableSchema = new SerializableSchema(schema);
JavaRDD<GenericRecord> genericRecordsRDD =
records.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get());
records.map(f -> (GenericRecord) f.getData().getInsertValue(schema.get()).get());
Dataset<Row> sourceDataset =
AvroConversionUtils.createDataFrame(
@@ -82,7 +81,7 @@ public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), schema.getNamespace(), false, Option.empty())
return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty())
.toJavaRDD()
.map(record -> {
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,8 +38,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* A spark implementation of {@link BaseBulkInsertHelper}.
@@ -76,9 +73,12 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
// write new files
HoodieData<WriteStatus> writeStatuses =
bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
@@ -90,7 +90,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
BulkInsertPartitioner partitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory writeHandleFactory) {
@@ -103,20 +103,12 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
parallelism, table);
}
final HoodieData<HoodieRecord<T>> repartitionedRecords;
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
? userDefinedBulkInsertPartitioner.get()
: BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
// only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism));
// generate new file ID prefixes for each output partition
final List<String> fileIDPrefixes =
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
final HoodieData<HoodieRecord<T>> repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism));
JavaRDD<WriteStatus> writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords)
.mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime,
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true)
partitioner.arePartitionRecordsSorted(), config, table, useWriterSchema, partitioner, writeHandleFactory), true)
.flatMap(List::iterator);
return HoodieJavaRDD.of(writeStatusRDD);