1
0

[HUDI-1338] Adding Delete support to test suite framework (#2172)

- Adding Delete support to test suite. 
         Added DeleteNode 
         Added support to generate delete records
This commit is contained in:
Sivabalan Narayanan
2020-11-01 00:15:41 -04:00
committed by GitHub
parent 6310a2307a
commit a205dd10fa
16 changed files with 472 additions and 121 deletions

View File

@@ -59,15 +59,13 @@ first_hive_sync:
deps: first_upsert
first_hive_query:
config:
hive_props:
prop2: "set spark.yarn.queue="
prop3: "set hive.strict.checks.large.query=false"
prop4: "set hive.stats.autogather=false"
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 11600
result2: 11300
type: HiveQueryNode
deps: first_hive_sync
second_upsert:
@@ -82,14 +80,55 @@ second_upsert:
deps: first_hive_query
second_hive_query:
config:
hive_props:
prop2: "set mapred.job.queue.name="
prop3: "set hive.strict.checks.large.query=false"
prop4: "set hive.stats.autogather=false"
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 11900
result2: 11600
type: HiveQueryNode
deps: second_upsert
fourth_insert:
config:
record_size: 70000
num_insert_partitions: 1
repeat_count: 1
num_records_insert: 1000
deps: second_hive_query
type: InsertNode
third_hive_query:
config:
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 12600
type: HiveQueryNode
deps: fourth_insert
first_delete:
config:
record_size: 70000
num_partitions_delete: 1
num_records_delete: 200
deps: third_hive_query
type: DeleteNode
fourth_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
fourth_hive_query:
config:
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 12400
type: HiveQueryNode
deps: fourth_hive_sync

View File

@@ -46,6 +46,10 @@
}, {
"name" : "fare",
"type" : "double"
} ]
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}]
}

View File

@@ -46,6 +46,10 @@
}, {
"name" : "fare",
"type" : "double"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}, {
"name" : "haversine_distance",
"type" : "double"

View File

@@ -0,0 +1,27 @@
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor

View File

@@ -178,41 +178,6 @@ Copy the integration tests jar into the docker container
docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar adhoc-2:/opt
```
Copy the following test properties file:
```
echo '
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
hoodie.datasource.hive_sync.skip_ro_suffix=true
' > test.properties
docker cp test.properties adhoc-2:/opt
```
```
docker exec -it adhoc-2 /bin/bash
```
@@ -254,7 +219,7 @@ spark-submit \
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
--target-table table1 \
--props test.properties \
--props file:/var/hoodie/ws/docker/demo/config/test-suite/test.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
--input-file-size 125829120 \
@@ -293,7 +258,7 @@ spark-submit \
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
--target-table table1 \
--props test.properties \
--props file:/var/hoodie/ws/docker/demo/config/test-suite/test.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
--input-file-size 125829120 \

View File

@@ -18,17 +18,19 @@
package org.apache.hudi.integ.testsuite.configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
/**
* Configuration to hold the delta output type and delta input format.
@@ -59,8 +61,7 @@ public class DeltaConfig implements Serializable {
}
/**
* Represents any kind of workload operation for new data. Each workload also contains a set of Option sequence of
* actions that can be executed in parallel.
* Represents any kind of workload operation for new data. Each workload also contains a set of Option sequence of actions that can be executed in parallel.
*/
public static class Config {
@@ -73,10 +74,12 @@ public class DeltaConfig implements Serializable {
public static final String HIVE_PROPERTIES = "hive_props";
private static String NUM_RECORDS_INSERT = "num_records_insert";
private static String NUM_RECORDS_UPSERT = "num_records_upsert";
private static String NUM_RECORDS_DELETE = "num_records_delete";
private static String REPEAT_COUNT = "repeat_count";
private static String RECORD_SIZE = "record_size";
private static String NUM_PARTITIONS_INSERT = "num_partitions_insert";
private static String NUM_PARTITIONS_UPSERT = "num_partitions_upsert";
private static String NUM_PARTITIONS_DELETE = "num_partitions_delete";
private static String NUM_FILES_UPSERT = "num_files_upsert";
private static String FRACTION_UPSERT_PER_FILE = "fraction_upsert_per_file";
private static String DISABLE_GENERATE = "disable_generate";
@@ -103,6 +106,10 @@ public class DeltaConfig implements Serializable {
return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_UPSERT, 0).toString());
}
public long getNumRecordsDelete() {
return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_DELETE, 0).toString());
}
public int getRecordSize() {
return Integer.valueOf(configsMap.getOrDefault(RECORD_SIZE, 1024).toString());
}
@@ -123,6 +130,10 @@ public class DeltaConfig implements Serializable {
return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString());
}
public int getNumDeletePartitions() {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_DELETE, 1).toString());
}
public int getNumUpsertFiles() {
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString());
}
@@ -192,6 +203,11 @@ public class DeltaConfig implements Serializable {
return this;
}
public Builder withNumRecordsToDelete(long numRecordsDelete) {
this.configsMap.put(NUM_RECORDS_DELETE, numRecordsDelete);
return this;
}
public Builder withNumInsertPartitions(int numInsertPartitions) {
this.configsMap.put(NUM_PARTITIONS_INSERT, numInsertPartitions);
return this;
@@ -202,6 +218,11 @@ public class DeltaConfig implements Serializable {
return this;
}
public Builder withNumDeletePartitions(int numDeletePartitions) {
this.configsMap.put(NUM_PARTITIONS_DELETE, numDeletePartitions);
return this;
}
public Builder withNumUpsertFiles(int numUpsertFiles) {
this.configsMap.put(NUM_FILES_UPSERT, numUpsertFiles);
return this;

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.converter;
import org.apache.hudi.integ.testsuite.generator.DeleteGeneratorIterator;
import org.apache.hudi.integ.testsuite.generator.LazyRecordGeneratorIterator;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
public class DeleteConverter implements Converter<GenericRecord, GenericRecord> {
private final String schemaStr;
private final int minPayloadSize;
public DeleteConverter(String schemaStr, int minPayloadSize) {
this.schemaStr = schemaStr;
this.minPayloadSize = minPayloadSize;
}
@Override
public JavaRDD<GenericRecord> convert(JavaRDD<GenericRecord> inputRDD) {
return inputRDD.mapPartitions(recordItr -> new LazyRecordGeneratorIterator(new DeleteGeneratorIterator(recordItr,
schemaStr, minPayloadSize)));
}
}

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.spark.api.java.JavaRDD;
/**
* Delete node to assist in issuing deletes.
*/
public class DeleteNode extends InsertNode {
public DeleteNode(Config config) {
super(config);
}
@Override
protected void generate(DeltaGenerator deltaGenerator) throws Exception {
if (!config.isDisableGenerate()) {
deltaGenerator.writeRecords(deltaGenerator.generateDeletes(config)).count();
}
}
@Override
protected JavaRDD<WriteStatus> ingest(HoodieTestSuiteWriter hoodieTestSuiteWriter, Option<String> commitTime)
throws Exception {
if (!config.isDisableIngest()) {
log.info("Deleting input data {}", this.getName());
this.result = hoodieTestSuiteWriter.upsert(commitTime);
}
return this.result;
}
}

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +39,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.CONFIG_NAME;
/**
* The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to
* the relations between nodes.
* The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to the relations between nodes.
*/
public class DagScheduler {
@@ -75,7 +77,7 @@ public class DagScheduler {
* Method to start executing the nodes in workflow DAGs.
*
* @param service ExecutorService
* @param nodes Nodes to be executed
* @param nodes Nodes to be executed
* @throws Exception will be thrown if ant error occurred
*/
private void execute(ExecutorService service, List<DagNode> nodes) throws Exception {
@@ -87,6 +89,7 @@ public class DagScheduler {
Set<DagNode> childNodes = new HashSet<>();
while (queue.size() > 0) {
DagNode nodeToExecute = queue.poll();
log.info("Node to execute in dag scheduler " + nodeToExecute.getConfig().toString());
futures.add(service.submit(() -> executeNode(nodeToExecute)));
if (nodeToExecute.getChildNodes().size() > 0) {
childNodes.addAll(nodeToExecute.getChildNodes());
@@ -116,7 +119,7 @@ public class DagScheduler {
try {
int repeatCount = node.getConfig().getRepeatCount();
while (repeatCount > 0) {
log.warn("executing node: " + node.getName() + " of type: " + node.getClass());
log.warn("executing node: \"" + node.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" of type: " + node.getClass() + " :: " + node.getConfig().toString());
node.execute(executionContext);
log.info("Finished executing {}", node.getName());
repeatCount--;

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.generator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Lazy delete record generator.
*/
public class DeleteGeneratorIterator implements Iterator<GenericRecord> {
// Use the full payload generator as default
private GenericRecordFullPayloadGenerator generator;
// iterator
private Iterator<GenericRecord> itr;
public DeleteGeneratorIterator(Iterator<GenericRecord> itr, String schemaStr, int minPayloadSize) {
this.itr = itr;
Schema schema = new Schema.Parser().parse(schemaStr);
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize);
}
@Override
public boolean hasNext() {
return itr.hasNext();
}
@Override
public GenericRecord next() {
GenericRecord newRecord = itr.next();
return this.generator.generateDeleteRecord(newRecord);
}
}

View File

@@ -36,9 +36,11 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.converter.Converter;
import org.apache.hudi.integ.testsuite.converter.DeleteConverter;
import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
@@ -48,14 +50,15 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -127,7 +130,6 @@ public class DeltaGenerator implements Serializable {
if (deltaOutputConfig.getInputParallelism() < numPartitions) {
inputBatch = inputBatch.coalesce(deltaOutputConfig.getInputParallelism());
}
return inputBatch;
}
@@ -162,16 +164,14 @@ public class DeltaGenerator implements Serializable {
// persist this since we will make multiple passes over this
int numPartition = Math.min(deltaOutputConfig.getInputParallelism(),
Math.max(1, config.getNumUpsertPartitions()));
log.info("Repartitioning records into " + numPartition + " partitions");
log.info("Repartitioning records into " + numPartition + " partitions for updates");
adjustedRDD = adjustedRDD.repartition(numPartition);
log.info("Repartitioning records done");
log.info("Repartitioning records done for updates");
UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
partitionPathFieldNames, recordRowKeyFieldNames);
JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
log.info("Records converted");
updates.persist(StorageLevel.DISK_ONLY());
if (inserts == null) {
inserts = updates;
} else {
@@ -185,6 +185,42 @@ public class DeltaGenerator implements Serializable {
}
}
public JavaRDD<GenericRecord> generateDeletes(Config config) throws IOException {
if (deltaOutputConfig.getDeltaOutputMode() == DeltaOutputMode.DFS) {
DeltaInputReader deltaInputReader = null;
JavaRDD<GenericRecord> adjustedRDD = null;
if (config.getNumDeletePartitions() < 1) {
// randomly generate deletes for a given number of records without regard to partitions and files
deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr,
((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty());
adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsDelete());
} else {
deltaInputReader =
new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
schemaStr);
if (config.getFractionUpsertPerFile() > 0) {
adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(),
config.getFractionUpsertPerFile());
} else {
adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), config
.getNumRecordsDelete());
}
}
log.info("Repartitioning records for delete");
// persist this since we will make multiple passes over this
adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
Converter converter = new DeleteConverter(schemaStr, config.getRecordSize());
JavaRDD<GenericRecord> deletes = converter.convert(adjustedRDD);
deletes.persist(StorageLevel.DISK_ONLY());
return deletes;
} else {
throw new IllegalArgumentException("Other formats are not supported at the moment");
}
}
public Map<Integer, Long> getPartitionToCountMap(JavaRDD<GenericRecord> records) {
// Requires us to keep the partitioner the same
return records.mapPartitionsWithIndex((index, itr) -> {

View File

@@ -18,17 +18,17 @@
package org.apache.hudi.integ.testsuite.generator;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
/**
* A GenericRecordGeneratorIterator for the custom schema of the workload. Implements {@link Iterator} to allow for
* iteration semantics.
* A GenericRecordGeneratorIterator for the custom schema of the workload. Implements {@link Iterator} to allow for iteration semantics.
*/
public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericRecord> {
@@ -67,7 +67,7 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
lastRecord = record;
return record;
} else {
return this.generator.randomize(lastRecord, this.partitionPathFieldNames);
return this.generator.randomize(lastRecord, partitionPathFieldNames);
}
}
}

View File

@@ -18,11 +18,21 @@
package org.apache.hudi.integ.testsuite.generator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Fixed;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,24 +41,15 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Fixed;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a GenericRecord payload generator that generates full generic records {@link GenericRecord}.
* Every field of a generic record created using this generator contains a random value.
* This is a GenericRecord payload generator that generates full generic records {@link GenericRecord}. Every field of a generic record created using this generator contains a random value.
*/
public class GenericRecordFullPayloadGenerator implements Serializable {
private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB
public static final String DEFAULT_HOODIE_IS_DELETED_COL = "_hoodie_is_deleted";
protected final Random random = new Random();
// The source schema used to generate a payload
private final transient Schema baseSchema;
@@ -81,12 +82,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
this.baseSchema = schema;
if (estimatedFullPayloadSize < minPayloadSize) {
int numberOfComplexFields = sizeInfo.getRight();
if (numberOfComplexFields < 1) {
LOG.warn("The schema does not have any collections/complex fields. "
+ "Cannot achieve minPayloadSize => " + minPayloadSize);
}
if (numberOfComplexFields < 1) {
LOG.warn("The schema does not have any collections/complex fields. "
+ "Cannot achieve minPayloadSize => " + minPayloadSize);
}
determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize);
determineExtraEntriesRequired(numberOfComplexFields, minPayloadSize - estimatedFullPayloadSize);
}
}
@@ -122,8 +123,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
/**
* Create a new {@link GenericRecord} with random value according to given schema.
*
* Long fields which are specified within partitionPathFieldNames are constrained to the value of the partition
* for which records are being generated.
* Long fields which are specified within partitionPathFieldNames are constrained to the value of the partition for which records are being generated.
*
* @return {@link GenericRecord} with random value
*/
@@ -137,7 +137,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
if (isPartialLongField(f, partitionPathFieldNames)) {
// This is a long field used as partition field. Set it to seconds since epoch.
long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS);
result.put(f.name(), (long)value);
result.put(f.name(), (long) value);
} else {
result.put(f.name(), typeConvert(f));
}
@@ -147,7 +147,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
/**
* Return true if this is a partition field of type long which should be set to the partition index.
* @return
*/
private boolean isPartialLongField(Schema.Field field, Set<String> partitionPathFieldNames) {
if ((partitionPathFieldNames == null) || !partitionPathFieldNames.contains(field.name())) {
@@ -165,7 +164,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
/**
* Update a given {@link GenericRecord} with random value. The fields in {@code blacklistFields} will not be updated.
*
* @param record GenericRecord to update
* @param record GenericRecord to update
* @param blacklistFields Fields whose value should not be touched
* @return The updated {@link GenericRecord}
*/
@@ -174,8 +173,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
}
/**
* Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value
* is random too.
* Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value is random too.
*
* @param schema Schema to create with.
* @return A {@link GenericRecord} with random value.
@@ -183,11 +181,15 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
protected GenericRecord convertPartial(Schema schema) {
GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
boolean setNull = random.nextBoolean();
if (!setNull) {
result.put(f.name(), typeConvert(f));
if (f.name().equals(DEFAULT_HOODIE_IS_DELETED_COL)) {
result.put(f.name(), false);
} else {
result.put(f.name(), null);
boolean setNull = random.nextBoolean();
if (!setNull) {
result.put(f.name(), typeConvert(f));
} else {
result.put(f.name(), null);
}
}
}
// TODO : pack remaining bytes into a complex field
@@ -195,22 +197,34 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
}
/**
* Set random value to {@link GenericRecord} according to the schema type of field.
* The field in blacklist will not be set.
* Set random value to {@link GenericRecord} according to the schema type of field. The field in blacklist will not be set.
*
* @param record GenericRecord to randomize.
* @param record GenericRecord to randomize.
* @param blacklistFields blacklistFields where the filed will not be randomized.
* @return Randomized GenericRecord.
*/
protected GenericRecord randomize(GenericRecord record, Set<String> blacklistFields) {
for (Schema.Field f : record.getSchema().getFields()) {
if (blacklistFields == null || !blacklistFields.contains(f.name())) {
if (f.name().equals(DEFAULT_HOODIE_IS_DELETED_COL)) {
record.put(f.name(), false);
} else if (blacklistFields == null || !blacklistFields.contains(f.name())) {
record.put(f.name(), typeConvert(f));
}
}
return record;
}
/**
* Set _hoodie_is_deleted column value to true.
*
* @param record GenericRecord to delete.
* @return GenericRecord representing deleted record.
*/
protected GenericRecord generateDeleteRecord(GenericRecord record) {
record.put(DEFAULT_HOODIE_IS_DELETED_COL, true);
return record;
}
/**
* Generate random value according to their type.
*/
@@ -219,6 +233,10 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
if (isOption(fieldSchema)) {
fieldSchema = getNonNull(fieldSchema);
}
if (fieldSchema.getName().equals(DEFAULT_HOODIE_IS_DELETED_COL)) {
return false;
}
switch (fieldSchema.getType()) {
case BOOLEAN:
return random.nextBoolean();
@@ -231,7 +249,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
case LONG:
return random.nextLong();
case STRING:
return UUID.randomUUID().toString();
return UUID.randomUUID().toString();
case ENUM:
List<String> enumSymbols = fieldSchema.getEnumSymbols();
return new GenericData.EnumSymbol(fieldSchema, enumSymbols.get(random.nextInt(enumSymbols.size() - 1)));
@@ -293,14 +311,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
}
/**
* Check whether a schema is option.
* return true if it match the follows:
* 1. Its type is Type.UNION
* 2. Has two types
* 3. Has a NULL type.
*
* @param schema
* @return
* Check whether a schema is option. return true if it match the follows: 1. Its type is Type.UNION 2. Has two types 3. Has a NULL type.
*/
protected boolean isOption(Schema schema) {
return schema.getType().equals(Schema.Type.UNION)
@@ -346,7 +357,6 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
/**
* Method help to calculate the number of entries to add.
*
* @param elementSchema
* @return Number of entries to add
*/
private void determineExtraEntriesRequired(int numberOfComplexFields, int numberOfBytesToAdd) {

View File

@@ -25,12 +25,16 @@ import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A lazy update payload generator to generate {@link GenericRecord}s lazily.
*/
public class UpdateGeneratorIterator implements Iterator<GenericRecord> {
private static Logger LOG = LoggerFactory.getLogger(UpdateGeneratorIterator.class);
// Use the full payload generator as default
private GenericRecordFullPayloadGenerator generator;
private Set<String> blackListedFields;

View File

@@ -18,15 +18,15 @@
package org.apache.hudi.integ.testsuite.writer;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
/**
* {@link org.apache.hadoop.hdfs.DistributedFileSystem} (or {@link org.apache.hadoop.fs.LocalFileSystem}) based delta
* generator.
* {@link org.apache.hadoop.hdfs.DistributedFileSystem} (or {@link org.apache.hadoop.fs.LocalFileSystem}) based delta generator.
*/
public class DFSDeltaWriterAdapter implements DeltaWriterAdapter<GenericRecord> {
@@ -40,10 +40,12 @@ public class DFSDeltaWriterAdapter implements DeltaWriterAdapter<GenericRecord>
@Override
public List<DeltaWriteStats> write(Iterator<GenericRecord> input) throws IOException {
while (input.hasNext()) {
GenericRecord next = input.next();
if (this.deltaInputGenerator.canWrite()) {
this.deltaInputGenerator.writeData(input.next());
this.deltaInputGenerator.writeData(next);
} else if (input.hasNext()) {
rollOver();
this.deltaInputGenerator.writeData(next);
}
}
close();

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.converter;
import org.apache.hudi.integ.testsuite.utils.TestUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import scala.Tuple2;
import static junit.framework.TestCase.assertTrue;
import static org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator.DEFAULT_HOODIE_IS_DELETED_COL;
/**
* Tests DeleteConverter.
*/
public class TestDeleteConverter {
private JavaSparkContext jsc;
@BeforeEach
public void setup() throws Exception {
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[1]");
}
@AfterEach
public void teardown() {
jsc.stop();
}
/**
* Test {@link UpdateConverter} by generates random deletes from existing records.
*/
@Test
public void testGenerateDeleteRecordsFromInputRecords() throws Exception {
// 1. prepare input records
JavaRDD<GenericRecord> inputRDD = TestUtils.makeRDD(jsc, 10);
String schemaStr = inputRDD.take(1).get(0).getSchema().toString();
int minPayloadSize = 1000;
// 2. Converter reads existing records and generates deletes
DeleteConverter deleteConverter = new DeleteConverter(schemaStr, minPayloadSize);
List<String> insertRowKeys = inputRDD.map(r -> r.get("_row_key").toString()).collect();
assertTrue(inputRDD.count() == 10);
JavaRDD<GenericRecord> outputRDD = deleteConverter.convert(inputRDD);
List<String> updateRowKeys = outputRDD.map(row -> row.get("_row_key").toString()).collect();
// The insert row keys should be the same as delete row keys
assertTrue(insertRowKeys.containsAll(updateRowKeys));
Map<String, GenericRecord> inputRecords = inputRDD.mapToPair(r -> new Tuple2<>(r.get("_row_key").toString(), r))
.collectAsMap();
List<GenericRecord> deleteRecords = outputRDD.collect();
deleteRecords.stream().forEach(updateRecord -> {
GenericRecord inputRecord = inputRecords.get(updateRecord.get("_row_key").toString());
assertTrue((boolean)inputRecord.get(DEFAULT_HOODIE_IS_DELETED_COL));
});
}
}