1
0

[HUDI-4084] Add support to test async table services with integ test suite framework (#5557)

* Add support to test async table services with integ test suite framework

* Make await time for validation configurable
This commit is contained in:
Sivabalan Narayanan
2022-05-23 23:05:56 -04:00
committed by GitHub
parent 47b764ec33
commit af1128acf9
17 changed files with 612 additions and 188 deletions

View File

@@ -593,6 +593,56 @@ Sample spark-submit command to test one delta streamer and a spark data source w
--use-hudi-data-to-generate-updates
```
=======
### Testing async table services
We can test async table services with deltastreamer using below command. 3 additional arguments are required to test async
table services comapared to previous command.
```shell
--continuous \
--test-continuous-mode \
--min-sync-interval-seconds 20
```
Here is the full command:
```shell
./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 \
--conf spark.task.cpus=1 --conf spark.executor.cores=1 \
--conf spark.task.maxFailures=100 \
--conf spark.memory.fraction=0.4 \
--conf spark.rdd.compress=true \
--conf spark.kryoserializer.buffer.max=2000m \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.memory.storageFraction=0.1 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.driver.maxResultSize=12g \
--conf spark.executor.heartbeatInterval=120s \
--conf spark.network.timeout=600s \
--conf spark.yarn.max.executor.failures=10 \
--conf spark.sql.catalogImplementation=hive \
--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob <PATH_TO_BUNDLE>/hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \
--source-ordering-field test_suite_source_ordering_field \
--use-deltastreamer \
--target-base-path /tmp/hudi/output \
--input-base-path /tmp/hudi/input \
--target-table table1 \
-props file:/tmp/test.properties \
--schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
--input-file-size 125829120 \
--workload-yaml-path file:/tmp/simple-deltastreamer.yaml \
--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
--table-type COPY_ON_WRITE \
--compact-scheduling-minshare 1 \
--clean-input \
--clean-output \
--continuous \
--test-continuous-mode \
--min-sync-interval-seconds 20
```
We can use any yaml and properties file w/ above spark-submit command to test deltastreamer w/ async table services.
## Automated tests for N no of yamls in Local Docker environment

View File

@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
/**
* Test suite Writer that assists in testing async table operations with Deltastreamer continuous mode.
*
* Sample command
* ./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 \
* --conf spark.task.cpus=1 --conf spark.executor.cores=1 \
* --conf spark.task.maxFailures=100 \
* --conf spark.memory.fraction=0.4 \
* --conf spark.rdd.compress=true \
* --conf spark.kryoserializer.buffer.max=2000m \
* --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
* --conf spark.memory.storageFraction=0.1 \
* --conf spark.shuffle.service.enabled=true \
* --conf spark.sql.hive.convertMetastoreParquet=false \
* --conf spark.driver.maxResultSize=12g \
* --conf spark.executor.heartbeatInterval=120s \
* --conf spark.network.timeout=600s \
* --conf spark.yarn.max.executor.failures=10 \
* --conf spark.sql.catalogImplementation=hive \
* --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob <PATH_TO_BUNDLE>/hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \
* --source-ordering-field test_suite_source_ordering_field \
* --use-deltastreamer \
* --target-base-path /tmp/hudi/output \
* --input-base-path /tmp/hudi/input \
* --target-table table1 \
* -props file:/tmp/test.properties \
* --schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \
* --source-class org.apache.hudi.utilities.sources.AvroDFSSource \
* --input-file-size 125829120 \
* --workload-yaml-path file:/tmp/simple-deltastreamer.yaml \
* --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
* --table-type COPY_ON_WRITE \
* --compact-scheduling-minshare 1 \
* --clean-input \
* --clean-output \
* --continuous \
* --test-continuous-mode \
* --min-sync-interval-seconds 20
*/
public class HoodieContinousTestSuiteWriter extends HoodieTestSuiteWriter {
private static Logger log = LoggerFactory.getLogger(HoodieContinousTestSuiteWriter.class);
public HoodieContinousTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws Exception {
super(jsc, props, cfg, schema);
}
@Override
public void shutdownResources() {
log.info("Shutting down deltastreamer gracefully ");
this.deltaStreamerWrapper.shutdownGracefully();
}
@Override
public RDD<GenericRecord> getNextBatch() throws Exception {
return null;
}
@Override
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
return null;
}
@Override
public Option<String> startCommit() {
return null;
}
public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws Exception {
return null;
}
@Override
public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws Exception {
return null;
}
@Override
public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) throws Exception {
return null;
}
@Override
public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) throws Exception {
return null;
}
@Override
public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws Exception {
return null;
}
@Override
public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception {
return null;
}
@Override
public void inlineClustering() {
}
@Override
public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws
Exception {
return Option.empty();
}
@Override
public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) {
}
@Override
public void commitCompaction(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) throws IOException {
}
}

View File

@@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform write operations into the target hudi dataset. Current supported writers are
* {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
*/
public class HoodieInlineTestSuiteWriter extends HoodieTestSuiteWriter {
private static Logger log = LoggerFactory.getLogger(HoodieInlineTestSuiteWriter.class);
private static final String GENERATED_DATA_PATH = "generated.data.path";
public HoodieInlineTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws Exception {
super(jsc, props, cfg, schema);
}
public void shutdownResources() {
// no-op for non continuous mode test suite writer.
}
public RDD<GenericRecord> getNextBatch() throws Exception {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
return inputRDD.map(r -> (GenericRecord) ((HoodieAvroRecord) r).getData()
.getInsertValue(new Schema.Parser().parse(schema)).get()).rdd();
}
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
return this.deltaStreamerWrapper.fetchSource();
}
public Option<String> startCommit() {
if (cfg.useDeltaStreamer) {
return Option.of(HoodieActiveTimeline.createNewInstantTime());
} else {
return Option.of(writeClient.startCommit());
}
}
public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.upsert(WriteOperationType.UPSERT);
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.upsert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.insert();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.insert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.insertOverwrite();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.insertOverwrite(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses();
}
}
public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.insertOverwriteTable();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.insertOverwriteTable(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses();
}
}
public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.bulkInsert();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.bulkInsert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.compact();
} else {
if (!instantTime.isPresent()) {
Option<Pair<String, HoodieCompactionPlan>> compactionPlanPair = Option
.fromJavaOptional(hoodieReadClient.getPendingCompactions()
.stream().findFirst());
if (compactionPlanPair.isPresent()) {
instantTime = Option.of(compactionPlanPair.get().getLeft());
}
}
if (instantTime.isPresent()) {
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeClient.compact(instantTime.get());
return compactionMetadata.getWriteStatuses();
} else {
return null;
}
}
}
public void inlineClustering() {
if (!cfg.useDeltaStreamer) {
Option<String> clusteringInstantOpt = writeClient.scheduleClustering(Option.empty());
clusteringInstantOpt.ifPresent(clusteringInstant -> {
// inline cluster should auto commit as the user is never given control
log.warn("Clustering instant :: " + clusteringInstant);
writeClient.cluster(clusteringInstant, true);
});
} else {
// TODO: fix clustering to be done async https://issues.apache.org/jira/browse/HUDI-1590
throw new IllegalArgumentException("Clustering cannot be triggered with deltastreamer");
}
}
public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws
Exception {
if (cfg.useDeltaStreamer) {
deltaStreamerWrapper.scheduleCompact();
return Option.empty();
} else {
return writeClient.scheduleCompaction(previousCommitExtraMetadata);
}
}
public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null && generatedDataStats.count() > 1) {
// Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
}
writeClient.commit(instantTime.get(), records, Option.of(extraMetadata));
}
}
public void commitCompaction(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) throws IOException {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null && generatedDataStats.count() > 1) {
// Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
}
HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(writeClient.getConfig(), writeClient.getEngineContext());
HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(table, instantTime.get(), HoodieJavaRDD.of(records), writeClient.getConfig().getSchema());
writeClient.commitCompaction(instantTime.get(), metadata, Option.of(extraMetadata));
}
}
}

View File

@@ -190,11 +190,12 @@ public class HoodieTestSuiteJob {
}
public void runTestSuite() {
WriterContext writerContext = null;
try {
WorkflowDag workflowDag = createWorkflowDag();
log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
long startTime = System.currentTimeMillis();
WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession);
writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession);
writerContext.initContext(jsc);
startOtherServicesIfNeeded(writerContext);
if (this.cfg.saferSchemaEvolution) {
@@ -217,6 +218,9 @@ public class HoodieTestSuiteJob {
log.error("Failed to run Test Suite ", e);
throw new HoodieException("Failed to run Test Suite ", e);
} finally {
if (writerContext != null) {
writerContext.shutdownResources();
}
if (stopJsc) {
stopQuietly();
}
@@ -310,5 +314,8 @@ public class HoodieTestSuiteJob {
@Parameter(names = {"--use-hudi-data-to-generate-updates"}, description = "Use data from hudi to generate updates for new batches ")
public Boolean useHudiToGenerateUpdates = false;
@Parameter(names = {"--test-continuous-mode"}, description = "Tests continuous mode in deltastreamer.")
public Boolean testContinousMode = false;
}
}

View File

@@ -18,37 +18,25 @@
package org.apache.hudi.integ.testsuite;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
@@ -57,38 +45,31 @@ import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform write operations into the target hudi dataset. Current supported writers are
* {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
*/
public class HoodieTestSuiteWriter implements Serializable {
public abstract class HoodieTestSuiteWriter implements Serializable {
private static Logger log = LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
private HoodieWriteConfig writeConfig;
private SparkRDDWriteClient writeClient;
protected HoodieTestSuiteConfig cfg;
private Option<String> lastCheckpoint;
private HoodieReadClient hoodieReadClient;
private Properties props;
private String schema;
private transient Configuration configuration;
private transient JavaSparkContext sparkContext;
private static Set<String> VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
protected HoodieDeltaStreamerWrapper deltaStreamerWrapper;
protected HoodieWriteConfig writeConfig;
protected SparkRDDWriteClient writeClient;
protected HoodieTestSuiteJob.HoodieTestSuiteConfig cfg;
protected Option<String> lastCheckpoint;
protected HoodieReadClient hoodieReadClient;
protected Properties props;
protected String schema;
protected transient Configuration configuration;
protected transient JavaSparkContext sparkContext;
protected static Set<String> VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), ScheduleCompactNode.class.getName()));
private static final String GENERATED_DATA_PATH = "generated.data.path";
public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws Exception {
public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws Exception {
// We ensure that only 1 instance of HoodieWriteClient is instantiated for a HoodieTestSuiteWriter
// This does not instantiate a HoodieWriteClient until a
// {@link HoodieDeltaStreamer#commit(HoodieWriteClient, JavaRDD, Option)} is invoked.
@@ -110,7 +91,7 @@ public class HoodieTestSuiteWriter implements Serializable {
return this.writeConfig;
}
private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, Properties props, String schema) {
private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, Properties props, String schema) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
.withAutoCommit(false)
@@ -131,159 +112,35 @@ public class HoodieTestSuiteWriter implements Serializable {
return false;
}
public RDD<GenericRecord> getNextBatch() throws Exception {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
return inputRDD.map(r -> (GenericRecord) ((HoodieAvroRecord) r).getData()
.getInsertValue(new Schema.Parser().parse(schema)).get()).rdd();
}
public abstract void shutdownResources();
public void getNextBatchForDeletes() throws Exception {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
inputRDD.collect();
}
public abstract RDD<GenericRecord> getNextBatch() throws Exception;
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
return this.deltaStreamerWrapper.fetchSource();
}
public abstract Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception ;
public Option<String> startCommit() {
if (cfg.useDeltaStreamer) {
return Option.of(HoodieActiveTimeline.createNewInstantTime());
} else {
return Option.of(writeClient.startCommit());
}
}
public abstract Option<String> startCommit();
public JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.upsert(WriteOperationType.UPSERT);
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.upsert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public abstract JavaRDD<WriteStatus> upsert(Option<String> instantTime) throws Exception;
public JavaRDD<WriteStatus> insert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.insert();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.insert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public abstract JavaRDD<WriteStatus> insert(Option<String> instantTime) throws Exception;
public JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.insertOverwrite();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.insertOverwrite(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses();
}
}
public abstract JavaRDD<WriteStatus> insertOverwrite(Option<String> instantTime) throws Exception;
public JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.insertOverwriteTable();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.insertOverwriteTable(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses();
}
}
public abstract JavaRDD<WriteStatus> insertOverwriteTable(Option<String> instantTime) throws Exception;
public JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.bulkInsert();
} else {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
return writeClient.bulkInsert(nextBatch.getRight().getRight(), instantTime.get());
}
}
public abstract JavaRDD<WriteStatus> bulkInsert(Option<String> instantTime) throws Exception;
public JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception {
if (cfg.useDeltaStreamer) {
return deltaStreamerWrapper.compact();
} else {
if (!instantTime.isPresent()) {
Option<Pair<String, HoodieCompactionPlan>> compactionPlanPair = Option
.fromJavaOptional(hoodieReadClient.getPendingCompactions()
.stream().findFirst());
if (compactionPlanPair.isPresent()) {
instantTime = Option.of(compactionPlanPair.get().getLeft());
}
}
if (instantTime.isPresent()) {
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeClient.compact(instantTime.get());
return compactionMetadata.getWriteStatuses();
} else {
return null;
}
}
}
public abstract JavaRDD<WriteStatus> compact(Option<String> instantTime) throws Exception;
public void inlineClustering() {
if (!cfg.useDeltaStreamer) {
Option<String> clusteringInstantOpt = writeClient.scheduleClustering(Option.empty());
clusteringInstantOpt.ifPresent(clusteringInstant -> {
// inline cluster should auto commit as the user is never given control
log.warn("Clustering instant :: " + clusteringInstant);
writeClient.cluster(clusteringInstant, true);
});
} else {
// TODO: fix clustering to be done async https://issues.apache.org/jira/browse/HUDI-1590
throw new IllegalArgumentException("Clustering cannot be triggered with deltastreamer");
}
}
public abstract void inlineClustering() throws Exception ;
public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws
Exception {
if (cfg.useDeltaStreamer) {
deltaStreamerWrapper.scheduleCompact();
return Option.empty();
} else {
return writeClient.scheduleCompaction(previousCommitExtraMetadata);
}
}
public abstract Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws Exception;
public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null && generatedDataStats.count() > 1) {
// Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
}
writeClient.commit(instantTime.get(), records, Option.of(extraMetadata));
}
}
public abstract void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime);
public void commitCompaction(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) throws IOException {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null && generatedDataStats.count() > 1) {
// Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
}
HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(writeClient.getConfig(), writeClient.getEngineContext());
HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(table, instantTime.get(), HoodieJavaRDD.of(records), writeClient.getConfig().getSchema());
writeClient.commitCompaction(instantTime.get(), metadata, Option.of(extraMetadata));
}
}
public abstract void commitCompaction(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) throws Exception;
public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws IllegalAccessException {
if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) {
@@ -301,7 +158,7 @@ public class HoodieTestSuiteWriter implements Serializable {
return deltaStreamerWrapper;
}
public HoodieTestSuiteConfig getCfg() {
public HoodieTestSuiteJob.HoodieTestSuiteConfig getCfg() {
return cfg;
}
@@ -325,3 +182,4 @@ public class HoodieTestSuiteWriter implements Serializable {
return schema;
}
}

View File

@@ -103,6 +103,7 @@ public class DeltaConfig implements Serializable {
private static String DELETE_INPUT_DATA_EXCEPT_LATEST = "delete_input_data_except_latest";
private static String PARTITIONS_TO_DELETE = "partitions_to_delete";
private static String INPUT_PARTITIONS_TO_SKIP_VALIDATE = "input_partitions_to_skip_validate";
private static String MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS = "max_wait_time_for_deltastreamer_catch_up_ms";
// Spark SQL Create Table
private static String TABLE_TYPE = "table_type";
@@ -253,6 +254,10 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING, false).toString());
}
public long maxWaitTimeForDeltastreamerToCatchupMs() {
return Long.valueOf(configsMap.getOrDefault(MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS, 5 * 60 * 1000).toString());
}
public Option<String> getTableType() {
return !configsMap.containsKey(TABLE_TYPE) ? Option.empty()
: Option.of(configsMap.get(TABLE_TYPE).toString());

View File

@@ -21,7 +21,9 @@ package org.apache.hudi.integ.testsuite.dag;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.HoodieContinousTestSuiteWriter;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.HoodieInlineTestSuiteWriter;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
@@ -37,6 +39,8 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* WriterContext wraps the delta writer/data generator related configuration needed to init/reinit.
@@ -53,6 +57,7 @@ public class WriterContext {
private BuiltinKeyGenerator keyGenerator;
private transient SparkSession sparkSession;
private transient JavaSparkContext jsc;
private ExecutorService executorService;
public WriterContext(JavaSparkContext jsc, TypedProperties props, HoodieTestSuiteConfig cfg,
BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) {
@@ -67,7 +72,8 @@ public class WriterContext {
try {
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
String schemaStr = schemaProvider.getSourceSchema().toString();
this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr);
this.hoodieTestSuiteWriter = (cfg.testContinousMode && cfg.useDeltaStreamer) ? new HoodieContinousTestSuiteWriter(jsc, props, cfg, schemaStr)
: new HoodieInlineTestSuiteWriter(jsc, props, cfg, schemaStr);
int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : jsc.defaultParallelism();
this.deltaGenerator = new DeltaGenerator(
new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName),
@@ -75,6 +81,10 @@ public class WriterContext {
schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput, cfg.useHudiToGenerateUpdates),
jsc, sparkSession, schemaStr, keyGenerator);
log.info(String.format("Initialized writerContext with: %s", schemaStr));
if (cfg.testContinousMode) {
executorService = Executors.newFixedThreadPool(1);
executorService.execute(new TestSuiteWriterRunnable(hoodieTestSuiteWriter));
}
} catch (Exception e) {
throw new HoodieException("Failed to reinitialize writerContext", e);
}
@@ -113,4 +123,35 @@ public class WriterContext {
public SparkSession getSparkSession() {
return sparkSession;
}
public void shutdownResources() {
this.hoodieTestSuiteWriter.shutdownResources();
if (executorService != null) {
executorService.shutdownNow();
}
}
/**
* TestSuiteWriterRunnable to spin up a thread to execute deltastreamer with async table services.
*/
class TestSuiteWriterRunnable implements Runnable {
private HoodieTestSuiteWriter hoodieTestSuiteWriter;
TestSuiteWriterRunnable(HoodieTestSuiteWriter hoodieTestSuiteWriter) {
this.hoodieTestSuiteWriter = hoodieTestSuiteWriter;
}
@Override
public void run() {
try {
Thread.sleep(20000);
log.info("Starting continuous sync with deltastreamer ");
hoodieTestSuiteWriter.getDeltaStreamerWrapper().sync();
log.info("Completed continuous sync with deltastreamer ");
} catch (Exception e) {
log.error("Deltastreamer failed in continuous mode " + e.getMessage());
throw new HoodieException("Shutting down deltastreamer in continuous mode failed ", e);
}
}
}
}

View File

@@ -20,10 +20,17 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,6 +47,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@@ -47,6 +57,8 @@ import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
/**
* This nodes validates contents from input path are in tact with Hudi. By default no configs are required for this node. But there is an
* optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites.
@@ -78,6 +90,12 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
int itrCountToExecute = config.getIterationCountToExecute();
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
(itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
FileSystem fs = new Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode) {
awaitUntilDeltaStreamerCaughtUp(context, context.getHoodieTestSuiteWriter().getCfg().targetBasePath, fs,
context.getHoodieTestSuiteWriter().getCfg().inputBasePath);
}
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
@@ -85,8 +103,6 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
// listing batches to be validated
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
if (log.isDebugEnabled()) {
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
log.info("fileStatuses length: " + fileStatuses.length);
for (FileStatus fileStatus : fileStatuses) {
@@ -145,8 +161,6 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
if (config.isDeleteInputData()) {
// clean up input data for current group of writes.
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
for (FileStatus fileStatus : fileStatuses) {
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
@@ -157,6 +171,50 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
}
}
private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hudiTablePath, FileSystem fs, String inputPath) throws IOException, InterruptedException {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(hudiTablePath).build();
HoodieTimeline commitTimeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
Option<String> latestCheckpoint = getLatestCheckpoint(commitTimeline);
FileStatus[] subDirs = fs.listStatus(new Path(inputPath));
List<FileStatus> subDirList = Arrays.asList(subDirs);
subDirList.sort(Comparator.comparingLong(entry -> Long.parseLong(entry.getPath().getName())));
String latestSubDir = subDirList.get(subDirList.size() -1).getPath().getName();
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
long maxWaitTime = config.maxWaitTimeForDeltastreamerToCatchupMs();
long waitedSoFar = 0;
while (!(latestCheckpoint.isPresent() && latestCheckpoint.get().equals(latestSubDir))) {
log.warn("Sleeping for 20 secs awaiting for deltastreamer to catch up with ingested data");
Thread.sleep(20000);
meta.reloadActiveTimeline();
commitTimeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
latestCheckpoint = getLatestCheckpoint(commitTimeline);
waitedSoFar += 20000;
if (waitedSoFar >= maxWaitTime) {
throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir);
}
log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " +
(latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
}
}
private Option<String> getLatestCheckpoint(HoodieTimeline timeline) {
return (Option<String>) timeline.getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) {
return Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
} else {
return Option.empty();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
}
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}
private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session, String inputPath) {
String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());

View File

@@ -216,15 +216,22 @@ public class DeltaGenerator implements Serializable {
adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsDelete());
} else {
deltaInputReader =
new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
schemaStr);
if (config.getFractionUpsertPerFile() > 0) {
adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(),
config.getFractionUpsertPerFile());
if (((DFSDeltaConfig) deltaOutputConfig).shouldUseHudiToGenerateUpdates()) {
deltaInputReader =
new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
schemaStr);
if (config.getFractionUpsertPerFile() > 0) {
adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(),
config.getFractionUpsertPerFile());
} else {
adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), config
.getNumRecordsDelete());
}
} else {
adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), config
.getNumRecordsDelete());
deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr,
((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty());
adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsDelete());
}
}