1
0

[HUDI-2176, 2178, 2179] Adding virtual key support to COW table (#3306)

This commit is contained in:
Sivabalan Narayanan
2021-07-26 17:21:04 -04:00
committed by GitHub
parent 5353243449
commit 61148c1c43
57 changed files with 969 additions and 413 deletions

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.index.simple;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
@@ -30,15 +31,19 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.index.SparkHoodieIndex;
import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.List;
import scala.Tuple2;
@@ -146,8 +151,15 @@ public class SparkHoodieSimpleIndex<T extends HoodieRecordPayload> extends Spark
List<Pair<String, HoodieBaseFile>> baseFiles) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism));
return jsc.parallelize(baseFiles, fetchParallelism)
.flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile)
try {
Option<BaseKeyGenerator> keyGeneratorOpt = config.populateMetaFields() ? Option.empty()
: Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
return jsc.parallelize(baseFiles, fetchParallelism)
.flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile, keyGeneratorOpt)
.locations().map(x -> Tuple2.apply(((Pair)x).getLeft(), ((Pair)x).getRight())).iterator());
} catch (IOException e) {
throw new HoodieIOException("KeyGenerator instantiation failed ", e);
}
}
}

View File

@@ -97,6 +97,8 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
* @param structType schema of the internalRow.
* @return the partition path.
*/
@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getPartitionPath(InternalRow internalRow, StructType structType) {
try {
initDeserializer(structType);

View File

@@ -19,6 +19,8 @@
package org.apache.hudi.keygen;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
/**
* Spark key generator interface.
@@ -28,4 +30,6 @@ public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
String getRecordKey(Row row);
String getPartitionPath(Row row);
String getPartitionPath(InternalRow internalRow, StructType structType);
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
@@ -37,11 +38,14 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
@@ -210,12 +214,21 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!config.populateMetaFields()) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerator (or any key generator that extends from BaseKeyGenerator) are supported when meta "
+ "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
}
}
if (requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier);
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
} else {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged,taskContextSupplier);
return new HoodieMergeHandle(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
}

View File

@@ -50,6 +50,7 @@ import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
@@ -247,8 +248,8 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
*/
private HoodieRecord<? extends HoodieRecordPayload> transform(IndexedRecord indexedRecord) {
GenericRecord record = (GenericRecord) indexedRecord;
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(table.getMetaClient().getTableConfig().getPayloadClass(),

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
@@ -44,6 +46,8 @@ import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.storage.HoodieConcatHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
@@ -78,6 +82,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata> {
private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class);
protected Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
public BaseSparkCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
@@ -85,6 +90,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
String instantTime,
WriteOperationType operationType) {
super(context, config, table, instantTime, operationType, Option.empty());
initKeyGenIfNeeded(config.populateMetaFields());
}
public BaseSparkCommitActionExecutor(HoodieEngineContext context,
@@ -94,6 +100,17 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
WriteOperationType operationType,
Option extraMetadata) {
super(context, config, table, instantTime, operationType, extraMetadata);
initKeyGenIfNeeded(config.populateMetaFields());
}
private void initKeyGenIfNeeded(boolean populateMetaFields) {
if (!populateMetaFields) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
}
}
}
private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
@@ -327,11 +344,12 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
if (table.requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier);
return new HoodieSortedMergeHandle<>(config, instantTime, (HoodieSparkTable) table, recordItr, partitionPath, fileId, taskContextSupplier,
keyGeneratorOpt);
} else if (!WriteOperationType.isChangingRecords(operationType) && config.allowDuplicateInserts()) {
return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
return new HoodieConcatHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
} else {
return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier, keyGeneratorOpt);
}
}