[HUDI-3483] Adding insert override nodes to integ test suite and few clean ups (#4895)
This commit is contained in:
committed by
GitHub
parent
6a5cfb45b9
commit
1379300b5b
104
docker/demo/config/test-suite/insert-overwrite-table.yaml
Normal file
104
docker/demo/config/test-suite/insert-overwrite-table.yaml
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
dag_name: simple-deltastreamer.yaml
|
||||||
|
dag_rounds: 1
|
||||||
|
dag_intermittent_delay_mins: 1
|
||||||
|
dag_content:
|
||||||
|
first_insert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10000
|
||||||
|
type: SparkInsertNode
|
||||||
|
deps: none
|
||||||
|
first_upsert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
num_records_insert: 1000
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_upsert: 8000
|
||||||
|
num_partitions_upsert: 10
|
||||||
|
type: SparkUpsertNode
|
||||||
|
deps: first_insert
|
||||||
|
second_insert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10000
|
||||||
|
type: SparkInsertNode
|
||||||
|
deps: first_upsert
|
||||||
|
second_upsert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
num_records_insert: 1000
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_upsert: 8000
|
||||||
|
num_partitions_upsert: 10
|
||||||
|
type: SparkUpsertNode
|
||||||
|
deps: second_insert
|
||||||
|
first_hive_sync:
|
||||||
|
config:
|
||||||
|
queue_name: "adhoc"
|
||||||
|
engine: "mr"
|
||||||
|
type: HiveSyncNode
|
||||||
|
deps: second_upsert
|
||||||
|
first_insert_overwrite_table:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
repeat_count: 10
|
||||||
|
num_records_insert: 10
|
||||||
|
type: SparkInsertOverwriteTableNode
|
||||||
|
deps: first_hive_sync
|
||||||
|
delete_all_input_except_last:
|
||||||
|
config:
|
||||||
|
delete_input_data_except_latest: true
|
||||||
|
type: DeleteInputDatasetNode
|
||||||
|
deps: first_insert_overwrite_table
|
||||||
|
third_insert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10000
|
||||||
|
type: SparkInsertNode
|
||||||
|
deps: delete_all_input_except_last
|
||||||
|
third_upsert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
num_records_insert: 1000
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_upsert: 8000
|
||||||
|
num_partitions_upsert: 10
|
||||||
|
type: SparkUpsertNode
|
||||||
|
deps: third_insert
|
||||||
|
second_hive_sync:
|
||||||
|
config:
|
||||||
|
queue_name: "adhoc"
|
||||||
|
engine: "mr"
|
||||||
|
type: HiveSyncNode
|
||||||
|
deps: third_upsert
|
||||||
|
second_validate:
|
||||||
|
config:
|
||||||
|
validate_full_data : true
|
||||||
|
validate_hive: false
|
||||||
|
delete_input_data: false
|
||||||
|
type: ValidateDatasetNode
|
||||||
|
deps: second_hive_sync
|
||||||
106
docker/demo/config/test-suite/insert-overwrite.yaml
Normal file
106
docker/demo/config/test-suite/insert-overwrite.yaml
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
dag_name: simple-deltastreamer.yaml
|
||||||
|
dag_rounds: 1
|
||||||
|
dag_intermittent_delay_mins: 1
|
||||||
|
dag_content:
|
||||||
|
|
||||||
|
first_insert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10000
|
||||||
|
type: SparkInsertNode
|
||||||
|
deps: none
|
||||||
|
first_upsert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
num_records_insert: 1000
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_upsert: 8000
|
||||||
|
num_partitions_upsert: 10
|
||||||
|
type: SparkUpsertNode
|
||||||
|
deps: first_insert
|
||||||
|
second_insert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10000
|
||||||
|
type: SparkInsertNode
|
||||||
|
deps: first_upsert
|
||||||
|
second_upsert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
num_records_insert: 1000
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_upsert: 8000
|
||||||
|
num_partitions_upsert: 10
|
||||||
|
type: SparkUpsertNode
|
||||||
|
deps: second_insert
|
||||||
|
first_hive_sync:
|
||||||
|
config:
|
||||||
|
queue_name: "adhoc"
|
||||||
|
engine: "mr"
|
||||||
|
type: HiveSyncNode
|
||||||
|
deps: second_upsert
|
||||||
|
first_insert_overwrite:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10
|
||||||
|
type: SparkInsertOverwriteNode
|
||||||
|
deps: first_hive_sync
|
||||||
|
delete_all_input_except_last:
|
||||||
|
config:
|
||||||
|
delete_input_data_except_latest: true
|
||||||
|
type: DeleteInputDatasetNode
|
||||||
|
deps: first_insert_overwrite
|
||||||
|
third_insert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10000
|
||||||
|
type: SparkInsertNode
|
||||||
|
deps: delete_all_input_except_last
|
||||||
|
third_upsert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
num_records_insert: 1000
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_upsert: 8000
|
||||||
|
num_partitions_upsert: 10
|
||||||
|
type: SparkUpsertNode
|
||||||
|
deps: third_insert
|
||||||
|
second_hive_sync:
|
||||||
|
config:
|
||||||
|
queue_name: "adhoc"
|
||||||
|
engine: "mr"
|
||||||
|
type: HiveSyncNode
|
||||||
|
deps: third_upsert
|
||||||
|
second_validate:
|
||||||
|
config:
|
||||||
|
validate_full_data : true
|
||||||
|
validate_hive: false
|
||||||
|
delete_input_data: false
|
||||||
|
type: ValidateDatasetNode
|
||||||
|
deps: second_hive_sync
|
||||||
73
docker/demo/config/test-suite/spark-clustering.yaml
Normal file
73
docker/demo/config/test-suite/spark-clustering.yaml
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
dag_name: cow-spark-simple.yaml
|
||||||
|
dag_rounds: 1
|
||||||
|
dag_intermittent_delay_mins: 1
|
||||||
|
dag_content:
|
||||||
|
first_insert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10000
|
||||||
|
type: SparkInsertNode
|
||||||
|
deps: none
|
||||||
|
first_upsert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
num_records_insert: 1000
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_upsert: 8000
|
||||||
|
num_partitions_upsert: 10
|
||||||
|
type: SparkUpsertNode
|
||||||
|
deps: first_insert
|
||||||
|
second_insert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_insert: 10000
|
||||||
|
type: SparkInsertNode
|
||||||
|
deps: first_upsert
|
||||||
|
second_upsert:
|
||||||
|
config:
|
||||||
|
record_size: 1000
|
||||||
|
num_partitions_insert: 10
|
||||||
|
num_records_insert: 1000
|
||||||
|
repeat_count: 1
|
||||||
|
num_records_upsert: 8000
|
||||||
|
num_partitions_upsert: 10
|
||||||
|
type: SparkUpsertNode
|
||||||
|
deps: second_insert
|
||||||
|
first_delete:
|
||||||
|
config:
|
||||||
|
num_partitions_delete: 10
|
||||||
|
num_records_delete: 16000
|
||||||
|
type: SparkDeleteNode
|
||||||
|
deps: second_upsert
|
||||||
|
second_hive_sync:
|
||||||
|
config:
|
||||||
|
queue_name: "adhoc"
|
||||||
|
engine: "mr"
|
||||||
|
type: HiveSyncNode
|
||||||
|
deps: first_delete
|
||||||
|
second_validate:
|
||||||
|
config:
|
||||||
|
validate_hive: false
|
||||||
|
delete_input_data: false
|
||||||
|
type: ValidateDatasetNode
|
||||||
|
deps: second_hive_sync
|
||||||
56
docker/demo/config/test-suite/test-metadata.properties
Normal file
56
docker/demo/config/test-suite/test-metadata.properties
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
#
|
||||||
|
|
||||||
|
hoodie.insert.shuffle.parallelism=100
|
||||||
|
hoodie.upsert.shuffle.parallelism=100
|
||||||
|
hoodie.bulkinsert.shuffle.parallelism=100
|
||||||
|
|
||||||
|
hoodie.metadata.enable=true
|
||||||
|
|
||||||
|
hoodie.deltastreamer.source.test.num_partitions=100
|
||||||
|
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
|
||||||
|
hoodie.deltastreamer.source.test.max_unique_records=100000000
|
||||||
|
hoodie.embed.timeline.server=false
|
||||||
|
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
|
||||||
|
|
||||||
|
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
|
||||||
|
hoodie.datasource.hive_sync.skip_ro_suffix=true
|
||||||
|
|
||||||
|
hoodie.datasource.write.recordkey.field=_row_key
|
||||||
|
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
|
||||||
|
hoodie.datasource.write.partitionpath.field=timestamp
|
||||||
|
|
||||||
|
hoodie.clustering.plan.strategy.sort.columns=_row_key
|
||||||
|
hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
|
||||||
|
hoodie.clustering.inline.max.commits=1
|
||||||
|
|
||||||
|
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
|
||||||
|
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
|
||||||
|
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
|
||||||
|
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
|
||||||
|
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
|
||||||
|
|
||||||
|
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
|
||||||
|
hoodie.datasource.hive_sync.database=testdb
|
||||||
|
hoodie.datasource.hive_sync.table=table1
|
||||||
|
hoodie.datasource.hive_sync.assume_date_partitioning=false
|
||||||
|
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
|
||||||
|
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
|
||||||
|
|
||||||
@@ -19,6 +19,8 @@ hoodie.insert.shuffle.parallelism=100
|
|||||||
hoodie.upsert.shuffle.parallelism=100
|
hoodie.upsert.shuffle.parallelism=100
|
||||||
hoodie.bulkinsert.shuffle.parallelism=100
|
hoodie.bulkinsert.shuffle.parallelism=100
|
||||||
|
|
||||||
|
hoodie.metadata.enable=false
|
||||||
|
|
||||||
hoodie.deltastreamer.source.test.num_partitions=100
|
hoodie.deltastreamer.source.test.num_partitions=100
|
||||||
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
|
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
|
||||||
hoodie.deltastreamer.source.test.max_unique_records=100000000
|
hoodie.deltastreamer.source.test.max_unique_records=100000000
|
||||||
@@ -32,10 +34,6 @@ hoodie.datasource.write.recordkey.field=_row_key
|
|||||||
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
|
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
|
||||||
hoodie.datasource.write.partitionpath.field=timestamp
|
hoodie.datasource.write.partitionpath.field=timestamp
|
||||||
|
|
||||||
hoodie.clustering.plan.strategy.sort.columns=_row_key
|
|
||||||
hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
|
|
||||||
hoodie.clustering.inline.max.commits=1
|
|
||||||
|
|
||||||
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
|
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
|
||||||
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
|
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
|
||||||
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
|
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
|
||||||
|
|||||||
@@ -96,6 +96,8 @@ public class DeltaConfig implements Serializable {
|
|||||||
private static String NUM_ROLLBACKS = "num_rollbacks";
|
private static String NUM_ROLLBACKS = "num_rollbacks";
|
||||||
private static String ENABLE_ROW_WRITING = "enable_row_writing";
|
private static String ENABLE_ROW_WRITING = "enable_row_writing";
|
||||||
private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate";
|
private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate";
|
||||||
|
private static String VALIDATE_FULL_DATA = "validate_full_data";
|
||||||
|
private static String DELETE_INPUT_DATA_EXCEPT_LATEST = "delete_input_data_except_latest";
|
||||||
|
|
||||||
// Spark SQL Create Table
|
// Spark SQL Create Table
|
||||||
private static String TABLE_TYPE = "table_type";
|
private static String TABLE_TYPE = "table_type";
|
||||||
@@ -206,10 +208,18 @@ public class DeltaConfig implements Serializable {
|
|||||||
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString());
|
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isDeleteInputDataExceptLatest() {
|
||||||
|
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA_EXCEPT_LATEST, false).toString());
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isValidateHive() {
|
public boolean isValidateHive() {
|
||||||
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString());
|
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isValidateFullData() {
|
||||||
|
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString());
|
||||||
|
}
|
||||||
|
|
||||||
public int getIterationCountToExecute() {
|
public int getIterationCountToExecute() {
|
||||||
return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, -1).toString());
|
return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, -1).toString());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,16 +19,14 @@
|
|||||||
|
|
||||||
package org.apache.hudi.integ.testsuite.dag.nodes;
|
package org.apache.hudi.integ.testsuite.dag.nodes;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
|
|
||||||
import org.apache.hudi.DataSourceWriteOptions;
|
import org.apache.hudi.DataSourceWriteOptions;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
|
||||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
|
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
|
||||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||||
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
|
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.ReduceFunction;
|
import org.apache.spark.api.java.function.ReduceFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
@@ -42,13 +40,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute;
|
|||||||
import org.apache.spark.sql.types.StructType;
|
import org.apache.spark.sql.types.StructType;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
import scala.collection.JavaConversions;
|
import scala.collection.JavaConversions;
|
||||||
import scala.collection.JavaConverters;
|
import scala.collection.JavaConverters;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This nodes validates contents from input path are in tact with Hudi. By default no configs are required for this node. But there is an
|
* This nodes validates contents from input path are in tact with Hudi. By default no configs are required for this node. But there is an
|
||||||
* optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites.
|
* optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites.
|
||||||
@@ -78,6 +76,7 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
|
|||||||
public void execute(ExecutionContext context, int curItrCount) throws Exception {
|
public void execute(ExecutionContext context, int curItrCount) throws Exception {
|
||||||
|
|
||||||
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
|
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
|
||||||
|
|
||||||
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
|
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
|
||||||
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
|
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
|
||||||
log.warn("Validation using data from input path " + inputPath);
|
log.warn("Validation using data from input path " + inputPath);
|
||||||
@@ -97,46 +96,60 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
|
|||||||
|
|
||||||
// read from hudi and remove meta columns.
|
// read from hudi and remove meta columns.
|
||||||
Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema());
|
Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema());
|
||||||
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
|
if (config.isValidateFullData()) {
|
||||||
long inputCount = inputSnapshotDf.count();
|
log.debug("Validating full dataset");
|
||||||
long outputCount = trimmedHudiDf.count();
|
Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
|
||||||
log.debug("Input count: " + inputCount + "; output count: " + outputCount);
|
Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
|
||||||
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
long exceptInputCount = exceptInputDf.count();
|
||||||
if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
|
long exceptHudiCount = exceptHudiDf.count();
|
||||||
log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount);
|
log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount);
|
||||||
throw new AssertionError("Hudi contents does not match contents input data. ");
|
if (exceptInputCount != 0 || exceptHudiCount != 0) {
|
||||||
}
|
log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
|
||||||
|
+ ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount);
|
||||||
if (config.isValidateHive()) {
|
throw new AssertionError("Hudi contents does not match contents input data. ");
|
||||||
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
|
}
|
||||||
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
|
} else {
|
||||||
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
|
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
|
||||||
session.sql("REFRESH TABLE " + database + "." + tableName);
|
long inputCount = inputSnapshotDf.count();
|
||||||
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
|
long outputCount = trimmedHudiDf.count();
|
||||||
"test_suite_source_ordering_field FROM " + database + "." + tableName);
|
log.debug("Input count: " + inputCount + "; output count: " + outputCount);
|
||||||
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key","rider","driver","begin_lat","begin_lon","end_lat","end_lon","fare",
|
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
||||||
"_hoodie_is_deleted","test_suite_source_ordering_field");
|
if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
|
||||||
|
log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount);
|
||||||
Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
|
throw new AssertionError("Hudi contents does not match contents input data. ");
|
||||||
outputCount = trimmedHudiDf.count();
|
|
||||||
log.warn("Input count: " + inputCount + "; output count: " + outputCount);
|
|
||||||
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
|
||||||
if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) {
|
|
||||||
log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
|
|
||||||
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// if delete input data is enabled, erase input data.
|
if (config.isValidateHive()) {
|
||||||
if (config.isDeleteInputData()) {
|
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
|
||||||
// clean up input data for current group of writes.
|
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
|
||||||
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
|
||||||
FileSystem fs = new Path(inputPathStr)
|
session.sql("REFRESH TABLE " + database + "." + tableName);
|
||||||
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
|
||||||
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
|
"test_suite_source_ordering_field FROM " + database + "." + tableName);
|
||||||
for (FileStatus fileStatus : fileStatuses) {
|
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
|
||||||
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
|
"_hoodie_is_deleted", "test_suite_source_ordering_field");
|
||||||
fs.delete(fileStatus.getPath(), true);
|
|
||||||
|
Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
|
||||||
|
outputCount = trimmedHudiDf.count();
|
||||||
|
log.warn("Input count: " + inputCount + "; output count: " + outputCount);
|
||||||
|
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
||||||
|
if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) {
|
||||||
|
log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
|
||||||
|
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if delete input data is enabled, erase input data.
|
||||||
|
if (config.isDeleteInputData()) {
|
||||||
|
// clean up input data for current group of writes.
|
||||||
|
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
||||||
|
FileSystem fs = new Path(inputPathStr)
|
||||||
|
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
||||||
|
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
|
||||||
|
for (FileStatus fileStatus : fileStatuses) {
|
||||||
|
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
|
||||||
|
fs.delete(fileStatus.getPath(), true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -149,8 +162,8 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
|
|||||||
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
|
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
|
||||||
ExpressionEncoder encoder = getEncoder(inputDf.schema());
|
ExpressionEncoder encoder = getEncoder(inputDf.schema());
|
||||||
return inputDf.groupByKey(
|
return inputDf.groupByKey(
|
||||||
(MapFunction<Row, String>) value ->
|
(MapFunction<Row, String>) value ->
|
||||||
value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField), Encoders.STRING())
|
value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField), Encoders.STRING())
|
||||||
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
|
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
|
||||||
int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
|
int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
|
||||||
int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
|
int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.integ.testsuite.dag.nodes;
|
||||||
|
|
||||||
|
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
|
||||||
|
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes all input except latest batch. Mostly used in insert_overwrite operations.
|
||||||
|
*/
|
||||||
|
public class DeleteInputDatasetNode extends DagNode<Boolean> {
|
||||||
|
|
||||||
|
public DeleteInputDatasetNode(DeltaConfig.Config config) {
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(ExecutionContext context, int curItrCount) throws Exception {
|
||||||
|
|
||||||
|
String latestBatch = String.valueOf(context.getWriterContext().getDeltaGenerator().getBatchId());
|
||||||
|
|
||||||
|
if (config.isDeleteInputDataExceptLatest()) {
|
||||||
|
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
||||||
|
FileSystem fs = new Path(inputPathStr)
|
||||||
|
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
||||||
|
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
|
||||||
|
for (FileStatus fileStatus : fileStatuses) {
|
||||||
|
if (!fileStatus.getPath().getName().equals(latestBatch)) {
|
||||||
|
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
|
||||||
|
fs.delete(fileStatus.getPath(), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -110,6 +110,10 @@ public class DeltaGenerator implements Serializable {
|
|||||||
return ws;
|
return ws;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getBatchId() {
|
||||||
|
return batchId;
|
||||||
|
}
|
||||||
|
|
||||||
public JavaRDD<GenericRecord> generateInserts(Config operation) {
|
public JavaRDD<GenericRecord> generateInserts(Config operation) {
|
||||||
int numPartitions = operation.getNumInsertPartitions();
|
int numPartitions = operation.getNumInsertPartitions();
|
||||||
long recordsPerPartition = operation.getNumRecordsInsert();
|
long recordsPerPartition = operation.getNumRecordsInsert();
|
||||||
|
|||||||
@@ -54,13 +54,18 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
|
|||||||
context.getWriterContext.getSparkSession)
|
context.getWriterContext.getSparkSession)
|
||||||
inputDF.write.format("hudi")
|
inputDF.write.format("hudi")
|
||||||
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
|
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
|
||||||
|
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field")
|
||||||
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
||||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
|
||||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
.option(DataSourceWriteOptions.OPERATION.key, getOperation())
|
||||||
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key")
|
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key")
|
||||||
.option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
|
.option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
|
||||||
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
|
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def getOperation(): String = {
|
||||||
|
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,31 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.integ.testsuite.dag.nodes
|
||||||
|
|
||||||
|
import org.apache.hudi.DataSourceWriteOptions
|
||||||
|
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
|
||||||
|
|
||||||
|
class SparkInsertOverwriteNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) {
|
||||||
|
|
||||||
|
override def getOperation(): String = {
|
||||||
|
DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.integ.testsuite.dag.nodes
|
||||||
|
|
||||||
|
import org.apache.hudi.DataSourceWriteOptions
|
||||||
|
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
|
||||||
|
|
||||||
|
class SparkInsertOverwriteTableNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) {
|
||||||
|
|
||||||
|
override def getOperation(): String = {
|
||||||
|
DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,49 +18,17 @@
|
|||||||
|
|
||||||
package org.apache.hudi.integ.testsuite.dag.nodes
|
package org.apache.hudi.integ.testsuite.dag.nodes
|
||||||
|
|
||||||
import org.apache.hudi.client.WriteStatus
|
import org.apache.hudi.DataSourceWriteOptions
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
|
||||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
|
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
|
||||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
|
|
||||||
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions}
|
|
||||||
import org.apache.spark.rdd.RDD
|
|
||||||
import org.apache.spark.sql.SaveMode
|
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Spark datasource based upsert node
|
* Spark datasource based upsert node
|
||||||
*
|
*
|
||||||
* @param dagNodeConfig DAG node configurations.
|
* @param dagNodeConfig DAG node configurations.
|
||||||
*/
|
*/
|
||||||
class SparkUpsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
|
class SparkUpsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) {
|
||||||
|
|
||||||
config = dagNodeConfig
|
override def getOperation(): String = {
|
||||||
|
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL
|
||||||
/**
|
|
||||||
* Execute the {@link DagNode}.
|
|
||||||
*
|
|
||||||
* @param context The context needed for an execution of a node.
|
|
||||||
* @param curItrCount iteration count for executing the node.
|
|
||||||
* @throws Exception Thrown if the execution failed.
|
|
||||||
*/
|
|
||||||
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
|
|
||||||
if (!config.isDisableGenerate) {
|
|
||||||
println("Generating input data for node {}", this.getName)
|
|
||||||
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
|
|
||||||
}
|
|
||||||
val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch,
|
|
||||||
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
|
|
||||||
context.getWriterContext.getSparkSession)
|
|
||||||
inputDF.write.format("hudi")
|
|
||||||
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
|
|
||||||
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
|
||||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
|
|
||||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
|
||||||
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key")
|
|
||||||
.option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
|
|
||||||
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
|
||||||
.mode(SaveMode.Append)
|
|
||||||
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user