diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java index 90d15b6ca..c475c633f 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java @@ -83,7 +83,7 @@ public class TestCleansCommand extends CLIFunctionalTestHarness { String fileId1 = UUID.randomUUID().toString(); String fileId2 = UUID.randomUUID().toString(); FileSystem fs = FSUtils.getFs(basePath(), hadoopConf()); - HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath); + HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath); // Create four commits for (int i = 100; i < 104; i++) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index dd504476b..ba3b0be16 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -21,7 +21,6 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.SparkMemoryUtils; import org.apache.hudi.client.utils.SparkValidatorUtils; -import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroupId; @@ -55,13 +54,13 @@ import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.storage.StorageLevel; +import scala.Tuple2; import java.io.IOException; import java.io.Serializable; @@ -77,23 +76,20 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import scala.Tuple2; - import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans; public abstract class BaseSparkCommitActionExecutor extends BaseCommitActionExecutor>, JavaRDD, JavaRDD, HoodieWriteMetadata> { private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class); - protected Option keyGeneratorOpt = Option.empty(); + protected final Option keyGeneratorOpt; public BaseSparkCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) { - super(context, config, table, instantTime, operationType, Option.empty()); - initKeyGenIfNeeded(config.populateMetaFields()); + this(context, config, table, instantTime, operationType, Option.empty()); } public BaseSparkCommitActionExecutor(HoodieEngineContext context, @@ -103,16 +99,12 @@ public abstract class BaseSparkCommitActionExecutor> existingKeysBySchema; @@ -148,27 +152,58 @@ public class HoodieTestDataGenerator implements AutoCloseable { //maintains the count of existing keys schema wise private Map numKeysBySchema; + public HoodieTestDataGenerator(long seed) { + this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>()); + } + + public HoodieTestDataGenerator(long seed, String[] partitionPaths, Map keyPartitionMap) { + this.rand = new Random(seed); + this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); + this.existingKeysBySchema = new HashMap<>(); + this.existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap); + this.numKeysBySchema = new HashMap<>(); + this.numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size()); + + logger.info(String.format("Test DataGenerator's seed (%s)", seed)); + } + + ////////////////////////////////////////////////////////////////////////////////// + // DEPRECATED API + ////////////////////////////////////////////////////////////////////////////////// + + @Deprecated public HoodieTestDataGenerator(String[] partitionPaths) { this(partitionPaths, new HashMap<>()); } + @Deprecated public HoodieTestDataGenerator() { this(DEFAULT_PARTITION_PATHS); } + @Deprecated public HoodieTestDataGenerator(String[] partitionPaths, Map keyPartitionMap) { - this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); - this.existingKeysBySchema = new HashMap<>(); - existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap); - numKeysBySchema = new HashMap<>(); - numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size()); + // NOTE: This used as a workaround to make sure that new instantiations of the generator + // always return "new" random values. + // Caveat is that if 2 successive invocations are made w/in the timespan that is smaller + // than the resolution of {@code nanoTime}, then this will produce identical results + this(System.nanoTime(), partitionPaths, keyPartitionMap); } + /** + * @deprecated please use non-static version + */ + public static void writePartitionMetadataDeprecated(FileSystem fs, String[] partitionPaths, String basePath) { + new HoodieTestDataGenerator().writePartitionMetadata(fs, partitionPaths, basePath); + } + + ////////////////////////////////////////////////////////////////////////////////// + /** * @implNote {@link HoodieTestDataGenerator} is supposed to just generate records with schemas. Leave HoodieTable files (metafile, basefile, logfile, etc) to {@link HoodieTestTable}. * @deprecated Use {@link HoodieTestTable#withPartitionMetaFiles(java.lang.String...)} instead. */ - public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) { + public void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) { for (String partitionPath : partitionPaths) { new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath)).trySave(0); } @@ -198,7 +233,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { * @param instantTime Instant time to use. * @return Raw paylaod of a test record. */ - public static RawTripTestPayload generateRandomValue(HoodieKey key, String instantTime) throws IOException { + public RawTripTestPayload generateRandomValue(HoodieKey key, String instantTime) throws IOException { return generateRandomValue(key, instantTime, false); } @@ -212,12 +247,12 @@ public class HoodieTestDataGenerator implements AutoCloseable { * @return Raw paylaod of a test record. * @throws IOException */ - public static RawTripTestPayload generateRandomValue( + private RawTripTestPayload generateRandomValue( HoodieKey key, String instantTime, boolean isFlattened) throws IOException { return generateRandomValue(key, instantTime, isFlattened, 0); } - public static RawTripTestPayload generateRandomValue( + private RawTripTestPayload generateRandomValue( HoodieKey key, String instantTime, boolean isFlattened, int ts) throws IOException { GenericRecord rec = generateGenericRecord( key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, ts, @@ -241,7 +276,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { /** * Generates a new avro record of the above schema format for a delete. */ - public static RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException { + private RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException { GenericRecord rec = generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0, true, false); return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0L); @@ -250,17 +285,17 @@ public class HoodieTestDataGenerator implements AutoCloseable { /** * Generates a new avro record of the above schema format, retaining the key if optionally provided. */ - public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String instantTime) { + private HoodieAvroPayload generateAvroPayload(HoodieKey key, String instantTime) { GenericRecord rec = generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0); return new HoodieAvroPayload(Option.of(rec)); } - public static GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName, - long timestamp) { + public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName, + long timestamp) { return generateGenericRecord(rowKey, partitionPath, riderName, driverName, timestamp, false, false); } - public static GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName, + public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName, long timestamp, boolean isDeleteRecord, boolean isFlattened) { GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA); @@ -269,25 +304,25 @@ public class HoodieTestDataGenerator implements AutoCloseable { rec.put("partition_path", partitionPath); rec.put("rider", riderName); rec.put("driver", driverName); - rec.put("begin_lat", RAND.nextDouble()); - rec.put("begin_lon", RAND.nextDouble()); - rec.put("end_lat", RAND.nextDouble()); - rec.put("end_lon", RAND.nextDouble()); + rec.put("begin_lat", rand.nextDouble()); + rec.put("begin_lon", rand.nextDouble()); + rec.put("end_lat", rand.nextDouble()); + rec.put("end_lon", rand.nextDouble()); if (isFlattened) { - rec.put("fare", RAND.nextDouble() * 100); + rec.put("fare", rand.nextDouble() * 100); rec.put("currency", "USD"); } else { - rec.put("distance_in_meters", RAND.nextInt()); - rec.put("seconds_since_epoch", RAND.nextLong()); - rec.put("weight", RAND.nextFloat()); + rec.put("distance_in_meters", rand.nextInt()); + rec.put("seconds_since_epoch", rand.nextLong()); + rec.put("weight", rand.nextFloat()); byte[] bytes = "Canada".getBytes(); rec.put("nation", ByteBuffer.wrap(bytes)); - long currentTimeMillis = System.currentTimeMillis(); - Date date = new Date(currentTimeMillis); - rec.put("current_date", (int) date.toLocalDate().toEpochDay()); - rec.put("current_ts", currentTimeMillis); + long randomMillis = genRandomTimeMillis(rand); + Instant instant = Instant.ofEpochMilli(randomMillis); + rec.put("current_date", (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay()); + rec.put("current_ts", randomMillis); - BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat())); + BigDecimal bigDecimal = new BigDecimal(String.format("%5f", rand.nextFloat())); Schema decimalSchema = AVRO_SCHEMA.getField("height").schema(); Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion(); GenericFixed genericFixed = decimalConversions.toFixed(bigDecimal, decimalSchema, LogicalTypes.decimal(10, 6)); @@ -296,14 +331,14 @@ public class HoodieTestDataGenerator implements AutoCloseable { rec.put("city_to_state", Collections.singletonMap("LA", "CA")); GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); - fareRecord.put("amount", RAND.nextDouble() * 100); + fareRecord.put("amount", rand.nextDouble() * 100); fareRecord.put("currency", "USD"); rec.put("fare", fareRecord); GenericArray tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema()); Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType(); GenericRecord tipRecord = new GenericData.Record(tipSchema); - tipRecord.put("amount", RAND.nextDouble() * 100); + tipRecord.put("amount", rand.nextDouble() * 100); tipRecord.put("currency", "USD"); tipHistoryArray.add(tipRecord); rec.put("tip_history", tipHistoryArray); @@ -326,7 +361,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { rec.put("timestamp", timestamp); rec.put("rider", riderName); rec.put("driver", driverName); - rec.put("fare", RAND.nextDouble() * 100); + rec.put("fare", rand.nextDouble() * 100); rec.put("_hoodie_is_deleted", false); return rec; } @@ -337,7 +372,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { rec.put("timestamp", timestamp); rec.put("rider", riderName); rec.put("driver", driverName); - rec.put("fare", RAND.nextDouble() * 100); + rec.put("fare", rand.nextDouble() * 100); rec.put("_hoodie_is_deleted", false); return rec; } @@ -347,7 +382,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { createCommitFile(basePath, instantTime, configuration, commitMetadata); } - public static void createCommitFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) { + private static void createCommitFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) { Arrays.asList(HoodieTimeline.makeCommitFileName(instantTime), HoodieTimeline.makeInflightCommitFileName(instantTime), HoodieTimeline.makeRequestedCommitFileName(instantTime)) .forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata)); @@ -383,13 +418,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { } } - public static void createReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) { - Arrays.asList(HoodieTimeline.makeReplaceFileName(instantTime), HoodieTimeline.makeInflightReplaceFileName(instantTime), - HoodieTimeline.makeRequestedReplaceFileName(instantTime)) - .forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata)); - } - - public static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) { + private static void createPendingReplaceFile(String basePath, String instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) { Arrays.asList(HoodieTimeline.makeInflightReplaceFileName(instantTime), HoodieTimeline.makeRequestedReplaceFileName(instantTime)) .forEach(f -> createMetadataFile(f, basePath, configuration, commitMetadata)); @@ -407,13 +436,6 @@ public class HoodieTestDataGenerator implements AutoCloseable { createEmptyFile(basePath, commitFile, configuration); } - public static void createCompactionRequestedFile(String basePath, String instantTime, Configuration configuration) - throws IOException { - Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" - + HoodieTimeline.makeRequestedCompactionFileName(instantTime)); - createEmptyFile(basePath, commitFile, configuration); - } - private static void createEmptyFile(String basePath, Path filePath, Configuration configuration) throws IOException { FileSystem fs = FSUtils.getFs(basePath, configuration); FSDataOutputStream os = fs.create(filePath, true); @@ -484,13 +506,13 @@ public class HoodieTestDataGenerator implements AutoCloseable { } public List generateInsertsForPartition(String instantTime, Integer n, String partition) { - return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, false, () -> partition, () -> UUID.randomUUID().toString()).collect(Collectors.toList()); + return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, false, () -> partition, () -> genPseudoRandomUUID(rand).toString()).collect(Collectors.toList()); } public Stream generateInsertsStream(String commitTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) { return generateInsertsStream(commitTime, n, isFlattened, schemaStr, containsAllPartitions, - () -> partitionPaths[RAND.nextInt(partitionPaths.length)], - () -> UUID.randomUUID().toString()); + () -> partitionPaths[rand.nextInt(partitionPaths.length)], + () -> genPseudoRandomUUID(rand).toString()); } /** @@ -552,8 +574,8 @@ public class HoodieTestDataGenerator implements AutoCloseable { List inserts = new ArrayList<>(); int currSize = getNumExistingKeys(TRIP_EXAMPLE_SCHEMA); for (int i = 0; i < limit; i++) { - String partitionPath = partitionPaths[RAND.nextInt(partitionPaths.length)]; - HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)]; + HoodieKey key = new HoodieKey(genPseudoRandomUUID(rand).toString(), partitionPath); HoodieRecord record = new HoodieAvroRecord(key, generateAvroPayload(key, instantTime)); inserts.add(record); @@ -654,7 +676,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { for (int i = 0; i < n; i++) { Map existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); - KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1)); + KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1)); HoodieRecord record = generateUpdateRecord(kp.key, instantTime); updates.add(record); } @@ -726,7 +748,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { } return IntStream.range(0, n).boxed().map(i -> { - int index = numExistingKeys == 1 ? 0 : RAND.nextInt(numExistingKeys - 1); + int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1); KeyPartition kp = existingKeys.get(index); // Find the available keyPartition starting from randomly chosen one. while (used.contains(kp)) { @@ -759,7 +781,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { List result = new ArrayList<>(); for (int i = 0; i < n; i++) { - int index = RAND.nextInt(numExistingKeys); + int index = rand.nextInt(numExistingKeys); while (!existingKeys.containsKey(index)) { index = (index + 1) % numExistingKeys; } @@ -791,7 +813,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { List result = new ArrayList<>(); for (int i = 0; i < n; i++) { - int index = RAND.nextInt(numExistingKeys); + int index = rand.nextInt(numExistingKeys); while (!existingKeys.containsKey(index)) { index = (index + 1) % numExistingKeys; } @@ -841,8 +863,8 @@ public class HoodieTestDataGenerator implements AutoCloseable { public List generateGenericRecords(int numRecords) { List list = new ArrayList<>(); IntStream.range(0, numRecords).forEach(i -> { - list.add(generateGenericRecord(UUID.randomUUID().toString(), "0", UUID.randomUUID().toString(), UUID.randomUUID() - .toString(), RAND.nextLong())); + list.add(generateGenericRecord(genPseudoRandomUUID(rand).toString(), "0", + genPseudoRandomUUID(rand).toString(), genPseudoRandomUUID(rand).toString(), rand.nextLong())); }); return list; } @@ -865,4 +887,31 @@ public class HoodieTestDataGenerator implements AutoCloseable { public void close() { existingKeysBySchema.clear(); } + + private static long genRandomTimeMillis(Random r) { + // Fri Feb 13 15:31:30 PST 2009 + long anchorTs = 1234567890L; + // NOTE: To provide for certainty and not generate overly random dates, we will limit + // dispersion to be w/in +/- 3 days from the anchor date + return anchorTs + r.nextLong() % 259200000L; + } + + private static UUID genPseudoRandomUUID(Random r) { + byte[] bytes = new byte[16]; + r.nextBytes(bytes); + + bytes[6] &= 0x0f; + bytes[6] |= 0x40; + bytes[8] &= 0x3f; + bytes[8] |= 0x80; + + try { + Constructor ctor = UUID.class.getDeclaredConstructor(byte[].class); + ctor.setAccessible(true); + return ctor.newInstance((Object) bytes); + } catch (InvocationTargetException | InstantiationException | IllegalAccessException | NoSuchMethodException e) { + logger.info("Failed to generate pseudo-random UUID!"); + throw new HoodieException(e); + } + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 89304d3d0..6b6ddc38e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -257,7 +257,8 @@ object HoodieSparkSqlWriter { DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean) .asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(processedRecord, - orderingVal, keyGenerator.getKey(gr), + orderingVal, + keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME)) } else { DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index 0d7b7ebbc..d2257f58d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -104,7 +104,6 @@ import java.util.stream.StreamSupport; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; -import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord; import static org.apache.spark.sql.functions.callUDF; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -564,8 +563,7 @@ public class TestBootstrap extends HoodieClientTestBase { final List records = new ArrayList<>(); IntStream.range(from, to).forEach(i -> { String id = "" + i; - records.add(generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, - timestamp, false, false).toString()); + records.add(new HoodieTestDataGenerator().generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, timestamp, false, false).toString()); }); if (isPartitioned) { sqlContext.udf().register("partgen", diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index 66bf0be0e..9146cdc4e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -98,7 +98,6 @@ import java.util.stream.StreamSupport; import static java.util.stream.Collectors.mapping; import static java.util.stream.Collectors.toList; -import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord; import static org.apache.spark.sql.functions.callUDF; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -476,8 +475,7 @@ public class TestOrcBootstrap extends HoodieClientTestBase { final List records = new ArrayList<>(); IntStream.range(from, to).forEach(i -> { String id = "" + i; - records.add(generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, - timestamp, false, false).toString()); + records.add(new HoodieTestDataGenerator().generateGenericRecord("trip_" + id, Long.toString(timestamp), "rider_" + id, "driver_" + id, timestamp, false, false).toString()); }); if (isPartitioned) { sqlContext.udf().register("partgen", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index c5c659a45..e7daf08d1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -30,10 +30,8 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.keygen.{ComplexKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} - import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} - import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Tag import org.junit.jupiter.params.ParameterizedTest @@ -60,9 +58,14 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { val updatedVerificationVal: String = "driver_update" @ParameterizedTest - @CsvSource(Array("true,org.apache.hudi.keygen.SimpleKeyGenerator", "true,org.apache.hudi.keygen.ComplexKeyGenerator", - "true,org.apache.hudi.keygen.TimestampBasedKeyGenerator", "false,org.apache.hudi.keygen.SimpleKeyGenerator", - "false,org.apache.hudi.keygen.ComplexKeyGenerator", "false,org.apache.hudi.keygen.TimestampBasedKeyGenerator")) + @CsvSource(Array( + "true,org.apache.hudi.keygen.SimpleKeyGenerator", + "true,org.apache.hudi.keygen.ComplexKeyGenerator", + "true,org.apache.hudi.keygen.TimestampBasedKeyGenerator", + "false,org.apache.hudi.keygen.SimpleKeyGenerator", + "false,org.apache.hudi.keygen.ComplexKeyGenerator", + "false,org.apache.hudi.keygen.TimestampBasedKeyGenerator" + )) def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String): Unit = { commonOpts += DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass if (classOf[ComplexKeyGenerator].getName.equals(keyGenClass)) { @@ -74,7 +77,7 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { commonOpts += Config.TIMESTAMP_TYPE_FIELD_PROP -> "EPOCHMILLISECONDS" commonOpts += Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyyMMdd" } - val dataGen = new HoodieTestDataGenerator() + val dataGen = new HoodieTestDataGenerator(0xDEED) val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) // Insert Operation val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList @@ -101,9 +104,13 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { var updateDf: DataFrame = null if (classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)) { // update current_ts to be same as original record so that partition path does not change with timestamp based key gen - val originalRow = inputDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0) - updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) - .withColumn("current_ts", lit(originalRow.getAs("current_ts"))) + val originalRow = snapshotDF1.filter(col("_row_key") === verificationRowKey).collectAsList().get(0) + updateDf = inputDF1.filter(col("_row_key") === verificationRowKey) + .withColumn(verificationCol, lit(updatedVerificationVal)) + .withColumn("current_ts", lit(originalRow.getAs[Long]("current_ts"))) + .limit(1) + val updatedRow = updateDf.collectAsList().get(0) + assertEquals(originalRow.getAs[Long]("current_ts"), updatedRow.getAs[Long]("current_ts")); } else { updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java index 3ac490bf9..a57be6246 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java @@ -233,8 +233,7 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000; List records = new ArrayList(); for (long recordNum = 0; recordNum < 96; recordNum++) { - records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum, - "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); + records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum, "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); } try (ParquetWriter writer = AvroParquetWriter.builder(srcFile) .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) { @@ -251,12 +250,12 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se List records = new ArrayList(); // 10 for update for (long recordNum = 0; recordNum < 11; recordNum++) { - records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, + records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); } // 4 for insert for (long recordNum = 96; recordNum < 100; recordNum++) { - records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, + records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); } try (ParquetWriter writer = AvroParquetWriter.builder(srcFile) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java index f192ede73..dd25e7f8b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotCopier.java @@ -96,7 +96,7 @@ public class TestHoodieSnapshotCopier extends FunctionalTestHarness { new File(basePath + "/2016/05/01/").mkdirs(); new File(basePath + "/2016/05/02/").mkdirs(); new File(basePath + "/2016/05/06/").mkdirs(); - HoodieTestDataGenerator.writePartitionMetadata(fs, new String[] {"2016/05/01", "2016/05/02", "2016/05/06"}, + HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, new String[] {"2016/05/01", "2016/05/02", "2016/05/06"}, basePath); // Make commit1 File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));