diff --git a/docker/demo/config/test-suite/insert-overwrite-table.yaml b/docker/demo/config/test-suite/insert-overwrite-table.yaml new file mode 100644 index 000000000..8b5a26e46 --- /dev/null +++ b/docker/demo/config/test-suite/insert-overwrite-table.yaml @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: simple-deltastreamer.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: first_insert + second_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: first_upsert + second_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: second_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: second_upsert + first_insert_overwrite_table: + config: + record_size: 1000 + repeat_count: 10 + num_records_insert: 10 + type: SparkInsertOverwriteTableNode + deps: first_hive_sync + delete_all_input_except_last: + config: + delete_input_data_except_latest: true + type: DeleteInputDatasetNode + deps: first_insert_overwrite_table + third_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: delete_all_input_except_last + third_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: third_insert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: third_upsert + second_validate: + config: + validate_full_data : true + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: second_hive_sync diff --git a/docker/demo/config/test-suite/insert-overwrite.yaml b/docker/demo/config/test-suite/insert-overwrite.yaml new file mode 100644 index 000000000..f2299c50c --- /dev/null +++ b/docker/demo/config/test-suite/insert-overwrite.yaml @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: simple-deltastreamer.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 1 +dag_content: + + first_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: first_insert + second_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: first_upsert + second_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: second_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: second_upsert + first_insert_overwrite: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10 + type: SparkInsertOverwriteNode + deps: first_hive_sync + delete_all_input_except_last: + config: + delete_input_data_except_latest: true + type: DeleteInputDatasetNode + deps: first_insert_overwrite + third_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: delete_all_input_except_last + third_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: third_insert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: third_upsert + second_validate: + config: + validate_full_data : true + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: second_hive_sync diff --git a/docker/demo/config/test-suite/spark-clustering.yaml b/docker/demo/config/test-suite/spark-clustering.yaml new file mode 100644 index 000000000..e8e722ca7 --- /dev/null +++ b/docker/demo/config/test-suite/spark-clustering.yaml @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-spark-simple.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: first_insert + second_insert: + config: + record_size: 1000 + num_partitions_insert: 10 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: first_upsert + second_upsert: + config: + record_size: 1000 + num_partitions_insert: 10 + num_records_insert: 1000 + repeat_count: 1 + num_records_upsert: 8000 + num_partitions_upsert: 10 + type: SparkUpsertNode + deps: second_insert + first_delete: + config: + num_partitions_delete: 10 + num_records_delete: 16000 + type: SparkDeleteNode + deps: second_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + second_validate: + config: + validate_hive: false + delete_input_data: false + type: ValidateDatasetNode + deps: second_hive_sync \ No newline at end of file diff --git a/docker/demo/config/test-suite/test-metadata.properties b/docker/demo/config/test-suite/test-metadata.properties new file mode 100644 index 000000000..48da77c51 --- /dev/null +++ b/docker/demo/config/test-suite/test-metadata.properties @@ -0,0 +1,56 @@ + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.insert.shuffle.parallelism=100 +hoodie.upsert.shuffle.parallelism=100 +hoodie.bulkinsert.shuffle.parallelism=100 + +hoodie.metadata.enable=true + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.clustering.plan.strategy.sort.columns=_row_key +hoodie.clustering.plan.strategy.daybased.lookback.partitions=0 +hoodie.clustering.inline.max.commits=1 + +hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/docker/demo/config/test-suite/test.properties b/docker/demo/config/test-suite/test.properties index 30cd1c1f0..509b9f4ba 100644 --- a/docker/demo/config/test-suite/test.properties +++ b/docker/demo/config/test-suite/test.properties @@ -19,6 +19,8 @@ hoodie.insert.shuffle.parallelism=100 hoodie.upsert.shuffle.parallelism=100 hoodie.bulkinsert.shuffle.parallelism=100 +hoodie.metadata.enable=false + hoodie.deltastreamer.source.test.num_partitions=100 hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false hoodie.deltastreamer.source.test.max_unique_records=100000000 @@ -32,10 +34,6 @@ hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator hoodie.datasource.write.partitionpath.field=timestamp -hoodie.clustering.plan.strategy.sort.columns=_row_key -hoodie.clustering.plan.strategy.daybased.lookback.partitions=0 -hoodie.clustering.inline.max.commits=1 - hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 56aa390ff..2c39f5f93 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -96,6 +96,8 @@ public class DeltaConfig implements Serializable { private static String NUM_ROLLBACKS = "num_rollbacks"; private static String ENABLE_ROW_WRITING = "enable_row_writing"; private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate"; + private static String VALIDATE_FULL_DATA = "validate_full_data"; + private static String DELETE_INPUT_DATA_EXCEPT_LATEST = "delete_input_data_except_latest"; // Spark SQL Create Table private static String TABLE_TYPE = "table_type"; @@ -206,10 +208,18 @@ public class DeltaConfig implements Serializable { return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString()); } + public boolean isDeleteInputDataExceptLatest() { + return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA_EXCEPT_LATEST, false).toString()); + } + public boolean isValidateHive() { return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString()); } + public boolean isValidateFullData() { + return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString()); + } + public int getIterationCountToExecute() { return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, -1).toString()); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java index 986e97328..09d44d986 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java @@ -19,16 +19,14 @@ package org.apache.hudi.integ.testsuite.dag.nodes; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.schema.SchemaUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.Dataset; @@ -42,13 +40,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; +import java.util.List; +import java.util.stream.Collectors; + import scala.Tuple2; import scala.collection.JavaConversions; import scala.collection.JavaConverters; -import java.util.List; -import java.util.stream.Collectors; - /** * This nodes validates contents from input path are in tact with Hudi. By default no configs are required for this node. But there is an * optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites. @@ -78,6 +76,7 @@ public abstract class BaseValidateDatasetNode extends DagNode { public void execute(ExecutionContext context, int curItrCount) throws Exception { SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate(); + // todo: Fix partitioning schemes. For now, assumes data based partitioning. String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*"; log.warn("Validation using data from input path " + inputPath); @@ -97,46 +96,60 @@ public abstract class BaseValidateDatasetNode extends DagNode { // read from hudi and remove meta columns. Dataset trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema()); - Dataset intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf); - long inputCount = inputSnapshotDf.count(); - long outputCount = trimmedHudiDf.count(); - log.debug("Input count: " + inputCount + "; output count: " + outputCount); - // the intersected df should be same as inputDf. if not, there is some mismatch. - if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) { - log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount); - throw new AssertionError("Hudi contents does not match contents input data. "); - } - - if (config.isValidateHive()) { - String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key()); - String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key()); - log.warn("Validating hive table with db : " + database + " and table : " + tableName); - session.sql("REFRESH TABLE " + database + "." + tableName); - Dataset cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " + - "test_suite_source_ordering_field FROM " + database + "." + tableName); - Dataset reorderedInputDf = inputSnapshotDf.select("_row_key","rider","driver","begin_lat","begin_lon","end_lat","end_lon","fare", - "_hoodie_is_deleted","test_suite_source_ordering_field"); - - Dataset intersectedHiveDf = reorderedInputDf.intersect(cowDf); - outputCount = trimmedHudiDf.count(); - log.warn("Input count: " + inputCount + "; output count: " + outputCount); - // the intersected df should be same as inputDf. if not, there is some mismatch. - if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) { - log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount); - throw new AssertionError("Hudi hive table contents does not match contents input data. "); + if (config.isValidateFullData()) { + log.debug("Validating full dataset"); + Dataset exceptInputDf = inputSnapshotDf.except(trimmedHudiDf); + Dataset exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf); + long exceptInputCount = exceptInputDf.count(); + long exceptHudiCount = exceptHudiDf.count(); + log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount); + if (exceptInputCount != 0 || exceptHudiCount != 0) { + log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count() + + ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount); + throw new AssertionError("Hudi contents does not match contents input data. "); + } + } else { + Dataset intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf); + long inputCount = inputSnapshotDf.count(); + long outputCount = trimmedHudiDf.count(); + log.debug("Input count: " + inputCount + "; output count: " + outputCount); + // the intersected df should be same as inputDf. if not, there is some mismatch. + if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) { + log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount); + throw new AssertionError("Hudi contents does not match contents input data. "); } - } - // if delete input data is enabled, erase input data. - if (config.isDeleteInputData()) { - // clean up input data for current group of writes. - inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; - FileSystem fs = new Path(inputPathStr) - .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); - FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); - for (FileStatus fileStatus : fileStatuses) { - log.debug("Micro batch to be deleted " + fileStatus.getPath().toString()); - fs.delete(fileStatus.getPath(), true); + if (config.isValidateHive()) { + String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key()); + String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key()); + log.warn("Validating hive table with db : " + database + " and table : " + tableName); + session.sql("REFRESH TABLE " + database + "." + tableName); + Dataset cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " + + "test_suite_source_ordering_field FROM " + database + "." + tableName); + Dataset reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare", + "_hoodie_is_deleted", "test_suite_source_ordering_field"); + + Dataset intersectedHiveDf = reorderedInputDf.intersect(cowDf); + outputCount = trimmedHudiDf.count(); + log.warn("Input count: " + inputCount + "; output count: " + outputCount); + // the intersected df should be same as inputDf. if not, there is some mismatch. + if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) { + log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount); + throw new AssertionError("Hudi hive table contents does not match contents input data. "); + } + } + + // if delete input data is enabled, erase input data. + if (config.isDeleteInputData()) { + // clean up input data for current group of writes. + inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; + FileSystem fs = new Path(inputPathStr) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); + for (FileStatus fileStatus : fileStatuses) { + log.debug("Micro batch to be deleted " + fileStatus.getPath().toString()); + fs.delete(fileStatus.getPath(), true); + } } } } @@ -149,8 +162,8 @@ public abstract class BaseValidateDatasetNode extends DagNode { Dataset inputDf = session.read().format("avro").load(inputPath); ExpressionEncoder encoder = getEncoder(inputDf.schema()); return inputDf.groupByKey( - (MapFunction) value -> - value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField), Encoders.STRING()) + (MapFunction) value -> + value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField), Encoders.STRING()) .reduceGroups((ReduceFunction) (v1, v2) -> { int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD); int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java new file mode 100644 index 000000000..2836f240e --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteInputDatasetNode.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Deletes all input except latest batch. Mostly used in insert_overwrite operations. + */ +public class DeleteInputDatasetNode extends DagNode { + + public DeleteInputDatasetNode(DeltaConfig.Config config) { + this.config = config; + } + + @Override + public void execute(ExecutionContext context, int curItrCount) throws Exception { + + String latestBatch = String.valueOf(context.getWriterContext().getDeltaGenerator().getBatchId()); + + if (config.isDeleteInputDataExceptLatest()) { + String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; + FileSystem fs = new Path(inputPathStr) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); + for (FileStatus fileStatus : fileStatuses) { + if (!fileStatus.getPath().getName().equals(latestBatch)) { + log.debug("Micro batch to be deleted " + fileStatus.getPath().toString()); + fs.delete(fileStatus.getPath(), true); + } + } + } + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index 6d5bc4ffe..69e32dfbc 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -110,6 +110,10 @@ public class DeltaGenerator implements Serializable { return ws; } + public int getBatchId() { + return batchId; + } + public JavaRDD generateInserts(Config operation) { int numPartitions = operation.getNumInsertPartitions(); long recordsPerPartition = operation.getNumRecordsInsert(); diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala index db17a6ee3..b8c46cad3 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala @@ -54,13 +54,18 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { context.getWriterContext.getSparkSession) inputDF.write.format("hudi") .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field") .option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.OPERATION.key, getOperation()) .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key") .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")) .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .mode(SaveMode.Append) .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) } + + def getOperation(): String = { + DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL + } } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala new file mode 100644 index 000000000..6dd2eac52 --- /dev/null +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteNode.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes + +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config + +class SparkInsertOverwriteNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) { + + override def getOperation(): String = { + DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL + } + +} diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala new file mode 100644 index 000000000..a6b80b3a9 --- /dev/null +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertOverwriteTableNode.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes + +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config + +class SparkInsertOverwriteTableNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) { + + override def getOperation(): String = { + DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL + } +} diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala index 858827a7b..113de93ad 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala @@ -18,49 +18,17 @@ package org.apache.hudi.integ.testsuite.dag.nodes -import org.apache.hudi.client.WriteStatus -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config -import org.apache.hudi.integ.testsuite.dag.ExecutionContext -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SaveMode - -import scala.collection.JavaConverters._ /** * Spark datasource based upsert node * * @param dagNodeConfig DAG node configurations. */ -class SparkUpsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { +class SparkUpsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) { - config = dagNodeConfig - - /** - * Execute the {@link DagNode}. - * - * @param context The context needed for an execution of a node. - * @param curItrCount iteration count for executing the node. - * @throws Exception Thrown if the execution failed. - */ - override def execute(context: ExecutionContext, curItrCount: Int): Unit = { - if (!config.isDisableGenerate) { - println("Generating input data for node {}", this.getName) - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() - } - val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch, - context.getWriterContext.getHoodieTestSuiteWriter.getSchema, - context.getWriterContext.getSparkSession) - inputDF.write.format("hudi") - .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) - .option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) - .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key") - .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")) - .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) - .mode(SaveMode.Append) - .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) + override def getOperation(): String = { + DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL } }