1
0

[HUDI-3469] Refactor HoodieTestDataGenerator to provide for reproducible Builds (#4866)

This commit is contained in:
Alexey Kudinkin
2022-03-01 22:15:26 -08:00
committed by GitHub
parent 3b2da9f138
commit 85f47b53df
13 changed files with 159 additions and 115 deletions

View File

@@ -83,7 +83,7 @@ public class TestCleansCommand extends CLIFunctionalTestHarness {
String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
FileSystem fs = FSUtils.getFs(basePath(), hadoopConf());
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath);
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath);
// Create four commits
for (int i = 100; i < 104; i++) {

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -55,13 +54,13 @@ import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
@@ -77,23 +76,20 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import scala.Tuple2;
import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans;
public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> extends
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata> {
private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class);
protected Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
protected final Option<BaseKeyGenerator> keyGeneratorOpt;
public BaseSparkCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
WriteOperationType operationType) {
super(context, config, table, instantTime, operationType, Option.empty());
initKeyGenIfNeeded(config.populateMetaFields());
this(context, config, table, instantTime, operationType, Option.empty());
}
public BaseSparkCommitActionExecutor(HoodieEngineContext context,
@@ -103,18 +99,14 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
WriteOperationType operationType,
Option extraMetadata) {
super(context, config, table, instantTime, operationType, extraMetadata);
initKeyGenIfNeeded(config.populateMetaFields());
}
private void initKeyGenIfNeeded(boolean populateMetaFields) {
if (!populateMetaFields) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
keyGeneratorOpt = config.populateMetaFields()
? Option.empty()
: Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(this.config.getProps()));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
}
}
}
private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering");

View File

@@ -75,7 +75,7 @@ public class TestClientRollback extends HoodieClientTestBase {
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
/**
* Write 1 (only inserts)

View File

@@ -53,7 +53,7 @@ public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
//just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
//1. prepare data
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
/**
* Write 1 (only inserts)
@@ -107,7 +107,7 @@ public class HoodieClientRollbackTestBase extends HoodieClientTestBase {
boolean commitSecondInsertOverwrite) throws IOException {
//just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
/**
* Write 1 (upsert)

View File

@@ -178,7 +178,7 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()).withRollbackUsingMarkers(false).withAutoCommit(false).build();
//1. prepare data
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath);
new HoodieTestDataGenerator().writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
// Write 1 (only inserts)
String newCommitTime = "001";

View File

@@ -534,7 +534,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
//just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
//1. prepare data
HoodieTestDataGenerator.writePartitionMetadata(metaClient.getFs(), new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
HoodieTestDataGenerator.writePartitionMetadataDeprecated(metaClient.getFs(), new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH}, basePath);
/**
* Write 1 (only inserts)
*/

View File

@@ -19,6 +19,17 @@
package org.apache.hudi.common.testutils;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils;
@@ -34,29 +45,22 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -140,7 +144,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
public static final TypeDescription ORC_TRIP_SCHEMA = AvroOrcUtils.createOrcSchema(new Schema.Parser().parse(TRIP_SCHEMA));
public static final Schema FLATTENED_AVRO_SCHEMA = new Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
private static final Random RAND = new Random(46474747);
private final Random rand;
//Maintains all the existing keys schema wise
private final Map<String, Map<Integer, KeyPartition>> existingKeysBySchema;
@@ -148,27 +152,58 @@ public class HoodieTestDataGenerator implements AutoCloseable {
//maintains the count of existing keys schema wise
private Map<String, Integer> numKeysBySchema;
public HoodieTestDataGenerator(long seed) {
this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>());
}
public HoodieTestDataGenerator(long seed, String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
this.rand = new Random(seed);
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeysBySchema = new HashMap<>();
this.existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
this.numKeysBySchema = new HashMap<>();
this.numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size());
logger.info(String.format("Test DataGenerator's seed (%s)", seed));
}
//////////////////////////////////////////////////////////////////////////////////
// DEPRECATED API
//////////////////////////////////////////////////////////////////////////////////
@Deprecated
public HoodieTestDataGenerator(String[] partitionPaths) {
this(partitionPaths, new HashMap<>());
}
@Deprecated
public HoodieTestDataGenerator() {
this(DEFAULT_PARTITION_PATHS);
}
@Deprecated
public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeysBySchema = new HashMap<>();
existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
numKeysBySchema = new HashMap<>();
numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size());
// NOTE: This used as a workaround to make sure that new instantiations of the generator
// always return "new" random values.
// Caveat is that if 2 successive invocations are made w/in the timespan that is smaller
// than the resolution of {@code nanoTime}, then this will produce identical results
this(System.nanoTime(), partitionPaths, keyPartitionMap);
}
/**
* @deprecated please use non-static version
*/
public static void writePartitionMetadataDeprecated(FileSystem fs, String[] partitionPaths, String basePath) {
new HoodieTestDataGenerator().writePartitionMetadata(fs, partitionPaths, basePath);
}
//////////////////////////////////////////////////////////////////////////////////
/**
* @implNote {@link HoodieTestDataGenerator} is supposed to just generate records with schemas. Leave HoodieTable files (metafile, basefile, logfile, etc) to {@link HoodieTestTable}.
* @deprecated Use {@link HoodieTestTable#withPartitionMetaFiles(java.lang.String...)} instead.
*/
public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) {
public void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) {
for (String partitionPath : partitionPaths) {
new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath)).trySave(0);
}
@@ -198,7 +233,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
* @param instantTime Instant time to use.
* @return Raw paylaod of a test record.
*/
public static RawTripTestPayload generateRandomValue(HoodieKey key, String instantTime) throws IOException {
public RawTripTestPayload generateRandomValue(HoodieKey key, String instantTime) throws IOException {
return generateRandomValue(key, instantTime, false);
}
@@ -212,12 +247,12 @@ public class HoodieTestDataGenerator implements AutoCloseable {
* @return Raw paylaod of a test record.
* @throws IOException
*/
public static RawTripTestPayload generateRandomValue(
private RawTripTestPayload generateRandomValue(
HoodieKey key, String instantTime, boolean isFlattened) throws IOException {
return generateRandomValue(key, instantTime, isFlattened, 0);
}
public static RawTripTestPayload generateRandomValue(
private RawTripTestPayload generateRandomValue(
HoodieKey key, String instantTime, boolean isFlattened, int ts) throws IOException {
GenericRecord rec = generateGenericRecord(
key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, ts,
@@ -241,7 +276,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
/**
* Generates a new avro record of the above schema format for a delete.
*/
public static RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException {
private RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0,
true, false);
return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0L);
@@ -250,17 +285,17 @@ public class HoodieTestDataGenerator implements AutoCloseable {
/**
* Generates a new avro record of the above schema format, retaining the key if optionally provided.
*/
public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String instantTime) {
private HoodieAvroPayload generateAvroPayload(HoodieKey key, String instantTime) {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0);
return new HoodieAvroPayload(Option.of(rec));
}
public static GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName,
public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName,
long timestamp) {
return generateGenericRecord(rowKey, partitionPath, riderName, driverName, timestamp, false, false);
}
public static GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName,
public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName,
long timestamp, boolean isDeleteRecord,
boolean isFlattened) {
GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
@@ -269,25 +304,25 @@ public class HoodieTestDataGenerator implements AutoCloseable {
rec.put("partition_path", partitionPath);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("begin_lat", rand.nextDouble());
rec.put("begin_lon", rand.nextDouble());
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
if (isFlattened) {
rec.put("fare", RAND.nextDouble() * 100);
rec.put("fare", rand.nextDouble() * 100);
rec.put("currency", "USD");
} else {
rec.put("distance_in_meters", RAND.nextInt());
rec.put("seconds_since_epoch", RAND.nextLong());
rec.put("weight", RAND.nextFloat());
rec.put("distance_in_meters", rand.nextInt());
rec.put("seconds_since_epoch", rand.nextLong());
rec.put("weight", rand.nextFloat());
byte[] bytes = "Canada".getBytes();
rec.put("nation", ByteBuffer.wrap(bytes));
long currentTimeMillis = System.currentTimeMillis();
Date date = new Date(currentTimeMillis);
rec.put("current_date", (int) date.toLocalDate().toEpochDay());
rec.put("current_ts", currentTimeMillis);
long randomMillis = genRandomTimeMillis(rand);
Instant instant = Instant.ofEpochMilli(randomMillis);
rec.put("current_date", (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay());
rec.put("current_ts", randomMillis);
BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat()));
BigDecimal bigDecimal = new BigDecimal(String.format("%5f", rand.nextFloat()));
Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6));
@@ -296,14 +331,14 @@ public class HoodieTestDataGenerator implements AutoCloseable {
rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
fareRecord.put("amount", RAND.nextDouble() * 100);
fareRecord.put("amount", rand.nextDouble() * 100);
fareRecord.put("currency", "USD");
rec.put("fare", fareRecord);
GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema());
Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
GenericRecord tipRecord = new GenericData.Record(tipSchema);
tipRecord.put("amount", RAND.nextDouble() * 100);
tipRecord.put("amount", rand.nextDouble() * 100);
tipRecord.put("currency", "USD");
tipHistoryArray.add(tipRecord);
rec.put("tip_history", tipHistoryArray);
@@ -326,7 +361,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
rec.put("timestamp", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("fare", RAND.nextDouble() * 100);
rec.put("fare", rand.nextDouble() * 100);
rec.put("_hoodie_is_deleted", false);
return rec;
}
@@ -337,7 +372,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
rec.put("timestamp", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("fare", RAND.nextDouble() * 100);
rec.put("fare", rand.nextDouble() * 100);
rec.put("_hoodie_is_deleted", false);
return rec;
}
@@ -347,7 +382,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
createCommitFile(basePath, instantTime, configuration, commitMetadata);
}
public static void createCommitFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
private static void createCommitFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
Arrays.asList(HoodieTimeline.makeCommitFileName(instantTime), HoodieTimeline.makeInflightCommitFileName(instantTime),
HoodieTimeline.makeRequestedCommitFileName(instantTime))
.forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata));
@@ -383,13 +418,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
}
}
public static void createReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
Arrays.asList(HoodieTimeline.makeReplaceFileName(instantTime), HoodieTimeline.makeInflightReplaceFileName(instantTime),
HoodieTimeline.makeRequestedReplaceFileName(instantTime))
.forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata));
}
public static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
private static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
Arrays.asList(HoodieTimeline.makeInflightReplaceFileName(instantTime),
HoodieTimeline.makeRequestedReplaceFileName(instantTime))
.forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata));
@@ -407,13 +436,6 @@ public class HoodieTestDataGenerator implements AutoCloseable {
createEmptyFile(basePath, commitFile, configuration);
}
public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration)
throws IOException {
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeRequestedCompactionFileName(instantTime));
createEmptyFile(basePath, commitFile, configuration);
}
private static void createEmptyFile(String basePath, Path filePath, Configuration configuration) throws IOException {
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(filePath, true);
@@ -484,13 +506,13 @@ public class HoodieTestDataGenerator implements AutoCloseable {
}
public List<HoodieRecord> generateInsertsForPartition(String instantTime, Integer n, String partition) {
return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, false, () -> partition, () -> UUID.randomUUID().toString()).collect(Collectors.toList());
return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, false, () -> partition, () -> genPseudoRandomUUID(rand).toString()).collect(Collectors.toList());
}
public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
return generateInsertsStream(commitTime, n, isFlattened, schemaStr, containsAllPartitions,
() -> partitionPaths[RAND.nextInt(partitionPaths.length)],
() -> UUID.randomUUID().toString());
() -> partitionPaths[rand.nextInt(partitionPaths.length)],
() -> genPseudoRandomUUID(rand).toString());
}
/**
@@ -552,8 +574,8 @@ public class HoodieTestDataGenerator implements AutoCloseable {
List<HoodieRecord> inserts = new ArrayList<>();
int currSize = getNumExistingKeys(TRIP_EXAMPLE_SCHEMA);
for (int i = 0; i < limit; i++) {
String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(genPseudoRandomUUID(rand).toString(), partitionPath);
HoodieRecord record = new HoodieAvroRecord(key, generateAvroPayload(key, instantTime));
inserts.add(record);
@@ -654,7 +676,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
for (int i = 0; i < n; i++) {
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1));
KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1));
HoodieRecord record = generateUpdateRecord(kp.key, instantTime);
updates.add(record);
}
@@ -726,7 +748,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
}
return IntStream.range(0, n).boxed().map(i -> {
int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1);
int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
KeyPartition kp = existingKeys.get(index);
// Find the available keyPartition starting from randomly chosen one.
while (used.contains(kp)) {
@@ -759,7 +781,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
List<HoodieKey> result = new ArrayList<>();
for (int i = 0; i < n; i++) {
int index = RAND.nextInt(numExistingKeys);
int index = rand.nextInt(numExistingKeys);
while (!existingKeys.containsKey(index)) {
index = (index + 1) % numExistingKeys;
}
@@ -791,7 +813,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
List<HoodieRecord> result = new ArrayList<>();
for (int i = 0; i < n; i++) {
int index = RAND.nextInt(numExistingKeys);
int index = rand.nextInt(numExistingKeys);
while (!existingKeys.containsKey(index)) {
index = (index + 1) % numExistingKeys;
}
@@ -841,8 +863,8 @@ public class HoodieTestDataGenerator implements AutoCloseable {
public List<GenericRecord> generateGenericRecords(int numRecords) {
List<GenericRecord> list = new ArrayList<>();
IntStream.range(0, numRecords).forEach(i -> {
list.add(generateGenericRecord(UUID.randomUUID().toString(), "0", UUID.randomUUID().toString(), UUID.randomUUID()
.toString(), RAND.nextLong()));
list.add(generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0",
genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong()));
});
return list;
}
@@ -865,4 +887,31 @@ public class HoodieTestDataGenerator implements AutoCloseable {
public void close() {
existingKeysBySchema.clear();
}
private static long genRandomTimeMillis(Random r) {
// Fri Feb 13 15:31:30 PST 2009
long anchorTs = 1234567890L;
// NOTE: To provide for certainty and not generate overly random dates, we will limit
// dispersion to be w/in +/- 3 days from the anchor date
return anchorTs + r.nextLong() % 259200000L;
}
private static UUID genPseudoRandomUUID(Random r) {
byte[] bytes = new byte[16];
r.nextBytes(bytes);
bytes[6] &= 0x0f;
bytes[6] |= 0x40;
bytes[8] &= 0x3f;
bytes[8] |= 0x80;
try {
Constructor<UUID> ctor = UUID.class.getDeclaredConstructor(byte[].class);
ctor.setAccessible(true);
return ctor.newInstance((Object) bytes);
} catch (InvocationTargetException | InstantiationException | IllegalAccessException | NoSuchMethodException e) {
logger.info("Failed to generate pseudo-random UUID!");
throw new HoodieException(e);
}
}
}

View File

@@ -257,7 +257,8 @@ object HoodieSparkSqlWriter {
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(processedRecord,
orderingVal, keyGenerator.getKey(gr),
orderingVal,
keyGenerator.getKey(gr),
hoodieConfig.getString(PAYLOAD_CLASS_NAME))
} else {
DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))

View File

@@ -104,7 +104,6 @@ import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord;
import static org.apache.spark.sql.functions.callUDF;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -564,8 +563,7 @@ public class TestBootstrap extends HoodieClientTestBase {
final List<String> records = new ArrayList<>();
IntStream.range(from, to).forEach(i -> {
String id = "" + i;
records.add(generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id,
timestamp, false, false).toString());
records.add(new HoodieTestDataGenerator().generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, timestamp, false, false).toString());
});
if (isPartitioned) {
sqlContext.udf().register("partgen",

View File

@@ -98,7 +98,6 @@ import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord;
import static org.apache.spark.sql.functions.callUDF;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -476,8 +475,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase {
final List<String> records = new ArrayList<>();
IntStream.range(from, to).forEach(i -> {
String id = "" + i;
records.add(generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id,
timestamp, false, false).toString());
records.add(new HoodieTestDataGenerator().generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, timestamp, false, false).toString());
});
if (isPartitioned) {
sqlContext.udf().register("partgen",

View File

@@ -30,10 +30,8 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
@@ -60,9 +58,14 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
val updatedVerificationVal: String = "driver_update"
@ParameterizedTest
@CsvSource(Array("true,org.apache.hudi.keygen.SimpleKeyGenerator", "true,org.apache.hudi.keygen.ComplexKeyGenerator",
"true,org.apache.hudi.keygen.TimestampBasedKeyGenerator", "false,org.apache.hudi.keygen.SimpleKeyGenerator",
"false,org.apache.hudi.keygen.ComplexKeyGenerator", "false,org.apache.hudi.keygen.TimestampBasedKeyGenerator"))
@CsvSource(Array(
"true,org.apache.hudi.keygen.SimpleKeyGenerator",
"true,org.apache.hudi.keygen.ComplexKeyGenerator",
"true,org.apache.hudi.keygen.TimestampBasedKeyGenerator",
"false,org.apache.hudi.keygen.SimpleKeyGenerator",
"false,org.apache.hudi.keygen.ComplexKeyGenerator",
"false,org.apache.hudi.keygen.TimestampBasedKeyGenerator"
))
def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String): Unit = {
commonOpts += DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass
if (classOf[ComplexKeyGenerator].getName.equals(keyGenClass)) {
@@ -74,7 +77,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
commonOpts += Config.TIMESTAMP_TYPE_FIELD_PROP -> "EPOCHMILLISECONDS"
commonOpts += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd"
}
val dataGen = new HoodieTestDataGenerator()
val dataGen = new HoodieTestDataGenerator(0xDEED)
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
// Insert Operation
val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
@@ -101,9 +104,13 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
var updateDf: DataFrame = null
if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) {
// update current_ts to be same as original record so that partition path does not change with timestamp based key gen
val originalRow = inputDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0)
updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
.withColumn("current_ts", lit(originalRow.getAs("current_ts")))
val originalRow = snapshotDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0)
updateDf = inputDF1.filter(col("_row_key") === verificationRowKey)
.withColumn(verificationCol, lit(updatedVerificationVal))
.withColumn("current_ts", lit(originalRow.getAs[Long]("current_ts")))
.limit(1)
val updatedRow = updateDf.collectAsList().get(0)
assertEquals(originalRow.getAs[Long]("current_ts"), updatedRow.getAs[Long]("current_ts"));
} else {
updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
}

View File

@@ -233,8 +233,7 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se
long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
for (long recordNum = 0; recordNum < 96; recordNum++) {
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum,
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum, "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
@@ -251,12 +250,12 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se
List<GenericRecord> records = new ArrayList<GenericRecord>();
// 10 for update
for (long recordNum = 0; recordNum < 11; recordNum++) {
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum,
records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum,
"driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
// 4 for insert
for (long recordNum = 96; recordNum < 100; recordNum++) {
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum,
records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum,
"driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)

View File

@@ -96,7 +96,7 @@ public class TestHoodieSnapshotCopier extends FunctionalTestHarness {
new File(basePath + "/2016/05/01/").mkdirs();
new File(basePath + "/2016/05/02/").mkdirs();
new File(basePath + "/2016/05/06/").mkdirs();
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[] {"2016/05/01", "2016/05/02", "2016/05/06"},
basePath);
// Make commit1
File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));