diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index de8000c90..2c4b73a6d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -26,21 +26,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; import org.apache.hudi.integ.testsuite.dag.DagUtils; import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator; import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler; -import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; +import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; @@ -64,10 +62,6 @@ public class HoodieTestSuiteJob { * Bag of properties with source, hoodie client, key generator etc. */ TypedProperties props; - /** - * Schema provider that supplies the command for writing out the generated payloads. - */ - private transient SchemaProvider schemaProvider; /** * Filesystem used. */ @@ -95,7 +89,6 @@ public class HoodieTestSuiteJob { this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration()); this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); log.info("Creating workload generator with configs : {}", props.toString()); - this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc); this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration()); this.keyGenerator = (BuiltinKeyGenerator) DataSourceUtils.createKeyGenerator(props); @@ -138,13 +131,9 @@ public class HoodieTestSuiteJob { WorkflowDag workflowDag = createWorkflowDag(); log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag)); long startTime = System.currentTimeMillis(); - String schemaStr = schemaProvider.getSourceSchema().toString(); - final HoodieTestSuiteWriter writer = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr); - final DeltaGenerator deltaGenerator = new DeltaGenerator( - new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName), - new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath, - schemaStr, cfg.limitFileSize), jsc, sparkSession, schemaStr, keyGenerator); - DagScheduler dagScheduler = new DagScheduler(workflowDag, writer, deltaGenerator); + WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession); + writerContext.initContext(jsc); + DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext); dagScheduler.schedule(); log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime); } catch (Exception e) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 30fa58442..f20f84e47 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -82,6 +82,7 @@ public class DeltaConfig implements Serializable { private static String DISABLE_GENERATE = "disable_generate"; private static String DISABLE_INGEST = "disable_ingest"; private static String HIVE_LOCAL = "hive_local"; + private static String REINIT_CONTEXT = "reinitialize_context"; private Map configsMap; @@ -133,6 +134,10 @@ public class DeltaConfig implements Serializable { return Boolean.valueOf(configsMap.getOrDefault(DISABLE_INGEST, false).toString()); } + public boolean getReinitContext() { + return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT, false).toString()); + } + public Map getOtherConfigs() { if (configsMap == null) { return new HashMap<>(); @@ -222,6 +227,11 @@ public class DeltaConfig implements Serializable { return this; } + public Builder reinitializeContext(boolean reinitContext) { + this.configsMap.put(REINIT_CONTEXT, reinitContext); + return this; + } + public Builder withConfig(String name, Object value) { this.configsMap.put(name, value); return this; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java index eecd763d1..17148f538 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java @@ -30,25 +30,27 @@ import org.apache.spark.api.java.JavaSparkContext; */ public class ExecutionContext implements Serializable { - private HoodieTestSuiteWriter hoodieTestSuiteWriter; - private DeltaGenerator deltaGenerator; + private WriterContext writerContext; private transient JavaSparkContext jsc; - public ExecutionContext(JavaSparkContext jsc, HoodieTestSuiteWriter hoodieTestSuiteWriter, DeltaGenerator deltaGenerator) { - this.hoodieTestSuiteWriter = hoodieTestSuiteWriter; - this.deltaGenerator = deltaGenerator; + public ExecutionContext(JavaSparkContext jsc, WriterContext writerContext) { + this.writerContext = writerContext; this.jsc = jsc; } public HoodieTestSuiteWriter getHoodieTestSuiteWriter() { - return hoodieTestSuiteWriter; + return writerContext.getHoodieTestSuiteWriter(); } public DeltaGenerator getDeltaGenerator() { - return deltaGenerator; + return writerContext.getDeltaGenerator(); } public JavaSparkContext getJsc() { return jsc; } + + public WriterContext getWriterContext() { + return writerContext; + } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java new file mode 100644 index 000000000..320c98632 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag; + +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; +import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; +import org.apache.hudi.integ.testsuite.reader.DeltaInputType; +import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; +import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import java.util.Map; + +/** + * WriterContext wraps the delta writer/data generator related configuration needed + * to init/reinit. + */ +public class WriterContext { + + protected static Logger log = LogManager.getLogger(WriterContext.class); + + private final HoodieTestSuiteConfig cfg; + private TypedProperties props; + private HoodieTestSuiteWriter hoodieTestSuiteWriter; + private DeltaGenerator deltaGenerator; + private transient SchemaProvider schemaProvider; + private BuiltinKeyGenerator keyGenerator; + private transient SparkSession sparkSession; + private transient JavaSparkContext jsc; + public WriterContext(JavaSparkContext jsc, TypedProperties props, HoodieTestSuiteConfig cfg, + BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) { + this.cfg = cfg; + this.props = props; + this.keyGenerator = keyGenerator; + this.sparkSession = sparkSession; + this.jsc = jsc; + } + + public void initContext(JavaSparkContext jsc) throws HoodieException { + try { + this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc); + String schemaStr = schemaProvider.getSourceSchema().toString(); + this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr); + this.deltaGenerator = new DeltaGenerator( + new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName), + new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath, + schemaStr, cfg.limitFileSize), + jsc, sparkSession, schemaStr, keyGenerator); + log.info(String.format("Initialized writerContext with: %s", schemaStr)); + } catch (Exception e) { + throw new HoodieException("Failed to reinitialize writerContext", e); + } + } + + public void reinitContext(Map newConfig) throws HoodieException { + // update props with any config overrides. + for (Map.Entry e : newConfig.entrySet()) { + if (this.props.containsKey(e.getKey())) { + this.props.setProperty(e.getKey(), e.getValue().toString()); + } + } + initContext(jsc); + } + + public HoodieTestSuiteWriter getHoodieTestSuiteWriter() { + return hoodieTestSuiteWriter; + } + + public DeltaGenerator getDeltaGenerator() { + return deltaGenerator; + } + + public String toString() { + return this.hoodieTestSuiteWriter.toString() + "\n" + this.deltaGenerator.toString() + "\n"; + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java index 62db5b67d..fdbcc1b12 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java @@ -37,6 +37,11 @@ public class InsertNode extends DagNode> { @Override public void execute(ExecutionContext executionContext) throws Exception { + // if the insert node has schema override set, reinitialize the table with new schema. + if (this.config.getReinitContext()) { + log.info(String.format("Reinitializing table with %s", this.config.getOtherConfigs().toString())); + executionContext.getWriterContext().reinitContext(this.config.getOtherConfigs()); + } generate(executionContext.getDeltaGenerator()); log.info("Configs : {}", this.config); if (!config.isDisableIngest()) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index ce943419a..2e9d43736 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -30,10 +30,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; -import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.WorkflowDag; -import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; +import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,9 +46,9 @@ public class DagScheduler { private WorkflowDag workflowDag; private ExecutionContext executionContext; - public DagScheduler(WorkflowDag workflowDag, HoodieTestSuiteWriter hoodieTestSuiteWriter, DeltaGenerator deltaGenerator) { + public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext) { this.workflowDag = workflowDag; - this.executionContext = new ExecutionContext(null, hoodieTestSuiteWriter, deltaGenerator); + this.executionContext = new ExecutionContext(null, writerContext); } /**