1
0

[HUDI-1635] Improvements to Hudi Test Suite (#2628)

This commit is contained in:
Balajee Nagasubramaniam
2021-03-09 13:29:38 -08:00
committed by GitHub
parent d3a451611c
commit d8af24d8a2
13 changed files with 360 additions and 38 deletions

View File

@@ -18,17 +18,26 @@
package org.apache.hudi.integ.testsuite;
import org.apache.avro.Schema;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.DagUtils;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
import org.apache.hudi.integ.testsuite.dag.scheduler.SaferSchemaDagScheduler;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
@@ -47,6 +56,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency
@@ -79,6 +90,7 @@ public class HoodieTestSuiteJob {
private transient HiveConf hiveConf;
private BuiltinKeyGenerator keyGenerator;
private transient HoodieTableMetaClient metaClient;
public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
log.warn("Running spark job w/ app id " + jsc.sc().applicationId());
@@ -92,13 +104,11 @@ public class HoodieTestSuiteJob {
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator = (BuiltinKeyGenerator) DataSourceUtils.createKeyGenerator(props);
if (!fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient.withPropertyBuilder()
metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder("archived")
.initTable(jsc.hadoopConfiguration(), cfg.targetBasePath);
}
if (cfg.cleanInput) {
Path inputPath = new Path(cfg.inputBasePath);
@@ -115,6 +125,28 @@ public class HoodieTestSuiteJob {
}
}
int getSchemaVersionFromCommit(int nthCommit) throws Exception {
int version = 0;
try {
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitsTimeline();
// Pickup the schema version from nth commit from last (most recent insert/upsert will be rolled back).
HoodieInstant prevInstant = timeline.nthFromLastInstant(nthCommit).get();
HoodieCommitMetadata commit = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(prevInstant).get(),
HoodieCommitMetadata.class);
Map<String, String> extraMetadata = commit.getExtraMetadata();
String avroSchemaStr = extraMetadata.get(HoodieCommitMetadata.SCHEMA_KEY);
Schema avroSchema = new Schema.Parser().parse(avroSchemaStr);
version = Integer.parseInt(avroSchema.getObjectProp("schemaVersion").toString());
// DAG will generate & ingest data for 2 versions (n-th version being validated, n-1).
log.info(String.format("Last used schemaVersion from latest commit file was %d. Optimizing the DAG.", version));
} catch (Exception e) {
// failed to open the commit to read schema version.
// continue executing the DAG without any changes.
log.info("Last used schemaVersion could not be validated from commit file. Skipping SaferSchema Optimization.");
}
return version;
}
private static HiveConf getDefaultHiveConf(Configuration cfg) {
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(cfg);
@@ -150,8 +182,22 @@ public class HoodieTestSuiteJob {
long startTime = System.currentTimeMillis();
WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession);
writerContext.initContext(jsc);
DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc);
dagScheduler.schedule();
if (this.cfg.saferSchemaEvolution) {
int numRollbacks = 2; // rollback most recent upsert/insert, by default.
// if root is RollbackNode, get num_rollbacks
List<DagNode> root = workflowDag.getNodeList();
if (!root.isEmpty() && root.get(0) instanceof RollbackNode) {
numRollbacks = root.get(0).getConfig().getNumRollbacks();
}
int version = getSchemaVersionFromCommit(numRollbacks - 1);
SaferSchemaDagScheduler dagScheduler = new SaferSchemaDagScheduler(workflowDag, writerContext, jsc, version);
dagScheduler.schedule();
} else {
DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc);
dagScheduler.schedule();
}
log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime);
} catch (Exception e) {
log.error("Failed to run Test Suite ", e);
@@ -211,5 +257,10 @@ public class HoodieTestSuiteJob {
@Parameter(names = {"--clean-output"}, description = "Clean the output folders and delete all files within it "
+ "before starting the Job")
public Boolean cleanOutput = false;
@Parameter(names = {"--saferSchemaEvolution"}, description = "Optimize the DAG for safer schema evolution."
+ "(If not provided, assumed to be false.)",
required = false)
public Boolean saferSchemaEvolution = false;
}
}

View File

@@ -92,6 +92,8 @@ public class DeltaConfig implements Serializable {
private static String EXECUTE_ITR_COUNT = "execute_itr_count";
private static String VALIDATE_ARCHIVAL = "validate_archival";
private static String VALIDATE_CLEAN = "validate_clean";
private static String SCHEMA_VERSION = "schema_version";
private static String NUM_ROLLBACKS = "num_rollbacks";
private Map<String, Object> configsMap;
@@ -131,6 +133,14 @@ public class DeltaConfig implements Serializable {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 0).toString());
}
public int getSchemaVersion() {
return Integer.valueOf(configsMap.getOrDefault(SCHEMA_VERSION, Integer.MAX_VALUE).toString());
}
public int getNumRollbacks() {
return Integer.valueOf(configsMap.getOrDefault(NUM_ROLLBACKS, 1).toString());
}
public int getStartPartition() {
return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString());
}
@@ -140,7 +150,7 @@ public class DeltaConfig implements Serializable {
}
public int getNumUpsertFiles() {
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString());
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 1).toString());
}
public double getFractionUpsertPerFile() {
@@ -248,6 +258,16 @@ public class DeltaConfig implements Serializable {
return this;
}
public Builder withSchemaVersion(int version) {
this.configsMap.put(SCHEMA_VERSION, version);
return this;
}
public Builder withNumRollbacks(int numRollbacks) {
this.configsMap.put(NUM_ROLLBACKS, numRollbacks);
return this;
}
public Builder withNumUpsertFiles(int numUpsertFiles) {
this.configsMap.put(NUM_FILES_UPSERT, numUpsertFiles);
return this;

View File

@@ -18,6 +18,19 @@
package org.apache.hudi.integ.testsuite.dag;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -190,18 +203,24 @@ public class DagUtils {
private static List<Pair<String, Integer>> getHiveQueries(Entry<String, JsonNode> entry) {
List<Pair<String, Integer>> queries = new ArrayList<>();
Iterator<Entry<String, JsonNode>> queriesItr = entry.getValue().fields();
while (queriesItr.hasNext()) {
queries.add(Pair.of(queriesItr.next().getValue().textValue(), queriesItr.next().getValue().asInt()));
try {
List<JsonNode> flattened = new ArrayList<>();
flattened.add(entry.getValue());
queries = (List<Pair<String, Integer>>)getHiveQueryMapper().readValue(flattened.toString(), List.class);
} catch (Exception e) {
e.printStackTrace();
}
return queries;
}
private static List<String> getProperties(Entry<String, JsonNode> entry) {
List<String> properties = new ArrayList<>();
Iterator<Entry<String, JsonNode>> queriesItr = entry.getValue().fields();
while (queriesItr.hasNext()) {
properties.add(queriesItr.next().getValue().textValue());
try {
List<JsonNode> flattened = new ArrayList<>();
flattened.add(entry.getValue());
properties = (List<String>)getHivePropertyMapper().readValue(flattened.toString(), List.class);
} catch (Exception e) {
e.printStackTrace();
}
return properties;
}
@@ -226,6 +245,22 @@ public class DagUtils {
private static JsonNode createJsonNode(DagNode node, String type) throws IOException {
JsonNode configNode = MAPPER.readTree(node.getConfig().toString());
JsonNode jsonNode = MAPPER.createObjectNode();
Iterator<Entry<String, JsonNode>> itr = configNode.fields();
while (itr.hasNext()) {
Entry<String, JsonNode> entry = itr.next();
switch (entry.getKey()) {
case DeltaConfig.Config.HIVE_QUERIES:
((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_QUERIES,
MAPPER.readTree(getHiveQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
break;
case DeltaConfig.Config.HIVE_PROPERTIES:
((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_PROPERTIES,
MAPPER.readTree(getHivePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
break;
default:
break;
}
}
((ObjectNode) jsonNode).put(DeltaConfig.Config.CONFIG_NAME, configNode);
((ObjectNode) jsonNode).put(DeltaConfig.Config.TYPE, type);
((ObjectNode) jsonNode).put(DeltaConfig.Config.DEPENDENCIES, getDependencyNames(node));
@@ -248,4 +283,101 @@ public class DagUtils {
return result.toString("utf-8");
}
private static ObjectMapper getHiveQueryMapper() {
SimpleModule module = new SimpleModule();
ObjectMapper queryMapper = new ObjectMapper();
module.addSerializer(List.class, new HiveQuerySerializer());
module.addDeserializer(List.class, new HiveQueryDeserializer());
queryMapper.registerModule(module);
return queryMapper;
}
private static final class HiveQuerySerializer extends JsonSerializer<List> {
Integer index = 0;
@Override
public void serialize(List pairs, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
for (Pair pair : (List<Pair>)pairs) {
gen.writeStringField("query" + index, pair.getLeft().toString());
gen.writeNumberField("result" + index, Integer.parseInt(pair.getRight().toString()));
index++;
}
gen.writeEndObject();
}
}
private static final class HiveQueryDeserializer extends JsonDeserializer<List> {
@Override
public List deserialize(JsonParser parser, DeserializationContext context) throws IOException {
List<Pair<String, Integer>> pairs = new ArrayList<>();
String query = "";
Integer result = 0;
// [{query0:<query>, result0:<result>,query1:<query>, result1:<result>}]
while (!parser.isClosed()) {
JsonToken jsonToken = parser.nextToken();
if (jsonToken.equals(JsonToken.END_ARRAY)) {
break;
}
if (JsonToken.FIELD_NAME.equals(jsonToken)) {
String fieldName = parser.getCurrentName();
parser.nextToken();
if (fieldName.contains("query")) {
query = parser.getValueAsString();
} else if (fieldName.contains("result")) {
result = parser.getValueAsInt();
pairs.add(Pair.of(query, result));
}
}
}
return pairs;
}
}
private static ObjectMapper getHivePropertyMapper() {
SimpleModule module = new SimpleModule();
ObjectMapper propMapper = new ObjectMapper();
module.addSerializer(List.class, new HivePropertySerializer());
module.addDeserializer(List.class, new HivePropertyDeserializer());
propMapper.registerModule(module);
return propMapper;
}
private static final class HivePropertySerializer extends JsonSerializer<List> {
Integer index = 0;
@Override
public void serialize(List props, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
for (String prop : (List<String>)props) {
gen.writeStringField("prop" + index, prop);
index++;
}
gen.writeEndObject();
}
}
private static final class HivePropertyDeserializer extends JsonDeserializer<List> {
@Override
public List deserialize(JsonParser parser, DeserializationContext context) throws IOException {
List<String> props = new ArrayList<>();
String prop = "";
// [{prop0:<property>,...}]
while (!parser.isClosed()) {
JsonToken jsonToken = parser.nextToken();
if (jsonToken.equals(JsonToken.END_ARRAY)) {
break;
}
if (JsonToken.FIELD_NAME.equals(jsonToken)) {
String fieldName = parser.getCurrentName();
parser.nextToken();
if (parser.getCurrentName().contains("prop")) {
prop = parser.getValueAsString();
props.add(prop);
}
}
}
return props;
}
}
}

View File

@@ -46,22 +46,26 @@ public class RollbackNode extends DagNode<Option<HoodieInstant>> {
*/
@Override
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing rollback node {}", this.getName());
int numRollbacks = config.getNumRollbacks();
log.info(String.format("Executing rollback node %s with %d rollbacks", this.getName(), numRollbacks));
// Can only be done with an instantiation of a new WriteClient hence cannot be done during DeltaStreamer
// testing for now
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
.build();
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
if (lastInstant.isPresent()) {
log.info("Rolling back last instant {}", lastInstant.get());
log.info("Cleaning up generated data for the instant being rolled back {}", lastInstant.get());
ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR,
DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()), "Test Suite only supports DFSTestSuitePathSelector");
executionContext.getHoodieTestSuiteWriter().getWriteClient(this).rollback(lastInstant.get().getTimestamp());
metaClient.getFs().delete(new Path(executionContext.getWriterContext().getCfg().inputBasePath,
executionContext.getWriterContext().getHoodieTestSuiteWriter().getLastCheckpoint().orElse("")), true);
this.result = lastInstant;
for (int i = 0; i < numRollbacks; i++) {
metaClient.reloadActiveTimeline();
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
if (lastInstant.isPresent()) {
log.info("Rolling back last instant {}", lastInstant.get());
log.info("Cleaning up generated data for the instant being rolled back {}", lastInstant.get());
ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR,
DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()), "Test Suite only supports DFSTestSuitePathSelector");
executionContext.getHoodieTestSuiteWriter().getWriteClient(this).rollback(lastInstant.get().getTimestamp());
metaClient.getFs().delete(new Path(executionContext.getWriterContext().getCfg().inputBasePath,
executionContext.getWriterContext().getHoodieTestSuiteWriter().getLastCheckpoint().orElse("")), true);
this.result = lastInstant;
}
}
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
import org.apache.hudi.metrics.Metrics;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.runners.Suite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,7 +130,7 @@ public class DagScheduler {
*
* @param node The node to be executed
*/
private void executeNode(DagNode node, int curRound) {
protected void executeNode(DagNode node, int curRound) {
if (node.isCompleted()) {
throw new RuntimeException("DagNode already completed! Cannot re-execute");
}

View File

@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.dag.scheduler;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
public class SaferSchemaDagScheduler extends DagScheduler {
private static Logger LOG = LogManager.getLogger(SaferSchemaDagScheduler.class);
int processedVersion;
public SaferSchemaDagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc) {
super(workflowDag, writerContext, jsc);
}
public SaferSchemaDagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc, int version) {
super(workflowDag, writerContext, jsc);
processedVersion = version;
}
@Override
protected void executeNode(DagNode node, int curRound) throws HoodieException {
if (node.getConfig().getSchemaVersion() < processedVersion) {
LOG.info(String.format("----------------- Processed SaferSchema version %d is available. "
+ "Skipping redundant Insert Operation. (Processed = %d) -----------------",
node.getConfig().getSchemaVersion(), processedVersion));
return;
}
super.executeNode(node, curRound);
}
}

View File

@@ -140,7 +140,7 @@ public class DeltaGenerator implements Serializable {
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, numPartitions)
.mapPartitionsWithIndex((index, p) -> {
return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions));
minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions, startPartition));
}, true);
if (deltaOutputConfig.getInputParallelism() < numPartitions) {

View File

@@ -44,18 +44,20 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
private String firstPartitionPathField;
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) {
this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null,
GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS,
GenericRecordFullPayloadGenerator.DEFAULT_START_PARTITION);
}
public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr,
List<String> partitionPathFieldNames, int numPartitions) {
List<String> partitionPathFieldNames, int numPartitions, int startPartition) {
this.counter = maxEntriesToProduce;
this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames);
if(partitionPathFieldNames != null && partitionPathFieldNames.size() > 0) {
this.firstPartitionPathField = partitionPathFieldNames.get(0);
}
Schema schema = new Schema.Parser().parse(schemaStr);
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions);
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions, startPartition);
}
@Override

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.integ.testsuite.generator;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
@@ -26,6 +27,7 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Fixed;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.junit.runners.Suite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +52,8 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
public static final int DEFAULT_NUM_DATE_PARTITIONS = 50;
public static final String DEFAULT_HOODIE_IS_DELETED_COL = "_hoodie_is_deleted";
public static final int DEFAULT_START_PARTITION = 0;
protected final Random random = new Random();
// The source schema used to generate a payload
private final transient Schema baseSchema;
@@ -61,6 +65,8 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
private int estimatedFullPayloadSize;
// Number of extra entries to add in a complex/collection field to achieve the desired record size
Map<String, Integer> extraEntriesMap = new HashMap<>();
// Start partition - default 0
private int startPartition = DEFAULT_START_PARTITION;
// The number of unique dates to create
private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS;
@@ -77,9 +83,11 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
this(schema, DEFAULT_PAYLOAD_SIZE);
}
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) {
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize,
int numDatePartitions, int startPartition) {
this(schema, minPayloadSize);
this.numDatePartitions = numDatePartitions;
this.startPartition = startPartition;
}
public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) {
@@ -329,9 +337,10 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
* Note: When generating records, number of records to be generated must be more than numDatePartitions * parallelism,
* to guarantee that at least numDatePartitions are created.
*/
@VisibleForTesting
public GenericRecord updateTimestamp(GenericRecord record, String fieldName) {
long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex % numDatePartitions, TimeUnit.DAYS);
record.put(fieldName, (System.currentTimeMillis() - delta)/1000);
long delta = TimeUnit.SECONDS.convert((++partitionIndex % numDatePartitions) + startPartition, TimeUnit.DAYS);
record.put(fieldName, delta);
return record;
}

View File

@@ -47,7 +47,9 @@ public class HiveSyncDagGeneratorMOR implements WorkflowDagGenerator {
root.addChildNode(child1);
DagNode child2 = new HiveQueryNode(Config.newBuilder().withHiveLocal(true).withHiveQueryAndResults(Arrays
.asList(Pair.of("select " + "count(*) from testdb1.table1_rt group " + "by rider having count(*) < 1", 0)))
.asList(Pair.of("select " + "count(*) from testdb1.hive_trips group " + "by rider having count(*) < 1", 0),
Pair.of("select " + "count(*) from testdb1.hive_trips ", 100)))
.withHiveProperties(Arrays.asList("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat"))
.build());
child1.addChildNode(child2);

View File

@@ -43,6 +43,40 @@ public class TestDagUtils {
System.out.println(yaml);
}
@Test
public void testConvertDagToYamlHiveQuery() throws Exception {
WorkflowDag dag = new HiveSyncDagGenerator().build();
DagNode insert1 = (DagNode) dag.getNodeList().get(0);
DagNode hiveSync1 = (DagNode)insert1.getChildNodes().get(0);
DagNode hiveQuery1 = (DagNode)hiveSync1.getChildNodes().get(0);
String yaml = DagUtils.convertDagToYaml(dag);
WorkflowDag dag2 = DagUtils.convertYamlToDag(yaml);
DagNode insert2 = (DagNode) dag2.getNodeList().get(0);
DagNode hiveSync2 = (DagNode)insert2.getChildNodes().get(0);
DagNode hiveQuery2 = (DagNode)hiveSync2.getChildNodes().get(0);
assertEquals(hiveQuery1.getConfig().getHiveQueries().get(0),
hiveQuery2.getConfig().getHiveQueries().get(0));
assertEquals(hiveQuery1.getConfig().getHiveProperties().get(0),
hiveQuery2.getConfig().getHiveProperties().get(0));
}
@Test
public void testConvertDagToYamlAndBack() throws Exception {
final ComplexDagGenerator dag = new ComplexDagGenerator();
final WorkflowDag originalWorkflowDag = dag.build();
final String yaml = DagUtils.convertDagToYaml(dag.build());
final WorkflowDag regeneratedWorkflowDag = DagUtils.convertYamlToDag(yaml);
final List<DagNode> originalWorkflowDagNodes = originalWorkflowDag.getNodeList();
final List<DagNode> regeneratedWorkflowDagNodes = regeneratedWorkflowDag.getNodeList();
assertEquals(originalWorkflowDagNodes.size(), regeneratedWorkflowDagNodes.size());
assertEquals(originalWorkflowDagNodes.get(0).getChildNodes().size(),
regeneratedWorkflowDagNodes.get(0).getChildNodes().size());
}
@Test
public void testConvertYamlToDag() throws Exception {
WorkflowDag dag = DagUtils.convertYamlToDag(UtilitiesTestBase.Helpers

View File

@@ -144,9 +144,8 @@ public class TestGenericRecordPayloadGenerator {
List<Long> insertTimeStamps = new ArrayList<>();
List<Long> updateTimeStamps = new ArrayList<>();
List<GenericRecord> records = new ArrayList<>();
Long startMillis = System.currentTimeMillis() - TimeUnit.MILLISECONDS
.convert(GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS, TimeUnit.DAYS);
Long startSeconds = 0L;
Long endSeconds = TimeUnit.SECONDS.convert(10, TimeUnit.DAYS);
// Generate 10 new records
IntStream.range(0, 10).forEach(a -> {
GenericRecord record = payloadGenerator.getNewPayloadWithTimestamp("timestamp");
@@ -165,7 +164,6 @@ public class TestGenericRecordPayloadGenerator {
assertTrue(insertRowKeys.containsAll(updateRowKeys));
// The timestamp field for the insert payloads should not all match with the update payloads
assertFalse(insertTimeStamps.containsAll(updateTimeStamps));
Long currentMillis = System.currentTimeMillis();
assertTrue(insertTimeStamps.stream().allMatch(t -> t >= startMillis && t <= currentMillis));
assertTrue(insertTimeStamps.stream().allMatch(t -> t >= startSeconds && t <= endSeconds));
}
}