From a205dd10faba0a83dcb39a12abb6f744b5224992 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sun, 1 Nov 2020 00:15:41 -0400 Subject: [PATCH] [HUDI-1338] Adding Delete support to test suite framework (#2172) - Adding Delete support to test suite. Added DeleteNode Added support to generate delete records --- .../config/test-suite/complex-dag-cow.yaml | 59 ++++++++++-- docker/demo/config/test-suite/source.avsc | 6 +- docker/demo/config/test-suite/target.avsc | 4 + docker/demo/config/test-suite/test.properties | 27 ++++++ hudi-integ-test/README.md | 39 +------- .../testsuite/configuration/DeltaConfig.java | 35 +++++-- .../testsuite/converter/DeleteConverter.java | 44 +++++++++ .../integ/testsuite/dag/nodes/DeleteNode.java | 54 +++++++++++ .../testsuite/dag/scheduler/DagScheduler.java | 11 ++- .../generator/DeleteGeneratorIterator.java | 54 +++++++++++ .../testsuite/generator/DeltaGenerator.java | 56 +++++++++-- ...lexibleSchemaRecordGenerationIterator.java | 10 +- .../GenericRecordFullPayloadGenerator.java | 96 ++++++++++--------- .../generator/UpdateGeneratorIterator.java | 4 + .../writer/DFSDeltaWriterAdapter.java | 10 +- .../converter/TestDeleteConverter.java | 84 ++++++++++++++++ 16 files changed, 472 insertions(+), 121 deletions(-) create mode 100644 docker/demo/config/test-suite/test.properties create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/DeleteConverter.java create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteNode.java create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeleteGeneratorIterator.java create mode 100644 hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/converter/TestDeleteConverter.java diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml b/docker/demo/config/test-suite/complex-dag-cow.yaml index 45f538bb0..a10026c0b 100644 --- a/docker/demo/config/test-suite/complex-dag-cow.yaml +++ b/docker/demo/config/test-suite/complex-dag-cow.yaml @@ -59,15 +59,13 @@ first_hive_sync: deps: first_upsert first_hive_query: config: - hive_props: - prop2: "set spark.yarn.queue=" - prop3: "set hive.strict.checks.large.query=false" - prop4: "set hive.stats.autogather=false" + queue_name: "adhoc" + engine: "mr" hive_queries: query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" result1: 0 query2: "select count(*) from testdb.table1" - result2: 11600 + result2: 11300 type: HiveQueryNode deps: first_hive_sync second_upsert: @@ -82,14 +80,55 @@ second_upsert: deps: first_hive_query second_hive_query: config: - hive_props: - prop2: "set mapred.job.queue.name=" - prop3: "set hive.strict.checks.large.query=false" - prop4: "set hive.stats.autogather=false" + queue_name: "adhoc" + engine: "mr" hive_queries: query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" result1: 0 query2: "select count(*) from testdb.table1" - result2: 11900 + result2: 11600 type: HiveQueryNode deps: second_upsert +fourth_insert: + config: + record_size: 70000 + num_insert_partitions: 1 + repeat_count: 1 + num_records_insert: 1000 + deps: second_hive_query + type: InsertNode +third_hive_query: + config: + queue_name: "adhoc" + engine: "mr" + hive_queries: + query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" + result1: 0 + query2: "select count(*) from testdb.table1" + result2: 12600 + type: HiveQueryNode + deps: fourth_insert +first_delete: + config: + record_size: 70000 + num_partitions_delete: 1 + num_records_delete: 200 + deps: third_hive_query + type: DeleteNode +fourth_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete +fourth_hive_query: + config: + queue_name: "adhoc" + engine: "mr" + hive_queries: + query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" + result1: 0 + query2: "select count(*) from testdb.table1" + result2: 12400 + type: HiveQueryNode + deps: fourth_hive_sync \ No newline at end of file diff --git a/docker/demo/config/test-suite/source.avsc b/docker/demo/config/test-suite/source.avsc index c2a8d7c2a..e62d3b55e 100644 --- a/docker/demo/config/test-suite/source.avsc +++ b/docker/demo/config/test-suite/source.avsc @@ -46,6 +46,10 @@ }, { "name" : "fare", "type" : "double" - } ] + }, { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + }] } diff --git a/docker/demo/config/test-suite/target.avsc b/docker/demo/config/test-suite/target.avsc index 29a2500a9..c71f1cec5 100644 --- a/docker/demo/config/test-suite/target.avsc +++ b/docker/demo/config/test-suite/target.avsc @@ -46,6 +46,10 @@ }, { "name" : "fare", "type" : "double" + }, { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false }, { "name" : "haversine_distance", "type" : "double" diff --git a/docker/demo/config/test-suite/test.properties b/docker/demo/config/test-suite/test.properties new file mode 100644 index 000000000..a7fd3986a --- /dev/null +++ b/docker/demo/config/test-suite/test.properties @@ -0,0 +1,27 @@ +hoodie.insert.shuffle.parallelism=100 +hoodie.upsert.shuffle.parallelism=100 +hoodie.bulkinsert.shuffle.parallelism=100 + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index 766db2e98..a6cdd0847 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -178,41 +178,6 @@ Copy the integration tests jar into the docker container docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar adhoc-2:/opt ``` -Copy the following test properties file: -``` -echo ' -hoodie.insert.shuffle.parallelism=100 -hoodie.upsert.shuffle.parallelism=100 -hoodie.bulkinsert.shuffle.parallelism=100 - -hoodie.deltastreamer.source.test.num_partitions=100 -hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false -hoodie.deltastreamer.source.test.max_unique_records=100000000 -hoodie.embed.timeline.server=false -hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector - -hoodie.datasource.write.recordkey.field=_row_key -hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator -hoodie.datasource.write.partitionpath.field=timestamp - -hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input -hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc -hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc -hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP -hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd - -hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ -hoodie.datasource.hive_sync.database=testdb -hoodie.datasource.hive_sync.table=table1 -hoodie.datasource.hive_sync.assume_date_partitioning=false -hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path -hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor -hoodie.datasource.hive_sync.skip_ro_suffix=true -' > test.properties - -docker cp test.properties adhoc-2:/opt -``` - ``` docker exec -it adhoc-2 /bin/bash ``` @@ -254,7 +219,7 @@ spark-submit \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ --input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \ --target-table table1 \ ---props test.properties \ +--props file:/var/hoodie/ws/docker/demo/config/test-suite/test.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --source-class org.apache.hudi.utilities.sources.AvroDFSSource \ --input-file-size 125829120 \ @@ -293,7 +258,7 @@ spark-submit \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ --input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \ --target-table table1 \ ---props test.properties \ +--props file:/var/hoodie/ws/docker/demo/config/test-suite/test.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --source-class org.apache.hudi.utilities.sources.AvroDFSSource \ --input-file-size 125829120 \ diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index db1560464..81f406be3 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -18,17 +18,19 @@ package org.apache.hudi.integ.testsuite.configuration; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.integ.testsuite.reader.DeltaInputType; +import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; + import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; + import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.integ.testsuite.reader.DeltaInputType; -import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; /** * Configuration to hold the delta output type and delta input format. @@ -59,8 +61,7 @@ public class DeltaConfig implements Serializable { } /** - * Represents any kind of workload operation for new data. Each workload also contains a set of Option sequence of - * actions that can be executed in parallel. + * Represents any kind of workload operation for new data. Each workload also contains a set of Option sequence of actions that can be executed in parallel. */ public static class Config { @@ -73,10 +74,12 @@ public class DeltaConfig implements Serializable { public static final String HIVE_PROPERTIES = "hive_props"; private static String NUM_RECORDS_INSERT = "num_records_insert"; private static String NUM_RECORDS_UPSERT = "num_records_upsert"; + private static String NUM_RECORDS_DELETE = "num_records_delete"; private static String REPEAT_COUNT = "repeat_count"; private static String RECORD_SIZE = "record_size"; private static String NUM_PARTITIONS_INSERT = "num_partitions_insert"; private static String NUM_PARTITIONS_UPSERT = "num_partitions_upsert"; + private static String NUM_PARTITIONS_DELETE = "num_partitions_delete"; private static String NUM_FILES_UPSERT = "num_files_upsert"; private static String FRACTION_UPSERT_PER_FILE = "fraction_upsert_per_file"; private static String DISABLE_GENERATE = "disable_generate"; @@ -103,6 +106,10 @@ public class DeltaConfig implements Serializable { return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_UPSERT, 0).toString()); } + public long getNumRecordsDelete() { + return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_DELETE, 0).toString()); + } + public int getRecordSize() { return Integer.valueOf(configsMap.getOrDefault(RECORD_SIZE, 1024).toString()); } @@ -123,6 +130,10 @@ public class DeltaConfig implements Serializable { return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString()); } + public int getNumDeletePartitions() { + return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_DELETE, 1).toString()); + } + public int getNumUpsertFiles() { return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString()); } @@ -192,6 +203,11 @@ public class DeltaConfig implements Serializable { return this; } + public Builder withNumRecordsToDelete(long numRecordsDelete) { + this.configsMap.put(NUM_RECORDS_DELETE, numRecordsDelete); + return this; + } + public Builder withNumInsertPartitions(int numInsertPartitions) { this.configsMap.put(NUM_PARTITIONS_INSERT, numInsertPartitions); return this; @@ -202,6 +218,11 @@ public class DeltaConfig implements Serializable { return this; } + public Builder withNumDeletePartitions(int numDeletePartitions) { + this.configsMap.put(NUM_PARTITIONS_DELETE, numDeletePartitions); + return this; + } + public Builder withNumUpsertFiles(int numUpsertFiles) { this.configsMap.put(NUM_FILES_UPSERT, numUpsertFiles); return this; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/DeleteConverter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/DeleteConverter.java new file mode 100644 index 000000000..f6dc08b5a --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/DeleteConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.converter; + +import org.apache.hudi.integ.testsuite.generator.DeleteGeneratorIterator; +import org.apache.hudi.integ.testsuite.generator.LazyRecordGeneratorIterator; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; + +import java.util.List; + +public class DeleteConverter implements Converter { + + private final String schemaStr; + private final int minPayloadSize; + + public DeleteConverter(String schemaStr, int minPayloadSize) { + this.schemaStr = schemaStr; + this.minPayloadSize = minPayloadSize; + } + + @Override + public JavaRDD convert(JavaRDD inputRDD) { + return inputRDD.mapPartitions(recordItr -> new LazyRecordGeneratorIterator(new DeleteGeneratorIterator(recordItr, + schemaStr, minPayloadSize))); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteNode.java new file mode 100644 index 000000000..b538b01d1 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteNode.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; + +import org.apache.spark.api.java.JavaRDD; + +/** + * Delete node to assist in issuing deletes. + */ +public class DeleteNode extends InsertNode { + + public DeleteNode(Config config) { + super(config); + } + + @Override + protected void generate(DeltaGenerator deltaGenerator) throws Exception { + if (!config.isDisableGenerate()) { + deltaGenerator.writeRecords(deltaGenerator.generateDeletes(config)).count(); + } + } + + @Override + protected JavaRDD ingest(HoodieTestSuiteWriter hoodieTestSuiteWriter, Option commitTime) + throws Exception { + if (!config.isDisableIngest()) { + log.info("Deleting input data {}", this.getName()); + this.result = hoodieTestSuiteWriter.upsert(commitTime); + } + return this.result; + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index 87686d42c..5c70ea164 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -24,6 +24,7 @@ import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; import org.apache.hudi.metrics.Metrics; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +39,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.CONFIG_NAME; + /** - * The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to - * the relations between nodes. + * The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to the relations between nodes. */ public class DagScheduler { @@ -75,7 +77,7 @@ public class DagScheduler { * Method to start executing the nodes in workflow DAGs. * * @param service ExecutorService - * @param nodes Nodes to be executed + * @param nodes Nodes to be executed * @throws Exception will be thrown if ant error occurred */ private void execute(ExecutorService service, List nodes) throws Exception { @@ -87,6 +89,7 @@ public class DagScheduler { Set childNodes = new HashSet<>(); while (queue.size() > 0) { DagNode nodeToExecute = queue.poll(); + log.info("Node to execute in dag scheduler " + nodeToExecute.getConfig().toString()); futures.add(service.submit(() -> executeNode(nodeToExecute))); if (nodeToExecute.getChildNodes().size() > 0) { childNodes.addAll(nodeToExecute.getChildNodes()); @@ -116,7 +119,7 @@ public class DagScheduler { try { int repeatCount = node.getConfig().getRepeatCount(); while (repeatCount > 0) { - log.warn("executing node: " + node.getName() + " of type: " + node.getClass()); + log.warn("executing node: \"" + node.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" of type: " + node.getClass() + " :: " + node.getConfig().toString()); node.execute(executionContext); log.info("Finished executing {}", node.getName()); repeatCount--; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeleteGeneratorIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeleteGeneratorIterator.java new file mode 100644 index 000000000..b95bd0e9d --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeleteGeneratorIterator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.generator; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Lazy delete record generator. + */ +public class DeleteGeneratorIterator implements Iterator { + + // Use the full payload generator as default + private GenericRecordFullPayloadGenerator generator; + // iterator + private Iterator itr; + + public DeleteGeneratorIterator(Iterator itr, String schemaStr, int minPayloadSize) { + this.itr = itr; + Schema schema = new Schema.Parser().parse(schemaStr); + this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize); + } + + @Override + public boolean hasNext() { + return itr.hasNext(); + } + + @Override + public GenericRecord next() { + GenericRecord newRecord = itr.next(); + return this.generator.generateDeleteRecord(newRecord); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index dc991b11e..53af8eb74 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -36,9 +36,11 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.hudi.integ.testsuite.converter.Converter; +import org.apache.hudi.integ.testsuite.converter.DeleteConverter; import org.apache.hudi.integ.testsuite.converter.UpdateConverter; import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader; import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader; @@ -48,14 +50,15 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter; import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory; import org.apache.hudi.keygen.BuiltinKeyGenerator; -import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; -import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; + import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + + import scala.Tuple2; /** @@ -127,7 +130,6 @@ public class DeltaGenerator implements Serializable { if (deltaOutputConfig.getInputParallelism() < numPartitions) { inputBatch = inputBatch.coalesce(deltaOutputConfig.getInputParallelism()); } - return inputBatch; } @@ -162,16 +164,14 @@ public class DeltaGenerator implements Serializable { // persist this since we will make multiple passes over this int numPartition = Math.min(deltaOutputConfig.getInputParallelism(), Math.max(1, config.getNumUpsertPartitions())); - log.info("Repartitioning records into " + numPartition + " partitions"); + log.info("Repartitioning records into " + numPartition + " partitions for updates"); adjustedRDD = adjustedRDD.repartition(numPartition); - log.info("Repartitioning records done"); + log.info("Repartitioning records done for updates"); + UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(), partitionPathFieldNames, recordRowKeyFieldNames); JavaRDD updates = converter.convert(adjustedRDD); - - log.info("Records converted"); updates.persist(StorageLevel.DISK_ONLY()); - if (inserts == null) { inserts = updates; } else { @@ -185,6 +185,42 @@ public class DeltaGenerator implements Serializable { } } + public JavaRDD generateDeletes(Config config) throws IOException { + if (deltaOutputConfig.getDeltaOutputMode() == DeltaOutputMode.DFS) { + DeltaInputReader deltaInputReader = null; + JavaRDD adjustedRDD = null; + + if (config.getNumDeletePartitions() < 1) { + // randomly generate deletes for a given number of records without regard to partitions and files + deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr, + ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty()); + adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete()); + adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsDelete()); + } else { + deltaInputReader = + new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(), + schemaStr); + if (config.getFractionUpsertPerFile() > 0) { + adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), + config.getFractionUpsertPerFile()); + } else { + adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), config + .getNumRecordsDelete()); + } + } + log.info("Repartitioning records for delete"); + // persist this since we will make multiple passes over this + adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism()); + Converter converter = new DeleteConverter(schemaStr, config.getRecordSize()); + JavaRDD deletes = converter.convert(adjustedRDD); + deletes.persist(StorageLevel.DISK_ONLY()); + return deletes; + } else { + throw new IllegalArgumentException("Other formats are not supported at the moment"); + } + } + + public Map getPartitionToCountMap(JavaRDD records) { // Requires us to keep the partitioner the same return records.mapPartitionsWithIndex((index, itr) -> { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java index 270dcd169..256dfa49e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java @@ -18,17 +18,17 @@ package org.apache.hudi.integ.testsuite.generator; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; /** - * A GenericRecordGeneratorIterator for the custom schema of the workload. Implements {@link Iterator} to allow for - * iteration semantics. + * A GenericRecordGeneratorIterator for the custom schema of the workload. Implements {@link Iterator} to allow for iteration semantics. */ public class FlexibleSchemaRecordGenerationIterator implements Iterator { @@ -67,7 +67,7 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator " + minPayloadSize); - } + if (numberOfComplexFields < 1) { + LOG.warn("The schema does not have any collections/complex fields. " + + "Cannot achieve minPayloadSize => " + minPayloadSize); + } - determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize); + determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize); } } @@ -122,8 +123,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { /** * Create a new {@link GenericRecord} with random value according to given schema. * - * Long fields which are specified within partitionPathFieldNames are constrained to the value of the partition - * for which records are being generated. + * Long fields which are specified within partitionPathFieldNames are constrained to the value of the partition for which records are being generated. * * @return {@link GenericRecord} with random value */ @@ -137,7 +137,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { if (isPartialLongField(f, partitionPathFieldNames)) { // This is a long field used as partition field. Set it to seconds since epoch. long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS); - result.put(f.name(), (long)value); + result.put(f.name(), (long) value); } else { result.put(f.name(), typeConvert(f)); } @@ -147,7 +147,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable { /** * Return true if this is a partition field of type long which should be set to the partition index. - * @return */ private boolean isPartialLongField(Schema.Field field, Set partitionPathFieldNames) { if ((partitionPathFieldNames == null) || !partitionPathFieldNames.contains(field.name())) { @@ -165,7 +164,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { /** * Update a given {@link GenericRecord} with random value. The fields in {@code blacklistFields} will not be updated. * - * @param record GenericRecord to update + * @param record GenericRecord to update * @param blacklistFields Fields whose value should not be touched * @return The updated {@link GenericRecord} */ @@ -174,8 +173,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { } /** - * Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value - * is random too. + * Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value is random too. * * @param schema Schema to create with. * @return A {@link GenericRecord} with random value. @@ -183,11 +181,15 @@ public class GenericRecordFullPayloadGenerator implements Serializable { protected GenericRecord convertPartial(Schema schema) { GenericRecord result = new GenericData.Record(schema); for (Schema.Field f : schema.getFields()) { - boolean setNull = random.nextBoolean(); - if (!setNull) { - result.put(f.name(), typeConvert(f)); + if (f.name().equals(DEFAULT_HOODIE_IS_DELETED_COL)) { + result.put(f.name(), false); } else { - result.put(f.name(), null); + boolean setNull = random.nextBoolean(); + if (!setNull) { + result.put(f.name(), typeConvert(f)); + } else { + result.put(f.name(), null); + } } } // TODO : pack remaining bytes into a complex field @@ -195,22 +197,34 @@ public class GenericRecordFullPayloadGenerator implements Serializable { } /** - * Set random value to {@link GenericRecord} according to the schema type of field. - * The field in blacklist will not be set. + * Set random value to {@link GenericRecord} according to the schema type of field. The field in blacklist will not be set. * - * @param record GenericRecord to randomize. + * @param record GenericRecord to randomize. * @param blacklistFields blacklistFields where the filed will not be randomized. * @return Randomized GenericRecord. */ protected GenericRecord randomize(GenericRecord record, Set blacklistFields) { for (Schema.Field f : record.getSchema().getFields()) { - if (blacklistFields == null || !blacklistFields.contains(f.name())) { + if (f.name().equals(DEFAULT_HOODIE_IS_DELETED_COL)) { + record.put(f.name(), false); + } else if (blacklistFields == null || !blacklistFields.contains(f.name())) { record.put(f.name(), typeConvert(f)); } } return record; } + /** + * Set _hoodie_is_deleted column value to true. + * + * @param record GenericRecord to delete. + * @return GenericRecord representing deleted record. + */ + protected GenericRecord generateDeleteRecord(GenericRecord record) { + record.put(DEFAULT_HOODIE_IS_DELETED_COL, true); + return record; + } + /** * Generate random value according to their type. */ @@ -219,6 +233,10 @@ public class GenericRecordFullPayloadGenerator implements Serializable { if (isOption(fieldSchema)) { fieldSchema = getNonNull(fieldSchema); } + if (fieldSchema.getName().equals(DEFAULT_HOODIE_IS_DELETED_COL)) { + return false; + } + switch (fieldSchema.getType()) { case BOOLEAN: return random.nextBoolean(); @@ -231,7 +249,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { case LONG: return random.nextLong(); case STRING: - return UUID.randomUUID().toString(); + return UUID.randomUUID().toString(); case ENUM: List enumSymbols = fieldSchema.getEnumSymbols(); return new GenericData.EnumSymbol(fieldSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1))); @@ -293,14 +311,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable { } /** - * Check whether a schema is option. - * return true if it match the follows: - * 1. Its type is Type.UNION - * 2. Has two types - * 3. Has a NULL type. - * - * @param schema - * @return + * Check whether a schema is option. return true if it match the follows: 1. Its type is Type.UNION 2. Has two types 3. Has a NULL type. */ protected boolean isOption(Schema schema) { return schema.getType().equals(Schema.Type.UNION) @@ -346,7 +357,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable { /** * Method help to calculate the number of entries to add. * - * @param elementSchema * @return Number of entries to add */ private void determineExtraEntriesRequired(int numberOfComplexFields, int numberOfBytesToAdd) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java index d9d137a42..51b1fd9ed 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java @@ -25,12 +25,16 @@ import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A lazy update payload generator to generate {@link GenericRecord}s lazily. */ public class UpdateGeneratorIterator implements Iterator { + private static Logger LOG = LoggerFactory.getLogger(UpdateGeneratorIterator.class); + // Use the full payload generator as default private GenericRecordFullPayloadGenerator generator; private Set blackListedFields; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java index 4c70ac564..65e4ee13c 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java @@ -18,15 +18,15 @@ package org.apache.hudi.integ.testsuite.writer; +import org.apache.avro.generic.GenericRecord; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.apache.avro.generic.GenericRecord; /** - * {@link org.apache.hadoop.hdfs.DistributedFileSystem} (or {@link org.apache.hadoop.fs.LocalFileSystem}) based delta - * generator. + * {@link org.apache.hadoop.hdfs.DistributedFileSystem} (or {@link org.apache.hadoop.fs.LocalFileSystem}) based delta generator. */ public class DFSDeltaWriterAdapter implements DeltaWriterAdapter { @@ -40,10 +40,12 @@ public class DFSDeltaWriterAdapter implements DeltaWriterAdapter @Override public List write(Iterator input) throws IOException { while (input.hasNext()) { + GenericRecord next = input.next(); if (this.deltaInputGenerator.canWrite()) { - this.deltaInputGenerator.writeData(input.next()); + this.deltaInputGenerator.writeData(next); } else if (input.hasNext()) { rollOver(); + this.deltaInputGenerator.writeData(next); } } close(); diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/converter/TestDeleteConverter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/converter/TestDeleteConverter.java new file mode 100644 index 000000000..8ed98b4fb --- /dev/null +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/converter/TestDeleteConverter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.converter; + +import org.apache.hudi.integ.testsuite.utils.TestUtils; +import org.apache.hudi.utilities.UtilHelpers; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import scala.Tuple2; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator.DEFAULT_HOODIE_IS_DELETED_COL; + +/** + * Tests DeleteConverter. + */ +public class TestDeleteConverter { + + private JavaSparkContext jsc; + + @BeforeEach + public void setup() throws Exception { + jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[1]"); + + } + + @AfterEach + public void teardown() { + jsc.stop(); + } + + /** + * Test {@link UpdateConverter} by generates random deletes from existing records. + */ + @Test + public void testGenerateDeleteRecordsFromInputRecords() throws Exception { + // 1. prepare input records + JavaRDD inputRDD = TestUtils.makeRDD(jsc, 10); + String schemaStr = inputRDD.take(1).get(0).getSchema().toString(); + int minPayloadSize = 1000; + + // 2. Converter reads existing records and generates deletes + DeleteConverter deleteConverter = new DeleteConverter(schemaStr, minPayloadSize); + List insertRowKeys = inputRDD.map(r -> r.get("_row_key").toString()).collect(); + assertTrue(inputRDD.count() == 10); + JavaRDD outputRDD = deleteConverter.convert(inputRDD); + List updateRowKeys = outputRDD.map(row -> row.get("_row_key").toString()).collect(); + // The insert row keys should be the same as delete row keys + assertTrue(insertRowKeys.containsAll(updateRowKeys)); + Map inputRecords = inputRDD.mapToPair(r -> new Tuple2<>(r.get("_row_key").toString(), r)) + .collectAsMap(); + List deleteRecords = outputRDD.collect(); + deleteRecords.stream().forEach(updateRecord -> { + GenericRecord inputRecord = inputRecords.get(updateRecord.get("_row_key").toString()); + assertTrue((boolean)inputRecord.get(DEFAULT_HOODIE_IS_DELETED_COL)); + }); + } +}