diff --git a/docker/compose/hadoop.env b/docker/compose/hadoop.env index 474c3db70..4e8a94246 100644 --- a/docker/compose/hadoop.env +++ b/docker/compose/hadoop.env @@ -21,12 +21,13 @@ HIVE_SITE_CONF_javax_jdo_option_ConnectionUserName=hive HIVE_SITE_CONF_javax_jdo_option_ConnectionPassword=hive HIVE_SITE_CONF_datanucleus_autoCreateSchema=false HIVE_SITE_CONF_hive_metastore_uris=thrift://hivemetastore:9083 -HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false +HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false HDFS_CONF_dfs_webhdfs_enabled=true HDFS_CONF_dfs_permissions_enabled=false #HDFS_CONF_dfs_client_use_datanode_hostname=true #HDFS_CONF_dfs_namenode_use_datanode_hostname=true +HDFS_CONF_dfs_replication=1 CORE_CONF_fs_defaultFS=hdfs://namenode:8020 CORE_CONF_hadoop_http_staticuser_user=root diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index 5179e892c..6e5027b7f 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -66,6 +66,7 @@ public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer { public Pair>> fetchSource() throws Exception { DeltaSync service = deltaSyncService.get().getDeltaSync(); + service.refreshTimeline(); return service.readFromSource(service.getCommitTimelineOpt()); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 2c4b73a6d..c2c242ab1 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -156,8 +156,7 @@ public class HoodieTestSuiteJob { public String inputBasePath; @Parameter(names = { - "--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload", - required = true) + "--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload") public String workloadDagGenerator = WorkflowDagGenerator.class.getName(); @Parameter(names = { @@ -177,8 +176,7 @@ public class HoodieTestSuiteJob { public Long limitFileSize = 1024 * 1024 * 120L; @Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to " - + "perform" - + " ingestion. If set to false, HoodieWriteClient will be used") + + "perform ingestion. If set to false, HoodieWriteClient will be used") public Boolean useDeltaStreamer = false; } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index f20f84e47..7a66681e5 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -257,6 +257,11 @@ public class DeltaConfig implements Serializable { return this; } + public Builder withName(String name) { + this.configsMap.put(CONFIG_NAME, name); + return this; + } + public Config build() { return new Config(configsMap); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java index 288986719..d5358238d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java @@ -68,7 +68,7 @@ public class DagUtils { Iterator> itr = jsonNode.fields(); while (itr.hasNext()) { Entry dagNode = itr.next(); - allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getValue())); + allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getKey(), dagNode.getValue())); } return new WorkflowDag(findRootNodes(allNodes)); } @@ -94,9 +94,10 @@ public class DagUtils { } } - private static DagNode convertJsonToDagNode(Map allNodes, JsonNode node) throws IOException { + private static DagNode convertJsonToDagNode(Map allNodes, String name, JsonNode node) + throws IOException { String type = node.get(DeltaConfig.Config.TYPE).asText(); - final DagNode retNode = convertJsonToDagNode(node, type); + final DagNode retNode = convertJsonToDagNode(node, type, name); Arrays.asList(node.get(DeltaConfig.Config.DEPENDENCIES).textValue().split(",")).stream().forEach(dep -> { DagNode parentNode = allNodes.get(dep); if (parentNode != null) { @@ -116,9 +117,10 @@ public class DagUtils { return rootNodes; } - private static DagNode convertJsonToDagNode(JsonNode node, String type) { + private static DagNode convertJsonToDagNode(JsonNode node, String type, String name) { try { - DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node)).build(); + DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node)) + .withName(name).build(); return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config); } catch (ClassNotFoundException e) { throw new RuntimeException(e); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java index 2c0fcba2c..83a8d5e10 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java @@ -18,6 +18,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; /** @@ -26,7 +27,8 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext; */ public class CleanNode extends DagNode { - public CleanNode() { + public CleanNode(Config config) { + this.config = config; } @Override diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index c42705d07..8dc7f4be5 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -31,7 +31,6 @@ import java.util.Map; import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.util.Option; -import org.apache.hudi.integ.testsuite.converter.Converter; import org.apache.hudi.integ.testsuite.converter.UpdateConverter; import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader; import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader; @@ -93,11 +92,11 @@ public class DeltaGenerator implements Serializable { } public JavaRDD generateInserts(Config operation) { - long recordsPerPartition = operation.getNumRecordsInsert(); int numPartitions = operation.getNumInsertPartitions(); + long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions; int minPayloadSize = operation.getRecordSize(); JavaRDD inputBatch = jsc.parallelize(Collections.EMPTY_LIST) - .repartition(operation.getNumInsertPartitions()).mapPartitions(p -> { + .repartition(numPartitions).mapPartitions(p -> { return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition, minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions)); }); @@ -112,34 +111,44 @@ public class DeltaGenerator implements Serializable { } DeltaInputReader deltaInputReader = null; JavaRDD adjustedRDD = null; - if (config.getNumUpsertPartitions() < 1) { - // randomly generate updates for a given number of records without regard to partitions and files - deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr, - ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty()); - adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert()); - adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert()); - } else { - deltaInputReader = - new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(), - schemaStr); - if (config.getFractionUpsertPerFile() > 0) { - adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), - config.getFractionUpsertPerFile()); + if (config.getNumUpsertPartitions() != 0) { + if (config.getNumUpsertPartitions() < 0) { + // randomly generate updates for a given number of records without regard to partitions and files + deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr, + ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty()); + adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert()); + adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert()); } else { - adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config - .getNumRecordsUpsert()); + deltaInputReader = + new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(), + schemaStr); + if (config.getFractionUpsertPerFile() > 0) { + adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), + config.getFractionUpsertPerFile()); + } else { + adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config + .getNumRecordsUpsert()); + } + } + + log.info("Repartitioning records"); + // persist this since we will make multiple passes over this + adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism()); + log.info("Repartitioning records done"); + UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(), + partitionPathFieldNames, recordRowKeyFieldNames); + JavaRDD updates = converter.convert(adjustedRDD); + + log.info("Records converted"); + updates.persist(StorageLevel.DISK_ONLY()); + + if (inserts == null) { + inserts = updates; + } else { + inserts = inserts.union(updates); } } - log.info("Repartitioning records"); - // persist this since we will make multiple passes over this - adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism()); - log.info("Repartitioning records done"); - Converter converter = new UpdateConverter(schemaStr, config.getRecordSize(), - partitionPathFieldNames, recordRowKeyFieldNames); - JavaRDD updates = converter.convert(adjustedRDD); - log.info("Records converted"); - updates.persist(StorageLevel.DISK_ONLY()); - return inserts != null ? inserts.union(updates) : updates; + return inserts; // TODO : Generate updates for only N partitions. } else { throw new IllegalArgumentException("Other formats are not supported at the moment"); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java index cdb219699..df9a44992 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java @@ -44,25 +44,22 @@ import org.slf4j.LoggerFactory; * Every field of a generic record created using this generator contains a random value. */ public class GenericRecordFullPayloadGenerator implements Serializable { + private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class); public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB public static final int DEFAULT_NUM_DATE_PARTITIONS = 50; - private static Logger log = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class); protected final Random random = new Random(); // The source schema used to generate a payload private final transient Schema baseSchema; // Used to validate a generic record private final transient GenericData genericData = new GenericData(); - // Number of more bytes to add based on the estimated full record payload size and min payload size - private int numberOfBytesToAdd; - // If more elements should be packed to meet the minPayloadSize - private boolean shouldAddMore; - // How many complex fields have we visited that can help us pack more entries and increase the size of the record - private int numberOfComplexFields; - // The size of a full record where every field of a generic record created contains 1 random value - private int estimatedFullPayloadSize; // The number of unique dates to create private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS; + // The size of a full record where every field of a generic record created contains 1 random value + private final int estimatedFullPayloadSize; + // Number of extra entries to add in a complex/collection field to achieve the desired record size + Map extraEntriesMap = new HashMap<>(); + // LogicalTypes in Avro 1.8.2 private static final String DECIMAL = "decimal"; private static final String UUID_NAME = "uuid"; @@ -80,17 +77,18 @@ public class GenericRecordFullPayloadGenerator implements Serializable { Pair sizeInfo = new GenericRecordFullPayloadSizeEstimator(schema) .typeEstimateAndNumComplexFields(); this.estimatedFullPayloadSize = sizeInfo.getLeft(); - this.numberOfComplexFields = sizeInfo.getRight(); this.baseSchema = schema; - this.shouldAddMore = estimatedFullPayloadSize < minPayloadSize; - if (this.shouldAddMore) { - this.numberOfBytesToAdd = minPayloadSize - estimatedFullPayloadSize; - if (numberOfComplexFields < 1) { - log.warn("The schema does not have any collections/complex fields. Cannot achieve minPayloadSize : {}", - minPayloadSize); - } + if (estimatedFullPayloadSize < minPayloadSize) { + int numberOfComplexFields = sizeInfo.getRight(); + if (numberOfComplexFields < 1) { + LOG.warn("The schema does not have any collections/complex fields. " + + "Cannot achieve minPayloadSize => " + minPayloadSize); + } + + determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize); } } + public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) { this(schema, minPayloadSize); this.numDatePartitions = numDatePartitions; @@ -113,7 +111,11 @@ public class GenericRecordFullPayloadGenerator implements Serializable { * @return {@link GenericRecord} with random value */ public GenericRecord getNewPayload() { - return convert(baseSchema); + return getNewPayload(baseSchema); + } + + protected GenericRecord getNewPayload(Schema schema) { + return randomize(new GenericData.Record(schema), null); } /** @@ -127,20 +129,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable { return randomize(record, blacklistFields); } - /** - * Create a {@link GenericRecord} with random value according to given schema. - * - * @param schema Schema to create record with - * @return {@link GenericRecord} with random value - */ - protected GenericRecord convert(Schema schema) { - GenericRecord result = new GenericData.Record(schema); - for (Schema.Field f : schema.getFields()) { - result.put(f.name(), typeConvert(f.schema())); - } - return result; - } - /** * Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value * is random too. @@ -153,7 +141,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { for (Schema.Field f : schema.getFields()) { boolean setNull = random.nextBoolean(); if (!setNull) { - result.put(f.name(), typeConvert(f.schema())); + result.put(f.name(), typeConvert(f)); } else { result.put(f.name(), null); } @@ -173,7 +161,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { protected GenericRecord randomize(GenericRecord record, List blacklistFields) { for (Schema.Field f : record.getSchema().getFields()) { if (blacklistFields == null || !blacklistFields.contains(f.name())) { - record.put(f.name(), typeConvert(f.schema())); + record.put(f.name(), typeConvert(f)); } } return record; @@ -188,12 +176,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable { /** * Generate random value according to their type. */ - private Object typeConvert(Schema schema) { - Schema localSchema = schema; - if (isOption(schema)) { - localSchema = getNonNull(schema); + private Object typeConvert(Schema.Field field) { + Schema fieldSchema = field.schema(); + if (isOption(fieldSchema)) { + fieldSchema = getNonNull(fieldSchema); } - switch (localSchema.getType()) { + switch (fieldSchema.getType()) { case BOOLEAN: return random.nextBoolean(); case DOUBLE: @@ -205,45 +193,35 @@ public class GenericRecordFullPayloadGenerator implements Serializable { case LONG: return getNextConstrainedLong(); case STRING: - return UUID.randomUUID().toString(); + return UUID.randomUUID().toString(); case ENUM: - List enumSymbols = localSchema.getEnumSymbols(); - return new GenericData.EnumSymbol(localSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1))); + List enumSymbols = fieldSchema.getEnumSymbols(); + return new GenericData.EnumSymbol(fieldSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1))); case RECORD: - return convert(localSchema); + return getNewPayload(fieldSchema); case ARRAY: - Schema elementSchema = localSchema.getElementType(); + Schema.Field elementField = new Schema.Field(field.name(), fieldSchema.getElementType(), "", null); List listRes = new ArrayList(); - if (isPrimitive(elementSchema) && this.shouldAddMore) { - int numEntriesToAdd = numEntriesToAdd(elementSchema); - while (numEntriesToAdd > 0) { - listRes.add(typeConvert(elementSchema)); - numEntriesToAdd--; - } - } else { - listRes.add(typeConvert(elementSchema)); + int numEntriesToAdd = extraEntriesMap.getOrDefault(field.name(), 1); + while (numEntriesToAdd-- > 0) { + listRes.add(typeConvert(elementField)); } return listRes; case MAP: - Schema valueSchema = localSchema.getValueType(); + Schema.Field valueField = new Schema.Field(field.name(), fieldSchema.getValueType(), "", null); Map mapRes = new HashMap(); - if (isPrimitive(valueSchema) && this.shouldAddMore) { - int numEntriesToAdd = numEntriesToAdd(valueSchema); - while (numEntriesToAdd > 0) { - mapRes.put(UUID.randomUUID().toString(), typeConvert(valueSchema)); - numEntriesToAdd--; - } - } else { - mapRes.put(UUID.randomUUID().toString(), typeConvert(valueSchema)); + numEntriesToAdd = extraEntriesMap.getOrDefault(field.name(), 1); + while (numEntriesToAdd > 0) { + mapRes.put(UUID.randomUUID().toString(), typeConvert(valueField)); + numEntriesToAdd--; } return mapRes; case BYTES: return ByteBuffer.wrap(UUID.randomUUID().toString().getBytes(Charset.defaultCharset())); case FIXED: - return generateFixedType(localSchema); + return generateFixedType(fieldSchema); default: - throw new IllegalArgumentException( - "Cannot handle type: " + localSchema.getType()); + throw new IllegalArgumentException("Cannot handle type: " + fieldSchema.getType()); } } @@ -333,23 +311,37 @@ public class GenericRecordFullPayloadGenerator implements Serializable { * @param elementSchema * @return Number of entries to add */ - private int numEntriesToAdd(Schema elementSchema) { - // Find the size of the primitive data type in bytes - int primitiveDataTypeSize = getSize(elementSchema); - int numEntriesToAdd = numberOfBytesToAdd / primitiveDataTypeSize; - // If more than 10 entries are being added for this same complex field and there are still more complex fields to - // be visited in the schema, reduce the number of entries to add by a factor of 10 to allow for other complex - // fields to pack some entries - if (numEntriesToAdd % 10 > 0 && this.numberOfComplexFields > 1) { - numEntriesToAdd = numEntriesToAdd / 10; - numberOfBytesToAdd -= numEntriesToAdd * primitiveDataTypeSize; - this.shouldAddMore = true; - } else { - this.numberOfBytesToAdd = 0; - this.shouldAddMore = false; + private void determineExtraEntriesRequired(int numberOfComplexFields, int numberOfBytesToAdd) { + for (Schema.Field f : baseSchema.getFields()) { + Schema elementSchema = f.schema(); + // Find the size of the primitive data type in bytes + int primitiveDataTypeSize = 0; + if (elementSchema.getType() == Type.ARRAY && isPrimitive(elementSchema.getElementType())) { + primitiveDataTypeSize = getSize(elementSchema.getElementType()); + } else if (elementSchema.getType() == Type.MAP && isPrimitive(elementSchema.getValueType())) { + primitiveDataTypeSize = getSize(elementSchema.getValueType()); + } else { + continue; + } + + int numEntriesToAdd = numberOfBytesToAdd / primitiveDataTypeSize; + // If more than 10 entries are being added for this same complex field and there are still more complex fields to + // be visited in the schema, reduce the number of entries to add by a factor of 10 to allow for other complex + // fields to pack some entries + if (numEntriesToAdd > 10 && numberOfComplexFields > 1) { + numEntriesToAdd = 10; + numberOfBytesToAdd -= numEntriesToAdd * primitiveDataTypeSize; + } else { + numberOfBytesToAdd = 0; + } + + extraEntriesMap.put(f.name(), numEntriesToAdd); + + numberOfComplexFields -= 1; + if (numberOfBytesToAdd <= 0) { + break; + } } - this.numberOfComplexFields -= 1; - return numEntriesToAdd; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordPartialPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordPartialPayloadGenerator.java index f7e4174e6..999f83de6 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordPartialPayloadGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordPartialPayloadGenerator.java @@ -38,7 +38,7 @@ public class GenericRecordPartialPayloadGenerator extends GenericRecordFullPaylo } @Override - protected GenericRecord convert(Schema schema) { + protected GenericRecord getNewPayload(Schema schema) { GenericRecord record = super.convertPartial(schema); return record; } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 209aa469c..e209118db 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -132,7 +132,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { Option numRecordsToUpdate, Option percentageRecordsPerFile) throws IOException { log.info("NumPartitions : {}, NumFiles : {}, numRecordsToUpdate : {}, percentageRecordsPerFile : {}", numPartitions, numFiles, numRecordsToUpdate, percentageRecordsPerFile); - List partitionPaths = getPartitions(numPartitions); + final List partitionPaths = getPartitions(numPartitions); // Read all file slices in the partition JavaPairRDD> partitionToFileSlice = getPartitionToFileSlice(metaClient, partitionPaths); @@ -156,7 +156,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { } // Adjust the number of files to read per partition based on the requested partition & file counts Map adjustedPartitionToFileIdCountMap = getFilesToReadPerPartition(partitionToFileSlice, - getPartitions(numPartitions).size(), numFilesToUpdate); + partitionPaths.size(), numFilesToUpdate); JavaRDD updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap, partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile)); if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get() diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 5a1756cc2..a268c7baa 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -219,7 +219,7 @@ public class DeltaSync implements Serializable { * * @throws IOException in case of any IOException */ - private void refreshTimeline() throws IOException { + public void refreshTimeline() throws IOException { if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, cfg.payloadClassName); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java index 5d56f2a2a..c8690f553 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java @@ -78,6 +78,7 @@ public class DFSPathSelector { while (fitr.hasNext()) { LocatedFileStatus fileStatus = fitr.next(); if (fileStatus.isDirectory() + || fileStatus.getLen() == 0 || IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) { continue; }