From e33a8f733c4a9a94479c166ad13ae9d53142cd3f Mon Sep 17 00:00:00 2001 From: Balajee Nagasubramaniam <47542891+nbalajee@users.noreply.github.com> Date: Tue, 29 Dec 2020 13:33:19 -0800 Subject: [PATCH] =?UTF-8?q?[HUDI-1147]=20Modify=20GenericRecordFullPayload?= =?UTF-8?q?Generator=20to=20generate=20vali=E2=80=A6=20(#2045)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [HUDI-1147] Modify GenericRecordFullPayloadGenerator to generate valid timestamps Co-authored-by: Sivabalan Narayanan --- .../testsuite/generator/DeltaGenerator.java | 4 +- ...lexibleSchemaRecordGenerationIterator.java | 23 ++++++---- .../GenericRecordFullPayloadGenerator.java | 43 ++++++++++++++----- .../TestGenericRecordPayloadGenerator.java | 35 +++++++++++++++ 4 files changed, 83 insertions(+), 22 deletions(-) diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index 6242cbfc7..30b2d6ce0 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -129,7 +129,7 @@ public class DeltaGenerator implements Serializable { public JavaRDD generateInserts(Config operation) { int numPartitions = operation.getNumInsertPartitions(); - long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions; + long recordsPerPartition = operation.getNumRecordsInsert(); int minPayloadSize = operation.getRecordSize(); int startPartition = operation.getStartPartition(); @@ -140,7 +140,7 @@ public class DeltaGenerator implements Serializable { JavaRDD inputBatch = jsc.parallelize(partitionIndexes, numPartitions) .mapPartitionsWithIndex((index, p) -> { return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition, - minPayloadSize, schemaStr, partitionPathFieldNames, (Integer)index)); + minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions)); }, true); if (deltaOutputConfig.getInputParallelism() < numPartitions) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java index 5477371a1..787ec844e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java @@ -20,11 +20,6 @@ package org.apache.hudi.integ.testsuite.generator; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.List; import java.util.HashSet; import java.util.Iterator; @@ -46,17 +41,21 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator partitionPathFieldNames; + private String firstPartitionPathField; public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) { this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0); } public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr, - List partitionPathFieldNames, int partitionIndex) { + List partitionPathFieldNames, int numPartitions) { this.counter = maxEntriesToProduce; this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames); + if(partitionPathFieldNames != null && partitionPathFieldNames.size() > 0) { + this.firstPartitionPathField = partitionPathFieldNames.get(0); + } Schema schema = new Schema.Parser().parse(schemaStr); - this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, partitionIndex); + this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions); } @Override @@ -67,12 +66,18 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator 0; if (lastRecord == null) { - GenericRecord record = this.generator.getNewPayload(partitionPathFieldNames); + GenericRecord record = partitionPathsNonEmpty + ? this.generator.getNewPayloadWithTimestamp(this.firstPartitionPathField) + : this.generator.getNewPayload(); lastRecord = record; return record; } else { - return this.generator.randomize(lastRecord, this.partitionPathFieldNames); + return partitionPathsNonEmpty + ? this.generator.getUpdatePayloadWithTimestamp(lastRecord, + this.partitionPathFieldNames, firstPartitionPathField) + : this.generator.getUpdatePayload(lastRecord, this.partitionPathFieldNames); } } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java index 49a5f312e..510fc499b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java @@ -46,9 +46,9 @@ import java.util.concurrent.TimeUnit; */ public class GenericRecordFullPayloadGenerator implements Serializable { - private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class); - + private static final Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class); public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB + public static final int DEFAULT_NUM_DATE_PARTITIONS = 50; public static final String DEFAULT_HOODIE_IS_DELETED_COL = "_hoodie_is_deleted"; protected final Random random = new Random(); // The source schema used to generate a payload @@ -58,10 +58,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable { // The index of partition for which records are being generated private int partitionIndex = 0; // The size of a full record where every field of a generic record created contains 1 random value - private final int estimatedFullPayloadSize; + private int estimatedFullPayloadSize; // Number of extra entries to add in a complex/collection field to achieve the desired record size Map extraEntriesMap = new HashMap<>(); + // The number of unique dates to create + private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS; // LogicalTypes in Avro 1.8.2 private static final String DECIMAL = "decimal"; private static final String UUID_NAME = "uuid"; @@ -75,6 +77,11 @@ public class GenericRecordFullPayloadGenerator implements Serializable { this(schema, DEFAULT_PAYLOAD_SIZE); } + public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) { + this(schema, minPayloadSize); + this.numDatePartitions = numDatePartitions; + } + public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) { Pair sizeInfo = new GenericRecordFullPayloadSizeEstimator(schema) .typeEstimateAndNumComplexFields(); @@ -83,19 +90,13 @@ public class GenericRecordFullPayloadGenerator implements Serializable { if (estimatedFullPayloadSize < minPayloadSize) { int numberOfComplexFields = sizeInfo.getRight(); if (numberOfComplexFields < 1) { - LOG.warn("The schema does not have any collections/complex fields. " - + "Cannot achieve minPayloadSize => " + minPayloadSize); + LOG.warn("The schema does not have any collections/complex fields. Cannot achieve minPayloadSize : {}", + minPayloadSize); } - determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize); } } - public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int partitionIndex) { - this(schema, minPayloadSize); - this.partitionIndex = partitionIndex; - } - protected static boolean isPrimitive(Schema localSchema) { if (localSchema.getType() != Type.ARRAY && localSchema.getType() != Type.MAP @@ -131,6 +132,15 @@ public class GenericRecordFullPayloadGenerator implements Serializable { return create(baseSchema, partitionPathFieldNames); } + public GenericRecord getNewPayloadWithTimestamp(String tsFieldName) { + return updateTimestamp(create(baseSchema, null), tsFieldName); + } + + public GenericRecord getUpdatePayloadWithTimestamp(GenericRecord record, Set blacklistFields, + String tsFieldName) { + return updateTimestamp(randomize(record, blacklistFields), tsFieldName); + } + protected GenericRecord create(Schema schema, Set partitionPathFieldNames) { GenericRecord result = new GenericData.Record(schema); for (Schema.Field f : schema.getFields()) { @@ -314,6 +324,17 @@ public class GenericRecordFullPayloadGenerator implements Serializable { return genericData.validate(baseSchema, record); } + /* + * Generates a sequential timestamp (daily increment), and updates the timestamp field of the record. + * Note: When generating records, number of records to be generated must be more than numDatePartitions * parallelism, + * to guarantee that at least numDatePartitions are created. + */ + public GenericRecord updateTimestamp(GenericRecord record, String fieldName) { + long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex % numDatePartitions, TimeUnit.DAYS); + record.put(fieldName, System.currentTimeMillis() - delta); + return record; + } + /** * Check whether a schema is option. return true if it match the follows: 1. Its type is Type.UNION 2. Has two types 3. Has a NULL type. */ diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java index 94515959d..2b3a65c71 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -133,4 +134,38 @@ public class TestGenericRecordPayloadGenerator { assertTrue(HoodieAvroUtils.avroToBytes(record).length < minPayloadSize + 0.1 * minPayloadSize); } + @Test + public void testUpdatePayloadGeneratorWithTimestamp() throws IOException { + Schema schema = new Schema.Parser().parse(UtilitiesTestBase.Helpers + .readFileFromAbsolutePath(System.getProperty("user.dir") + "/.." + SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH)); + GenericRecordFullPayloadGenerator payloadGenerator = new GenericRecordFullPayloadGenerator(schema); + List insertRowKeys = new ArrayList<>(); + List updateRowKeys = new ArrayList<>(); + List insertTimeStamps = new ArrayList<>(); + List updateTimeStamps = new ArrayList<>(); + List records = new ArrayList<>(); + Long startMillis = System.currentTimeMillis() - TimeUnit.MILLISECONDS + .convert(GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS, TimeUnit.DAYS); + + // Generate 10 new records + IntStream.range(0, 10).forEach(a -> { + GenericRecord record = payloadGenerator.getNewPayloadWithTimestamp("timestamp"); + records.add(record); + insertRowKeys.add(record.get("_row_key").toString()); + insertTimeStamps.add((Long) record.get("timestamp")); + }); + Set blacklistFields = new HashSet<>(Arrays.asList("_row_key")); + records.stream().forEach(a -> { + // Generate 10 updated records + GenericRecord record = payloadGenerator.getUpdatePayloadWithTimestamp(a, blacklistFields, "timestamp"); + updateRowKeys.add(record.get("_row_key").toString()); + updateTimeStamps.add((Long) record.get("timestamp")); + }); + // The row keys from insert payloads should match all the row keys from the update payloads + assertTrue(insertRowKeys.containsAll(updateRowKeys)); + // The timestamp field for the insert payloads should not all match with the update payloads + assertFalse(insertTimeStamps.containsAll(updateTimeStamps)); + Long currentMillis = System.currentTimeMillis(); + assertTrue(insertTimeStamps.stream().allMatch(t -> t >= startMillis && t <= currentMillis)); + } }