[HUDI-2872][HUDI-2646] Refactoring layout optimization (clustering) flow to support linear ordering (#4606)
Refactoring layout optimization (clustering) flow to - Enable support for linear (lexicographic) ordering as one of the ordering strategies (along w/ Z-order, Hilbert) - Reconcile Layout Optimization and Clustering configuration to be more congruent
This commit is contained in:
@@ -305,7 +305,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
|
||||
finalizeWrite(table, compactionCommitTime, writeStats);
|
||||
// commit to data table after committing to metadata table.
|
||||
writeTableMetadataForTableServices(table, metadata, compactionInstant);
|
||||
updateTableMetadata(table, metadata, compactionInstant);
|
||||
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
|
||||
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
|
||||
} finally {
|
||||
@@ -378,17 +378,20 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
throw new HoodieClusteringException("Clustering failed to write to files:"
|
||||
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
|
||||
}
|
||||
|
||||
final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
|
||||
try {
|
||||
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
|
||||
|
||||
finalizeWrite(table, clusteringCommitTime, writeStats);
|
||||
writeTableMetadataForTableServices(table, metadata,clusteringInstant);
|
||||
// Update outstanding metadata indexes
|
||||
if (config.isLayoutOptimizationEnabled()
|
||||
&& !config.getClusteringSortColumns().isEmpty()) {
|
||||
table.updateMetadataIndexes(context, writeStats, clusteringCommitTime);
|
||||
}
|
||||
// Update table's metadata (table)
|
||||
updateTableMetadata(table, metadata, clusteringInstant);
|
||||
// Update tables' metadata indexes
|
||||
// NOTE: This overlaps w/ metadata table (above) and will be reconciled in the future
|
||||
table.updateMetadataIndexes(context, writeStats, clusteringCommitTime);
|
||||
|
||||
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
|
||||
|
||||
table.getActiveTimeline().transitionReplaceInflightToComplete(
|
||||
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
|
||||
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
@@ -412,13 +415,13 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
|
||||
}
|
||||
|
||||
private void writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
|
||||
HoodieInstant hoodieInstant) {
|
||||
private void updateTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
|
||||
HoodieInstant hoodieInstant) {
|
||||
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
|
||||
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
|
||||
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
|
||||
table.getMetadataWriter(hoodieInstant.getTimestamp()).ifPresent(
|
||||
w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
|
||||
table.getMetadataWriter(hoodieInstant.getTimestamp())
|
||||
.ifPresent(writer -> writer.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -18,6 +18,10 @@
|
||||
|
||||
package org.apache.hudi.client.clustering.run.strategy;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.avro.model.HoodieClusteringGroup;
|
||||
import org.apache.hudi.avro.model.HoodieClusteringPlan;
|
||||
@@ -39,11 +43,12 @@ import org.apache.hudi.common.util.FutureUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieClusteringException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
|
||||
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner;
|
||||
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
|
||||
import org.apache.hudi.io.IOUtils;
|
||||
import org.apache.hudi.io.storage.HoodieFileReader;
|
||||
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
|
||||
@@ -54,11 +59,6 @@ import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
@@ -134,16 +134,28 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
|
||||
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty.
|
||||
*/
|
||||
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
|
||||
if (getWriteConfig().isLayoutOptimizationEnabled()) {
|
||||
// sort input records by z-order/hilbert
|
||||
return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(),
|
||||
getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema)));
|
||||
} else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
|
||||
return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
|
||||
HoodieAvroUtils.addMetadataFields(schema), getWriteConfig().isConsistentLogicalTimestampEnabled()));
|
||||
} else {
|
||||
return Option.empty();
|
||||
}
|
||||
Option<String[]> orderByColumnsOpt =
|
||||
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
|
||||
.map(listStr -> listStr.split(","));
|
||||
|
||||
return orderByColumnsOpt.map(orderByColumns -> {
|
||||
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();
|
||||
switch (layoutOptStrategy) {
|
||||
case ZORDER:
|
||||
case HILBERT:
|
||||
return new RDDSpatialCurveSortPartitioner(
|
||||
(HoodieSparkEngineContext) getEngineContext(),
|
||||
orderByColumns,
|
||||
layoutOptStrategy,
|
||||
getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
|
||||
HoodieAvroUtils.addMetadataFields(schema));
|
||||
case LINEAR:
|
||||
return new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema),
|
||||
getWriteConfig().isConsistentLogicalTimestampEnabled());
|
||||
default:
|
||||
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -124,8 +124,8 @@ public abstract class SingleSparkJobExecutionStrategy<T extends HoodieRecordPayl
|
||||
Iterator<List<WriteStatus>> writeStatuses = performClusteringWithRecordsIterator(inputRecords, clusteringOps.getNumOutputGroups(), instantTime,
|
||||
strategyParams, schema.get(), inputFileIds, preserveHoodieMetadata, taskContextSupplier);
|
||||
|
||||
Iterable<List<WriteStatus>> writestatusIterable = () -> writeStatuses;
|
||||
return StreamSupport.stream(writestatusIterable.spliterator(), false)
|
||||
Iterable<List<WriteStatus>> writeStatusIterable = () -> writeStatuses;
|
||||
return StreamSupport.stream(writeStatusIterable.spliterator(), false)
|
||||
.flatMap(writeStatusList -> writeStatusList.stream());
|
||||
}
|
||||
|
||||
|
||||
@@ -28,8 +28,8 @@ import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.RewriteAvroPayload;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieClusteringConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.sort.SpaceCurveSortingHelper;
|
||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
@@ -38,80 +38,74 @@ import org.apache.spark.sql.Row;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
|
||||
|
||||
/**
|
||||
* A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition.
|
||||
* support z-curve optimization, hilbert will come soon.
|
||||
* @param <T> HoodieRecordPayload type
|
||||
*/
|
||||
public class RDDSpatialCurveOptimizationSortPartitioner<T extends HoodieRecordPayload>
|
||||
public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
|
||||
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
|
||||
private final HoodieSparkEngineContext sparkEngineContext;
|
||||
private final SerializableSchema serializableSchema;
|
||||
private final HoodieWriteConfig config;
|
||||
|
||||
public RDDSpatialCurveOptimizationSortPartitioner(HoodieSparkEngineContext sparkEngineContext, HoodieWriteConfig config, Schema schema) {
|
||||
private final HoodieSparkEngineContext sparkEngineContext;
|
||||
private final String[] orderByColumns;
|
||||
private final Schema schema;
|
||||
private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy;
|
||||
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType;
|
||||
|
||||
public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContext,
|
||||
String[] orderByColumns,
|
||||
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
|
||||
HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType,
|
||||
Schema schema) {
|
||||
this.sparkEngineContext = sparkEngineContext;
|
||||
this.config = config;
|
||||
this.serializableSchema = new SerializableSchema(schema);
|
||||
this.orderByColumns = orderByColumns;
|
||||
this.layoutOptStrategy = layoutOptStrategy;
|
||||
this.curveCompositionStrategyType = curveCompositionStrategyType;
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
|
||||
JavaRDD<GenericRecord> preparedRecord = prepareGenericRecord(records, outputSparkPartitions, serializableSchema.get());
|
||||
return preparedRecord.map(record -> {
|
||||
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
|
||||
HoodieKey hoodieKey = new HoodieKey(key, partition);
|
||||
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new RewriteAvroPayload(record));
|
||||
return hoodieRecord;
|
||||
});
|
||||
}
|
||||
|
||||
private JavaRDD<GenericRecord> prepareGenericRecord(JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final Schema schema) {
|
||||
SerializableSchema serializableSchema = new SerializableSchema(schema);
|
||||
JavaRDD<GenericRecord> genericRecordJavaRDD = inputRecords.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get());
|
||||
Dataset<Row> originDF =
|
||||
JavaRDD<GenericRecord> genericRecordsRDD =
|
||||
records.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get());
|
||||
|
||||
Dataset<Row> sourceDataset =
|
||||
AvroConversionUtils.createDataFrame(
|
||||
genericRecordJavaRDD.rdd(),
|
||||
genericRecordsRDD.rdd(),
|
||||
schema.toString(),
|
||||
sparkEngineContext.getSqlContext().sparkSession()
|
||||
);
|
||||
|
||||
Dataset<Row> sortedDF = reorder(originDF, numOutputGroups);
|
||||
Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
|
||||
|
||||
return HoodieSparkUtils.createRdd(sortedDF, schema.getName(),
|
||||
schema.getNamespace(), false, org.apache.hudi.common.util.Option.empty()).toJavaRDD();
|
||||
return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), schema.getNamespace(), false, Option.empty())
|
||||
.toJavaRDD()
|
||||
.map(record -> {
|
||||
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
|
||||
String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
|
||||
HoodieKey hoodieKey = new HoodieKey(key, partition);
|
||||
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new RewriteAvroPayload(record));
|
||||
return hoodieRecord;
|
||||
});
|
||||
}
|
||||
|
||||
private Dataset<Row> reorder(Dataset<Row> originDF, int numOutputGroups) {
|
||||
String orderedColumnsListConfig = config.getClusteringSortColumns();
|
||||
|
||||
if (isNullOrEmpty(orderedColumnsListConfig) || numOutputGroups <= 0) {
|
||||
private Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {
|
||||
if (orderByColumns.length == 0) {
|
||||
// No-op
|
||||
return originDF;
|
||||
return dataset;
|
||||
}
|
||||
|
||||
List<String> orderedCols =
|
||||
Arrays.stream(orderedColumnsListConfig.split(","))
|
||||
.map(String::trim)
|
||||
.collect(Collectors.toList());
|
||||
List<String> orderedCols = Arrays.asList(orderByColumns);
|
||||
|
||||
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy =
|
||||
HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue(config.getLayoutOptimizationStrategy());
|
||||
|
||||
HoodieClusteringConfig.BuildCurveStrategyType curveBuildStrategyType = config.getLayoutOptimizationCurveBuildMethod();
|
||||
|
||||
switch (curveBuildStrategyType) {
|
||||
switch (curveCompositionStrategyType) {
|
||||
case DIRECT:
|
||||
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups);
|
||||
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
|
||||
case SAMPLE:
|
||||
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups);
|
||||
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
|
||||
default:
|
||||
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveBuildStrategyType));
|
||||
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,10 +262,10 @@ public class ColumnStatsIndexHelper {
|
||||
// │ │ ├── <part-...>.parquet
|
||||
// │ │ └── ...
|
||||
//
|
||||
// If index is currently empty (no persisted tables), we simply create one
|
||||
// using clustering operation's commit instance as it's name
|
||||
Path newIndexTablePath = new Path(indexFolderPath, commitTime);
|
||||
|
||||
// If index is currently empty (no persisted tables), we simply create one
|
||||
// using clustering operation's commit instance as it's name
|
||||
if (!fs.exists(new Path(indexFolderPath))) {
|
||||
newColStatsIndexDf.repartition(1)
|
||||
.write()
|
||||
@@ -326,6 +326,9 @@ public class ColumnStatsIndexHelper {
|
||||
.repartition(1)
|
||||
.write()
|
||||
.format("parquet")
|
||||
// NOTE: We intend to potentially overwrite index-table from the previous Clustering
|
||||
// operation that has failed to commit
|
||||
.mode("overwrite")
|
||||
.save(newIndexTablePath.toString());
|
||||
|
||||
// Clean up residual col-stats-index tables that have might have been dangling since
|
||||
|
||||
@@ -184,13 +184,6 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
|
||||
String basePath = metaClient.getBasePath();
|
||||
String indexPath = metaClient.getColumnStatsIndexPath();
|
||||
|
||||
List<String> completedCommits =
|
||||
metaClient.getCommitsTimeline()
|
||||
.filterCompletedInstants()
|
||||
.getInstants()
|
||||
.map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<String> touchedFiles =
|
||||
updatedFilesStats.stream()
|
||||
.map(s -> new Path(basePath, s.getPath()).toString())
|
||||
@@ -214,6 +207,13 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
|
||||
new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields()
|
||||
);
|
||||
|
||||
List<String> completedCommits =
|
||||
metaClient.getCommitsTimeline()
|
||||
.filterCompletedInstants()
|
||||
.getInstants()
|
||||
.map(HoodieInstant::getTimestamp)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
ColumnStatsIndexHelper.updateColumnStatsIndexFor(
|
||||
sparkEngineContext.getSqlContext().sparkSession(),
|
||||
AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema),
|
||||
|
||||
Reference in New Issue
Block a user