1
0

[HUDI-2909] Handle logical type in TimestampBasedKeyGenerator (#4203)

* [HUDI-2909] Handle logical type in TimestampBasedKeyGenerator

Timestampbased key generator was returning diff values for row writer and non row writer path. this patch fixes it and is guarded by a config flag (`hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled`)
This commit is contained in:
Sagar Sumit
2022-01-08 20:52:44 +05:30
committed by GitHub
parent 03a83ffeb5
commit 827549949c
36 changed files with 364 additions and 101 deletions

View File

@@ -49,6 +49,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
@@ -889,6 +890,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(KEYGENERATOR_CLASS_NAME);
}
public boolean isConsistentLogicalTimestampEnabled() {
return getBooleanOrDefault(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
}
public Boolean shouldAutoCommit() {
return getBoolean(AUTO_COMMIT_ENABLE);
}

View File

@@ -40,11 +40,11 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
}
@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath, isConsistentLogicalTimestampEnabled());
}
}

View File

@@ -40,7 +40,7 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
}
@Override

View File

@@ -65,7 +65,7 @@ public class KeyGenUtils {
/**
* Extracts the record key fields in strings out of the given record key,
* this is the reverse operation of {@link #getRecordKey(GenericRecord, String)}.
* this is the reverse operation of {@link #getRecordKey(GenericRecord, String, boolean)}.
*
* @see SimpleAvroKeyGenerator
* @see org.apache.hudi.keygen.ComplexAvroKeyGenerator
@@ -89,11 +89,11 @@ public class KeyGenUtils {
}
}
public static String getRecordKey(GenericRecord record, List<String> recordKeyFields) {
public static String getRecordKey(GenericRecord record, List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled) {
boolean keyIsNullEmpty = true;
StringBuilder recordKey = new StringBuilder();
for (String recordKeyField : recordKeyFields) {
String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true);
String recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled);
if (recordKeyValue == null) {
recordKey.append(recordKeyField + ":" + NULL_RECORDKEY_PLACEHOLDER + ",");
} else if (recordKeyValue.isEmpty()) {
@@ -112,14 +112,14 @@ public class KeyGenUtils {
}
public static String getRecordPartitionPath(GenericRecord record, List<String> partitionPathFields,
boolean hiveStylePartitioning, boolean encodePartitionPath) {
boolean hiveStylePartitioning, boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) {
if (partitionPathFields.isEmpty()) {
return "";
}
StringBuilder partitionPath = new StringBuilder();
for (String partitionPathField : partitionPathFields) {
String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled);
if (fieldVal == null || fieldVal.isEmpty()) {
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + HUDI_DEFAULT_PARTITION_PATH
: HUDI_DEFAULT_PARTITION_PATH);
@@ -135,8 +135,8 @@ public class KeyGenUtils {
return partitionPath.toString();
}
public static String getRecordKey(GenericRecord record, String recordKeyField) {
String recordKey = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true);
public static String getRecordKey(GenericRecord record, String recordKeyField, boolean consistentLogicalTimestampEnabled) {
String recordKey = HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, consistentLogicalTimestampEnabled);
if (recordKey == null || recordKey.isEmpty()) {
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
}
@@ -144,8 +144,8 @@ public class KeyGenUtils {
}
public static String getPartitionPath(GenericRecord record, String partitionPathField,
boolean hiveStylePartitioning, boolean encodePartitionPath) {
String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
boolean hiveStylePartitioning, boolean encodePartitionPath, boolean consistentLogicalTimestampEnabled) {
String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = HUDI_DEFAULT_PARTITION_PATH;
}

View File

@@ -57,9 +57,9 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
// 1. if there is only one record key field, the format of record key is just "<value>"
// 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
if (getRecordKeyFieldNames().size() == 1) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
}
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
return KeyGenUtils.getRecordKey(record, getRecordKeyFields(), isConsistentLogicalTimestampEnabled());
}
public String getEmptyPartition() {

View File

@@ -47,11 +47,11 @@ public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0), isConsistentLogicalTimestampEnabled());
}
@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath);
return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath, isConsistentLogicalTimestampEnabled());
}
}

View File

@@ -36,6 +36,7 @@ import org.joda.time.format.DateTimeFormatter;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
@@ -125,7 +126,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
@Override
public String getPartitionPath(GenericRecord record) {
Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true);
Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true, isConsistentLogicalTimestampEnabled());
if (partitionVal == null) {
partitionVal = getDefaultPartitionVal();
}
@@ -191,6 +192,8 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
} else if (partitionVal instanceof Long) {
timeMs = convertLongTimeToMillis((Long) partitionVal);
} else if (partitionVal instanceof Timestamp && isConsistentLogicalTimestampEnabled()) {
timeMs = ((Timestamp) partitionVal).getTime();
} else if (partitionVal instanceof Integer) {
timeMs = convertLongTimeToMillis(((Integer) partitionVal).longValue());
} else if (partitionVal instanceof BigDecimal) {
@@ -225,5 +228,4 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
}
return MILLISECONDS.convert(partitionVal, timeUnit);
}
}

View File

@@ -42,7 +42,7 @@ public class HoodieTableMetadataKeyGenerator extends BaseKeyGenerator {
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY);
return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY, isConsistentLogicalTimestampEnabled());
}
@Override

View File

@@ -124,7 +124,8 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
HoodieAvroUtils.addMetadataFields(schema),
getWriteConfig().isConsistentLogicalTimestampEnabled()));
} else {
return Option.empty();
}

View File

@@ -39,18 +39,20 @@ public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
private final String[] sortColumnNames;
private final Schema schema;
private final boolean consistentLogicalTimestampEnabled;
public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boolean consistentLogicalTimestampEnabled) {
this.sortColumnNames = columnNames;
this.schema = schema;
this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
}
@Override
public List<HoodieRecord<T>> repartitionRecords(
List<HoodieRecord<T>> records, int outputSparkPartitions) {
return records.stream().sorted((o1, o2) -> {
Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema);
Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema);
Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled);
Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled);
return values1.toString().compareTo(values2.toString());
}).collect(Collectors.toList());
}

View File

@@ -64,13 +64,13 @@ public class TestJavaBulkInsertInternalPartitioner extends HoodieJavaClientTestB
List<HoodieRecord> records = generateTestRecordsForBulkInsert(1000);
testBulkInsertInternalPartitioner(
new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records, true, generatePartitionNumRecords(records), Option.of(columnComparator));
}
private Comparator<HoodieRecord> getCustomColumnComparator(Schema schema, String[] sortColumns) {
return Comparator.comparing(
record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema).toString());
record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema, false).toString());
}
private void verifyRecordAscendingOrder(List<HoodieRecord> records,

View File

@@ -108,7 +108,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
return writeMetadata;
}
/**
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters.
* The number of new file groups created is bounded by numOutputGroups.
@@ -141,7 +140,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema)));
} else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
HoodieAvroUtils.addMetadataFields(schema), getWriteConfig().isConsistentLogicalTimestampEnabled()));
} else {
return Option.empty();
}

View File

@@ -39,15 +39,18 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
private final String[] sortColumnNames;
private final SerializableSchema serializableSchema;
private final boolean consistentLogicalTimestampEnabled;
public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema()));
this.sortColumnNames = getSortColumnName(config);
this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled();
}
public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boolean consistentLogicalTimestampEnabled) {
this.sortColumnNames = columnNames;
this.serializableSchema = new SerializableSchema(schema);
this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
}
@Override
@@ -55,9 +58,10 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
int outputSparkPartitions) {
final String[] sortColumns = this.sortColumnNames;
final SerializableSchema schema = this.serializableSchema;
final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled;
return records.sortBy(
record -> {
Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema);
Object recordValue = HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema, consistentLogicalTimestampEnabled);
// null values are replaced with empty string for null_first order
if (recordValue == null) {
return StringUtils.EMPTY_STRING;
@@ -66,7 +70,6 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
}
},
true, outputSparkPartitions);
}
@Override

View File

@@ -147,9 +147,9 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false),
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
HoodieWriteConfig config = HoodieWriteConfig

View File

@@ -45,7 +45,7 @@ public class TestBucketIdentifier {
String indexKeyField = "_row_key";
GenericRecord record = KeyGeneratorTestUtilities.getRecord();
HoodieRecord hoodieRecord = new HoodieRecord(
new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField), ""), null);
new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null);
int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8);
assert bucketId == BucketIdentifier.getBucketId(
Arrays.asList(record.get(indexKeyField).toString()), 8);
@@ -57,7 +57,7 @@ public class TestBucketIdentifier {
String indexKeyField = "_row_key";
GenericRecord record = KeyGeneratorTestUtilities.getRecord();
HoodieRecord hoodieRecord = new HoodieRecord(
new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField), ""), null);
new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null);
int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8);
assert bucketId == BucketIdentifier.getBucketId(
Arrays.asList(record.get(indexKeyField).toString()), 8);

View File

@@ -238,6 +238,40 @@ public class TestTimestampBasedKeyGenerator {
assertEquals("2021-04-19", keyGen.getPartitionPath(baseRow));
}
@Test
public void testScalarWithLogicalType() throws IOException {
schema = SchemaTestUtil.getTimestampWithLogicalTypeSchema();
structType = AvroConversionUtils.convertAvroSchemaToStructType(schema);
baseRecord = SchemaTestUtil.generateAvroRecordFromJson(schema, 1, "001", "f1");
baseRecord.put("createTime", 1638513806000000L);
properties = getBaseKeyConfig("SCALAR", "yyyy/MM/dd", "GMT", "MICROSECONDS");
properties.setProperty(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), "true");
TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
assertEquals("2021/12/03", hk1.getPartitionPath());
// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("2021/12/03", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("2021/12/03", keyGen.getPartitionPath(internalRow, baseRow.schema()));
// timezone is GMT, createTime is null
baseRecord.put("createTime", null);
properties = getBaseKeyConfig("SCALAR", "yyyy/MM/dd", "GMT", "MICROSECONDS");
properties.setProperty(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), "true");
keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk2 = keyGen.getKey(baseRecord);
assertEquals("1970/01/01", hk2.getPartitionPath());
// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("1970/01/01", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("1970/01/01", keyGen.getPartitionPath(internalRow, baseRow.schema()));
}
@Test
public void test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsUTC() throws IOException {
baseRecord.put("createTime", "2020-04-01T13:01:33.428Z");