From 6310a2307abba94c7ff8a770f45462deae2c312e Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Thu, 29 Oct 2020 06:50:37 -0700 Subject: [PATCH] [HUDI-1351] Improvements to the hudi test suite for scalability and repeated testing. (#2197) 1. Added the --clean-input and --clean-output parameters to clean the input and output directories before starting the job 2. Added the --delete-old-input parameter to deleted older batches for data already ingested. This helps keep number of redundant files low. 3. Added the --input-parallelism parameter to restrict the parallelism when generating input data. This helps keeping the number of generated input files low. 4. Added an option start_offset to Dag Nodes. Without ability to specify start offsets, data is generated into existing partitions. With start offset, DAG can control on which partition, the data is to be written. 5. Fixed generation of records for correct number of partitions - In the existing implementation, the partition is chosen as a random long. This does not guarantee exact number of requested partitions to be created. 6. Changed variable blacklistedFields to be a Set as that is faster than List for membership checks. 7. Fixed integer division for Math.ceil. If two integers are divided, the result is not double unless one of the integer is casted to double. --- .../integ/testsuite/HoodieTestSuiteJob.java | 29 ++++++++ .../configuration/DFSDeltaConfig.java | 17 ++++- .../testsuite/configuration/DeltaConfig.java | 12 +++- .../integ/testsuite/dag/WriterContext.java | 3 +- .../testsuite/generator/DeltaGenerator.java | 48 ++++++++++--- ...lexibleSchemaRecordGenerationIterator.java | 15 ++-- .../GenericRecordFullPayloadGenerator.java | 68 +++++++++++++++---- .../generator/UpdateGeneratorIterator.java | 8 ++- .../reader/DFSHoodieDatasetInputReader.java | 26 ++++--- .../TestDFSHoodieTestSuiteWriterAdapter.java | 2 +- .../TestGenericRecordPayloadGenerator.java | 5 +- 11 files changed, 187 insertions(+), 46 deletions(-) diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index c2c242ab1..7b3324e4b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -96,6 +96,20 @@ public class HoodieTestSuiteJob { HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath, HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived"); } + + if (cfg.cleanInput) { + Path inputPath = new Path(cfg.inputBasePath); + if (fs.exists(inputPath)) { + fs.delete(inputPath, true); + } + } + + if (cfg.cleanOutput) { + Path outputPath = new Path(cfg.targetBasePath); + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + } } private static HiveConf getDefaultHiveConf(Configuration cfg) { @@ -175,9 +189,24 @@ public class HoodieTestSuiteJob { required = true) public Long limitFileSize = 1024 * 1024 * 120L; + @Parameter(names = {"--input-parallelism"}, description = "Parallelism to use when generation input files", + required = false) + public Integer inputParallelism = 0; + + @Parameter(names = {"--delete-old-input"}, description = "Delete older input files once they have been ingested", + required = false) + public Boolean deleteOldInput = false; + @Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to " + "perform ingestion. If set to false, HoodieWriteClient will be used") public Boolean useDeltaStreamer = false; + @Parameter(names = {"--clean-input"}, description = "Clean the input folders and delete all files within it " + + "before starting the Job") + public Boolean cleanInput = false; + + @Parameter(names = {"--clean-output"}, description = "Clean the output folders and delete all files within it " + + "before starting the Job") + public Boolean cleanOutput = false; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java index 291562808..0ac36687f 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java @@ -36,15 +36,22 @@ public class DFSDeltaConfig extends DeltaConfig { private final Long maxFileSize; // The current batch id private Integer batchId; + // Paralleism to use when generating input data + private int inputParallelism; + // Whether to delete older input data once it has been ingested + private boolean deleteOldInputData; public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType, SerializableConfiguration configuration, - String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize) { + String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize, + int inputParallelism, boolean deleteOldInputData) { super(deltaOutputMode, deltaInputType, configuration); this.deltaBasePath = deltaBasePath; this.schemaStr = schemaStr; this.maxFileSize = maxFileSize; this.datasetOutputPath = targetBasePath; + this.inputParallelism = inputParallelism; + this.deleteOldInputData = deleteOldInputData; } public String getDeltaBasePath() { @@ -70,4 +77,12 @@ public class DFSDeltaConfig extends DeltaConfig { public void setBatchId(Integer batchId) { this.batchId = batchId; } + + public int getInputParallelism() { + return inputParallelism; + } + + public boolean shouldDeleteOldInputData() { + return deleteOldInputData; + } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 7a66681e5..db1560464 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -83,6 +83,7 @@ public class DeltaConfig implements Serializable { private static String DISABLE_INGEST = "disable_ingest"; private static String HIVE_LOCAL = "hive_local"; private static String REINIT_CONTEXT = "reinitialize_context"; + private static String START_PARTITION = "start_partition"; private Map configsMap; @@ -118,8 +119,12 @@ public class DeltaConfig implements Serializable { return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 0).toString()); } + public int getStartPartition() { + return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString()); + } + public int getNumUpsertFiles() { - return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 1).toString()); + return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString()); } public double getFractionUpsertPerFile() { @@ -207,6 +212,11 @@ public class DeltaConfig implements Serializable { return this; } + public Builder withStartPartition(int startPartition) { + this.configsMap.put(START_PARTITION, startPartition); + return this; + } + public Builder withNumTimesToRepeat(int repeatCount) { this.configsMap.put(REPEAT_COUNT, repeatCount); return this; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java index 21a84db4a..e457f0a8d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java @@ -67,10 +67,11 @@ public class WriterContext { this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc); String schemaStr = schemaProvider.getSourceSchema().toString(); this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr); + int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : jsc.defaultParallelism(); this.deltaGenerator = new DeltaGenerator( new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName), new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath, - schemaStr, cfg.limitFileSize), + schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput), jsc, sparkSession, schemaStr, keyGenerator); log.info(String.format("Initialized writerContext with: %s", schemaStr)); } catch (Exception e) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index 8dc7f4be5..dc991b11e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -28,9 +28,17 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.integ.testsuite.converter.UpdateConverter; import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader; import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader; @@ -41,7 +49,6 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter; import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; -import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -58,7 +65,7 @@ public class DeltaGenerator implements Serializable { private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class); - private DeltaConfig deltaOutputConfig; + private DFSDeltaConfig deltaOutputConfig; private transient JavaSparkContext jsc; private transient SparkSession sparkSession; private String schemaStr; @@ -66,7 +73,7 @@ public class DeltaGenerator implements Serializable { private List partitionPathFieldNames; private int batchId; - public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession, + public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession, String schemaStr, BuiltinKeyGenerator keyGenerator) { this.deltaOutputConfig = deltaOutputConfig; this.jsc = jsc; @@ -77,6 +84,16 @@ public class DeltaGenerator implements Serializable { } public JavaRDD writeRecords(JavaRDD records) { + if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) { + Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), Integer.toString(batchId - 1)); + try { + FileSystem fs = FSUtils.getFs(oldInputDir.toString(), deltaOutputConfig.getConfiguration()); + fs.delete(oldInputDir, true); + } catch (IOException e) { + log.error("Failed to delete older input data direcory " + oldInputDir, e); + } + } + // The following creates a new anonymous function for iterator and hence results in serialization issues JavaRDD ws = records.mapPartitions(itr -> { try { @@ -95,11 +112,22 @@ public class DeltaGenerator implements Serializable { int numPartitions = operation.getNumInsertPartitions(); long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions; int minPayloadSize = operation.getRecordSize(); - JavaRDD inputBatch = jsc.parallelize(Collections.EMPTY_LIST) - .repartition(numPartitions).mapPartitions(p -> { + int startPartition = operation.getStartPartition(); + + // Each spark partition below will generate records for a single partition given by the integer index. + List partitionIndexes = IntStream.rangeClosed(0 + startPartition, numPartitions + startPartition) + .boxed().collect(Collectors.toList()); + + JavaRDD inputBatch = jsc.parallelize(partitionIndexes, numPartitions) + .mapPartitionsWithIndex((index, p) -> { return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition, - minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions)); - }); + minPayloadSize, schemaStr, partitionPathFieldNames, (Integer)index)); + }, true); + + if (deltaOutputConfig.getInputParallelism() < numPartitions) { + inputBatch = inputBatch.coalesce(deltaOutputConfig.getInputParallelism()); + } + return inputBatch; } @@ -131,9 +159,11 @@ public class DeltaGenerator implements Serializable { } } - log.info("Repartitioning records"); // persist this since we will make multiple passes over this - adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism()); + int numPartition = Math.min(deltaOutputConfig.getInputParallelism(), + Math.max(1, config.getNumUpsertPartitions())); + log.info("Repartitioning records into " + numPartition + " partitions"); + adjustedRDD = adjustedRDD.repartition(numPartition); log.info("Repartitioning records done"); UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(), partitionPathFieldNames, recordRowKeyFieldNames); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java index 512118fa1..270dcd169 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java @@ -18,8 +18,11 @@ package org.apache.hudi.integ.testsuite.generator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -37,18 +40,18 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator partitionPathFieldNames; + private Set partitionPathFieldNames; public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) { - this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS); + this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0); } public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr, - List partitionPathFieldNames, int numPartitions) { + List partitionPathFieldNames, int partitionIndex) { this.counter = maxEntriesToProduce; - this.partitionPathFieldNames = partitionPathFieldNames; + this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames); Schema schema = new Schema.Parser().parse(schemaStr); - this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions); + this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, partitionIndex); } @Override @@ -60,7 +63,7 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator partitionPathFieldNames) { + return create(baseSchema, partitionPathFieldNames); + } + + protected GenericRecord create(Schema schema, Set partitionPathFieldNames) { + GenericRecord result = new GenericData.Record(schema); + for (Schema.Field f : schema.getFields()) { + if (isPartialLongField(f, partitionPathFieldNames)) { + // This is a long field used as partition field. Set it to seconds since epoch. + long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS); + result.put(f.name(), (long)value); + } else { + result.put(f.name(), typeConvert(f)); + } + } + return result; + } + + /** + * Return true if this is a partition field of type long which should be set to the partition index. + * @return + */ + private boolean isPartialLongField(Schema.Field field, Set partitionPathFieldNames) { + if ((partitionPathFieldNames == null) || !partitionPathFieldNames.contains(field.name())) { + return false; + } + + Schema fieldSchema = field.schema(); + if (isOption(fieldSchema)) { + fieldSchema = getNonNull(fieldSchema); + } + + return fieldSchema.getType() == org.apache.avro.Schema.Type.LONG; } /** @@ -125,7 +169,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { * @param blacklistFields Fields whose value should not be touched * @return The updated {@link GenericRecord} */ - public GenericRecord getUpdatePayload(GenericRecord record, List blacklistFields) { + public GenericRecord getUpdatePayload(GenericRecord record, Set blacklistFields) { return randomize(record, blacklistFields); } @@ -158,7 +202,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { * @param blacklistFields blacklistFields where the filed will not be randomized. * @return Randomized GenericRecord. */ - protected GenericRecord randomize(GenericRecord record, List blacklistFields) { + protected GenericRecord randomize(GenericRecord record, Set blacklistFields) { for (Schema.Field f : record.getSchema().getFields()) { if (blacklistFields == null || !blacklistFields.contains(f.name())) { record.put(f.name(), typeConvert(f)); @@ -167,12 +211,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable { return record; } - private long getNextConstrainedLong() { - int numPartitions = random.nextInt(numDatePartitions); - long unixTimeStamp = TimeUnit.SECONDS.convert(numPartitions, TimeUnit.DAYS); - return unixTimeStamp; - } - /** * Generate random value according to their type. */ @@ -191,7 +229,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { case INT: return random.nextInt(); case LONG: - return getNextConstrainedLong(); + return random.nextLong(); case STRING: return UUID.randomUUID().toString(); case ENUM: diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java index a33ef0c7f..d9d137a42 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java @@ -18,9 +18,11 @@ package org.apache.hudi.integ.testsuite.generator; -import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -31,14 +33,14 @@ public class UpdateGeneratorIterator implements Iterator { // Use the full payload generator as default private GenericRecordFullPayloadGenerator generator; - private List blackListedFields; + private Set blackListedFields; // iterator private Iterator itr; public UpdateGeneratorIterator(Iterator itr, String schemaStr, List partitionPathFieldNames, List recordKeyFieldNames, int minPayloadSize) { this.itr = itr; - this.blackListedFields = new ArrayList<>(); + this.blackListedFields = new HashSet<>(); this.blackListedFields.addAll(partitionPathFieldNames); this.blackListedFields.addAll(recordKeyFieldNames); Schema schema = new Schema.Parser().parse(schemaStr); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index e209118db..cfe7991f4 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -36,6 +36,7 @@ import java.util.stream.StreamSupport; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; @@ -78,8 +79,10 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { } protected List getPartitions(Option partitionsLimit) throws IOException { - List partitionPaths = FSUtils - .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), false); + // Using FSUtils.getFS here instead of metaClient.getFS() since we dont want to count these listStatus + // calls in metrics as they are not part of normal HUDI operation. + FileSystem fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); + List partitionPaths = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath(), false); // Sort partition so we can pick last N partitions by default Collections.sort(partitionPaths); if (!partitionPaths.isEmpty()) { @@ -136,6 +139,9 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { // Read all file slices in the partition JavaPairRDD> partitionToFileSlice = getPartitionToFileSlice(metaClient, partitionPaths); + Map partitionToFileIdCountMap = partitionToFileSlice + .mapToPair(p -> new Tuple2<>(p._1, iteratorSize(p._2))).collectAsMap(); + // TODO : read record count from metadata // Read the records in a single file long recordsInSingleFile = iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice))); @@ -144,7 +150,11 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { if (!numFiles.isPresent() || numFiles.get() == 0) { // If num files are not passed, find the number of files to update based on total records to update and records // per file - numFilesToUpdate = (int) (numRecordsToUpdate.get() / recordsInSingleFile); + numFilesToUpdate = (int)Math.ceil((double)numRecordsToUpdate.get() / recordsInSingleFile); + // recordsInSingleFile is not average so we still need to account for bias is records distribution + // in the files. Limit to the maximum number of files available. + int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get(); + numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount); log.info("Files to update {}", numFilesToUpdate); numRecordsToUpdatePerFile = recordsInSingleFile; } else { @@ -154,9 +164,10 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { numRecordsToUpdatePerFile = percentageRecordsPerFile.isPresent() ? (long) (recordsInSingleFile * percentageRecordsPerFile.get()) : numRecordsToUpdate.get() / numFilesToUpdate; } + // Adjust the number of files to read per partition based on the requested partition & file counts Map adjustedPartitionToFileIdCountMap = getFilesToReadPerPartition(partitionToFileSlice, - partitionPaths.size(), numFilesToUpdate); + partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap); JavaRDD updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap, partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile)); if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get() @@ -190,10 +201,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { } private Map getFilesToReadPerPartition(JavaPairRDD> - partitionToFileSlice, Integer numPartitions, Integer numFiles) { - int numFilesPerPartition = (int) Math.ceil(numFiles / numPartitions); - Map partitionToFileIdCountMap = partitionToFileSlice - .mapToPair(p -> new Tuple2<>(p._1, iteratorSize(p._2))).collectAsMap(); + partitionToFileSlice, Integer numPartitions, Integer numFiles, Map partitionToFileIdCountMap) { long totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get(); ValidationUtils.checkArgument(totalExistingFilesCount >= numFiles, "Cannot generate updates " + "for more files than present in the dataset, file requested " + numFiles + ", files present " @@ -204,7 +212,9 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { .sorted(comparingByValue()) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2, LinkedHashMap::new)); + // Limit files to be read per partition + int numFilesPerPartition = (int) Math.ceil((double)numFiles / numPartitions); Map adjustedPartitionToFileIdCountMap = new HashMap<>(); partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> { if (e.getValue() <= numFilesPerPartition) { diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java index ff41b44dd..ff92bd037 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java @@ -125,7 +125,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends UtilitiesTestBase { public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws IOException { DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, DeltaInputType.AVRO, new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, dfsBasePath, - schemaProvider.getSourceSchema().toString(), 10240L); + schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false); DeltaWriterAdapter dfsDeltaWriterAdapter = DeltaWriterFactory .getDeltaWriterAdapter(dfsSinkConfig, 1); FlexibleSchemaRecordGenerationIterator itr = new FlexibleSchemaRecordGenerationIterator(1000, diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java index 7524d4af8..94515959d 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java @@ -24,7 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.IntStream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -92,7 +94,8 @@ public class TestGenericRecordPayloadGenerator { insertRowKeys.add(record.get("_row_key").toString()); insertTimeStamps.add((Long) record.get("timestamp")); }); - List blacklistFields = Arrays.asList("_row_key"); + Set blacklistFields = new HashSet<>(); + blacklistFields.add("_row_key"); records.stream().forEach(a -> { // Generate 10 updated records GenericRecord record = payloadGenerator.getUpdatePayload(a, blacklistFields);