diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index e23d88586..ed7a4582e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -68,7 +68,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-
/**
* Class to be used in tests to keep generating test inserts and updates against a corpus.
*
@@ -98,7 +97,8 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},";
public static final String FARE_FLATTENED_SCHEMA = "{\"name\": \"fare\", \"type\": \"double\"},"
+ "{\"name\": \"currency\", \"type\": \"string\"},";
- public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": ["
+ public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"default\": null, \"type\": {\"type\": "
+ + "\"array\", \"items\": {\"type\": \"record\", \"default\": null, \"name\": \"tip_history\", \"fields\": ["
+ "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},";
public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},";
public static final String EXTRA_TYPE_SCHEMA = "{\"name\": \"distance_in_meters\", \"type\": \"int\"},"
@@ -254,7 +254,6 @@ public class HoodieTestDataGenerator {
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
-
if (isFlattened) {
rec.put("fare", RAND.nextDouble() * 100);
rec.put("currency", "USD");
@@ -730,7 +729,7 @@ public class HoodieTestDataGenerator {
public boolean deleteExistingKeyIfPresent(HoodieKey key) {
Map existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
- for (Map.Entry entry: existingKeys.entrySet()) {
+ for (Map.Entry entry : existingKeys.entrySet()) {
if (entry.getValue().key.equals(key)) {
int index = entry.getKey();
existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
@@ -740,10 +739,18 @@ public class HoodieTestDataGenerator {
return true;
}
}
-
return false;
}
+ public List generateGenericRecords(int numRecords) {
+ List list = new ArrayList<>();
+ IntStream.range(0, numRecords).forEach(i -> {
+ list.add(generateGenericRecord(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID()
+ .toString(), RAND.nextDouble()));
+ });
+ return list;
+ }
+
public String[] getPartitionPaths() {
return partitionPaths;
}
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 877ba4791..d8292526e 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -112,6 +112,15 @@ public class HiveSyncTool {
+ " of type " + hoodieHiveClient.getTableType());
// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
+
+ // check if the database exists else create it
+ try {
+ hoodieHiveClient.updateHiveSQL("create database if not exists " + cfg.databaseName);
+ } catch (Exception e) {
+ // this is harmless since table creation will fail anyways, creation of DB is needed for in-memory testing
+ LOG.warn("Unable to create database", e);
+ }
+
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();
// Sync schema if needed
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index f1034e3cd..2bb3fd112 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -572,4 +572,8 @@ public class HoodieHiveClient {
return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
}
}
+
+ public IMetaStoreClient getClient() {
+ return client;
+ }
}
\ No newline at end of file
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
index 2f9880304..51a5bf5ed 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
@@ -126,10 +126,21 @@ public class HiveTestService {
public void stop() {
resetSystemProperties();
if (tServer != null) {
- tServer.stop();
+ try {
+ tServer.stop();
+ } catch (Exception e) {
+ LOG.error("Stop meta store failed", e);
+ }
}
if (hiveServer != null) {
- hiveServer.stop();
+ try {
+ hiveServer.stop();
+ } catch (Exception e) {
+ LOG.error("Stop hive server failed", e);
+ }
+ }
+ if (executorService != null) {
+ executorService.shutdownNow();
}
LOG.info("Hive Minicluster service shut down.");
tServer = null;
diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md
new file mode 100644
index 000000000..d87fec3ad
--- /dev/null
+++ b/hudi-integ-test/README.md
@@ -0,0 +1,262 @@
+
+
+This page describes in detail how to run end to end tests on a hudi dataset that helps in improving our confidence
+in a release as well as perform large scale performance benchmarks.
+
+# Objectives
+
+1. Test with different versions of core libraries and components such as `hdfs`, `parquet`, `spark`,
+`hive` and `avro`.
+2. Generate different types of workloads across different dimensions such as `payload size`, `number of updates`,
+`number of inserts`, `number of partitions`
+3. Perform multiple types of operations such as `insert`, `bulk_insert`, `upsert`, `compact`, `query`
+4. Support custom post process actions and validations
+
+# High Level Design
+
+The Hudi test suite runs as a long running spark job. The suite is divided into the following high level components :
+
+## Workload Generation
+
+This component does the work of generating the workload; `inserts`, `upserts` etc.
+
+## Workload Scheduling
+
+Depending on the type of workload generated, data is either ingested into the target hudi
+dataset or the corresponding workload operation is executed. For example compaction does not necessarily need a workload
+to be generated/ingested but can require an execution.
+
+## Other actions/operatons
+
+The test suite supports different types of operations besides ingestion such as Hive Query execution, Clean action etc.
+
+# Usage instructions
+
+
+## Entry class to the test suite
+
+```
+org.apache.hudi.testsuite.HoodieTestSuiteJob.java - Entry Point of the hudi test suite job. This
+class wraps all the functionalities required to run a configurable integration suite.
+```
+
+## Configurations required to run the job
+```
+org.apache.hudi.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig - Config class that drives the behavior of the
+integration test suite. This class extends from com.uber.hoodie.utilities.DeltaStreamerConfig. Look at
+link#HudiDeltaStreamer page to learn about all the available configs applicable to your test suite.
+```
+
+## Generating a custom Workload Pattern
+
+There are 2 ways to generate a workload pattern
+
+ 1.Programatically
+
+Choose to write up the entire DAG of operations programatically, take a look at `WorkflowDagGenerator` class.
+Once you're ready with the DAG you want to execute, simply pass the class name as follows:
+
+```
+spark-submit
+...
+...
+--class org.apache.hudi.testsuite.HoodieTestSuiteJob
+--workload-generator-classname org.apache.hudi.testsuite.dag.scheduler.
+...
+```
+
+ 2.YAML file
+
+Choose to write up the entire DAG of operations in YAML, take a look at `complex-workload-dag-cow.yaml` or
+`complex-workload-dag-mor.yaml`.
+Once you're ready with the DAG you want to execute, simply pass the yaml file path as follows:
+
+```
+spark-submit
+...
+...
+--class org.apache.hudi.testsuite.HoodieTestSuiteJob
+--workload-yaml-path /path/to/your-workflow-dag.yaml
+...
+```
+
+## Building the test suite
+
+The test suite can be found in the `hudi-integ-test` module. Use the `prepare_integration_suite.sh` script to
+build
+the test suite, you can provide different parameters to the script.
+
+```
+shell$ ./prepare_integration_suite.sh --help
+Usage: prepare_integration_suite.sh
+ --spark-command, prints the spark command
+ -h, hdfs-version
+ -s, spark version
+ -p, parquet version
+ -a, avro version
+ -s, hive version
+```
+
+```
+shell$ ./prepare_integration_suite.sh
+....
+....
+Final command : mvn clean install -DskipTests
+```
+
+## Running on the cluster or in your local machine
+Copy over the necessary files and jars that are required to your cluster and then run the following spark-submit
+command after replacing the correct values for the parameters.
+NOTE : The properties-file should have all the necessary information required to ingest into a Hudi dataset. For more
+ information on what properties need to be set, take a look at the test suite section under demo steps.
+```
+shell$ ./prepare_integration_suite.sh --spark-command
+spark-submit --packages com.databricks:spark-avro_2.11:4.0.0 --master prepare_integration_suite.sh --deploy-mode
+--properties-file --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob target/hudi-integ-test-0.6
+.0-SNAPSHOT.jar --source-class --source-ordering-field --input-base-path --target-base-path --target-table --props --storage-type --payload-class --workload-yaml-path --input-file-size --
+```
+
+## Running through a test-case (local)
+Take a look at the `TestHoodieTestSuiteJob` to check how you can run the entire suite using JUnit.
+
+## Running an end to end test suite in Local Docker environment
+
+```
+docker exec -it adhoc-2 /bin/bash
+# COPY_ON_WRITE tables
+=========================
+## Run the following command to start the test suite
+spark-submit \
+--packages com.databricks:spark-avro_2.11:4.0.0 \
+--conf spark.task.cpus=1 \
+--conf spark.executor.cores=1 \
+--conf spark.task.maxFailures=100 \
+--conf spark.memory.fraction=0.4 \
+--conf spark.rdd.compress=true \
+--conf spark.kryoserializer.buffer.max=2000m \
+--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.memory.storageFraction=0.1 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.sql.hive.convertMetastoreParquet=false \
+--conf spark.ui.port=5555 \
+--conf spark.driver.maxResultSize=12g \
+--conf spark.executor.heartbeatInterval=120s \
+--conf spark.network.timeout=600s \
+--conf spark.eventLog.overwrite=true \
+--conf spark.eventLog.enabled=true \
+--conf spark.yarn.max.executor.failures=10 \
+--conf spark.sql.catalogImplementation=hive \
+--conf spark.sql.shuffle.partitions=1000 \
+--class org.apache.hudi.testsuite.HoodieTestSuiteJob $HUDI_TEST_SUITE_BUNDLE \
+--source-ordering-field timestamp \
+--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
+--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
+--target-table test_table \
+--props /var/hoodie/ws/docker/demo/config/testsuite/test-source.properties \
+--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
+--source-limit 300000 \
+--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
+--input-file-size 125829120 \
+--workload-yaml-path /var/hoodie/ws/docker/demo/config/testsuite/complex-workflow-dag-cow.yaml \
+--storage-type COPY_ON_WRITE \
+--compact-scheduling-minshare 1 \
+--hoodie-conf "hoodie.deltastreamer.source.test.num_partitions=100" \
+--hoodie-conf "hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false" \
+--hoodie-conf "hoodie.deltastreamer.source.test.max_unique_records=100000000" \
+--hoodie-conf "hoodie.embed.timeline.server=false" \
+--hoodie-conf "hoodie.datasource.write.recordkey.field=_row_key" \
+--hoodie-conf "hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input" \
+--hoodie-conf "hoodie.datasource.write.keygenerator.class=org.apache.hudi.ComplexKeyGenerator" \
+--hoodie-conf "hoodie.datasource.write.partitionpath.field=timestamp" \
+--hoodie-conf "hoodie.deltastreamer.schemaprovider.source.schema.file=/var/hoodie/ws/docker/demo/config/testsuite/source.avsc" \
+--hoodie-conf "hoodie.datasource.hive_sync.assume_date_partitioning=false" \
+--hoodie-conf "hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/" \
+--hoodie-conf "hoodie.datasource.hive_sync.database=testdb" \
+--hoodie-conf "hoodie.datasource.hive_sync.table=test_table" \
+--hoodie-conf "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor" \
+--hoodie-conf "hoodie.datasource.hive_sync.assume_date_partitioning=true" \
+--hoodie-conf "hoodie.datasource.write.keytranslator.class=org.apache.hudi.DayBasedPartitionPathKeyTranslator" \
+--hoodie-conf "hoodie.deltastreamer.schemaprovider.target.schema.file=/var/hoodie/ws/docker/demo/config/testsuite/source.avsc"
+...
+...
+2019-11-03 05:44:47 INFO DagScheduler:69 - ----------- Finished workloads ----------
+2019-11-03 05:44:47 INFO HoodieTestSuiteJob:138 - Finished scheduling all tasks
+...
+2019-11-03 05:44:48 INFO SparkContext:54 - Successfully stopped SparkContext
+# MERGE_ON_READ tables
+=========================
+## Run the following command to start the test suite
+spark-submit \
+--packages com.databricks:spark-avro_2.11:4.0.0 \
+--conf spark.task.cpus=1 \
+--conf spark.executor.cores=1 \
+--conf spark.task.maxFailures=100 \
+--conf spark.memory.fraction=0.4 \
+--conf spark.rdd.compress=true \
+--conf spark.kryoserializer.buffer.max=2000m \
+--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
+--conf spark.memory.storageFraction=0.1 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.sql.hive.convertMetastoreParquet=false \
+--conf spark.ui.port=5555 \
+--conf spark.driver.maxResultSize=12g \
+--conf spark.executor.heartbeatInterval=120s \
+--conf spark.network.timeout=600s \
+--conf spark.eventLog.overwrite=true \
+--conf spark.eventLog.enabled=true \
+--conf spark.yarn.max.executor.failures=10 \
+--conf spark.sql.catalogImplementation=hive \
+--conf spark.sql.shuffle.partitions=1000 \
+--class org.apache.hudi.testsuite.HoodieTestSuiteJob $HUDI_TEST_SUITE_BUNDLE \
+--source-ordering-field timestamp \
+--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
+--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
+--target-table test_table \
+--props /var/hoodie/ws/docker/demo/config/testsuite/test-source.properties \
+--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
+--source-limit 300000 \
+--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
+--input-file-size 125829120 \
+--workload-yaml-path /var/hoodie/ws/docker/demo/config/testsuite/complex-workflow-dag-mor.yaml \
+--storage-type MERGE_ON_READ \
+--compact-scheduling-minshare 1 \
+--hoodie-conf "hoodie.deltastreamer.source.test.num_partitions=100" \
+--hoodie-conf "hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false" \
+--hoodie-conf "hoodie.deltastreamer.source.test.max_unique_records=100000000" \
+--hoodie-conf "hoodie.embed.timeline.server=false" \
+--hoodie-conf "hoodie.datasource.write.recordkey.field=_row_key" \
+--hoodie-conf "hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input" \
+--hoodie-conf "hoodie.datasource.write.keygenerator.class=org.apache.hudi.ComplexKeyGenerator" \
+--hoodie-conf "hoodie.datasource.write.partitionpath.field=timestamp" \
+--hoodie-conf "hoodie.deltastreamer.schemaprovider.source.schema.file=/var/hoodie/ws/docker/demo/config/testsuite/source.avsc" \
+--hoodie-conf "hoodie.datasource.hive_sync.assume_date_partitioning=false" \
+--hoodie-conf "hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/" \
+--hoodie-conf "hoodie.datasource.hive_sync.database=testdb" \
+--hoodie-conf "hoodie.datasource.hive_sync.table=test_table" \
+--hoodie-conf "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor" \
+--hoodie-conf "hoodie.datasource.hive_sync.assume_date_partitioning=true" \
+--hoodie-conf "hoodie.datasource.write.keytranslator.class=org.apache.hudi.DayBasedPartitionPathKeyTranslator" \
+--hoodie-conf "hoodie.deltastreamer.schemaprovider.target.schema.file=/var/hoodie/ws/docker/demo/config/testsuite/source.avsc"
+...
+...
+2019-11-03 05:44:47 INFO DagScheduler:69 - ----------- Finished workloads ----------
+2019-11-03 05:44:47 INFO HoodieTestSuiteJob:138 - Finished scheduling all tasks
+...
+2019-11-03 05:44:48 INFO SparkContext:54 - Successfully stopped SparkContext
+```
+
\ No newline at end of file
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index b9bcc6d96..e81f335ba 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -51,12 +51,69 @@
test
+
+
+
+ org.eclipse.jetty.aggregate
+ jetty-all
+ ${jetty.version}
+ uber
+ test
+
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+
+
+ org.mortbay.jetty
+ *
+
+
+ javax.servlet.jsp
+ *
+
+
+ javax.servlet
+ *
+
+
+ org.eclipse.jetty
+ *
+
+
+
+
org.apache.hudi
hudi-spark_${scala.binary.version}
${project.version}
+
+ org.apache.hudi
+ hudi-utilities_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+ org.mortbay.jetty
+ *
+
+
+ javax.servlet.jsp
+ *
+
+
+ javax.servlet
+ *
+
+
+ com.databricks
+ spark-avro_${scala.binary.version}
+
+
+
@@ -89,6 +146,67 @@
test-jar
test
+
+ org.apache.hudi
+ hudi-hive-sync
+ ${project.version}
+
+
+ org.mortbay.jetty
+ *
+
+
+ javax.servlet.jsp
+ *
+
+
+ javax.servlet
+ *
+
+
+ tests
+ test-jar
+
+
+ org.apache.hudi
+ hudi-utilities_${scala.binary.version}
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.mortbay.jetty
+ *
+
+
+ javax.servlet.jsp
+ *
+
+
+ javax.servlet
+ *
+
+
+
+
+ org.apache.hudi
+ hudi-client
+ ${project.version}
+ test-jar
+
+
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+ 2.7.4
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.6.7.3
+
@@ -107,6 +225,87 @@
test
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ tests
+
+
+ org.mortbay.jetty
+ *
+
+
+ javax.servlet.jsp
+ *
+
+
+ javax.servlet
+ *
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ tests
+
+
+ javax.servlet
+ *
+
+
+ netty
+ io.netty
+
+
+ netty-all
+ io.netty
+
+
+
+
+
+
+ ${hive.groupid}
+ hive-exec
+ ${hive.version}
+ ${hive.exec.classifier}
+
+
+ javax.servlet
+ servlet-api
+
+
+
+
+
+ ${hive.groupid}
+ hive-jdbc
+ ${hive.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ javax.servlet.jsp
+ *
+
+
+ javax.servlet
+ *
+
+
+ org.eclipse.jetty
+ *
+
+
+ test
+
+
org.awaitility
awaitility
@@ -138,6 +337,12 @@
test
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
diff --git a/hudi-integ-test/prepare_integration_suite.sh b/hudi-integ-test/prepare_integration_suite.sh
new file mode 100644
index 000000000..06999d9ca
--- /dev/null
+++ b/hudi-integ-test/prepare_integration_suite.sh
@@ -0,0 +1,174 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#!/bin/bash
+
+# Determine the current working directory
+_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+# Preserve the calling directory
+_CALLING_DIR="$(pwd)"
+
+#########################
+# The command line help #
+#########################
+usage() {
+ echo "Usage: $0"
+ echo " --spark-command, prints the spark command"
+ echo " -h | --hadoop, hadoop-version"
+ echo " -s | --spark, spark version"
+ echo " -p | --parquet, parquet version"
+ echo " -a | --avro, avro version"
+ echo " -i | --hive, hive version"
+ echo " -l | --scala, scala version"
+ exit 1
+}
+
+get_spark_command() {
+if [ -z "$scala" ]
+then
+ scala="2.11"
+else
+ scala=$1
+fi
+echo "spark-submit --packages org.apache.spark:spark-avro_${scala}:2.4.4 \
+--master $0 \
+--deploy-mode $1 \
+--properties-file $2 \
+--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \
+`ls target/hudi-integ-test-*-SNAPSHOT.jar` \
+--source-class $3 \
+--source-ordering-field $4 \
+--input-base-path $5 \
+--target-base-path $6 \
+--target-table $7 \
+--props $8 \
+--storage-type $9 \
+--payload-class "${10}" \
+--workload-yaml-path "${11}" \
+--input-file-size "${12}" \
+--"
+}
+
+case "$1" in
+ --help)
+ usage
+ exit 0
+ ;;
+esac
+
+case "$1" in
+ --spark-command)
+ get_spark_command
+ exit 0
+ ;;
+esac
+
+while getopts ":h:s:p:a:i:l:-:" opt; do
+ case $opt in
+ h) hadoop="$OPTARG"
+ printf "Argument hadoop is %s\n" "$hadoop"
+ ;;
+ s) spark="$OPTARG"
+ printf "Argument spark is %s\n" "$spark"
+ ;;
+ p) parquet="$OPTARG"
+ printf "Argument parquet is %s\n" "$parquet"
+ ;;
+ a) avro="$OPTARG"
+ printf "Argument avro is %s\n" "$avro"
+ ;;
+ i) hive="$OPTARG"
+ printf "Argument hive is %s\n" "$hive"
+ ;;
+ l) scala="$OPTARG"
+ printf "Argument scala is %s\n" "$scala"
+ ;;
+ -)
+ case "$OPTARG" in
+ hadoop)
+ hadoop="${!OPTIND}"; OPTIND=$(( $OPTIND + 1 ))
+ printf "Argument hadoop is %s\n" "$hadoop"
+ ;;
+ spark)
+ spark="${!OPTIND}"; OPTIND=$(( $OPTIND + 1 ))
+ printf "Argument spark is %s\n" "$spark"
+ ;;
+ parquet)
+ parquet="${!OPTIND}"; OPTIND=$(( $OPTIND + 1 ))
+ printf "Argument parquet is %s\n" "$parquet"
+ ;;
+ avro)
+ avro="${!OPTIND}"; OPTIND=$(( $OPTIND + 1 ))
+ printf "Argument avro is %s\n" "$avro"
+ ;;
+ hive)
+ hive="${!OPTIND}"; OPTIND=$(( $OPTIND + 1 ))
+ printf "Argument hive is %s\n" "$hive"
+ ;;
+ scala)
+ scala="${!OPTIND}"; OPTIND=$(( $OPTIND + 1 ))
+ printf "Argument scala is %s\n" "$scala"
+ ;;
+ *) echo "Invalid option --$OPTARG" >&2
+ ;;
+ esac ;;
+ \?) echo "Invalid option -$OPTARG" >&2
+ ;;
+ esac
+done
+
+
+get_versions () {
+ base_command=''
+ if [ -z "$hadoop" ]
+ then
+ base_command=$base_command
+ else
+ hadoop=$1
+ base_command+=' -Dhadoop.version='$hadoop
+ fi
+
+ if [ -z "$hive" ]
+ then
+ base_command=$base_command
+ else
+ hive=$2
+ base_command+=' -Dhive.version='$hive
+ fi
+
+ if [ -z "$scala" ]
+ then
+ base_command=$base_command
+ else
+ scala=$3
+ base_command+=' -Dscala-'$scala
+ fi
+ echo $base_command
+}
+
+versions=$(get_versions $hadoop $hive $scala)
+
+final_command='mvn clean install -DskipTests '$versions
+printf "Final command $final_command \n"
+
+# change to the project root directory to run maven command
+move_to_root='cd ..'
+$move_to_root && $final_command
+
+# change back to original working directory
+cd $_CALLING_DIR
+
+printf "A sample spark command to start the integration suite \n"
+get_spark_command $scala
\ No newline at end of file
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
new file mode 100644
index 000000000..a489bacc3
--- /dev/null
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Extends the {@link HoodieDeltaStreamer} to expose certain operations helpful in running the Test Suite.
+ * This is done to achieve 2 things 1) Leverage some components of {@link HoodieDeltaStreamer} 2)
+ * Piggyback on the suite to test {@link HoodieDeltaStreamer}
+ */
+public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer {
+
+ public HoodieDeltaStreamerWrapper(Config cfg, JavaSparkContext jssc) throws Exception {
+ super(cfg, jssc);
+ }
+
+ public HoodieDeltaStreamerWrapper(Config cfg, JavaSparkContext jssc, FileSystem fs, HiveConf conf) throws Exception {
+ super(cfg, jssc, fs, conf);
+ }
+
+ public JavaRDD upsert(Operation operation) throws
+ Exception {
+ cfg.operation = operation;
+ return deltaSyncService.getDeltaSync().syncOnce().getRight();
+ }
+
+ public JavaRDD insert() throws Exception {
+ return upsert(Operation.INSERT);
+ }
+
+ public JavaRDD bulkInsert() throws
+ Exception {
+ return upsert(Operation.BULK_INSERT);
+ }
+
+ public void scheduleCompact() throws Exception {
+ // Since we don't support scheduleCompact() operation in delta-streamer, assume upsert without any data that will
+ // trigger scheduling compaction
+ upsert(Operation.UPSERT);
+ }
+
+ public JavaRDD compact() throws Exception {
+ // Since we don't support compact() operation in delta-streamer, assume upsert without any data that will trigger
+ // inline compaction
+ return upsert(Operation.UPSERT);
+ }
+
+ public Pair>> fetchSource() throws Exception {
+ return deltaSyncService.getDeltaSync().readFromSource(deltaSyncService.getDeltaSync().getCommitTimelineOpt());
+ }
+
+}
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
new file mode 100644
index 000000000..be036c16d
--- /dev/null
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
+import org.apache.hudi.integ.testsuite.dag.DagUtils;
+import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
+import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
+import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
+import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
+import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
+import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the entry point for running a Hudi Test Suite. Although this class has similarities with
+ * {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency on the changes in
+ * DeltaStreamer.
+ */
+public class HoodieTestSuiteJob {
+
+ private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
+
+ private final HoodieTestSuiteConfig cfg;
+ /**
+ * Bag of properties with source, hoodie client, key generator etc.
+ */
+ TypedProperties props;
+ /**
+ * Schema provider that supplies the command for writing out the generated payloads.
+ */
+ private transient SchemaProvider schemaProvider;
+ /**
+ * Filesystem used.
+ */
+ private transient FileSystem fs;
+ /**
+ * Spark context.
+ */
+ private transient JavaSparkContext jsc;
+ /**
+ * Spark Session.
+ */
+ private transient SparkSession sparkSession;
+ /**
+ * Hive Config.
+ */
+ private transient HiveConf hiveConf;
+
+ private KeyGenerator keyGenerator;
+
+ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
+ this.cfg = cfg;
+ this.jsc = jsc;
+ this.sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
+ this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
+ this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
+ log.info("Creating workload generator with configs : {}", props.toString());
+ this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
+ this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
+ this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
+ if (!fs.exists(new Path(cfg.targetBasePath))) {
+ HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath,
+ HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived");
+ }
+ }
+
+ private static HiveConf getDefaultHiveConf(Configuration cfg) {
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.addResource(cfg);
+ return hiveConf;
+ }
+
+ public static void main(String[] args) throws Exception {
+ final HoodieTestSuiteConfig cfg = new HoodieTestSuiteConfig();
+ JCommander cmd = new JCommander(cfg, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ JavaSparkContext jssc = UtilHelpers.buildSparkContext("workload-generator-" + cfg.outputTypeName
+ + "-" + cfg.inputFormatName, cfg.sparkMaster);
+ new HoodieTestSuiteJob(cfg, jssc).runTestSuite();
+ }
+
+ public void runTestSuite() {
+ try {
+ WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils
+ .loadClass((this.cfg).workloadDagGenerator)).build()
+ : DagUtils.convertYamlPathToDag(this.fs, this.cfg.workloadYamlPath);
+ log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag));
+ long startTime = System.currentTimeMillis();
+ String schemaStr = schemaProvider.getSourceSchema().toString();
+ final HoodieTestSuiteWriter writer = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr);
+ final DeltaGenerator deltaGenerator = new DeltaGenerator(
+ new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName),
+ new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath,
+ schemaStr, cfg.limitFileSize), jsc, sparkSession, schemaStr, keyGenerator);
+ DagScheduler dagScheduler = new DagScheduler(workflowDag, writer, deltaGenerator);
+ dagScheduler.schedule();
+ log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime);
+ } catch (Exception e) {
+ log.error("Failed to run Test Suite ", e);
+ throw new HoodieException("Failed to run Test Suite ", e);
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ /**
+ * The Hudi test suite uses {@link HoodieDeltaStreamer} to run some operations hence extend delta streamer config.
+ */
+ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {
+
+ @Parameter(names = {"--input-base-path"}, description = "base path for input data"
+ + "(Will be created if did not exist first time around. If exists, more data will be added to that path)",
+ required = true)
+ public String inputBasePath;
+
+ @Parameter(names = {
+ "--workload-generator-classname"}, description = "WorkflowDag of operations to generate the workload",
+ required = true)
+ public String workloadDagGenerator = WorkflowDagGenerator.class.getName();
+
+ @Parameter(names = {
+ "--workload-yaml-path"}, description = "Workflow Dag yaml path to generate the workload")
+ public String workloadYamlPath;
+
+ @Parameter(names = {"--delta-output-type"}, description = "Subclass of "
+ + "org.apache.hudi.testsuite.workload.DeltaOutputMode to readAvro data.")
+ public String outputTypeName = DeltaOutputMode.DFS.name();
+
+ @Parameter(names = {"--delta-input-format"}, description = "Subclass of "
+ + "org.apache.hudi.testsuite.workload.DeltaOutputMode to read avro data.")
+ public String inputFormatName = DeltaInputType.AVRO.name();
+
+ @Parameter(names = {"--input-file-size"}, description = "The min/max size of the input files to generate",
+ required = true)
+ public Long limitFileSize = 1024 * 1024 * 120L;
+
+ @Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to "
+ + "perform"
+ + " ingestion. If set to false, HoodieWriteClient will be used")
+ public Boolean useDeltaStreamer = false;
+
+ }
+}
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
new file mode 100644
index 000000000..b22faca22
--- /dev/null
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.integ.testsuite;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieReadClient;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
+import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
+import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
+import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
+import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform
+ * write operations into the target hudi dataset. Current supported writers are {@link HoodieDeltaStreamerWrapper}
+ * and {@link HoodieWriteClient}.
+ */
+public class HoodieTestSuiteWriter {
+
+ private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
+ private HoodieWriteClient writeClient;
+ protected HoodieTestSuiteConfig cfg;
+ private Option lastCheckpoint;
+ private HoodieReadClient hoodieReadClient;
+ private Properties props;
+ private String schema;
+ private transient Configuration configuration;
+ private transient JavaSparkContext sparkContext;
+ private static Set VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
+ Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), ScheduleCompactNode.class.getName()));
+
+ public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws
+ Exception {
+ this(jsc, props, cfg, schema, true);
+ }
+
+ public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema,
+ boolean rollbackInflight) throws Exception {
+ // We ensure that only 1 instance of HoodieWriteClient is instantiated for a HoodieTestSuiteWriter
+ // This does not instantiate a HoodieWriteClient until a
+ // {@link HoodieDeltaStreamer#commit(HoodieWriteClient, JavaRDD, Option)} is invoked.
+ this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc);
+ this.hoodieReadClient = new HoodieReadClient(jsc, cfg.targetBasePath);
+ if (!cfg.useDeltaStreamer) {
+ this.writeClient = new HoodieWriteClient(jsc, getHoodieClientConfig(cfg, props, schema), rollbackInflight);
+ }
+ this.cfg = cfg;
+ this.configuration = jsc.hadoopConfiguration();
+ this.sparkContext = jsc;
+ this.props = props;
+ this.schema = schema;
+ }
+
+ private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, Properties props, String schema) {
+ HoodieWriteConfig.Builder builder =
+ HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
+ .withAutoCommit(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
+ .forTable(cfg.targetTableName)
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+ .withProps(props);
+ builder = builder.withSchema(schema);
+ return builder.build();
+ }
+
+ private boolean allowWriteClientAccess(DagNode dagNode) {
+ if (VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE.contains(dagNode.getClass().getName())) {
+ return true;
+ }
+ return false;
+ }
+
+ public Pair>> fetchSource() throws Exception {
+ return this.deltaStreamerWrapper.fetchSource();
+ }
+
+ public Option startCommit() {
+ if (cfg.useDeltaStreamer) {
+ return Option.of(HoodieActiveTimeline.createNewInstantTime());
+ } else {
+ return Option.of(writeClient.startCommit());
+ }
+ }
+
+ public JavaRDD upsert(Option instantTime) throws Exception {
+ if (cfg.useDeltaStreamer) {
+ return deltaStreamerWrapper.upsert(Operation.UPSERT);
+ } else {
+ Pair>> nextBatch = fetchSource();
+ lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
+ return writeClient.upsert(nextBatch.getRight().getRight(), instantTime.get());
+ }
+ }
+
+ public JavaRDD insert(Option instantTime) throws Exception {
+ if (cfg.useDeltaStreamer) {
+ return deltaStreamerWrapper.insert();
+ } else {
+ Pair>> nextBatch = fetchSource();
+ lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
+ return writeClient.insert(nextBatch.getRight().getRight(), instantTime.get());
+ }
+ }
+
+ public JavaRDD bulkInsert(Option instantTime) throws Exception {
+ if (cfg.useDeltaStreamer) {
+ return deltaStreamerWrapper.bulkInsert();
+ } else {
+ Pair>> nextBatch = fetchSource();
+ lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
+ return writeClient.bulkInsert(nextBatch.getRight().getRight(), instantTime.get());
+ }
+ }
+
+ public JavaRDD compact(Option instantTime) throws Exception {
+ if (cfg.useDeltaStreamer) {
+ return deltaStreamerWrapper.compact();
+ } else {
+ if (!instantTime.isPresent()) {
+ Option> compactionPlanPair = Option
+ .fromJavaOptional(hoodieReadClient.getPendingCompactions()
+ .stream().findFirst());
+ if (compactionPlanPair.isPresent()) {
+ instantTime = Option.of(compactionPlanPair.get().getLeft());
+ }
+ }
+ if (instantTime.isPresent()) {
+ return writeClient.compact(instantTime.get());
+ } else {
+ return null;
+ }
+ }
+ }
+
+ public Option scheduleCompaction(Option