1
0

[MINOR] Make constant fields final in HoodieTestDataGenerator (#1234)

This commit is contained in:
Y Ethan Guo
2020-01-15 20:42:30 -08:00
committed by leesf
parent 1daba24065
commit b39458b008
9 changed files with 38 additions and 37 deletions

View File

@@ -74,7 +74,7 @@ public class HoodieTestDataGenerator {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3;
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
@@ -82,13 +82,14 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"
+ "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
+ "struct<amount:double,currency:string>,boolean";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);
public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
private static Random rand = new Random(46474747);
private static final Random RAND = new Random(46474747);
private final Map<Integer, KeyPartition> existingKeys;
private final String[] partitionPaths;
@@ -145,18 +146,18 @@ public class HoodieTestDataGenerator {
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp, boolean isDeleteRecord) {
GenericRecord rec = new GenericData.Record(avroSchema);
GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("begin_lat", rand.nextDouble());
rec.put("begin_lon", rand.nextDouble());
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
GenericRecord fareRecord = new GenericData.Record(avroSchema.getField("fare").schema());
fareRecord.put("amount", rand.nextDouble() * 100);
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
fareRecord.put("amount", RAND.nextDouble() * 100);
fareRecord.put("currency", "USD");
rec.put("fare", fareRecord);
@@ -248,7 +249,7 @@ public class HoodieTestDataGenerator {
int currSize = getNumExistingKeys();
return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
KeyPartition kp = new KeyPartition();
kp.key = key;
@@ -277,7 +278,7 @@ public class HoodieTestDataGenerator {
List<HoodieRecord> inserts = new ArrayList<>();
int currSize = getNumExistingKeys();
for (int i = 0; i < limit; i++) {
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
HoodieRecord record = new HoodieRecord(key, generateAvroPayload(key, commitTime));
inserts.add(record);
@@ -367,7 +368,7 @@ public class HoodieTestDataGenerator {
public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws IOException {
List<HoodieRecord> updates = new ArrayList<>();
for (int i = 0; i < n; i++) {
KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1));
KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1));
HoodieRecord record = generateUpdateRecord(kp.key, commitTime);
updates.add(record);
}
@@ -410,7 +411,7 @@ public class HoodieTestDataGenerator {
}
return IntStream.range(0, n).boxed().map(i -> {
int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
KeyPartition kp = existingKeys.get(index);
// Find the available keyPartition starting from randomly chosen one.
while (used.contains(kp)) {
@@ -440,7 +441,7 @@ public class HoodieTestDataGenerator {
}
return IntStream.range(0, n).boxed().map(i -> {
int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
KeyPartition kp = existingKeys.get(index);
// Find the available keyPartition starting from randomly chosen one.
while (used.contains(kp)) {
@@ -469,7 +470,7 @@ public class HoodieTestDataGenerator {
}
return IntStream.range(0, n).boxed().map(i -> {
int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
KeyPartition kp = existingKeys.get(index);
// Find the available keyPartition starting from randomly chosen one.
while (used.contains(kp)) {

View File

@@ -84,7 +84,7 @@ public class TestBoundedInMemoryExecutor extends HoodieClientTestHarness {
SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try {
executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.avroSchema));
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
int result = executor.execute();
// It should buffer and write 100 records
Assert.assertEquals(result, 100);

View File

@@ -81,7 +81,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final int numRecords = 128;
final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(commitTime, numRecords);
final BoundedInMemoryQueue<HoodieRecord, HoodieInsertValueGenResult<HoodieRecord>> queue =
new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.avroSchema));
new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> resFuture = executorService.submit(() -> {
new IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue);
@@ -93,13 +93,13 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
while (queue.iterator().hasNext()) {
final HoodieRecord originalRecord = originalRecordIterator.next();
final Option<IndexedRecord> originalInsertValue =
originalRecord.getData().getInsertValue(HoodieTestDataGenerator.avroSchema);
originalRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
final HoodieInsertValueGenResult<HoodieRecord> payload = queue.iterator().next();
// Ensure that record ordering is guaranteed.
Assert.assertEquals(originalRecord, payload.record);
// cached insert value matches the expected insert value.
Assert.assertEquals(originalInsertValue,
payload.record.getData().getInsertValue(HoodieTestDataGenerator.avroSchema));
payload.record.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA));
recordsRead++;
}
Assert.assertFalse(queue.iterator().hasNext() || originalRecordIterator.hasNext());
@@ -120,7 +120,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final List<List<HoodieRecord>> recs = new ArrayList<>();
final BoundedInMemoryQueue<HoodieRecord, HoodieInsertValueGenResult<HoodieRecord>> queue =
new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.avroSchema));
new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Record Key to <Producer Index, Rec Index within a producer>
Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>();
@@ -214,11 +214,11 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final int recordLimit = 5;
final SizeEstimator<HoodieInsertValueGenResult<HoodieRecord>> sizeEstimator = new DefaultSizeEstimator<>();
HoodieInsertValueGenResult<HoodieRecord> payload =
getTransformFunction(HoodieTestDataGenerator.avroSchema).apply(hoodieRecords.get(0));
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
final long objSize = sizeEstimator.sizeEstimate(payload);
final long memoryLimitInBytes = recordLimit * objSize;
final BoundedInMemoryQueue<HoodieRecord, HoodieInsertValueGenResult<HoodieRecord>> queue =
new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.avroSchema));
new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
executorService.submit(() -> {
@@ -262,7 +262,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator = new DefaultSizeEstimator<>();
// queue memory limit
HoodieInsertValueGenResult<HoodieRecord> payload =
getTransformFunction(HoodieTestDataGenerator.avroSchema).apply(hoodieRecords.get(0));
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA).apply(hoodieRecords.get(0));
final long objSize = sizeEstimator.sizeEstimate(new Tuple2<>(payload.record, payload.insertValue));
final long memoryLimitInBytes = 4 * objSize;
@@ -270,7 +270,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
// stops and throws
// correct exception back.
BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue1 =
new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.avroSchema));
new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> resFuture = executorService.submit(() -> {
@@ -301,7 +301,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>> queue2 =
new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.avroSchema));
new BoundedInMemoryQueue(memoryLimitInBytes, getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
// Produce
Future<Boolean> res = executorService.submit(() -> {

View File

@@ -150,7 +150,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
// Write them to corresponding avro logfiles
HoodieTestUtils.writeRecordsToLogFiles(fs, metaClient.getBasePath(),
HoodieTestDataGenerator.avroSchemaWithMetadataFields, updatedRecords);
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
// Verify that all data file has one log file
metaClient = HoodieTableMetaClient.reload(metaClient);

View File

@@ -45,14 +45,14 @@ public class TestHoodieStorageWriterFactory extends TestHoodieClientBase {
final HoodieWriteConfig cfg = getConfig();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(commitTime,
parquetPath, table, cfg, HoodieTestDataGenerator.avroSchema);
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA);
Assert.assertTrue(parquetWriter instanceof HoodieParquetWriter);
// other file format exception.
final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
try {
HoodieStorageWriter<IndexedRecord> logWriter = HoodieStorageWriterFactory.getStorageWriter(commitTime, logPath,
table, cfg, HoodieTestDataGenerator.avroSchema);
table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA);
fail("should fail since log storage writer is not supported yet.");
} catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);

View File

@@ -764,7 +764,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Write them to corresponding avro logfiles
HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(),
HoodieTestDataGenerator.avroSchemaWithMetadataFields, updatedRecords);
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
// Verify that all data file has one log file
metaClient = HoodieTableMetaClient.reload(metaClient);

View File

@@ -166,7 +166,7 @@ public class TestHDFSParquetImporter implements Serializable {
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
for (GenericRecord record : records) {
writer.write(record);
}

View File

@@ -189,7 +189,7 @@ public class UtilitiesTestBase {
public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile)
.withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
for (GenericRecord record : records) {
writer.write(record);
}
@@ -205,7 +205,7 @@ public class UtilitiesTestBase {
public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
try {
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.AVRO_SCHEMA);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;

View File

@@ -130,7 +130,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
try {
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.AVRO_SCHEMA);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;