From d8af24d8a2fdbead4592a36df1bd9dda333f1513 Mon Sep 17 00:00:00 2001 From: Balajee Nagasubramaniam <47542891+nbalajee@users.noreply.github.com> Date: Tue, 9 Mar 2021 13:29:38 -0800 Subject: [PATCH] [HUDI-1635] Improvements to Hudi Test Suite (#2628) --- hudi-integ-test/README.md | 18 ++- .../integ/testsuite/HoodieTestSuiteJob.java | 61 +++++++- .../testsuite/configuration/DeltaConfig.java | 22 ++- .../hudi/integ/testsuite/dag/DagUtils.java | 144 +++++++++++++++++- .../testsuite/dag/nodes/RollbackNode.java | 26 ++-- .../testsuite/dag/scheduler/DagScheduler.java | 3 +- .../scheduler/SaferSchemaDagScheduler.java | 53 +++++++ .../testsuite/generator/DeltaGenerator.java | 2 +- ...lexibleSchemaRecordGenerationIterator.java | 8 +- .../GenericRecordFullPayloadGenerator.java | 15 +- .../dag/HiveSyncDagGeneratorMOR.java | 4 +- .../integ/testsuite/dag/TestDagUtils.java | 34 +++++ .../TestGenericRecordPayloadGenerator.java | 8 +- 13 files changed, 360 insertions(+), 38 deletions(-) create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/SaferSchemaDagScheduler.java diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index 06de263e9..0017f8a29 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -471,4 +471,20 @@ hdfs dfs -rm -r /user/hive/warehouse/hudi-integ-test-suite/input/ As of now, "ValidateDatasetNode" uses spark data source and hive tables for comparison. Hence COW and real time view in MOR can be tested. - \ No newline at end of file +To run test suite jobs for validating all versions of schema, a DAG with insert, upsert nodes can be supplied with every version of schema to be evaluated, with "--saferSchemaEvolution" flag indicating the job is for schema validations. First run of the job will populate the dataset with data files with every version of schema and perform an upsert operation for verifying schema evolution. + +Second and subsequent runs will verify that the data can be inserted with latest version of schema and perform an upsert operation to evolve all older version of schema (created by older run) to the latest version of schema. + +Sample DAG: +``` +rollback with num_rollbacks = 2 +insert with schema_version = +.... +upsert with fraction_upsert_per_file = 0.5 +``` + +Spark submit with the flag: +``` +--saferSchemaEvolution +``` + diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 00f4d1dff..da7953f98 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -18,17 +18,26 @@ package org.apache.hudi.integ.testsuite; +import org.apache.avro.Schema; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.integ.testsuite.dag.DagUtils; import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator; import org.apache.hudi.integ.testsuite.dag.WriterContext; +import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode; import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler; +import org.apache.hudi.integ.testsuite.dag.scheduler.SaferSchemaDagScheduler; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; import org.apache.hudi.keygen.BuiltinKeyGenerator; @@ -47,6 +56,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; +import java.util.Map; /** * This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency @@ -79,6 +90,7 @@ public class HoodieTestSuiteJob { private transient HiveConf hiveConf; private BuiltinKeyGenerator keyGenerator; + private transient HoodieTableMetaClient metaClient; public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException { log.warn("Running spark job w/ app id " + jsc.sc().applicationId()); @@ -92,13 +104,11 @@ public class HoodieTestSuiteJob { this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration()); this.keyGenerator = (BuiltinKeyGenerator) DataSourceUtils.createKeyGenerator(props); - if (!fs.exists(new Path(cfg.targetBasePath))) { - HoodieTableMetaClient.withPropertyBuilder() + metaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) .setTableName(cfg.targetTableName) .setArchiveLogFolder("archived") .initTable(jsc.hadoopConfiguration(), cfg.targetBasePath); - } if (cfg.cleanInput) { Path inputPath = new Path(cfg.inputBasePath); @@ -115,6 +125,28 @@ public class HoodieTestSuiteJob { } } + int getSchemaVersionFromCommit(int nthCommit) throws Exception { + int version = 0; + try { + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitsTimeline(); + // Pickup the schema version from nth commit from last (most recent insert/upsert will be rolled back). + HoodieInstant prevInstant = timeline.nthFromLastInstant(nthCommit).get(); + HoodieCommitMetadata commit = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(prevInstant).get(), + HoodieCommitMetadata.class); + Map extraMetadata = commit.getExtraMetadata(); + String avroSchemaStr = extraMetadata.get(HoodieCommitMetadata.SCHEMA_KEY); + Schema avroSchema = new Schema.Parser().parse(avroSchemaStr); + version = Integer.parseInt(avroSchema.getObjectProp("schemaVersion").toString()); + // DAG will generate & ingest data for 2 versions (n-th version being validated, n-1). + log.info(String.format("Last used schemaVersion from latest commit file was %d. Optimizing the DAG.", version)); + } catch (Exception e) { + // failed to open the commit to read schema version. + // continue executing the DAG without any changes. + log.info("Last used schemaVersion could not be validated from commit file. Skipping SaferSchema Optimization."); + } + return version; + } + private static HiveConf getDefaultHiveConf(Configuration cfg) { HiveConf hiveConf = new HiveConf(); hiveConf.addResource(cfg); @@ -150,8 +182,22 @@ public class HoodieTestSuiteJob { long startTime = System.currentTimeMillis(); WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession); writerContext.initContext(jsc); - DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc); - dagScheduler.schedule(); + if (this.cfg.saferSchemaEvolution) { + int numRollbacks = 2; // rollback most recent upsert/insert, by default. + + // if root is RollbackNode, get num_rollbacks + List root = workflowDag.getNodeList(); + if (!root.isEmpty() && root.get(0) instanceof RollbackNode) { + numRollbacks = root.get(0).getConfig().getNumRollbacks(); + } + + int version = getSchemaVersionFromCommit(numRollbacks - 1); + SaferSchemaDagScheduler dagScheduler = new SaferSchemaDagScheduler(workflowDag, writerContext, jsc, version); + dagScheduler.schedule(); + } else { + DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc); + dagScheduler.schedule(); + } log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime); } catch (Exception e) { log.error("Failed to run Test Suite ", e); @@ -211,5 +257,10 @@ public class HoodieTestSuiteJob { @Parameter(names = {"--clean-output"}, description = "Clean the output folders and delete all files within it " + "before starting the Job") public Boolean cleanOutput = false; + + @Parameter(names = {"--saferSchemaEvolution"}, description = "Optimize the DAG for safer schema evolution." + + "(If not provided, assumed to be false.)", + required = false) + public Boolean saferSchemaEvolution = false; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 193bf2ca1..9e29c9b10 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -92,6 +92,8 @@ public class DeltaConfig implements Serializable { private static String EXECUTE_ITR_COUNT = "execute_itr_count"; private static String VALIDATE_ARCHIVAL = "validate_archival"; private static String VALIDATE_CLEAN = "validate_clean"; + private static String SCHEMA_VERSION = "schema_version"; + private static String NUM_ROLLBACKS = "num_rollbacks"; private Map configsMap; @@ -131,6 +133,14 @@ public class DeltaConfig implements Serializable { return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 0).toString()); } + public int getSchemaVersion() { + return Integer.valueOf(configsMap.getOrDefault(SCHEMA_VERSION, Integer.MAX_VALUE).toString()); + } + + public int getNumRollbacks() { + return Integer.valueOf(configsMap.getOrDefault(NUM_ROLLBACKS, 1).toString()); + } + public int getStartPartition() { return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString()); } @@ -140,7 +150,7 @@ public class DeltaConfig implements Serializable { } public int getNumUpsertFiles() { - return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString()); + return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 1).toString()); } public double getFractionUpsertPerFile() { @@ -248,6 +258,16 @@ public class DeltaConfig implements Serializable { return this; } + public Builder withSchemaVersion(int version) { + this.configsMap.put(SCHEMA_VERSION, version); + return this; + } + + public Builder withNumRollbacks(int numRollbacks) { + this.configsMap.put(NUM_ROLLBACKS, numRollbacks); + return this; + } + public Builder withNumUpsertFiles(int numUpsertFiles) { this.configsMap.put(NUM_FILES_UPSERT, numUpsertFiles); return this; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java index 1211c0098..700a5b820 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java @@ -18,6 +18,19 @@ package org.apache.hudi.integ.testsuite.dag; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; +import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; + import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -190,18 +203,24 @@ public class DagUtils { private static List> getHiveQueries(Entry entry) { List> queries = new ArrayList<>(); - Iterator> queriesItr = entry.getValue().fields(); - while (queriesItr.hasNext()) { - queries.add(Pair.of(queriesItr.next().getValue().textValue(), queriesItr.next().getValue().asInt())); + try { + List flattened = new ArrayList<>(); + flattened.add(entry.getValue()); + queries = (List>)getHiveQueryMapper().readValue(flattened.toString(), List.class); + } catch (Exception e) { + e.printStackTrace(); } return queries; } private static List getProperties(Entry entry) { List properties = new ArrayList<>(); - Iterator> queriesItr = entry.getValue().fields(); - while (queriesItr.hasNext()) { - properties.add(queriesItr.next().getValue().textValue()); + try { + List flattened = new ArrayList<>(); + flattened.add(entry.getValue()); + properties = (List)getHivePropertyMapper().readValue(flattened.toString(), List.class); + } catch (Exception e) { + e.printStackTrace(); } return properties; } @@ -226,6 +245,22 @@ public class DagUtils { private static JsonNode createJsonNode(DagNode node, String type) throws IOException { JsonNode configNode = MAPPER.readTree(node.getConfig().toString()); JsonNode jsonNode = MAPPER.createObjectNode(); + Iterator> itr = configNode.fields(); + while (itr.hasNext()) { + Entry entry = itr.next(); + switch (entry.getKey()) { + case DeltaConfig.Config.HIVE_QUERIES: + ((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_QUERIES, + MAPPER.readTree(getHiveQueryMapper().writeValueAsString(node.getConfig().getHiveQueries()))); + break; + case DeltaConfig.Config.HIVE_PROPERTIES: + ((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_PROPERTIES, + MAPPER.readTree(getHivePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties()))); + break; + default: + break; + } + } ((ObjectNode) jsonNode).put(DeltaConfig.Config.CONFIG_NAME, configNode); ((ObjectNode) jsonNode).put(DeltaConfig.Config.TYPE, type); ((ObjectNode) jsonNode).put(DeltaConfig.Config.DEPENDENCIES, getDependencyNames(node)); @@ -248,4 +283,101 @@ public class DagUtils { return result.toString("utf-8"); } + private static ObjectMapper getHiveQueryMapper() { + SimpleModule module = new SimpleModule(); + ObjectMapper queryMapper = new ObjectMapper(); + module.addSerializer(List.class, new HiveQuerySerializer()); + module.addDeserializer(List.class, new HiveQueryDeserializer()); + queryMapper.registerModule(module); + return queryMapper; + } + + private static final class HiveQuerySerializer extends JsonSerializer { + Integer index = 0; + @Override + public void serialize(List pairs, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + for (Pair pair : (List)pairs) { + gen.writeStringField("query" + index, pair.getLeft().toString()); + gen.writeNumberField("result" + index, Integer.parseInt(pair.getRight().toString())); + index++; + } + gen.writeEndObject(); + } + } + + private static final class HiveQueryDeserializer extends JsonDeserializer { + @Override + public List deserialize(JsonParser parser, DeserializationContext context) throws IOException { + List> pairs = new ArrayList<>(); + String query = ""; + Integer result = 0; + // [{query0:, result0:,query1:, result1:}] + while (!parser.isClosed()) { + JsonToken jsonToken = parser.nextToken(); + if (jsonToken.equals(JsonToken.END_ARRAY)) { + break; + } + if (JsonToken.FIELD_NAME.equals(jsonToken)) { + String fieldName = parser.getCurrentName(); + parser.nextToken(); + + if (fieldName.contains("query")) { + query = parser.getValueAsString(); + } else if (fieldName.contains("result")) { + result = parser.getValueAsInt(); + pairs.add(Pair.of(query, result)); + } + } + } + return pairs; + } + } + + private static ObjectMapper getHivePropertyMapper() { + SimpleModule module = new SimpleModule(); + ObjectMapper propMapper = new ObjectMapper(); + module.addSerializer(List.class, new HivePropertySerializer()); + module.addDeserializer(List.class, new HivePropertyDeserializer()); + propMapper.registerModule(module); + return propMapper; + } + + private static final class HivePropertySerializer extends JsonSerializer { + Integer index = 0; + @Override + public void serialize(List props, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + for (String prop : (List)props) { + gen.writeStringField("prop" + index, prop); + index++; + } + gen.writeEndObject(); + } + } + + private static final class HivePropertyDeserializer extends JsonDeserializer { + @Override + public List deserialize(JsonParser parser, DeserializationContext context) throws IOException { + List props = new ArrayList<>(); + String prop = ""; + // [{prop0:,...}] + while (!parser.isClosed()) { + JsonToken jsonToken = parser.nextToken(); + if (jsonToken.equals(JsonToken.END_ARRAY)) { + break; + } + if (JsonToken.FIELD_NAME.equals(jsonToken)) { + String fieldName = parser.getCurrentName(); + parser.nextToken(); + + if (parser.getCurrentName().contains("prop")) { + prop = parser.getValueAsString(); + props.add(prop); + } + } + } + return props; + } + } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java index c8cb62846..1e9be1b0a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java @@ -46,22 +46,26 @@ public class RollbackNode extends DagNode> { */ @Override public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { - log.info("Executing rollback node {}", this.getName()); + int numRollbacks = config.getNumRollbacks(); + log.info(String.format("Executing rollback node %s with %d rollbacks", this.getName(), numRollbacks)); // Can only be done with an instantiation of a new WriteClient hence cannot be done during DeltaStreamer // testing for now HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath) .build(); - Option lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant(); - if (lastInstant.isPresent()) { - log.info("Rolling back last instant {}", lastInstant.get()); - log.info("Cleaning up generated data for the instant being rolled back {}", lastInstant.get()); - ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, - DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()), "Test Suite only supports DFSTestSuitePathSelector"); - executionContext.getHoodieTestSuiteWriter().getWriteClient(this).rollback(lastInstant.get().getTimestamp()); - metaClient.getFs().delete(new Path(executionContext.getWriterContext().getCfg().inputBasePath, - executionContext.getWriterContext().getHoodieTestSuiteWriter().getLastCheckpoint().orElse("")), true); - this.result = lastInstant; + for (int i = 0; i < numRollbacks; i++) { + metaClient.reloadActiveTimeline(); + Option lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant(); + if (lastInstant.isPresent()) { + log.info("Rolling back last instant {}", lastInstant.get()); + log.info("Cleaning up generated data for the instant being rolled back {}", lastInstant.get()); + ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, + DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()), "Test Suite only supports DFSTestSuitePathSelector"); + executionContext.getHoodieTestSuiteWriter().getWriteClient(this).rollback(lastInstant.get().getTimestamp()); + metaClient.getFs().delete(new Path(executionContext.getWriterContext().getCfg().inputBasePath, + executionContext.getWriterContext().getHoodieTestSuiteWriter().getLastCheckpoint().orElse("")), true); + this.result = lastInstant; + } } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index 34cb9bc0a..f65862889 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -27,6 +27,7 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode; import org.apache.hudi.metrics.Metrics; import org.apache.spark.api.java.JavaSparkContext; +import org.junit.runners.Suite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,7 +130,7 @@ public class DagScheduler { * * @param node The node to be executed */ - private void executeNode(DagNode node, int curRound) { + protected void executeNode(DagNode node, int curRound) { if (node.isCompleted()) { throw new RuntimeException("DagNode already completed! Cannot re-execute"); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/SaferSchemaDagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/SaferSchemaDagScheduler.java new file mode 100644 index 000000000..ba89675ad --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/SaferSchemaDagScheduler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.scheduler; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler; +import org.apache.hudi.integ.testsuite.dag.WorkflowDag; +import org.apache.hudi.integ.testsuite.dag.WriterContext; +import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +public class SaferSchemaDagScheduler extends DagScheduler { + private static Logger LOG = LogManager.getLogger(SaferSchemaDagScheduler.class); + int processedVersion; + + public SaferSchemaDagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc) { + super(workflowDag, writerContext, jsc); + } + + public SaferSchemaDagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc, int version) { + super(workflowDag, writerContext, jsc); + processedVersion = version; + } + + @Override + protected void executeNode(DagNode node, int curRound) throws HoodieException { + if (node.getConfig().getSchemaVersion() < processedVersion) { + LOG.info(String.format("----------------- Processed SaferSchema version %d is available. " + + "Skipping redundant Insert Operation. (Processed = %d) -----------------", + node.getConfig().getSchemaVersion(), processedVersion)); + return; + } + super.executeNode(node, curRound); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index 30b2d6ce0..258b6b7d1 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -140,7 +140,7 @@ public class DeltaGenerator implements Serializable { JavaRDD inputBatch = jsc.parallelize(partitionIndexes, numPartitions) .mapPartitionsWithIndex((index, p) -> { return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition, - minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions)); + minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions, startPartition)); }, true); if (deltaOutputConfig.getInputParallelism() < numPartitions) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java index 00928f3c1..cd46b10b4 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java @@ -44,18 +44,20 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator partitionPathFieldNames, int numPartitions) { + List partitionPathFieldNames, int numPartitions, int startPartition) { this.counter = maxEntriesToProduce; this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames); if(partitionPathFieldNames != null && partitionPathFieldNames.size() > 0) { this.firstPartitionPathField = partitionPathFieldNames.get(0); } Schema schema = new Schema.Parser().parse(schemaStr); - this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions); + this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions, startPartition); } @Override diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java index c900a8e68..d7d2e87eb 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java @@ -18,6 +18,7 @@ package org.apache.hudi.integ.testsuite.generator; +import com.google.common.annotations.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.avro.Schema; @@ -26,6 +27,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Fixed; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; +import org.junit.runners.Suite; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +52,8 @@ public class GenericRecordFullPayloadGenerator implements Serializable { public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB public static final int DEFAULT_NUM_DATE_PARTITIONS = 50; public static final String DEFAULT_HOODIE_IS_DELETED_COL = "_hoodie_is_deleted"; + public static final int DEFAULT_START_PARTITION = 0; + protected final Random random = new Random(); // The source schema used to generate a payload private final transient Schema baseSchema; @@ -61,6 +65,8 @@ public class GenericRecordFullPayloadGenerator implements Serializable { private int estimatedFullPayloadSize; // Number of extra entries to add in a complex/collection field to achieve the desired record size Map extraEntriesMap = new HashMap<>(); + // Start partition - default 0 + private int startPartition = DEFAULT_START_PARTITION; // The number of unique dates to create private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS; @@ -77,9 +83,11 @@ public class GenericRecordFullPayloadGenerator implements Serializable { this(schema, DEFAULT_PAYLOAD_SIZE); } - public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) { + public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, + int numDatePartitions, int startPartition) { this(schema, minPayloadSize); this.numDatePartitions = numDatePartitions; + this.startPartition = startPartition; } public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) { @@ -329,9 +337,10 @@ public class GenericRecordFullPayloadGenerator implements Serializable { * Note: When generating records, number of records to be generated must be more than numDatePartitions * parallelism, * to guarantee that at least numDatePartitions are created. */ + @VisibleForTesting public GenericRecord updateTimestamp(GenericRecord record, String fieldName) { - long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex % numDatePartitions, TimeUnit.DAYS); - record.put(fieldName, (System.currentTimeMillis() - delta)/1000); + long delta = TimeUnit.SECONDS.convert((++partitionIndex % numDatePartitions) + startPartition, TimeUnit.DAYS); + record.put(fieldName, delta); return record; } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/HiveSyncDagGeneratorMOR.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/HiveSyncDagGeneratorMOR.java index 07dd467a2..4082d2962 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/HiveSyncDagGeneratorMOR.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/HiveSyncDagGeneratorMOR.java @@ -47,7 +47,9 @@ public class HiveSyncDagGeneratorMOR implements WorkflowDagGenerator { root.addChildNode(child1); DagNode child2 = new HiveQueryNode(Config.newBuilder().withHiveLocal(true).withHiveQueryAndResults(Arrays - .asList(Pair.of("select " + "count(*) from testdb1.table1_rt group " + "by rider having count(*) < 1", 0))) + .asList(Pair.of("select " + "count(*) from testdb1.hive_trips group " + "by rider having count(*) < 1", 0), + Pair.of("select " + "count(*) from testdb1.hive_trips ", 100))) + .withHiveProperties(Arrays.asList("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat")) .build()); child1.addChildNode(child2); diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java index 70e6da7d3..5267e8876 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java @@ -43,6 +43,40 @@ public class TestDagUtils { System.out.println(yaml); } + @Test + public void testConvertDagToYamlHiveQuery() throws Exception { + WorkflowDag dag = new HiveSyncDagGenerator().build(); + DagNode insert1 = (DagNode) dag.getNodeList().get(0); + DagNode hiveSync1 = (DagNode)insert1.getChildNodes().get(0); + DagNode hiveQuery1 = (DagNode)hiveSync1.getChildNodes().get(0); + + String yaml = DagUtils.convertDagToYaml(dag); + + WorkflowDag dag2 = DagUtils.convertYamlToDag(yaml); + DagNode insert2 = (DagNode) dag2.getNodeList().get(0); + DagNode hiveSync2 = (DagNode)insert2.getChildNodes().get(0); + DagNode hiveQuery2 = (DagNode)hiveSync2.getChildNodes().get(0); + assertEquals(hiveQuery1.getConfig().getHiveQueries().get(0), + hiveQuery2.getConfig().getHiveQueries().get(0)); + assertEquals(hiveQuery1.getConfig().getHiveProperties().get(0), + hiveQuery2.getConfig().getHiveProperties().get(0)); + } + + @Test + public void testConvertDagToYamlAndBack() throws Exception { + final ComplexDagGenerator dag = new ComplexDagGenerator(); + final WorkflowDag originalWorkflowDag = dag.build(); + final String yaml = DagUtils.convertDagToYaml(dag.build()); + final WorkflowDag regeneratedWorkflowDag = DagUtils.convertYamlToDag(yaml); + + final List originalWorkflowDagNodes = originalWorkflowDag.getNodeList(); + final List regeneratedWorkflowDagNodes = regeneratedWorkflowDag.getNodeList(); + + assertEquals(originalWorkflowDagNodes.size(), regeneratedWorkflowDagNodes.size()); + assertEquals(originalWorkflowDagNodes.get(0).getChildNodes().size(), + regeneratedWorkflowDagNodes.get(0).getChildNodes().size()); + } + @Test public void testConvertYamlToDag() throws Exception { WorkflowDag dag = DagUtils.convertYamlToDag(UtilitiesTestBase.Helpers diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java index 2b3a65c71..53340ac98 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java @@ -144,9 +144,8 @@ public class TestGenericRecordPayloadGenerator { List insertTimeStamps = new ArrayList<>(); List updateTimeStamps = new ArrayList<>(); List records = new ArrayList<>(); - Long startMillis = System.currentTimeMillis() - TimeUnit.MILLISECONDS - .convert(GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS, TimeUnit.DAYS); - + Long startSeconds = 0L; + Long endSeconds = TimeUnit.SECONDS.convert(10, TimeUnit.DAYS); // Generate 10 new records IntStream.range(0, 10).forEach(a -> { GenericRecord record = payloadGenerator.getNewPayloadWithTimestamp("timestamp"); @@ -165,7 +164,6 @@ public class TestGenericRecordPayloadGenerator { assertTrue(insertRowKeys.containsAll(updateRowKeys)); // The timestamp field for the insert payloads should not all match with the update payloads assertFalse(insertTimeStamps.containsAll(updateTimeStamps)); - Long currentMillis = System.currentTimeMillis(); - assertTrue(insertTimeStamps.stream().allMatch(t -> t >= startMillis && t <= currentMillis)); + assertTrue(insertTimeStamps.stream().allMatch(t -> t >= startSeconds && t <= endSeconds)); } }