1
0

[HUDI-4039] Make sure all builtin KeyGenerators properly implement Spark specific APIs (#5523)

This set of changes makes sure that all builtin KeyGenerators properly implement Spark-specific APIs in a performant way (minimizing key-generators overhead)
This commit is contained in:
Alexey Kudinkin
2022-07-22 08:35:07 -07:00
committed by GitHub
parent d5c904e10e
commit eea4a692c0
52 changed files with 1507 additions and 1363 deletions

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.client.model;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.types.DataType;
@@ -34,7 +35,7 @@ import java.util.Arrays;
* Hudi internal implementation of the {@link InternalRow} allowing to extend arbitrary
* {@link InternalRow} overlaying Hudi-internal meta-fields on top of it.
*
* Capable of overlaying meta-fields in both cases: whether original {@link #row} contains
* Capable of overlaying meta-fields in both cases: whether original {@link #sourceRow} contains
* meta columns or not. This allows to handle following use-cases allowing to avoid any
* manipulation (reshuffling) of the source row, by simply creating new instance
* of {@link HoodieInternalRow} with all the meta-values provided
@@ -50,22 +51,27 @@ public class HoodieInternalRow extends InternalRow {
/**
* Collection of meta-fields as defined by {@link HoodieRecord#HOODIE_META_COLUMNS}
*
* NOTE: {@code HoodieInternalRow} *always* overlays its own meta-fields even in case
* when source row also contains them, to make sure these fields are mutable and
* can be updated (for ex, {@link UnsafeRow} doesn't support mutations due to
* its memory layout, as it persists field offsets)
*/
private final UTF8String[] metaFields;
private final InternalRow row;
private final InternalRow sourceRow;
/**
* Specifies whether source {@link #row} contains meta-fields
* Specifies whether source {@link #sourceRow} contains meta-fields
*/
private final boolean containsMetaFields;
private final boolean sourceContainsMetaFields;
public HoodieInternalRow(UTF8String commitTime,
UTF8String commitSeqNumber,
UTF8String recordKey,
UTF8String partitionPath,
UTF8String fileName,
InternalRow row,
boolean containsMetaFields) {
InternalRow sourceRow,
boolean sourceContainsMetaFields) {
this.metaFields = new UTF8String[] {
commitTime,
commitSeqNumber,
@@ -74,21 +80,21 @@ public class HoodieInternalRow extends InternalRow {
fileName
};
this.row = row;
this.containsMetaFields = containsMetaFields;
this.sourceRow = sourceRow;
this.sourceContainsMetaFields = sourceContainsMetaFields;
}
private HoodieInternalRow(UTF8String[] metaFields,
InternalRow row,
boolean containsMetaFields) {
InternalRow sourceRow,
boolean sourceContainsMetaFields) {
this.metaFields = metaFields;
this.row = row;
this.containsMetaFields = containsMetaFields;
this.sourceRow = sourceRow;
this.sourceContainsMetaFields = sourceContainsMetaFields;
}
@Override
public int numFields() {
return row.numFields();
return sourceRow.numFields();
}
@Override
@@ -96,7 +102,7 @@ public class HoodieInternalRow extends InternalRow {
if (ordinal < metaFields.length) {
metaFields[ordinal] = null;
} else {
row.setNullAt(rebaseOrdinal(ordinal));
sourceRow.setNullAt(rebaseOrdinal(ordinal));
}
}
@@ -112,7 +118,7 @@ public class HoodieInternalRow extends InternalRow {
String.format("Could not update the row at (%d) with value of type (%s), either UTF8String or String are expected", ordinal, value.getClass().getSimpleName()));
}
} else {
row.update(rebaseOrdinal(ordinal), value);
sourceRow.update(rebaseOrdinal(ordinal), value);
}
}
@@ -121,113 +127,113 @@ public class HoodieInternalRow extends InternalRow {
if (ordinal < metaFields.length) {
return metaFields[ordinal] == null;
}
return row.isNullAt(rebaseOrdinal(ordinal));
return sourceRow.isNullAt(rebaseOrdinal(ordinal));
}
@Override
public UTF8String getUTF8String(int ordinal) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
if (ordinal < metaFields.length) {
return metaFields[ordinal];
}
return row.getUTF8String(rebaseOrdinal(ordinal));
return sourceRow.getUTF8String(rebaseOrdinal(ordinal));
}
@Override
public Object get(int ordinal, DataType dataType) {
if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
if (ordinal < metaFields.length) {
validateMetaFieldDataType(dataType);
return metaFields[ordinal];
}
return row.get(rebaseOrdinal(ordinal), dataType);
return sourceRow.get(rebaseOrdinal(ordinal), dataType);
}
@Override
public boolean getBoolean(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Boolean.class);
return row.getBoolean(rebaseOrdinal(ordinal));
return sourceRow.getBoolean(rebaseOrdinal(ordinal));
}
@Override
public byte getByte(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Byte.class);
return row.getByte(rebaseOrdinal(ordinal));
return sourceRow.getByte(rebaseOrdinal(ordinal));
}
@Override
public short getShort(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Short.class);
return row.getShort(rebaseOrdinal(ordinal));
return sourceRow.getShort(rebaseOrdinal(ordinal));
}
@Override
public int getInt(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Integer.class);
return row.getInt(rebaseOrdinal(ordinal));
return sourceRow.getInt(rebaseOrdinal(ordinal));
}
@Override
public long getLong(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Long.class);
return row.getLong(rebaseOrdinal(ordinal));
return sourceRow.getLong(rebaseOrdinal(ordinal));
}
@Override
public float getFloat(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Float.class);
return row.getFloat(rebaseOrdinal(ordinal));
return sourceRow.getFloat(rebaseOrdinal(ordinal));
}
@Override
public double getDouble(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Double.class);
return row.getDouble(rebaseOrdinal(ordinal));
return sourceRow.getDouble(rebaseOrdinal(ordinal));
}
@Override
public Decimal getDecimal(int ordinal, int precision, int scale) {
ruleOutMetaFieldsAccess(ordinal, Decimal.class);
return row.getDecimal(rebaseOrdinal(ordinal), precision, scale);
return sourceRow.getDecimal(rebaseOrdinal(ordinal), precision, scale);
}
@Override
public byte[] getBinary(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, Byte[].class);
return row.getBinary(rebaseOrdinal(ordinal));
return sourceRow.getBinary(rebaseOrdinal(ordinal));
}
@Override
public CalendarInterval getInterval(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, CalendarInterval.class);
return row.getInterval(rebaseOrdinal(ordinal));
return sourceRow.getInterval(rebaseOrdinal(ordinal));
}
@Override
public InternalRow getStruct(int ordinal, int numFields) {
ruleOutMetaFieldsAccess(ordinal, InternalRow.class);
return row.getStruct(rebaseOrdinal(ordinal), numFields);
return sourceRow.getStruct(rebaseOrdinal(ordinal), numFields);
}
@Override
public ArrayData getArray(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, ArrayData.class);
return row.getArray(rebaseOrdinal(ordinal));
return sourceRow.getArray(rebaseOrdinal(ordinal));
}
@Override
public MapData getMap(int ordinal) {
ruleOutMetaFieldsAccess(ordinal, MapData.class);
return row.getMap(rebaseOrdinal(ordinal));
return sourceRow.getMap(rebaseOrdinal(ordinal));
}
@Override
public InternalRow copy() {
return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), row.copy(), containsMetaFields);
return new HoodieInternalRow(Arrays.copyOf(metaFields, metaFields.length), sourceRow.copy(), sourceContainsMetaFields);
}
private int rebaseOrdinal(int ordinal) {
// NOTE: In cases when source row does not contain meta fields, we will have to
// rebase ordinal onto its indexes
return containsMetaFields ? ordinal : ordinal - metaFields.length;
return sourceContainsMetaFields ? ordinal : ordinal - metaFields.length;
}
private void validateMetaFieldDataType(DataType dataType) {

View File

@@ -18,15 +18,14 @@
package org.apache.hudi.io.storage.row;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
@@ -42,32 +41,34 @@ public class HoodieInternalRowFileWriterFactory {
* Factory method to assist in instantiating an instance of {@link HoodieInternalRowFileWriter}.
* @param path path of the RowFileWriter.
* @param hoodieTable instance of {@link HoodieTable} in use.
* @param config instance of {@link HoodieWriteConfig} to use.
* @param writeConfig instance of {@link HoodieWriteConfig} to use.
* @param schema schema of the dataset in use.
* @return the instantiated {@link HoodieInternalRowFileWriter}.
* @throws IOException if format is not supported or if any exception during instantiating the RowFileWriter.
*
*/
public static HoodieInternalRowFileWriter getInternalRowFileWriter(
Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
public static HoodieInternalRowFileWriter getInternalRowFileWriter(Path path,
HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
StructType schema)
throws IOException {
final String extension = FSUtils.getFileExtension(path.getName());
if (PARQUET.getFileExtension().equals(extension)) {
return newParquetInternalRowFileWriter(path, config, schema, hoodieTable);
return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig, schema, tryInstantiateBloomFilter(writeConfig));
}
throw new UnsupportedOperationException(extension + " format not supported yet.");
}
private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(
Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table)
private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(Path path,
HoodieTable table,
HoodieWriteConfig writeConfig,
StructType structType,
Option<BloomFilter> bloomFilterOpt
)
throws IOException {
BloomFilter filter = BloomFilterFactory.createBloomFilter(
writeConfig.getBloomFilterNumEntries(),
writeConfig.getBloomFilterFPP(),
writeConfig.getDynamicBloomFilterMaxNumEntries(),
writeConfig.getBloomFilterType());
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, filter, writeConfig);
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, bloomFilterOpt, writeConfig);
return new HoodieInternalRowParquetWriter(
path,
new HoodieParquetConfig<>(
@@ -82,30 +83,18 @@ public class HoodieInternalRowFileWriterFactory {
));
}
public static HoodieInternalRowFileWriter getInternalRowFileWriterWithoutMetaFields(
Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
throws IOException {
if (PARQUET.getFileExtension().equals(hoodieTable.getBaseFileExtension())) {
return newParquetInternalRowFileWriterWithoutMetaFields(path, config, schema, hoodieTable);
}
throw new HoodieIOException(hoodieTable.getBaseFileExtension() + " format not supported yet in row writer path");
}
private static Option<BloomFilter> tryInstantiateBloomFilter(HoodieWriteConfig writeConfig) {
// NOTE: Currently Bloom Filter is only going to be populated if meta-fields are populated
if (writeConfig.populateMetaFields()) {
BloomFilter bloomFilter = BloomFilterFactory.createBloomFilter(
writeConfig.getBloomFilterNumEntries(),
writeConfig.getBloomFilterFPP(),
writeConfig.getDynamicBloomFilterMaxNumEntries(),
writeConfig.getBloomFilterType());
private static HoodieInternalRowFileWriter newParquetInternalRowFileWriterWithoutMetaFields(
Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table)
throws IOException {
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null, writeConfig);
return new HoodieInternalRowParquetWriter(
path, new HoodieParquetConfig<>(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
writeConfig.getParquetCompressionRatio(),
writeConfig.parquetDictionaryEnabled())
);
return Option.of(bloomFilter);
}
return Option.empty();
}
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.hadoop.CachingPath;
@@ -56,11 +57,6 @@ public class HoodieRowCreateHandle implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieRowCreateHandle.class);
private static final AtomicLong GLOBAL_SEQ_NO = new AtomicLong(1);
private static final Integer RECORD_KEY_META_FIELD_ORD =
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
private static final Integer PARTITION_PATH_META_FIELD_ORD =
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
private final HoodieTable table;
private final HoodieWriteConfig writeConfig;
@@ -87,14 +83,13 @@ public class HoodieRowCreateHandle implements Serializable {
int taskPartitionId,
long taskId,
long taskEpochId,
StructType structType,
boolean populateMetaFields) {
StructType structType) {
this.partitionPath = partitionPath;
this.table = table;
this.writeConfig = writeConfig;
this.fileId = fileId;
this.currTimer = new HoodieTimer(true);
this.currTimer = HoodieTimer.start();
FileSystem fs = table.getMetaClient().getFs();
@@ -102,7 +97,7 @@ public class HoodieRowCreateHandle implements Serializable {
String fileName = FSUtils.makeBaseFileName(instantTime, writeToken, this.fileId, table.getBaseFileExtension());
this.path = makeNewPath(fs, partitionPath, fileName, writeConfig);
this.populateMetaFields = populateMetaFields;
this.populateMetaFields = writeConfig.populateMetaFields();
this.fileName = UTF8String.fromString(path.getName());
this.commitTime = UTF8String.fromString(instantTime);
this.seqIdGenerator = (id) -> HoodieRecord.generateSequenceId(instantTime, taskPartitionId, id);
@@ -121,12 +116,15 @@ public class HoodieRowCreateHandle implements Serializable {
FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath),
table.getPartitionMetafileFormat());
partitionMetadata.trySave(taskPartitionId);
createMarkerFile(partitionPath, fileName, instantTime, table, writeConfig);
this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
this.fileWriter = HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(path, table, writeConfig, structType);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
}
LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
LOG.info("New handle created for partition: " + partitionPath + " with fileId " + fileId);
}
/**
@@ -137,47 +135,59 @@ public class HoodieRowCreateHandle implements Serializable {
* @throws IOException
*/
public void write(InternalRow row) throws IOException {
if (populateMetaFields) {
writeRow(row);
} else {
writeRowNoMetaFields(row);
}
}
private void writeRow(InternalRow row) {
try {
// NOTE: PLEASE READ THIS CAREFULLY BEFORE MODIFYING
// This code lays in the hot-path, and substantial caution should be
// exercised making changes to it to minimize amount of excessive:
// - Conversions b/w Spark internal (low-level) types and JVM native ones (like
// [[UTF8String]] and [[String]])
// - Conversions b/w Spark internal types and JVM native ones (like [[UTF8String]]
// and [[String]])
// - Repeated computations (for ex, converting file-path to [[UTF8String]] over and
// over again)
UTF8String recordKey = row.getUTF8String(RECORD_KEY_META_FIELD_ORD);
UTF8String recordKey = row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
UTF8String partitionPath = row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
// This is the only meta-field that is generated dynamically, hence conversion b/w
// [[String]] and [[UTF8String]] is unavoidable
UTF8String seqId = UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
InternalRow updatedRow;
// In cases when no meta-fields need to be added we simply relay provided row to
// the writer as is
if (!populateMetaFields) {
updatedRow = row;
} else {
UTF8String partitionPath = row.getUTF8String(PARTITION_PATH_META_FIELD_ORD);
// This is the only meta-field that is generated dynamically, hence conversion b/w
// [[String]] and [[UTF8String]] is unavoidable
UTF8String seqId = UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement()));
updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey,
partitionPath, fileName, row, true);
}
InternalRow updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey,
partitionPath, fileName, row, true);
try {
fileWriter.writeRow(recordKey, updatedRow);
// NOTE: To avoid conversion on the hot-path we only convert [[UTF8String]] into [[String]]
// in cases when successful records' writes are being tracked
writeStatus.markSuccess(writeStatus.isTrackingSuccessfulWrites() ? recordKey.toString() : null);
} catch (Throwable t) {
} catch (Exception t) {
writeStatus.markFailure(recordKey.toString(), t);
}
} catch (Throwable ge) {
writeStatus.setGlobalError(ge);
throw ge;
} catch (Exception e) {
writeStatus.setGlobalError(e);
throw e;
}
}
private void writeRowNoMetaFields(InternalRow row) {
try {
// TODO make sure writing w/ and w/o meta fields is consistent (currently writing w/o
// meta-fields would fail if any record will, while when writing w/ meta-fields it won't)
fileWriter.writeRow(row);
writeStatus.markSuccess();
} catch (Exception e) {
writeStatus.setGlobalError(e);
throw new HoodieException("Exception thrown while writing spark InternalRows to file ", e);
}
}
/**
* @returns {@code true} if this handle can take in more writes. else {@code false}.
* Returns {@code true} if this handle can take in more writes. else {@code false}.
*/
public boolean canWrite() {
return fileWriter.canWrite();
@@ -188,7 +198,6 @@ public class HoodieRowCreateHandle implements Serializable {
* status of the writes to this handle.
*
* @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle.
* @throws IOException
*/
public HoodieInternalWriteStatus close() throws IOException {
fileWriter.close();
@@ -245,10 +254,4 @@ public class HoodieRowCreateHandle implements Serializable {
return taskPartitionId + "-" + taskId + "-" + taskEpochId;
}
protected HoodieInternalRowFileWriter createNewFileWriter(
Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
throws IOException {
return HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(
path, hoodieTable, config, schema);
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage.row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -45,14 +46,13 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
private UTF8String minRecordKey;
private UTF8String maxRecordKey;
public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter, HoodieWriteConfig writeConfig) {
super();
public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieWriteConfig writeConfig) {
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
this.bloomFilter = bloomFilter;
this.bloomFilter = bloomFilterOpt.orElse(null);
}
public Configuration getHadoopConf() {

View File

@@ -19,132 +19,560 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.unsafe.types.UTF8String;
import scala.Function1;
import java.util.HashMap;
import javax.annotation.concurrent.ThreadSafe;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.apache.hudi.common.util.CollectionUtils.tail;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
* Base class for the built-in key generators. Contains methods structured for
* code reuse amongst them.
* Base class for all built-in key generators.
*
* NOTE: By default it implements all the methods of {@link SparkKeyGeneratorInterface}, which
* by default however fallback to Avro implementation. For maximum performance (to avoid
* conversion from Spark's internal data-types to Avro) you should override these methods
* in your implementation.
*
* TODO rename to AvroFallbackBaseKeyGenerator
*/
@ThreadSafe
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
private static final String STRUCT_NAME = "hoodieRowTopLevelField";
private static final String NAMESPACE = "hoodieRow";
private Function1<Row, GenericRecord> converterFn = null;
private final AtomicBoolean validatePartitionFields = new AtomicBoolean(false);
protected StructType structType;
private static final Logger LOG = LogManager.getLogger(BuiltinKeyGenerator.class);
protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = new HashMap<>();
protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo = new HashMap<>();
private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
protected transient volatile SparkRowConverter rowConverter;
protected transient volatile SparkRowAccessor rowAccessor;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
}
/**
* Fetch record key from {@link Row}.
*
* @param row instance of {@link Row} from which record key is requested.
* @return the record key of interest from {@link Row}.
*/
@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
// TODO avoid conversion to avro
// since converterFn is transient this will be repeatedly initialized over and over again
if (null == converterFn) {
converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
}
return getKey(converterFn.apply(row)).getRecordKey();
tryInitRowConverter(row.schema());
// NOTE: This implementation has considerable computational overhead and has to be overridden
// to provide for optimal performance on Spark. This implementation provided exclusively
// for compatibility reasons.
return getRecordKey(rowConverter.convertToAvro(row));
}
@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(InternalRow internalRow, StructType schema) {
try {
// TODO fix
buildFieldSchemaInfoIfNeeded(schema);
return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, false);
} catch (Exception e) {
throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
}
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
tryInitRowConverter(schema);
// NOTE: This implementation has considerable computational overhead and has to be overridden
// to provide for optimal performance on Spark. This implementation provided exclusively
// for compatibility reasons.
return UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
}
/**
* Fetch partition path from {@link Row}.
*
* @param row instance of {@link Row} from which partition path is requested
* @return the partition path of interest from {@link Row}.
*/
@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getPartitionPath(Row row) {
if (null == converterFn) {
converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
}
return getKey(converterFn.apply(row)).getPartitionPath();
tryInitRowConverter(row.schema());
// NOTE: This implementation has considerable computational overhead and has to be overridden
// to provide for optimal performance on Spark. This implementation provided exclusively
// for compatibility reasons.
return getPartitionPath(rowConverter.convertToAvro(row));
}
/**
* Fetch partition path from {@link InternalRow}.
*
* @param internalRow {@link InternalRow} instance from which partition path needs to be fetched from.
* @param structType schema of the internalRow.
* @return the partition path.
*/
@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getPartitionPath(InternalRow internalRow, StructType structType) {
try {
buildFieldSchemaInfoIfNeeded(structType);
return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(internalRow, getPartitionPathFields(),
hiveStylePartitioning, partitionPathSchemaInfo);
} catch (Exception e) {
throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
}
public UTF8String getPartitionPath(InternalRow internalRow, StructType schema) {
tryInitRowConverter(schema);
// NOTE: This implementation has considerable computational overhead and has to be overridden
// to provide for optimal performance on Spark. This implementation provided exclusively
// for compatibility reasons.
GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
return UTF8String.fromString(getPartitionPath(avroRecord));
}
void buildFieldSchemaInfoIfNeeded(StructType structType) {
if (this.structType == null) {
this.structType = structType;
getRecordKeyFields()
.stream().filter(f -> !f.isEmpty())
.forEach(f -> recordKeySchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, true)));
if (getPartitionPathFields() != null) {
getPartitionPathFields().stream().filter(f -> !f.isEmpty())
.forEach(f -> partitionPathSchemaInfo.put(f, RowKeyGeneratorHelper.getFieldSchemaInfo(structType, f, false)));
protected void tryInitRowAccessor(StructType schema) {
if (this.rowAccessor == null) {
synchronized (this) {
if (this.rowAccessor == null) {
this.rowAccessor = new SparkRowAccessor(schema);
}
}
}
}
protected String getPartitionPathInternal(InternalRow row, StructType structType) {
buildFieldSchemaInfoIfNeeded(structType);
validatePartitionFieldsForInternalRow();
return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathSchemaInfo);
/**
* NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
* optimizations, like inlining)
*/
protected final String combinePartitionPath(Object... partitionPathParts) {
return combinePartitionPathInternal(
JavaStringBuilder::new,
BuiltinKeyGenerator::toString,
this::tryEncodePartitionPath,
BuiltinKeyGenerator::handleNullOrEmptyPartitionPathPart,
partitionPathParts
);
}
protected void validatePartitionFieldsForInternalRow() {
if (!validatePartitionFields.getAndSet(true)) {
partitionPathSchemaInfo.values().forEach(entry -> {
if (entry.getKey().size() > 1) {
throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns");
/**
* NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
* optimizations, like inlining)
*/
protected final UTF8String combinePartitionPathUnsafe(Object... partitionPathParts) {
return combinePartitionPathInternal(
UTF8StringBuilder::new,
BuiltinKeyGenerator::toUTF8String,
this::tryEncodePartitionPathUTF8,
BuiltinKeyGenerator::handleNullOrEmptyPartitionPathPartUTF8,
partitionPathParts
);
}
/**
* NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
* optimizations, like inlining)
*/
protected final String combineRecordKey(Object... recordKeyParts) {
return combineRecordKeyInternal(
JavaStringBuilder::new,
BuiltinKeyGenerator::toString,
BuiltinKeyGenerator::handleNullRecordKey,
recordKeyParts
);
}
/**
* NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
* optimizations, like inlining)
*/
protected final UTF8String combineRecordKeyUnsafe(Object... recordKeyParts) {
return combineRecordKeyInternal(
UTF8StringBuilder::new,
BuiltinKeyGenerator::toUTF8String,
BuiltinKeyGenerator::handleNullRecordKey,
recordKeyParts
);
}
/**
* NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
* optimizations, like inlining)
*/
protected final String combineCompositeRecordKey(Object... recordKeyParts) {
return combineCompositeRecordKeyInternal(
JavaStringBuilder::new,
BuiltinKeyGenerator::toString,
BuiltinKeyGenerator::handleNullOrEmptyCompositeKeyPart,
BuiltinKeyGenerator::isNullOrEmptyCompositeKeyPart,
recordKeyParts
);
}
/**
* NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
* optimizations, like inlining)
*/
protected final UTF8String combineCompositeRecordKeyUnsafe(Object... recordKeyParts) {
return combineCompositeRecordKeyInternal(
UTF8StringBuilder::new,
BuiltinKeyGenerator::toUTF8String,
BuiltinKeyGenerator::handleNullOrEmptyCompositeKeyPartUTF8,
BuiltinKeyGenerator::isNullOrEmptyCompositeKeyPartUTF8,
recordKeyParts
);
}
private <S> S combineRecordKeyInternal(
Supplier<StringBuilder<S>> builderFactory,
Function<Object, S> converter,
Function<S, S> emptyKeyPartHandler,
Object... recordKeyParts
) {
if (recordKeyParts.length == 1) {
return emptyKeyPartHandler.apply(converter.apply(recordKeyParts[0]));
}
StringBuilder<S> sb = builderFactory.get();
for (int i = 0; i < recordKeyParts.length; ++i) {
// NOTE: If record-key part has already been a string [[toString]] will be a no-op
sb.append(emptyKeyPartHandler.apply(converter.apply(recordKeyParts[i])));
if (i < recordKeyParts.length - 1) {
sb.appendJava(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
}
}
return sb.build();
}
private <S> S combineCompositeRecordKeyInternal(
Supplier<StringBuilder<S>> builderFactory,
Function<Object, S> converter,
Function<S, S> emptyKeyPartHandler,
Predicate<S> isNullOrEmptyKeyPartPredicate,
Object... recordKeyParts
) {
boolean hasNonNullNonEmptyPart = false;
StringBuilder<S> sb = builderFactory.get();
for (int i = 0; i < recordKeyParts.length; ++i) {
// NOTE: If record-key part has already been a string [[toString]] will be a no-op
S convertedKeyPart = emptyKeyPartHandler.apply(converter.apply(recordKeyParts[i]));
sb.appendJava(recordKeyFields.get(i));
sb.appendJava(COMPOSITE_KEY_FIELD_VALUE_INFIX);
sb.append(convertedKeyPart);
// This check is to validate that overall composite-key has at least one non-null, non-empty
// segment
hasNonNullNonEmptyPart |= !isNullOrEmptyKeyPartPredicate.test(convertedKeyPart);
if (i < recordKeyParts.length - 1) {
sb.appendJava(DEFAULT_RECORD_KEY_PARTS_SEPARATOR);
}
}
if (hasNonNullNonEmptyPart) {
return sb.build();
} else {
throw new HoodieKeyException(String.format("All of the values for (%s) were either null or empty", recordKeyFields));
}
}
private <S> S combinePartitionPathInternal(Supplier<StringBuilder<S>> builderFactory,
Function<Object, S> converter,
Function<S, S> encoder,
Function<S, S> emptyHandler,
Object... partitionPathParts) {
checkState(partitionPathParts.length == partitionPathFields.size());
// Avoid creating [[StringBuilder]] in case there's just one partition-path part,
// and Hive-style of partitioning is not required
if (!hiveStylePartitioning && partitionPathParts.length == 1) {
return emptyHandler.apply(converter.apply(partitionPathParts[0]));
}
StringBuilder<S> sb = builderFactory.get();
for (int i = 0; i < partitionPathParts.length; ++i) {
S partitionPathPartStr = encoder.apply(emptyHandler.apply(converter.apply(partitionPathParts[i])));
if (hiveStylePartitioning) {
sb.appendJava(partitionPathFields.get(i))
.appendJava("=")
.append(partitionPathPartStr);
} else {
sb.append(partitionPathPartStr);
}
if (i < partitionPathParts.length - 1) {
sb.appendJava(DEFAULT_PARTITION_PATH_SEPARATOR);
}
}
return sb.build();
}
private String tryEncodePartitionPath(String partitionPathPart) {
return encodePartitionPath ? PartitionPathEncodeUtils.escapePathName(partitionPathPart) : partitionPathPart;
}
private UTF8String tryEncodePartitionPathUTF8(UTF8String partitionPathPart) {
// NOTE: This method avoids [[UTF8String]] to [[String]] conversion (and back) unless
// partition-path encoding is enabled
return encodePartitionPath ? UTF8String.fromString(PartitionPathEncodeUtils.escapePathName(partitionPathPart.toString())) : partitionPathPart;
}
private void tryInitRowConverter(StructType structType) {
if (rowConverter == null) {
synchronized (this) {
if (rowConverter == null) {
rowConverter = new SparkRowConverter(structType);
}
});
}
}
}
protected static String requireNonNullNonEmptyKey(String key) {
if (key != null && key.length() > 0) {
return key;
} else {
throw new HoodieKeyException("Record key has to be non-empty string!");
}
}
protected static UTF8String requireNonNullNonEmptyKey(UTF8String key) {
if (key != null && key.numChars() > 0) {
return key;
} else {
throw new HoodieKeyException("Record key has to be non-empty string!");
}
}
protected static <S> S handleNullRecordKey(S s) {
if (s == null || s.toString().isEmpty()) {
throw new HoodieKeyException("Record key has to be non-null!");
}
return s;
}
private static UTF8String toUTF8String(Object o) {
if (o == null) {
return null;
} else if (o instanceof UTF8String) {
return (UTF8String) o;
} else {
// NOTE: If object is a [[String]], [[toString]] would be a no-op
return UTF8String.fromString(o.toString());
}
}
private static String toString(Object o) {
return o == null ? null : o.toString();
}
private static String handleNullOrEmptyCompositeKeyPart(Object keyPart) {
if (keyPart == null) {
return NULL_RECORDKEY_PLACEHOLDER;
} else {
// NOTE: [[toString]] is a no-op if key-part was already a [[String]]
String keyPartStr = keyPart.toString();
return !keyPartStr.isEmpty() ? keyPartStr : EMPTY_RECORDKEY_PLACEHOLDER;
}
}
private static UTF8String handleNullOrEmptyCompositeKeyPartUTF8(UTF8String keyPart) {
if (keyPart == null) {
return NULL_RECORD_KEY_PLACEHOLDER_UTF8;
} else if (keyPart.numChars() == 0) {
return EMPTY_RECORD_KEY_PLACEHOLDER_UTF8;
}
return keyPart;
}
@SuppressWarnings("StringEquality")
private static boolean isNullOrEmptyCompositeKeyPart(String keyPart) {
// NOTE: Converted key-part is compared against null/empty stub using ref-equality
// for performance reasons (it relies on the fact that we're using internalized
// constants)
return keyPart == NULL_RECORDKEY_PLACEHOLDER || keyPart == EMPTY_RECORDKEY_PLACEHOLDER;
}
private static boolean isNullOrEmptyCompositeKeyPartUTF8(UTF8String keyPart) {
// NOTE: Converted key-part is compared against null/empty stub using ref-equality
// for performance reasons (it relies on the fact that we're using internalized
// constants)
return keyPart == NULL_RECORD_KEY_PLACEHOLDER_UTF8 || keyPart == EMPTY_RECORD_KEY_PLACEHOLDER_UTF8;
}
private static String handleNullOrEmptyPartitionPathPart(Object partitionPathPart) {
if (partitionPathPart == null) {
return HUDI_DEFAULT_PARTITION_PATH;
} else {
// NOTE: [[toString]] is a no-op if key-part was already a [[String]]
String keyPartStr = partitionPathPart.toString();
return keyPartStr.isEmpty() ? HUDI_DEFAULT_PARTITION_PATH : keyPartStr;
}
}
private static UTF8String handleNullOrEmptyPartitionPathPartUTF8(UTF8String keyPart) {
if (keyPart == null || keyPart.numChars() == 0) {
return HUDI_DEFAULT_PARTITION_PATH_UTF8;
}
return keyPart;
}
/**
* Converts provided (raw) value extracted from the {@link InternalRow} object into a deserialized,
* JVM native format (for ex, converting {@code Long} into {@link Instant},
* {@code Integer} to {@link LocalDate}, etc)
*
* This method allows to avoid costly full-row deserialization sequence. Note, that this method
* should be maintained in sync w/
*
* <ol>
* <li>{@code RowEncoder#deserializerFor}, as well as</li>
* <li>{@code HoodieAvroUtils#convertValueForAvroLogicalTypes}</li>
* </ol>
*
* @param dataType target data-type of the given value
* @param value target value to be converted
*/
private static Object convertToLogicalDataType(DataType dataType, Object value) {
if (value == null) {
return null;
} else if (dataType instanceof TimestampType) {
// Provided value have to be [[Long]] in this case, representing micros since epoch
return new Timestamp((Long) value / 1000);
} else if (dataType instanceof DateType) {
// Provided value have to be [[Int]] in this case
return LocalDate.ofEpochDay((Integer) value);
}
return value;
}
protected static class SparkRowConverter {
private static final String STRUCT_NAME = "hoodieRowTopLevelField";
private static final String NAMESPACE = "hoodieRow";
private final Function1<Row, GenericRecord> avroConverter;
private final SparkRowSerDe rowSerDe;
SparkRowConverter(StructType schema) {
this.rowSerDe = HoodieSparkUtils.getDeserializer(schema);
this.avroConverter = AvroConversionUtils.createConverterToAvro(schema, STRUCT_NAME, NAMESPACE);
}
GenericRecord convertToAvro(Row row) {
return avroConverter.apply(row);
}
GenericRecord convertToAvro(InternalRow row) {
return avroConverter.apply(rowSerDe.deserializeRow(row));
}
}
protected class SparkRowAccessor {
private final HoodieUnsafeRowUtils.NestedFieldPath[] recordKeyFieldsPaths;
private final HoodieUnsafeRowUtils.NestedFieldPath[] partitionPathFieldsPaths;
SparkRowAccessor(StructType schema) {
this.recordKeyFieldsPaths = resolveNestedFieldPaths(getRecordKeyFieldNames(), schema);
this.partitionPathFieldsPaths = resolveNestedFieldPaths(getPartitionPathFields(), schema);
}
public Object[] getRecordKeyParts(Row row) {
return getNestedFieldValues(row, recordKeyFieldsPaths);
}
public Object[] getRecordPartitionPathValues(Row row) {
return getNestedFieldValues(row, partitionPathFieldsPaths);
}
public Object[] getRecordKeyParts(InternalRow row) {
return getNestedFieldValues(row, recordKeyFieldsPaths);
}
public Object[] getRecordPartitionPathValues(InternalRow row) {
return getNestedFieldValues(row, partitionPathFieldsPaths);
}
private Object[] getNestedFieldValues(Row row, HoodieUnsafeRowUtils.NestedFieldPath[] nestedFieldsPaths) {
Object[] nestedFieldValues = new Object[nestedFieldsPaths.length];
for (int i = 0; i < nestedFieldsPaths.length; ++i) {
nestedFieldValues[i] = HoodieUnsafeRowUtils$.MODULE$.getNestedRowValue(row, nestedFieldsPaths[i]);
}
return nestedFieldValues;
}
private Object[] getNestedFieldValues(InternalRow row, HoodieUnsafeRowUtils.NestedFieldPath[] nestedFieldsPaths) {
Object[] nestedFieldValues = new Object[nestedFieldsPaths.length];
for (int i = 0; i < nestedFieldsPaths.length; ++i) {
Object rawValue = HoodieUnsafeRowUtils$.MODULE$.getNestedInternalRowValue(row, nestedFieldsPaths[i]);
DataType dataType = tail(nestedFieldsPaths[i].parts())._2.dataType();
nestedFieldValues[i] = convertToLogicalDataType(dataType, rawValue);
}
return nestedFieldValues;
}
private HoodieUnsafeRowUtils.NestedFieldPath[] resolveNestedFieldPaths(List<String> fieldPaths, StructType schema) {
try {
return fieldPaths.stream()
.map(fieldPath -> HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, fieldPath))
.toArray(HoodieUnsafeRowUtils.NestedFieldPath[]::new);
} catch (Exception e) {
LOG.error(String.format("Failed to resolve nested field-paths (%s) in schema (%s)", fieldPaths, schema), e);
throw new HoodieException("Failed to resolve nested field-paths", e);
}
}
}
/**
* This is a generic interface closing the gap and unifying the {@link java.lang.StringBuilder} with
* {@link org.apache.hudi.unsafe.UTF8StringBuilder} implementations, allowing us to avoid code-duplication by performing
* most of the key-generation in a generic and unified way
*
* @param <S> target string type this builder is producing (could either be native {@link String}
* or alternatively {@link UTF8String}
*/
private interface StringBuilder<S> {
default StringBuilder<S> append(S s) {
return appendJava(s.toString());
}
StringBuilder<S> appendJava(String s);
S build();
}
private static class JavaStringBuilder implements StringBuilder<String> {
private final java.lang.StringBuilder sb = new java.lang.StringBuilder();
@Override
public StringBuilder<String> appendJava(String s) {
sb.append(s);
return this;
}
@Override
public String build() {
return sb.toString();
}
}
private static class UTF8StringBuilder implements StringBuilder<UTF8String> {
private final org.apache.hudi.unsafe.UTF8StringBuilder sb = new org.apache.hudi.unsafe.UTF8StringBuilder();
@Override
public StringBuilder<UTF8String> appendJava(String s) {
sb.append(s);
return this;
}
@Override
public StringBuilder<UTF8String> append(UTF8String s) {
sb.append(s);
return this;
}
@Override
public UTF8String build() {
return sb.build();
}
}
}

View File

@@ -17,19 +17,23 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
* Key generator prefixing field names before corresponding record-key parts.
*
* <p/>
* For example, for the schema of {@code { "key": string, "value": bytes }}, and corresponding record
* {@code { "key": "foo" }}, record-key "key:foo" will be produced.
*/
public class ComplexKeyGenerator extends BuiltinKeyGenerator {
@@ -45,7 +49,7 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
this.complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
}
@Override
@@ -60,26 +64,25 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
tryInitRowAccessor(row.schema());
return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
}
@Override
public String getRecordKey(InternalRow internalRow, StructType schema) {
buildFieldSchemaInfoIfNeeded(schema);
return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, true);
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
tryInitRowAccessor(schema);
return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
}
@Override
public String getPartitionPath(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathSchemaInfo);
tryInitRowAccessor(row.schema());
return combinePartitionPath(rowAccessor.getRecordPartitionPathValues(row));
}
@Override
public String getPartitionPath(InternalRow row, StructType structType) {
return getPartitionPathInternal(row, structType);
public UTF8String getPartitionPath(InternalRow row, StructType schema) {
tryInitRowAccessor(schema);
return combinePartitionPathUnsafe(rowAccessor.getRecordPartitionPathValues(row));
}
}

View File

@@ -29,9 +29,11 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Collectors;
/**
@@ -46,16 +48,29 @@ import java.util.stream.Collectors;
* field in the partition path, use field1:simple 3. If you want your table to be non partitioned, simply leave it as blank.
*
* RecordKey is internally generated using either SimpleKeyGenerator or ComplexKeyGenerator.
*
* @deprecated
*/
@Deprecated
public class CustomKeyGenerator extends BuiltinKeyGenerator {
private final CustomAvroKeyGenerator customAvroKeyGenerator;
public CustomKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
// NOTE: We have to strip partition-path configuration, since it could only be interpreted by
// this key-gen
super(stripPartitionPathConfig(props));
this.recordKeyFields =
Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
.map(String::trim)
.collect(Collectors.toList());
String partitionPathFields = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
this.partitionPathFields = partitionPathFields == null
? Collections.emptyList()
: Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList());
this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
validateRecordKeyFields();
}
@Override
@@ -70,9 +85,8 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
validateRecordKeyFields();
return getRecordKeyFields().size() == 1
? new SimpleKeyGenerator(config).getRecordKey(row)
return getRecordKeyFieldNames().size() == 1
? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row)
: new ComplexKeyGenerator(config).getRecordKey(row);
}
@@ -82,8 +96,8 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
}
@Override
public String getPartitionPath(InternalRow row, StructType structType) {
return getPartitionPath(Option.empty(), Option.empty(), Option.of(Pair.of(row, structType)));
public UTF8String getPartitionPath(InternalRow row, StructType schema) {
return UTF8String.fromString(getPartitionPath(Option.empty(), Option.empty(), Option.of(Pair.of(row, schema))));
}
private String getPartitionPath(Option<GenericRecord> record, Option<Row> row, Option<Pair<InternalRow, StructType>> internalRowStructTypePair) {
@@ -99,7 +113,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
return "";
}
for (String field : getPartitionPathFields()) {
String[] fieldWithType = field.split(customAvroKeyGenerator.SPLIT_REGEX);
String[] fieldWithType = field.split(CustomAvroKeyGenerator.SPLIT_REGEX);
if (fieldWithType.length != 2) {
throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format");
}
@@ -142,9 +156,18 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
}
private void validateRecordKeyFields() {
if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
if (getRecordKeyFieldNames() == null || getRecordKeyFieldNames().isEmpty()) {
throw new HoodieKeyException("Unable to find field names for record key in cfg");
}
}
private static TypedProperties stripPartitionPathConfig(TypedProperties props) {
TypedProperties filtered = new TypedProperties(props);
// NOTE: We have to stub it out w/ empty string, since we properties are:
// - Expected to bear this config
// - Can't be stubbed out w/ null
filtered.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "");
return filtered;
}
}

View File

@@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.util.ArrayList;
import java.util.Arrays;
@@ -40,7 +41,12 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
public GlobalDeleteKeyGenerator(TypedProperties config) {
super(config);
this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
this.globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
}
@Override
public List<String> getPartitionPathFields() {
return new ArrayList<>();
}
@Override
@@ -53,21 +59,16 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
return globalAvroDeleteKeyGenerator.getPartitionPath(record);
}
@Override
public List<String> getPartitionPathFields() {
return new ArrayList<>();
}
@Override
public String getRecordKey(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, true);
tryInitRowAccessor(row.schema());
return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
}
@Override
public String getRecordKey(InternalRow internalRow, StructType schema) {
buildFieldSchemaInfoIfNeeded(schema);
return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, true);
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
tryInitRowAccessor(schema);
return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
}
@Override
@@ -76,8 +77,8 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
}
@Override
public String getPartitionPath(InternalRow row, StructType structType) {
return globalAvroDeleteKeyGenerator.getEmptyPartition();
public UTF8String getPartitionPath(InternalRow row, StructType schema) {
return UTF8String.EMPTY_UTF8;
}
}

View File

@@ -18,13 +18,13 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.util.Arrays;
import java.util.Collections;
@@ -41,19 +41,11 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
public NonpartitionedKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
.split(",")).map(String::trim).collect(Collectors.toList());
.split(","))
.map(String::trim)
.collect(Collectors.toList());
this.partitionPathFields = Collections.emptyList();
nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props);
}
@Override
public String getRecordKey(GenericRecord record) {
return nonpartitionedAvroKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
return nonpartitionedAvroKeyGenerator.getPartitionPath(record);
this.nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props);
}
@Override
@@ -61,10 +53,26 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
return nonpartitionedAvroKeyGenerator.getPartitionPathFields();
}
@Override
public String getRecordKey(GenericRecord record) {
return nonpartitionedAvroKeyGenerator.getRecordKey(record);
}
@Override
public String getRecordKey(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false);
tryInitRowAccessor(row.schema());
return combineRecordKey(rowAccessor.getRecordKeyParts(row));
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
tryInitRowAccessor(schema);
return combineRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
}
@Override
public String getPartitionPath(GenericRecord record) {
return nonpartitionedAvroKeyGenerator.getPartitionPath(record);
}
@Override
@@ -73,8 +81,8 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
}
@Override
public String getPartitionPath(InternalRow internalRow, StructType structType) {
return nonpartitionedAvroKeyGenerator.getEmptyPartition();
public UTF8String getPartitionPath(InternalRow row, StructType schema) {
return UTF8String.EMPTY_UTF8;
}
}

View File

@@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.TimestampType;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
public class RowKeyGenUtils {
/**
* Converts provided (raw) value extracted from the {@link InternalRow} object into a deserialized,
* JVM native format (for ex, converting {@code Long} into {@link Instant},
* {@code Integer} to {@link LocalDate}, etc)
*
* This method allows to avoid costly full-row deserialization sequence. Note, that this method
* should be maintained in sync w/
*
* <ol>
* <li>{@code RowEncoder#deserializerFor}, as well as</li>
* <li>{@code HoodieAvroUtils#convertValueForAvroLogicalTypes}</li>
* </ol>
*
* @param dataType target data-type of the given value
* @param value target value to be converted
*/
public static Object convertToLogicalDataType(DataType dataType, Object value) {
if (dataType instanceof TimestampType) {
// Provided value have to be [[Long]] in this case, representing micros since epoch
return new Timestamp((Long) value / 1000);
} else if (dataType instanceof DateType) {
// Provided value have to be [[Int]] in this case
return LocalDate.ofEpochDay((Integer) value);
}
return value;
}
}

View File

@@ -1,355 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.spark.sql.types.StructType$;
import scala.Option;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.RowKeyGenUtils.convertToLogicalDataType;
/**
* Helper class to fetch fields from Row.
*
* TODO cleanup
*/
@Deprecated
public class RowKeyGeneratorHelper {
public static String getRecordKeyFromInternalRow(InternalRow internalRow, List<String> recordKeyFields,
Map<String, Pair<List<Integer>, DataType>> recordKeyPositions, boolean prefixFieldName) {
AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
String toReturn = recordKeyFields.stream().map(field -> {
String val = null;
List<Integer> fieldPositions = recordKeyPositions.get(field).getKey();
if (fieldPositions.size() == 1) { // simple field
Integer fieldPos = fieldPositions.get(0);
if (internalRow.isNullAt(fieldPos)) {
val = NULL_RECORDKEY_PLACEHOLDER;
} else {
DataType dataType = recordKeyPositions.get(field).getValue();
val = convertToLogicalDataType(dataType, internalRow.get(fieldPos, dataType)).toString();
if (val.isEmpty()) {
val = EMPTY_RECORDKEY_PLACEHOLDER;
} else {
keyIsNullOrEmpty.set(false);
}
}
} else { // nested fields
val = getNestedFieldVal(internalRow, recordKeyPositions.get(field)).toString();
if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
keyIsNullOrEmpty.set(false);
}
}
return prefixFieldName ? (field + ":" + val) : val;
}).collect(Collectors.joining(","));
if (keyIsNullOrEmpty.get()) {
throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
}
return toReturn;
}
/**
* Generates record key for the corresponding {@link Row}.
*
* @param row instance of {@link Row} of interest
* @param recordKeyFields record key fields as a list
* @param recordKeyPositions record key positions for the corresponding record keys in {@code recordKeyFields}
* @param prefixFieldName {@code true} if field name need to be prefixed in the returned result. {@code false} otherwise.
* @return the record key thus generated
*/
public static String getRecordKeyFromRow(Row row, List<String> recordKeyFields, Map<String, Pair<List<Integer>, DataType>> recordKeyPositions, boolean prefixFieldName) {
AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
String toReturn = recordKeyFields.stream().map(field -> {
String val = null;
List<Integer> fieldPositions = recordKeyPositions.get(field).getKey();
if (fieldPositions.size() == 1) { // simple field
Integer fieldPos = fieldPositions.get(0);
if (row.isNullAt(fieldPos)) {
val = NULL_RECORDKEY_PLACEHOLDER;
} else {
val = row.getAs(field).toString();
if (val.isEmpty()) {
val = EMPTY_RECORDKEY_PLACEHOLDER;
} else {
keyIsNullOrEmpty.set(false);
}
}
} else { // nested fields
val = getNestedFieldVal(row, recordKeyPositions.get(field).getKey()).toString();
if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && !val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
keyIsNullOrEmpty.set(false);
}
}
return prefixFieldName ? (field + ":" + val) : val;
}).collect(Collectors.joining(","));
if (keyIsNullOrEmpty.get()) {
throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null or empty.");
}
return toReturn;
}
/**
* Generates partition path for the corresponding {@link Row}.
*
* @param row instance of {@link Row} of interest
* @param partitionPathFields partition path fields as a list
* @param hiveStylePartitioning {@code true} if hive style partitioning is set. {@code false} otherwise
* @param partitionPathPositions partition path positions for the corresponding fields in {@code partitionPathFields}
* @return the generated partition path for the row
*/
public static String getPartitionPathFromRow(Row row, List<String> partitionPathFields, boolean hiveStylePartitioning, Map<String, Pair<List<Integer>, DataType>> partitionPathPositions) {
return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
String field = partitionPathFields.get(idx);
String val = null;
List<Integer> fieldPositions = partitionPathPositions.get(field).getKey();
if (fieldPositions.size() == 1) { // simple
Integer fieldPos = fieldPositions.get(0);
// for partition path, if field is not found, index will be set to -1
if (fieldPos == -1 || row.isNullAt(fieldPos)) {
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
Object data = row.get(fieldPos);
val = convertToTimestampIfInstant(data).toString();
if (val.isEmpty()) {
val = HUDI_DEFAULT_PARTITION_PATH;
}
}
if (hiveStylePartitioning) {
val = field + "=" + val;
}
} else { // nested
Object data = getNestedFieldVal(row, partitionPathPositions.get(field).getKey());
data = convertToTimestampIfInstant(data);
if (data.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || data.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH;
} else {
val = hiveStylePartitioning ? field + "=" + data.toString() : data.toString();
}
}
return val;
}).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
}
public static String getPartitionPathFromInternalRow(InternalRow internalRow, List<String> partitionPathFields, boolean hiveStylePartitioning,
Map<String, Pair<List<Integer>, DataType>> partitionPathPositions) {
return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
String field = partitionPathFields.get(idx);
String val = null;
List<Integer> fieldPositions = partitionPathPositions.get(field).getKey();
DataType dataType = partitionPathPositions.get(field).getValue();
if (fieldPositions.size() == 1) { // simple
Integer fieldPos = fieldPositions.get(0);
// for partition path, if field is not found, index will be set to -1
if (fieldPos == -1 || internalRow.isNullAt(fieldPos)) {
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
Object value = convertToLogicalDataType(dataType, internalRow.get(fieldPos, dataType));
if (value == null || value.toString().isEmpty()) {
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
val = value.toString();
}
}
if (hiveStylePartitioning) {
val = field + "=" + val;
}
} else { // nested
throw new IllegalArgumentException("Nested partitioning is not supported with disabling meta columns.");
}
return val;
}).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
}
public static Object getFieldValFromInternalRow(InternalRow internalRow,
Integer partitionPathPosition,
DataType partitionPathDataType) {
Object val = null;
if (internalRow.isNullAt(partitionPathPosition)) {
return HUDI_DEFAULT_PARTITION_PATH;
} else {
Object value = partitionPathDataType == DataTypes.StringType ? internalRow.getString(partitionPathPosition) : internalRow.get(partitionPathPosition, partitionPathDataType);
if (value == null || value.toString().isEmpty()) {
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
val = value;
}
}
return val;
}
/**
* Fetch the field value located at the positions requested for.
* <p>
* The fetching logic recursively goes into the nested field based on the position list to get the field value.
* For example, given the row [4357686,key1,2020-03-21,pi,[val1,10]] with the following schema, which has the fourth
* field as a nested field, and positions list as [4,0],
* <p>
* 0 = "StructField(timestamp,LongType,false)"
* 1 = "StructField(_row_key,StringType,false)"
* 2 = "StructField(ts_ms,StringType,false)"
* 3 = "StructField(pii_col,StringType,false)"
* 4 = "StructField(nested_col,StructType(StructField(prop1,StringType,false), StructField(prop2,LongType,false)),false)"
* <p>
* the logic fetches the value from field nested_col.prop1.
* If any level of the nested field is null, {@link KeyGenUtils#NULL_RECORDKEY_PLACEHOLDER} is returned.
* If the field value is an empty String, {@link KeyGenUtils#EMPTY_RECORDKEY_PLACEHOLDER} is returned.
*
* @param row instance of {@link Row} of interest
* @param positions tree style positions where the leaf node need to be fetched and returned
* @return the field value as per the positions requested for.
*/
public static Object getNestedFieldVal(Row row, List<Integer> positions) {
if (positions.size() == 1 && positions.get(0) == -1) {
return HUDI_DEFAULT_PARTITION_PATH;
}
int index = 0;
int totalCount = positions.size();
Row valueToProcess = row;
Object toReturn = null;
while (index < totalCount) {
if (valueToProcess.isNullAt(positions.get(index))) {
toReturn = NULL_RECORDKEY_PLACEHOLDER;
break;
}
if (index < totalCount - 1) {
valueToProcess = (Row) valueToProcess.get(positions.get(index));
} else { // last index
if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
break;
}
toReturn = valueToProcess.getAs(positions.get(index));
}
index++;
}
return toReturn;
}
public static Object getNestedFieldVal(InternalRow internalRow, Pair<List<Integer>, DataType> positionsAndType) {
if (positionsAndType.getKey().size() == 1 && positionsAndType.getKey().get(0) == -1) {
return HUDI_DEFAULT_PARTITION_PATH;
}
int index = 0;
int totalCount = positionsAndType.getKey().size();
InternalRow valueToProcess = internalRow;
Object toReturn = null;
while (index < totalCount) {
if (valueToProcess.isNullAt(positionsAndType.getKey().get(index))) {
toReturn = NULL_RECORDKEY_PLACEHOLDER;
break;
}
if (index < totalCount - 1) {
valueToProcess = (InternalRow) valueToProcess.get(positionsAndType.getKey().get(index), StructType$.MODULE$.defaultConcreteType());
} else { // last index
if (valueToProcess.get(positionsAndType.getKey().get(index), positionsAndType.getValue()).toString().isEmpty()) {
toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
break;
}
toReturn = valueToProcess.get(positionsAndType.getKey().get(index), positionsAndType.getValue());
}
index++;
}
return toReturn;
}
/**
* Generate the tree style positions for the field requested for as per the defined struct type.
*
* @param structType schema of interest
* @param field field of interest for which the positions are requested for
* @param isRecordKey {@code true} if the field requested for is a record key. {@code false} in case of a partition path.
* @return the positions of the field as per the struct type and the leaf field's datatype.
*/
public static Pair<List<Integer>, DataType> getFieldSchemaInfo(StructType structType, String field, boolean isRecordKey) {
String[] slices = field.split("\\.");
List<Integer> positions = new ArrayList<>();
int index = 0;
int totalCount = slices.length;
DataType leafFieldDataType = null;
while (index < totalCount) {
String slice = slices[index];
Option<Object> curIndexOpt = structType.getFieldIndex(slice);
if (curIndexOpt.isDefined()) {
int curIndex = (int) curIndexOpt.get();
positions.add(curIndex);
final StructField nestedField = structType.fields()[curIndex];
if (index < totalCount - 1) {
if (!(nestedField.dataType() instanceof StructType)) {
if (isRecordKey) {
throw new HoodieKeyException("Nested field should be of type StructType " + nestedField);
} else {
positions = Collections.singletonList(-1);
break;
}
}
structType = (StructType) nestedField.dataType();
} else {
// leaf node.
leafFieldDataType = nestedField.dataType();
}
} else {
if (isRecordKey) {
throw new HoodieKeyException("Can't find " + slice + " in StructType for the field " + field);
} else {
positions = Collections.singletonList(-1);
break;
}
}
index++;
}
return Pair.of(positions, leafFieldDataType);
}
private static Object convertToTimestampIfInstant(Object data) {
if (data instanceof Instant) {
return Timestamp.from((Instant) data);
}
return data;
}
}

View File

@@ -18,13 +18,13 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.util.Collections;
@@ -46,11 +46,9 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
super(props);
this.recordKeyFields = recordKeyField == null
? Collections.emptyList() : Collections.singletonList(recordKeyField);
this.partitionPathFields = partitionPathField == null
? Collections.emptyList() : Collections.singletonList(partitionPathField);
simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField);
this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField);
this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
}
@Override
@@ -65,19 +63,43 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false);
tryInitRowAccessor(row.schema());
Object[] recordKeys = rowAccessor.getRecordKeyParts(row);
// NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
// record-key field
if (recordKeys[0] == null) {
return handleNullRecordKey(null);
} else {
return requireNonNullNonEmptyKey(recordKeys[0].toString());
}
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
tryInitRowAccessor(schema);
Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow);
// NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
// record-key field
if (recordKeyValues[0] == null) {
return handleNullRecordKey(null);
} else if (recordKeyValues[0] instanceof UTF8String) {
return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]);
} else {
return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString()));
}
}
@Override
public String getPartitionPath(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathSchemaInfo);
tryInitRowAccessor(row.schema());
return combinePartitionPath(rowAccessor.getRecordPartitionPathValues(row));
}
@Override
public String getPartitionPath(InternalRow row, StructType structType) {
return getPartitionPathInternal(row, structType);
public UTF8String getPartitionPath(InternalRow row, StructType schema) {
tryInitRowAccessor(schema);
return combinePartitionPathUnsafe(rowAccessor.getRecordPartitionPathValues(row));
}
}

View File

@@ -18,20 +18,65 @@
package org.apache.hudi.keygen;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIMethod;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
/**
* Spark key generator interface.
* Spark-specific {@link KeyGenerator} interface extension allowing implementation to
* specifically implement record-key, partition-path generation w/o the need for (expensive)
* conversion from Spark internal representation (for ex, to Avro)
*/
public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
/**
* Extracts record key from Spark's {@link Row}
*
* @param row instance of {@link Row} from which record-key is extracted
* @return record's (primary) key
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
String getRecordKey(Row row);
String getRecordKey(InternalRow row, StructType schema);
/**
* Extracts record key from Spark's {@link InternalRow}
*
* NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could
* internally hold just a binary representation of the data, while {@link Row} has it
* deserialized into JVM-native representation (like {@code Integer}, {@code Long},
* {@code String}, etc)
*
* @param row instance of {@link InternalRow} from which record-key is extracted
* @param schema schema {@link InternalRow} is adhering to
* @return record-key as instance of {@link UTF8String}
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
UTF8String getRecordKey(InternalRow row, StructType schema);
/**
* Extracts partition-path from {@link Row}
*
* @param row instance of {@link Row} from which partition-path is extracted
* @return record's partition-path
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
String getPartitionPath(Row row);
String getPartitionPath(InternalRow internalRow, StructType structType);
/**
* Extracts partition-path from Spark's {@link InternalRow}
*
* NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could
* internally hold just a binary representation of the data, while {@link Row} has it
* deserialized into JVM-native representation (like {@code Integer}, {@code Long},
* {@code String}, etc)
*
* @param row instance of {@link InternalRow} from which record-key is extracted
* @param schema schema {@link InternalRow} is adhering to
* @return partition-path as instance of {@link UTF8String}
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
UTF8String getPartitionPath(InternalRow row, StructType schema);
}

View File

@@ -18,20 +18,19 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
import java.util.Objects;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
* Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
@@ -61,39 +60,44 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
@Override
public String getRecordKey(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeySchemaInfo, false);
tryInitRowAccessor(row.schema());
return combineRecordKey(rowAccessor.getRecordKeyParts(row));
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
tryInitRowAccessor(schema);
return combineRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
}
@Override
public String getPartitionPath(Row row) {
buildFieldSchemaInfoIfNeeded(row.schema());
Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getKey());
return getTimestampBasedPartitionPath(partitionPathFieldVal);
tryInitRowAccessor(row.schema());
Object[] partitionPathValues = rowAccessor.getRecordPartitionPathValues(row);
return getFormattedPartitionPath(partitionPathValues[0]);
}
@Override
public String getPartitionPath(InternalRow internalRow, StructType structType) {
buildFieldSchemaInfoIfNeeded(structType);
validatePartitionFieldsForInternalRow();
Object partitionPathFieldVal = RowKeyGeneratorHelper.getFieldValFromInternalRow(internalRow,
partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getKey().get(0),
partitionPathSchemaInfo.get(getPartitionPathFields().get(0)).getValue());
return getTimestampBasedPartitionPath(partitionPathFieldVal);
public UTF8String getPartitionPath(InternalRow row, StructType schema) {
tryInitRowAccessor(schema);
Object[] partitionPathValues = rowAccessor.getRecordPartitionPathValues(row);
return UTF8String.fromString(getFormattedPartitionPath(partitionPathValues[0]));
}
private String getTimestampBasedPartitionPath(Object partitionPathFieldVal) {
Object fieldVal = null;
private String getFormattedPartitionPath(Object partitionPathPart) {
Object fieldVal;
if (partitionPathPart == null || Objects.equals(partitionPathPart, HUDI_DEFAULT_PARTITION_PATH)) {
fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal();
} else if (partitionPathPart instanceof UTF8String) {
fieldVal = partitionPathPart.toString();
} else {
fieldVal = partitionPathPart;
}
try {
if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(HUDI_DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
|| partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal();
} else {
fieldVal = partitionPathFieldVal;
}
return timestampBasedAvroKeyGenerator.getPartitionPath(fieldVal);
} catch (Exception e) {
throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + fieldVal, e);
throw new HoodieKeyGeneratorException(String.format("Failed to properly format partition-path (%s)", fieldVal), e);
}
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.table.action.bootstrap;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
@@ -30,14 +32,13 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
public abstract class BaseBootstrapMetadataHandler implements BootstrapMetadataHandler {
private static final Logger LOG = LogManager.getLogger(ParquetBootstrapMetadataHandler.class);
@@ -57,8 +58,10 @@ public abstract class BaseBootstrapMetadataHandler implements BootstrapMetadataH
table, partitionPath, FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());
try {
Schema avroSchema = getAvroSchema(sourceFilePath);
Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema,
keyGenerator.getRecordKeyFieldNames());
List<String> recordKeyColumns = keyGenerator.getRecordKeyFieldNames().stream()
.map(HoodieAvroUtils::getRootLevelFieldName)
.collect(Collectors.toList());
Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);
AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);
AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);

View File

@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.unsafe;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.UTF8String;
/**
* A helper class to write {@link UTF8String}s to an internal buffer and build the concatenated
* {@link UTF8String} at the end.
*/
public class UTF8StringBuilder {
private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
private byte[] buffer;
private int cursor = Platform.BYTE_ARRAY_OFFSET;
public UTF8StringBuilder() {
// Since initial buffer size is 16 in `StringBuilder`, we set the same size here
this(16);
}
public UTF8StringBuilder(int initialSize) {
if (initialSize < 0) {
throw new IllegalArgumentException("Size must be non-negative");
}
if (initialSize > ARRAY_MAX) {
throw new IllegalArgumentException(
"Size " + initialSize + " exceeded maximum size of " + ARRAY_MAX);
}
this.buffer = new byte[initialSize];
}
// Grows the buffer by at least `neededSize`
private void grow(int neededSize) {
if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow internal buffer by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + ARRAY_MAX);
}
final int length = totalSize() + neededSize;
if (buffer.length < length) {
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
final byte[] tmp = new byte[newLength];
Platform.copyMemory(
buffer,
Platform.BYTE_ARRAY_OFFSET,
tmp,
Platform.BYTE_ARRAY_OFFSET,
totalSize());
buffer = tmp;
}
}
private int totalSize() {
return cursor - Platform.BYTE_ARRAY_OFFSET;
}
public void append(UTF8String value) {
grow(value.numBytes());
value.writeToMemory(buffer, cursor);
cursor += value.numBytes();
}
public void append(String value) {
append(UTF8String.fromString(value));
}
public void appendBytes(Object base, long offset, int length) {
grow(length);
Platform.copyMemory(
base,
offset,
buffer,
cursor,
length);
cursor += length;
}
public UTF8String build() {
return UTF8String.fromBytes(buffer, 0, totalSize());
}
}

View File

@@ -29,16 +29,16 @@ object HoodieUnsafeRowUtils {
* Fetches (nested) value w/in provided [[Row]] uniquely identified by the provided nested-field path
* previously composed by [[composeNestedFieldPath]]
*/
def getNestedRowValue(row: Row, nestedFieldPath: Array[(Int, StructField)]): Any = {
def getNestedRowValue(row: Row, nestedFieldPath: NestedFieldPath): Any = {
var curRow = row
for (idx <- nestedFieldPath.indices) {
val (ord, f) = nestedFieldPath(idx)
for (idx <- nestedFieldPath.parts.indices) {
val (ord, f) = nestedFieldPath.parts(idx)
if (curRow.isNullAt(ord)) {
// scalastyle:off return
if (f.nullable) return null
else throw new IllegalArgumentException(s"Found null value for the field that is declared as non-nullable: $f")
// scalastyle:on return
} else if (idx == nestedFieldPath.length - 1) {
} else if (idx == nestedFieldPath.parts.length - 1) {
// scalastyle:off return
return curRow.get(ord)
// scalastyle:on return
@@ -57,21 +57,21 @@ object HoodieUnsafeRowUtils {
* Fetches (nested) value w/in provided [[InternalRow]] uniquely identified by the provided nested-field path
* previously composed by [[composeNestedFieldPath]]
*/
def getNestedInternalRowValue(row: InternalRow, nestedFieldPath: Array[(Int, StructField)]): Any = {
if (nestedFieldPath.length == 0) {
def getNestedInternalRowValue(row: InternalRow, nestedFieldPath: NestedFieldPath): Any = {
if (nestedFieldPath.parts.length == 0) {
throw new IllegalArgumentException("Nested field-path could not be empty")
}
var curRow = row
var idx = 0
while (idx < nestedFieldPath.length) {
val (ord, f) = nestedFieldPath(idx)
while (idx < nestedFieldPath.parts.length) {
val (ord, f) = nestedFieldPath.parts(idx)
if (curRow.isNullAt(ord)) {
// scalastyle:off return
if (f.nullable) return null
else throw new IllegalArgumentException(s"Found null value for the field that is declared as non-nullable: $f")
// scalastyle:on return
} else if (idx == nestedFieldPath.length - 1) {
} else if (idx == nestedFieldPath.parts.length - 1) {
// scalastyle:off return
return curRow.get(ord, f.dataType)
// scalastyle:on return
@@ -93,7 +93,7 @@ object HoodieUnsafeRowUtils {
*
* This method produces nested-field path, that is subsequently used by [[getNestedInternalRowValue]], [[getNestedRowValue]]
*/
def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): Array[(Int, StructField)] = {
def composeNestedFieldPath(schema: StructType, nestedFieldRef: String): NestedFieldPath = {
val fieldRefParts = nestedFieldRef.split('.')
val ordSeq = ArrayBuffer[(Int, StructField)]()
var curSchema = schema
@@ -115,6 +115,8 @@ object HoodieUnsafeRowUtils {
idx += 1
}
ordSeq.toArray
NestedFieldPath(ordSeq.toArray)
}
case class NestedFieldPath(parts: Array[(Int, StructField)])
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage.row;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieParquetConfig;
@@ -115,6 +116,6 @@ public class TestHoodieInternalRowParquetWriter extends HoodieClientTestHarness
writeConfig.getBloomFilterFPP(),
writeConfig.getDynamicBloomFilterMaxNumEntries(),
writeConfig.getBloomFilterType());
return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, filter, writeConfig);
return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, Option.of(filter), writeConfig);
}
}

View File

@@ -80,9 +80,11 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
@ValueSource(booleans = { true, false })
public void testRowCreateHandle(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg =
SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort).build();
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieWriteConfig config = SparkDatasetTestUtils.getConfigBuilder(basePath, timelineServicePort)
.withPopulateMetaFields(populateMetaFields)
.build();
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
List<String> fileNames = new ArrayList<>();
List<String> fileAbsPaths = new ArrayList<>();
@@ -95,8 +97,8 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
String fileId = UUID.randomUUID().toString();
String instantTime = "000";
HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime,
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, populateMetaFields);
HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, config, partitionPath, fileId, instantTime,
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
int size = 10 + RANDOM.nextInt(1000);
// Generate inputs
Dataset<Row> inputRows = SparkDatasetTestUtils.getRandomRows(sqlContext, size, partitionPath, false);
@@ -133,7 +135,7 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
String instantTime = "000";
HoodieRowCreateHandle handle =
new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, true);
new HoodieRowCreateHandle(table, cfg, partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
int size = 10 + RANDOM.nextInt(1000);
int totalFailures = 5;
// Generate first batch of valid rows
@@ -186,7 +188,7 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
try {
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE, true);
new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE);
fail("Should have thrown exception");
} catch (HoodieInsertException ioe) {
// expected without metadata table

View File

@@ -179,6 +179,7 @@ public class SparkDatasetTestUtils {
public static HoodieWriteConfig.Builder getConfigBuilder(String basePath, int timelineServicePort) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withPopulateMetaFields(true)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())

View File

@@ -1,102 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen
import java.sql.Timestamp
import org.apache.spark.sql.Row
import org.apache.hudi.keygen.RowKeyGeneratorHelper._
import org.apache.spark.sql.types.{DataType, DataTypes}
import org.junit.jupiter.api.{Assertions, Test}
import scala.collection.JavaConverters._
class TestRowGeneratorHelper {
@Test
def testGetPartitionPathFromRow(): Unit = {
/** single plain partition */
val row1 = Row.fromSeq(Seq(1, "z3", 10.0, "20220108"))
val ptField1 = List("dt").asJava
val mapValue = org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.LongType)
val ptPos1 = Map("dt" -> mapValue).asJava
Assertions.assertEquals("20220108",
getPartitionPathFromRow(row1, ptField1, false, ptPos1))
Assertions.assertEquals("dt=20220108",
getPartitionPathFromRow(row1, ptField1, true, ptPos1))
/** multiple plain partitions */
val row2 = Row.fromSeq(Seq(1, "z3", 10.0, "2022", "01", "08"))
val ptField2 = List("year", "month", "day").asJava
val ptPos2 = Map("year" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.StringType),
"month" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.StringType),
"day" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(5)).asJava, DataTypes.StringType)
).asJava
Assertions.assertEquals("2022/01/08",
getPartitionPathFromRow(row2, ptField2, false, ptPos2))
Assertions.assertEquals("year=2022/month=01/day=08",
getPartitionPathFromRow(row2, ptField2, true, ptPos2))
/** multiple partitions which contains TimeStamp type or Instant type */
val timestamp = Timestamp.valueOf("2020-01-08 10:00:00")
val instant = timestamp.toInstant
val ptField3 = List("event", "event_time").asJava
val ptPos3 = Map("event" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3)).asJava, DataTypes.StringType),
"event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.TimestampType)
).asJava
// with timeStamp type
val row2_ts = Row.fromSeq(Seq(1, "z3", 10.0, "click", timestamp))
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
getPartitionPathFromRow(row2_ts, ptField3, false, ptPos3))
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
getPartitionPathFromRow(row2_ts, ptField3, true, ptPos3))
// with instant type
val row2_instant = Row.fromSeq(Seq(1, "z3", 10.0, "click", instant))
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
getPartitionPathFromRow(row2_instant, ptField3, false, ptPos3))
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
getPartitionPathFromRow(row2_instant, ptField3, true, ptPos3))
/** mixed case with plain and nested partitions */
val nestedRow4 = Row.fromSeq(Seq(instant, "ad"))
val ptField4 = List("event_time").asJava
val ptPos4 = Map("event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3), new Integer(0)).asJava, DataTypes.TimestampType)).asJava
// with instant type
val row4 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow4, "click"))
Assertions.assertEquals("2020-01-08 10:00:00.0",
getPartitionPathFromRow(row4, ptField4, false, ptPos4))
Assertions.assertEquals("event_time=2020-01-08 10:00:00.0",
getPartitionPathFromRow(row4, ptField4, true, ptPos4))
val nestedRow5 = Row.fromSeq(Seq(timestamp, "ad"))
val ptField5 = List("event", "event_time").asJava
val ptPos5 = Map(
"event_time" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(3), new Integer(0)).asJava, DataTypes.TimestampType),
"event" -> org.apache.hudi.common.util.collection.Pair.of(List(new Integer(4)).asJava, DataTypes.StringType)
).asJava
val row5 = Row.fromSeq(Seq(1, "z3", 10.0, nestedRow5, "click"))
Assertions.assertEquals("click/2020-01-08 10:00:00.0",
getPartitionPathFromRow(row5, ptField5, false, ptPos5))
Assertions.assertEquals("event=click/event_time=2020-01-08 10:00:00.0",
getPartitionPathFromRow(row5, ptField5, true, ptPos5))
}
}

View File

@@ -41,7 +41,7 @@ class TestHoodieUnsafeRowUtils {
assertEquals(
Seq((1, schema(1)), (0, schema(1).dataType.asInstanceOf[StructType](0))),
composeNestedFieldPath(schema, "bar.baz").toSeq)
composeNestedFieldPath(schema, "bar.baz").parts.toSeq)
assertThrows(classOf[IllegalArgumentException]) { () =>
composeNestedFieldPath(schema, "foo.baz")
@@ -148,6 +148,7 @@ class TestHoodieUnsafeRowUtils {
}
}
// TODO rebase on ScalaAssertionSupport
private def assertThrows[T <: Throwable](expectedExceptionClass: Class[T])(f: () => Unit): T = {
try {
f.apply()