1
0

[HUDI-1303] Some improvements for the HUDI Test Suite. (#2128)

1. Use the DAG Node's label from the yaml as its name instead of UUID names which are not descriptive when debugging issues from logs.
2. Fix CleanNode constructor which is not correctly implemented
3. When generating upsets, allows more granualar control over the number of inserts and upserts - zero or more inserts and upserts can be specified instead of always requiring both inserts and upserts.
4. Fixed generation of records of specific size
   - The current code was using a class variable "shouldAddMore" which was reset to false after the first record generation causing subsequent records to be of minimum size.
   - In this change, we pre-calculate the extra size of the complex fields. When generating records, for complex fields we read the field size from this map.
5. Refresh the timeline of the DeltaSync service before calling readFromSource. This ensures that only the newest generated data is read and data generated in the older Dag Nodes is ignored (as their AVRO files will have an older timestamp).
6. Making --workload-generator-classname an optional parameter as most probably the default will be used
This commit is contained in:
Prashant Wason
2020-10-07 05:33:51 -07:00
committed by GitHub
parent 524193eb4b
commit 788d236c44
12 changed files with 134 additions and 123 deletions

View File

@@ -21,12 +21,13 @@ HIVE_SITE_CONF_javax_jdo_option_ConnectionUserName=hive
HIVE_SITE_CONF_javax_jdo_option_ConnectionPassword=hive
HIVE_SITE_CONF_datanucleus_autoCreateSchema=false
HIVE_SITE_CONF_hive_metastore_uris=thrift://hivemetastore:9083
HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false
HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false
HDFS_CONF_dfs_webhdfs_enabled=true
HDFS_CONF_dfs_permissions_enabled=false
#HDFS_CONF_dfs_client_use_datanode_hostname=true
#HDFS_CONF_dfs_namenode_use_datanode_hostname=true
HDFS_CONF_dfs_replication=1
CORE_CONF_fs_defaultFS=hdfs://namenode:8020
CORE_CONF_hadoop_http_staticuser_user=root

View File

@@ -66,6 +66,7 @@ public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer {
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
DeltaSync service = deltaSyncService.get().getDeltaSync();
service.refreshTimeline();
return service.readFromSource(service.getCommitTimelineOpt());
}

View File

@@ -156,8 +156,7 @@ public class HoodieTestSuiteJob {
public String inputBasePath;
@Parameter(names = {
"--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload",
required = true)
"--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload")
public String workloadDagGenerator = WorkflowDagGenerator.class.getName();
@Parameter(names = {
@@ -177,8 +176,7 @@ public class HoodieTestSuiteJob {
public Long limitFileSize = 1024 * 1024 * 120L;
@Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to "
+ "perform"
+ " ingestion. If set to false, HoodieWriteClient will be used")
+ "perform ingestion. If set to false, HoodieWriteClient will be used")
public Boolean useDeltaStreamer = false;
}

View File

@@ -257,6 +257,11 @@ public class DeltaConfig implements Serializable {
return this;
}
public Builder withName(String name) {
this.configsMap.put(CONFIG_NAME, name);
return this;
}
public Config build() {
return new Config(configsMap);
}

View File

@@ -68,7 +68,7 @@ public class DagUtils {
Iterator<Entry<String, JsonNode>> itr = jsonNode.fields();
while (itr.hasNext()) {
Entry<String, JsonNode> dagNode = itr.next();
allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getValue()));
allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getKey(), dagNode.getValue()));
}
return new WorkflowDag(findRootNodes(allNodes));
}
@@ -94,9 +94,10 @@ public class DagUtils {
}
}
private static DagNode convertJsonToDagNode(Map<String, DagNode> allNodes, JsonNode node) throws IOException {
private static DagNode convertJsonToDagNode(Map<String, DagNode> allNodes, String name, JsonNode node)
throws IOException {
String type = node.get(DeltaConfig.Config.TYPE).asText();
final DagNode retNode = convertJsonToDagNode(node, type);
final DagNode retNode = convertJsonToDagNode(node, type, name);
Arrays.asList(node.get(DeltaConfig.Config.DEPENDENCIES).textValue().split(",")).stream().forEach(dep -> {
DagNode parentNode = allNodes.get(dep);
if (parentNode != null) {
@@ -116,9 +117,10 @@ public class DagUtils {
return rootNodes;
}
private static DagNode convertJsonToDagNode(JsonNode node, String type) {
private static DagNode convertJsonToDagNode(JsonNode node, String type, String name) {
try {
DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node)).build();
DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node))
.withName(name).build();
return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
/**
@@ -26,7 +27,8 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
*/
public class CleanNode extends DagNode<Boolean> {
public CleanNode() {
public CleanNode(Config config) {
this.config = config;
}
@Override

View File

@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.converter.Converter;
import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
@@ -93,11 +92,11 @@ public class DeltaGenerator implements Serializable {
}
public JavaRDD<GenericRecord> generateInserts(Config operation) {
long recordsPerPartition = operation.getNumRecordsInsert();
int numPartitions = operation.getNumInsertPartitions();
long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
int minPayloadSize = operation.getRecordSize();
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
.repartition(operation.getNumInsertPartitions()).mapPartitions(p -> {
.repartition(numPartitions).mapPartitions(p -> {
return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions));
});
@@ -112,34 +111,44 @@ public class DeltaGenerator implements Serializable {
}
DeltaInputReader deltaInputReader = null;
JavaRDD<GenericRecord> adjustedRDD = null;
if (config.getNumUpsertPartitions() < 1) {
// randomly generate updates for a given number of records without regard to partitions and files
deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr,
((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty());
adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert());
} else {
deltaInputReader =
new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
schemaStr);
if (config.getFractionUpsertPerFile() > 0) {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(),
config.getFractionUpsertPerFile());
if (config.getNumUpsertPartitions() != 0) {
if (config.getNumUpsertPartitions() < 0) {
// randomly generate updates for a given number of records without regard to partitions and files
deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr,
((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty());
adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert());
} else {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config
.getNumRecordsUpsert());
deltaInputReader =
new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
schemaStr);
if (config.getFractionUpsertPerFile() > 0) {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(),
config.getFractionUpsertPerFile());
} else {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config
.getNumRecordsUpsert());
}
}
log.info("Repartitioning records");
// persist this since we will make multiple passes over this
adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
log.info("Repartitioning records done");
UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
partitionPathFieldNames, recordRowKeyFieldNames);
JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
log.info("Records converted");
updates.persist(StorageLevel.DISK_ONLY());
if (inserts == null) {
inserts = updates;
} else {
inserts = inserts.union(updates);
}
}
log.info("Repartitioning records");
// persist this since we will make multiple passes over this
adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
log.info("Repartitioning records done");
Converter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
partitionPathFieldNames, recordRowKeyFieldNames);
JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
log.info("Records converted");
updates.persist(StorageLevel.DISK_ONLY());
return inserts != null ? inserts.union(updates) : updates;
return inserts;
// TODO : Generate updates for only N partitions.
} else {
throw new IllegalArgumentException("Other formats are not supported at the moment");

View File

@@ -44,25 +44,22 @@ import org.slf4j.LoggerFactory;
* Every field of a generic record created using this generator contains a random value.
*/
public class GenericRecordFullPayloadGenerator implements Serializable {
private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
private static Logger log = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
protected final Random random = new Random();
// The source schema used to generate a payload
private final transient Schema baseSchema;
// Used to validate a generic record
private final transient GenericData genericData = new GenericData();
// Number of more bytes to add based on the estimated full record payload size and min payload size
private int numberOfBytesToAdd;
// If more elements should be packed to meet the minPayloadSize
private boolean shouldAddMore;
// How many complex fields have we visited that can help us pack more entries and increase the size of the record
private int numberOfComplexFields;
// The size of a full record where every field of a generic record created contains 1 random value
private int estimatedFullPayloadSize;
// The number of unique dates to create
private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
// The size of a full record where every field of a generic record created contains 1 random value
private final int estimatedFullPayloadSize;
// Number of extra entries to add in a complex/collection field to achieve the desired record size
Map<String, Integer> extraEntriesMap = new HashMap<>();
// LogicalTypes in Avro 1.8.2
private static final String DECIMAL = "decimal";
private static final String UUID_NAME = "uuid";
@@ -80,17 +77,18 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
Pair<Integer, Integer> sizeInfo = new GenericRecordFullPayloadSizeEstimator(schema)
.typeEstimateAndNumComplexFields();
this.estimatedFullPayloadSize = sizeInfo.getLeft();
this.numberOfComplexFields = sizeInfo.getRight();
this.baseSchema = schema;
this.shouldAddMore = estimatedFullPayloadSize < minPayloadSize;
if (this.shouldAddMore) {
this.numberOfBytesToAdd = minPayloadSize - estimatedFullPayloadSize;
if (numberOfComplexFields < 1) {
log.warn("The schema does not have any collections/complex fields. Cannot achieve minPayloadSize : {}",
minPayloadSize);
}
if (estimatedFullPayloadSize < minPayloadSize) {
int numberOfComplexFields = sizeInfo.getRight();
if (numberOfComplexFields < 1) {
LOG.warn("The schema does not have any collections/complex fields. "
+ "Cannot achieve minPayloadSize => " + minPayloadSize);
}
determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize);
}
}
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) {
this(schema, minPayloadSize);
this.numDatePartitions = numDatePartitions;
@@ -113,7 +111,11 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
* @return {@link GenericRecord} with random value
*/
public GenericRecord getNewPayload() {
return convert(baseSchema);
return getNewPayload(baseSchema);
}
protected GenericRecord getNewPayload(Schema schema) {
return randomize(new GenericData.Record(schema), null);
}
/**
@@ -127,20 +129,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
return randomize(record, blacklistFields);
}
/**
* Create a {@link GenericRecord} with random value according to given schema.
*
* @param schema Schema to create record with
* @return {@link GenericRecord} with random value
*/
protected GenericRecord convert(Schema schema) {
GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
result.put(f.name(), typeConvert(f.schema()));
}
return result;
}
/**
* Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value
* is random too.
@@ -153,7 +141,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
for (Schema.Field f : schema.getFields()) {
boolean setNull = random.nextBoolean();
if (!setNull) {
result.put(f.name(), typeConvert(f.schema()));
result.put(f.name(), typeConvert(f));
} else {
result.put(f.name(), null);
}
@@ -173,7 +161,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
protected GenericRecord randomize(GenericRecord record, List<String> blacklistFields) {
for (Schema.Field f : record.getSchema().getFields()) {
if (blacklistFields == null || !blacklistFields.contains(f.name())) {
record.put(f.name(), typeConvert(f.schema()));
record.put(f.name(), typeConvert(f));
}
}
return record;
@@ -188,12 +176,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
/**
* Generate random value according to their type.
*/
private Object typeConvert(Schema schema) {
Schema localSchema = schema;
if (isOption(schema)) {
localSchema = getNonNull(schema);
private Object typeConvert(Schema.Field field) {
Schema fieldSchema = field.schema();
if (isOption(fieldSchema)) {
fieldSchema = getNonNull(fieldSchema);
}
switch (localSchema.getType()) {
switch (fieldSchema.getType()) {
case BOOLEAN:
return random.nextBoolean();
case DOUBLE:
@@ -205,45 +193,35 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
case LONG:
return getNextConstrainedLong();
case STRING:
return UUID.randomUUID().toString();
return UUID.randomUUID().toString();
case ENUM:
List<String> enumSymbols = localSchema.getEnumSymbols();
return new GenericData.EnumSymbol(localSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1)));
List<String> enumSymbols = fieldSchema.getEnumSymbols();
return new GenericData.EnumSymbol(fieldSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1)));
case RECORD:
return convert(localSchema);
return getNewPayload(fieldSchema);
case ARRAY:
Schema elementSchema = localSchema.getElementType();
Schema.Field elementField = new Schema.Field(field.name(), fieldSchema.getElementType(), "", null);
List listRes = new ArrayList();
if (isPrimitive(elementSchema) && this.shouldAddMore) {
int numEntriesToAdd = numEntriesToAdd(elementSchema);
while (numEntriesToAdd > 0) {
listRes.add(typeConvert(elementSchema));
numEntriesToAdd--;
}
} else {
listRes.add(typeConvert(elementSchema));
int numEntriesToAdd = extraEntriesMap.getOrDefault(field.name(), 1);
while (numEntriesToAdd-- > 0) {
listRes.add(typeConvert(elementField));
}
return listRes;
case MAP:
Schema valueSchema = localSchema.getValueType();
Schema.Field valueField = new Schema.Field(field.name(), fieldSchema.getValueType(), "", null);
Map<String, Object> mapRes = new HashMap<String, Object>();
if (isPrimitive(valueSchema) && this.shouldAddMore) {
int numEntriesToAdd = numEntriesToAdd(valueSchema);
while (numEntriesToAdd > 0) {
mapRes.put(UUID.randomUUID().toString(), typeConvert(valueSchema));
numEntriesToAdd--;
}
} else {
mapRes.put(UUID.randomUUID().toString(), typeConvert(valueSchema));
numEntriesToAdd = extraEntriesMap.getOrDefault(field.name(), 1);
while (numEntriesToAdd > 0) {
mapRes.put(UUID.randomUUID().toString(), typeConvert(valueField));
numEntriesToAdd--;
}
return mapRes;
case BYTES:
return ByteBuffer.wrap(UUID.randomUUID().toString().getBytes(Charset.defaultCharset()));
case FIXED:
return generateFixedType(localSchema);
return generateFixedType(fieldSchema);
default:
throw new IllegalArgumentException(
"Cannot handle type: " + localSchema.getType());
throw new IllegalArgumentException("Cannot handle type: " + fieldSchema.getType());
}
}
@@ -333,23 +311,37 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
* @param elementSchema
* @return Number of entries to add
*/
private int numEntriesToAdd(Schema elementSchema) {
// Find the size of the primitive data type in bytes
int primitiveDataTypeSize = getSize(elementSchema);
int numEntriesToAdd = numberOfBytesToAdd / primitiveDataTypeSize;
// If more than 10 entries are being added for this same complex field and there are still more complex fields to
// be visited in the schema, reduce the number of entries to add by a factor of 10 to allow for other complex
// fields to pack some entries
if (numEntriesToAdd % 10 > 0 && this.numberOfComplexFields > 1) {
numEntriesToAdd = numEntriesToAdd / 10;
numberOfBytesToAdd -= numEntriesToAdd * primitiveDataTypeSize;
this.shouldAddMore = true;
} else {
this.numberOfBytesToAdd = 0;
this.shouldAddMore = false;
private void determineExtraEntriesRequired(int numberOfComplexFields, int numberOfBytesToAdd) {
for (Schema.Field f : baseSchema.getFields()) {
Schema elementSchema = f.schema();
// Find the size of the primitive data type in bytes
int primitiveDataTypeSize = 0;
if (elementSchema.getType() == Type.ARRAY && isPrimitive(elementSchema.getElementType())) {
primitiveDataTypeSize = getSize(elementSchema.getElementType());
} else if (elementSchema.getType() == Type.MAP && isPrimitive(elementSchema.getValueType())) {
primitiveDataTypeSize = getSize(elementSchema.getValueType());
} else {
continue;
}
int numEntriesToAdd = numberOfBytesToAdd / primitiveDataTypeSize;
// If more than 10 entries are being added for this same complex field and there are still more complex fields to
// be visited in the schema, reduce the number of entries to add by a factor of 10 to allow for other complex
// fields to pack some entries
if (numEntriesToAdd > 10 && numberOfComplexFields > 1) {
numEntriesToAdd = 10;
numberOfBytesToAdd -= numEntriesToAdd * primitiveDataTypeSize;
} else {
numberOfBytesToAdd = 0;
}
extraEntriesMap.put(f.name(), numEntriesToAdd);
numberOfComplexFields -= 1;
if (numberOfBytesToAdd <= 0) {
break;
}
}
this.numberOfComplexFields -= 1;
return numEntriesToAdd;
}
}

View File

@@ -38,7 +38,7 @@ public class GenericRecordPartialPayloadGenerator extends GenericRecordFullPaylo
}
@Override
protected GenericRecord convert(Schema schema) {
protected GenericRecord getNewPayload(Schema schema) {
GenericRecord record = super.convertPartial(schema);
return record;
}

View File

@@ -132,7 +132,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
Option<Long> numRecordsToUpdate, Option<Double> percentageRecordsPerFile) throws IOException {
log.info("NumPartitions : {}, NumFiles : {}, numRecordsToUpdate : {}, percentageRecordsPerFile : {}",
numPartitions, numFiles, numRecordsToUpdate, percentageRecordsPerFile);
List<String> partitionPaths = getPartitions(numPartitions);
final List<String> partitionPaths = getPartitions(numPartitions);
// Read all file slices in the partition
JavaPairRDD<String, Iterator<FileSlice>> partitionToFileSlice = getPartitionToFileSlice(metaClient,
partitionPaths);
@@ -156,7 +156,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
}
// Adjust the number of files to read per partition based on the requested partition & file counts
Map<String, Integer> adjustedPartitionToFileIdCountMap = getFilesToReadPerPartition(partitionToFileSlice,
getPartitions(numPartitions).size(), numFilesToUpdate);
partitionPaths.size(), numFilesToUpdate);
JavaRDD<GenericRecord> updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap,
partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile));
if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get()

View File

@@ -219,7 +219,7 @@ public class DeltaSync implements Serializable {
*
* @throws IOException in case of any IOException
*/
private void refreshTimeline() throws IOException {
public void refreshTimeline() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath,
cfg.payloadClassName);

View File

@@ -78,6 +78,7 @@ public class DFSPathSelector {
while (fitr.hasNext()) {
LocatedFileStatus fileStatus = fitr.next();
if (fileStatus.isDirectory()
|| fileStatus.getLen() == 0
|| IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
}