[HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate vali… (#2045)
* [HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate valid timestamps Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>
This commit is contained in:
committed by
GitHub
parent
da51aa64fc
commit
e33a8f733c
@@ -129,7 +129,7 @@ public class DeltaGenerator implements Serializable {
|
|||||||
|
|
||||||
public JavaRDD<GenericRecord> generateInserts(Config operation) {
|
public JavaRDD<GenericRecord> generateInserts(Config operation) {
|
||||||
int numPartitions = operation.getNumInsertPartitions();
|
int numPartitions = operation.getNumInsertPartitions();
|
||||||
long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
|
long recordsPerPartition = operation.getNumRecordsInsert();
|
||||||
int minPayloadSize = operation.getRecordSize();
|
int minPayloadSize = operation.getRecordSize();
|
||||||
int startPartition = operation.getStartPartition();
|
int startPartition = operation.getStartPartition();
|
||||||
|
|
||||||
@@ -140,7 +140,7 @@ public class DeltaGenerator implements Serializable {
|
|||||||
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, numPartitions)
|
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, numPartitions)
|
||||||
.mapPartitionsWithIndex((index, p) -> {
|
.mapPartitionsWithIndex((index, p) -> {
|
||||||
return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
|
return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
|
||||||
minPayloadSize, schemaStr, partitionPathFieldNames, (Integer)index));
|
minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions));
|
||||||
}, true);
|
}, true);
|
||||||
|
|
||||||
if (deltaOutputConfig.getInputParallelism() < numPartitions) {
|
if (deltaOutputConfig.getInputParallelism() < numPartitions) {
|
||||||
|
|||||||
@@ -20,11 +20,6 @@ package org.apache.hudi.integ.testsuite.generator;
|
|||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
@@ -46,17 +41,21 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
|
|||||||
private GenericRecord lastRecord;
|
private GenericRecord lastRecord;
|
||||||
// Partition path field name
|
// Partition path field name
|
||||||
private Set<String> partitionPathFieldNames;
|
private Set<String> partitionPathFieldNames;
|
||||||
|
private String firstPartitionPathField;
|
||||||
|
|
||||||
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) {
|
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) {
|
||||||
this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
|
this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr,
|
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr,
|
||||||
List<String> partitionPathFieldNames, int partitionIndex) {
|
List<String> partitionPathFieldNames, int numPartitions) {
|
||||||
this.counter = maxEntriesToProduce;
|
this.counter = maxEntriesToProduce;
|
||||||
this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames);
|
this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames);
|
||||||
|
if(partitionPathFieldNames != null && partitionPathFieldNames.size() > 0) {
|
||||||
|
this.firstPartitionPathField = partitionPathFieldNames.get(0);
|
||||||
|
}
|
||||||
Schema schema = new Schema.Parser().parse(schemaStr);
|
Schema schema = new Schema.Parser().parse(schemaStr);
|
||||||
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, partitionIndex);
|
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -67,12 +66,18 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
|
|||||||
@Override
|
@Override
|
||||||
public GenericRecord next() {
|
public GenericRecord next() {
|
||||||
this.counter--;
|
this.counter--;
|
||||||
|
boolean partitionPathsNonEmpty = partitionPathFieldNames != null && partitionPathFieldNames.size() > 0;
|
||||||
if (lastRecord == null) {
|
if (lastRecord == null) {
|
||||||
GenericRecord record = this.generator.getNewPayload(partitionPathFieldNames);
|
GenericRecord record = partitionPathsNonEmpty
|
||||||
|
? this.generator.getNewPayloadWithTimestamp(this.firstPartitionPathField)
|
||||||
|
: this.generator.getNewPayload();
|
||||||
lastRecord = record;
|
lastRecord = record;
|
||||||
return record;
|
return record;
|
||||||
} else {
|
} else {
|
||||||
return this.generator.randomize(lastRecord, this.partitionPathFieldNames);
|
return partitionPathsNonEmpty
|
||||||
|
? this.generator.getUpdatePayloadWithTimestamp(lastRecord,
|
||||||
|
this.partitionPathFieldNames, firstPartitionPathField)
|
||||||
|
: this.generator.getUpdatePayload(lastRecord, this.partitionPathFieldNames);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,9 +46,9 @@ import java.util.concurrent.TimeUnit;
|
|||||||
*/
|
*/
|
||||||
public class GenericRecordFullPayloadGenerator implements Serializable {
|
public class GenericRecordFullPayloadGenerator implements Serializable {
|
||||||
|
|
||||||
private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
|
private static final Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
|
||||||
|
|
||||||
public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
|
public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
|
||||||
|
public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
|
||||||
public static final String DEFAULT_HOODIE_IS_DELETED_COL = "_hoodie_is_deleted";
|
public static final String DEFAULT_HOODIE_IS_DELETED_COL = "_hoodie_is_deleted";
|
||||||
protected final Random random = new Random();
|
protected final Random random = new Random();
|
||||||
// The source schema used to generate a payload
|
// The source schema used to generate a payload
|
||||||
@@ -58,10 +58,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
|||||||
// The index of partition for which records are being generated
|
// The index of partition for which records are being generated
|
||||||
private int partitionIndex = 0;
|
private int partitionIndex = 0;
|
||||||
// The size of a full record where every field of a generic record created contains 1 random value
|
// The size of a full record where every field of a generic record created contains 1 random value
|
||||||
private final int estimatedFullPayloadSize;
|
private int estimatedFullPayloadSize;
|
||||||
// Number of extra entries to add in a complex/collection field to achieve the desired record size
|
// Number of extra entries to add in a complex/collection field to achieve the desired record size
|
||||||
Map<String, Integer> extraEntriesMap = new HashMap<>();
|
Map<String, Integer> extraEntriesMap = new HashMap<>();
|
||||||
|
|
||||||
|
// The number of unique dates to create
|
||||||
|
private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
|
||||||
// LogicalTypes in Avro 1.8.2
|
// LogicalTypes in Avro 1.8.2
|
||||||
private static final String DECIMAL = "decimal";
|
private static final String DECIMAL = "decimal";
|
||||||
private static final String UUID_NAME = "uuid";
|
private static final String UUID_NAME = "uuid";
|
||||||
@@ -75,6 +77,11 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
|||||||
this(schema, DEFAULT_PAYLOAD_SIZE);
|
this(schema, DEFAULT_PAYLOAD_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) {
|
||||||
|
this(schema, minPayloadSize);
|
||||||
|
this.numDatePartitions = numDatePartitions;
|
||||||
|
}
|
||||||
|
|
||||||
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) {
|
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) {
|
||||||
Pair<Integer, Integer> sizeInfo = new GenericRecordFullPayloadSizeEstimator(schema)
|
Pair<Integer, Integer> sizeInfo = new GenericRecordFullPayloadSizeEstimator(schema)
|
||||||
.typeEstimateAndNumComplexFields();
|
.typeEstimateAndNumComplexFields();
|
||||||
@@ -83,19 +90,13 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
|||||||
if (estimatedFullPayloadSize < minPayloadSize) {
|
if (estimatedFullPayloadSize < minPayloadSize) {
|
||||||
int numberOfComplexFields = sizeInfo.getRight();
|
int numberOfComplexFields = sizeInfo.getRight();
|
||||||
if (numberOfComplexFields < 1) {
|
if (numberOfComplexFields < 1) {
|
||||||
LOG.warn("The schema does not have any collections/complex fields. "
|
LOG.warn("The schema does not have any collections/complex fields. Cannot achieve minPayloadSize : {}",
|
||||||
+ "Cannot achieve minPayloadSize => " + minPayloadSize);
|
minPayloadSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize);
|
determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int partitionIndex) {
|
|
||||||
this(schema, minPayloadSize);
|
|
||||||
this.partitionIndex = partitionIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static boolean isPrimitive(Schema localSchema) {
|
protected static boolean isPrimitive(Schema localSchema) {
|
||||||
if (localSchema.getType() != Type.ARRAY
|
if (localSchema.getType() != Type.ARRAY
|
||||||
&& localSchema.getType() != Type.MAP
|
&& localSchema.getType() != Type.MAP
|
||||||
@@ -131,6 +132,15 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
|||||||
return create(baseSchema, partitionPathFieldNames);
|
return create(baseSchema, partitionPathFieldNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public GenericRecord getNewPayloadWithTimestamp(String tsFieldName) {
|
||||||
|
return updateTimestamp(create(baseSchema, null), tsFieldName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public GenericRecord getUpdatePayloadWithTimestamp(GenericRecord record, Set<String> blacklistFields,
|
||||||
|
String tsFieldName) {
|
||||||
|
return updateTimestamp(randomize(record, blacklistFields), tsFieldName);
|
||||||
|
}
|
||||||
|
|
||||||
protected GenericRecord create(Schema schema, Set<String> partitionPathFieldNames) {
|
protected GenericRecord create(Schema schema, Set<String> partitionPathFieldNames) {
|
||||||
GenericRecord result = new GenericData.Record(schema);
|
GenericRecord result = new GenericData.Record(schema);
|
||||||
for (Schema.Field f : schema.getFields()) {
|
for (Schema.Field f : schema.getFields()) {
|
||||||
@@ -314,6 +324,17 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
|||||||
return genericData.validate(baseSchema, record);
|
return genericData.validate(baseSchema, record);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Generates a sequential timestamp (daily increment), and updates the timestamp field of the record.
|
||||||
|
* Note: When generating records, number of records to be generated must be more than numDatePartitions * parallelism,
|
||||||
|
* to guarantee that at least numDatePartitions are created.
|
||||||
|
*/
|
||||||
|
public GenericRecord updateTimestamp(GenericRecord record, String fieldName) {
|
||||||
|
long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex % numDatePartitions, TimeUnit.DAYS);
|
||||||
|
record.put(fieldName, System.currentTimeMillis() - delta);
|
||||||
|
return record;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether a schema is option. return true if it match the follows: 1. Its type is Type.UNION 2. Has two types 3. Has a NULL type.
|
* Check whether a schema is option. return true if it match the follows: 1. Its type is Type.UNION 2. Has two types 3. Has a NULL type.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import java.util.Arrays;
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
@@ -133,4 +134,38 @@ public class TestGenericRecordPayloadGenerator {
|
|||||||
assertTrue(HoodieAvroUtils.avroToBytes(record).length < minPayloadSize + 0.1 * minPayloadSize);
|
assertTrue(HoodieAvroUtils.avroToBytes(record).length < minPayloadSize + 0.1 * minPayloadSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdatePayloadGeneratorWithTimestamp() throws IOException {
|
||||||
|
Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers
|
||||||
|
.readFileFromAbsolutePath(System.getProperty("user.dir") + "/.." + SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH));
|
||||||
|
GenericRecordFullPayloadGenerator payloadGenerator = new GenericRecordFullPayloadGenerator(schema);
|
||||||
|
List<String> insertRowKeys = new ArrayList<>();
|
||||||
|
List<String> updateRowKeys = new ArrayList<>();
|
||||||
|
List<Long> insertTimeStamps = new ArrayList<>();
|
||||||
|
List<Long> updateTimeStamps = new ArrayList<>();
|
||||||
|
List<GenericRecord> records = new ArrayList<>();
|
||||||
|
Long startMillis = System.currentTimeMillis() - TimeUnit.MILLISECONDS
|
||||||
|
.convert(GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS, TimeUnit.DAYS);
|
||||||
|
|
||||||
|
// Generate 10 new records
|
||||||
|
IntStream.range(0, 10).forEach(a -> {
|
||||||
|
GenericRecord record = payloadGenerator.getNewPayloadWithTimestamp("timestamp");
|
||||||
|
records.add(record);
|
||||||
|
insertRowKeys.add(record.get("_row_key").toString());
|
||||||
|
insertTimeStamps.add((Long) record.get("timestamp"));
|
||||||
|
});
|
||||||
|
Set<String> blacklistFields = new HashSet<>(Arrays.asList("_row_key"));
|
||||||
|
records.stream().forEach(a -> {
|
||||||
|
// Generate 10 updated records
|
||||||
|
GenericRecord record = payloadGenerator.getUpdatePayloadWithTimestamp(a, blacklistFields, "timestamp");
|
||||||
|
updateRowKeys.add(record.get("_row_key").toString());
|
||||||
|
updateTimeStamps.add((Long) record.get("timestamp"));
|
||||||
|
});
|
||||||
|
// The row keys from insert payloads should match all the row keys from the update payloads
|
||||||
|
assertTrue(insertRowKeys.containsAll(updateRowKeys));
|
||||||
|
// The timestamp field for the insert payloads should not all match with the update payloads
|
||||||
|
assertFalse(insertTimeStamps.containsAll(updateTimeStamps));
|
||||||
|
Long currentMillis = System.currentTimeMillis();
|
||||||
|
assertTrue(insertTimeStamps.stream().allMatch(t -> t >= startMillis && t <= currentMillis));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user