1
0

Adding fixes to test suite framework. Adding clustering node and validate async operations node. (#2400)

This commit is contained in:
Sivabalan Narayanan
2021-02-12 12:29:21 -05:00
committed by GitHub
parent ff0e3f5669
commit d5f202821b
31 changed files with 531 additions and 123 deletions

View File

@@ -13,13 +13,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-long-running-example.yaml
dag_rounds: 2
dag_name: complex-dag-cow.yaml
dag_rounds: 1
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 100
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
@@ -27,7 +27,7 @@ dag_content:
deps: none
second_insert:
config:
record_size: 100
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 10000
@@ -35,19 +35,26 @@ dag_content:
type: InsertNode
third_insert:
config:
record_size: 100
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: third_insert
first_validate:
config:
validate_hive: true
type: ValidateDatasetNode
deps: third_insert
deps: first_hive_sync
first_upsert:
config:
record_size: 100
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
@@ -61,8 +68,15 @@ dag_content:
num_records_delete: 2000
type: DeleteNode
deps: first_upsert
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
second_validate:
config:
validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete
deps: second_hive_sync

View File

@@ -15,105 +15,70 @@
# limitations under the License.
dag_name: complex-dag-mor.yaml
dag_rounds: 1
dag_intermittent_delay_mins: 10
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 70000
record_size: 1000
num_partitions_insert: 1
repeat_count: 5
repeat_count: 1
num_records_insert: 100
type: InsertNode
deps: none
second_insert:
config:
record_size: 70000
record_size: 1000
num_partitions_insert: 1
repeat_count: 5
num_records_insert: 100
repeat_count: 1
num_records_insert: 1000
deps: first_insert
type: InsertNode
third_insert:
config:
record_size: 70000
record_size: 1000
num_partitions_insert: 1
repeat_count: 2
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
first_rollback:
config:
deps: third_insert
type: RollbackNode
first_upsert:
config:
record_size: 70000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 100
num_partitions_upsert: 10
type: UpsertNode
deps: first_rollback
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_upsert
first_hive_query:
deps: third_insert
first_validate:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveQueryNode
type: ValidateDatasetNode
deps: first_hive_sync
second_upsert:
first_upsert:
config:
record_size: 70000
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 100
num_partitions_upsert: 10
num_partitions_upsert: 1
type: UpsertNode
deps: first_hive_query
second_hive_query:
config:
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 1100
type: HiveQueryNode
deps: second_upsert
deps: first_validate
first_schedule_compact:
config:
type: ScheduleCompactNode
deps: second_hive_query
third_upsert:
deps: first_upsert
first_delete:
config:
record_size: 70000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 100
num_partitions_upsert: 10
type: UpsertNode
num_partitions_delete: 1
num_records_delete: 500
type: DeleteNode
deps: first_schedule_compact
first_compact:
config:
type: CompactNode
deps: first_schedule_compact
third_hive_query:
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 1400
type: HiveQueryNode
deps: first_compact
type: HiveSyncNode
deps: first_delete
second_validate:
config:
delete_input_data: true
type: ValidateDatasetNode
deps: second_hive_sync

View File

@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-clustering-example.yaml
dag_rounds: 3
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
type: InsertNode
deps: none
second_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 10000
deps: first_insert
type: InsertNode
third_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
first_delete:
config:
num_partitions_delete: 1
num_records_delete: 9000
type: DeleteNode
deps: third_insert
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
first_validate:
config:
validate_hive: true
type: ValidateDatasetNode
deps: first_hive_sync
first_cluster:
config:
execute_itr_count: 2
type: ClusteringNode
deps: first_validate
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_cluster
second_validate:
config:
validate_hive: true
type: ValidateDatasetNode
deps: second_hive_sync

View File

@@ -14,12 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-long-running-example.yaml
dag_rounds: 20
dag_intermittent_delay_mins: 10
dag_rounds: 50
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 100
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
@@ -27,7 +27,7 @@ dag_content:
deps: none
second_insert:
config:
record_size: 100
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 10000
@@ -35,19 +35,26 @@ dag_content:
type: InsertNode
third_insert:
config:
record_size: 100
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: third_insert
first_validate:
config:
validate_hive: true
type: ValidateDatasetNode
deps: third_insert
deps: first_hive_sync
first_upsert:
config:
record_size: 100
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
@@ -58,11 +65,25 @@ dag_content:
first_delete:
config:
num_partitions_delete: 1
num_records_delete: 2000
num_records_delete: 8000
type: DeleteNode
deps: first_upsert
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
second_validate:
config:
validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete
deps: second_hive_sync
last_validate:
config:
execute_itr_count: 50
validate_clean: true
validate_archival: true
type: ValidateAsyncOperations
deps: second_validate

View File

@@ -0,0 +1,89 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-long-running-multi-partitions.yaml
dag_rounds: 50
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 5
repeat_count: 1
num_records_insert: 1000
type: InsertNode
deps: none
second_insert:
config:
record_size: 1000
num_partitions_insert: 50
repeat_count: 1
num_records_insert: 10000
deps: first_insert
type: InsertNode
third_insert:
config:
record_size: 1000
num_partitions_insert: 2
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: third_insert
first_validate:
config:
validate_hive: true
type: ValidateDatasetNode
deps: first_hive_sync
first_upsert:
config:
record_size: 1000
num_partitions_insert: 2
num_records_insert: 300
repeat_count: 1
num_records_upsert: 100
num_partitions_upsert: 1
type: UpsertNode
deps: first_validate
first_delete:
config:
num_partitions_delete: 50
num_records_delete: 8000
type: DeleteNode
deps: first_upsert
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
second_validate:
config:
validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
deps: second_hive_sync
last_validate:
config:
execute_itr_count: 50
validate_clean: true
validate_archival: true
type: ValidateAsyncOperations
deps: second_validate

View File

@@ -20,6 +20,10 @@ hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
hoodie.clustering.plan.strategy.sort.columns=_row_key
hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
hoodie.clustering.inline.max.commits=1
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc

View File

@@ -371,7 +371,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* Common method containing steps to be performed before write (upsert/insert/...
*
* @param instantTime Instant Time
* @param hoodieTable Hoodie Table
* @return Write Status
*/
protected void preWrite(String instantTime, WriteOperationType writeOperationType) {
@@ -719,7 +718,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
*/
protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writeStatuses,
HoodieTable<T, I, K, O> table, String compactionCommitTime);
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
*

View File

@@ -90,6 +90,11 @@
<artifactId>hudi-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-utilities_${scala.binary.version}</artifactId>
@@ -201,6 +206,12 @@
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<!-- Fasterxml -->
<dependency>

View File

@@ -82,6 +82,7 @@ public class HoodieTestSuiteJob {
private BuiltinKeyGenerator keyGenerator;
public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
log.warn("Running spark job w/ app id " + jsc.sc().applicationId());
this.cfg = cfg;
this.jsc = jsc;
cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.integ.testsuite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
@@ -30,6 +28,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
@@ -41,9 +40,14 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
@@ -53,13 +57,15 @@ import java.util.Properties;
import java.util.Set;
/**
* A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform
* write operations into the target hudi dataset. Current supported writers are {@link HoodieDeltaStreamerWrapper}
* and {@link SparkRDDWriteClient}.
* A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform write operations into the target hudi dataset. Current supported writers are
* {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}.
*/
public class HoodieTestSuiteWriter {
private static Logger log = LoggerFactory.getLogger(HoodieTestSuiteWriter.class);
private HoodieDeltaStreamerWrapper deltaStreamerWrapper;
private HoodieWriteConfig writeConfig;
private SparkRDDWriteClient writeClient;
protected HoodieTestSuiteConfig cfg;
private Option<String> lastCheckpoint;
@@ -85,8 +91,9 @@ public class HoodieTestSuiteWriter {
HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc);
this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc);
this.hoodieReadClient = new HoodieReadClient(context, cfg.targetBasePath);
this.writeConfig = getHoodieClientConfig(cfg, props, schema);
if (!cfg.useDeltaStreamer) {
this.writeClient = new SparkRDDWriteClient(context, getHoodieClientConfig(cfg, props, schema), rollbackInflight);
this.writeClient = new SparkRDDWriteClient(context, writeConfig, rollbackInflight);
}
this.cfg = cfg;
this.configuration = jsc.hadoopConfiguration();
@@ -95,6 +102,10 @@ public class HoodieTestSuiteWriter {
this.schema = schema;
}
public HoodieWriteConfig getWriteConfig() {
return this.writeConfig;
}
private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, Properties props, String schema) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
@@ -178,6 +189,20 @@ public class HoodieTestSuiteWriter {
}
}
public void inlineClustering() {
if (!cfg.useDeltaStreamer) {
Option<String> clusteringInstantOpt = writeClient.scheduleClustering(Option.empty());
clusteringInstantOpt.ifPresent(clusteringInstant -> {
// inline cluster should auto commit as the user is never given control
log.warn("Clustering instant :: " + clusteringInstant);
writeClient.cluster(clusteringInstant, true);
});
} else {
// TODO: fix clustering to be done async https://issues.apache.org/jira/browse/HUDI-1590
throw new IllegalArgumentException("Clustering cannot be triggered with deltastreamer");
}
}
public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws
Exception {
if (!cfg.useDeltaStreamer) {
@@ -189,7 +214,7 @@ public class HoodieTestSuiteWriter {
}
public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) {
Option<String> instantTime) {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like

View File

@@ -88,6 +88,10 @@ public class DeltaConfig implements Serializable {
private static String REINIT_CONTEXT = "reinitialize_context";
private static String START_PARTITION = "start_partition";
private static String DELETE_INPUT_DATA = "delete_input_data";
private static String VALIDATE_HIVE = "validate_hive";
private static String EXECUTE_ITR_COUNT = "execute_itr_count";
private static String VALIDATE_ARCHIVAL = "validate_archival";
private static String VALIDATE_CLEAN = "validate_clean";
private Map<String, Object> configsMap;
@@ -159,6 +163,22 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString());
}
public boolean isValidateHive() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString());
}
public int getIterationCountToExecute() {
return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, -1).toString());
}
public boolean validateArchival() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_ARCHIVAL, false).toString());
}
public boolean validateClean() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_CLEAN, false).toString());
}
public Map<String, Object> getOtherConfigs() {
if (configsMap == null) {
return new HashMap<>();

View File

@@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaSparkContext;
/**
* This wraps the context needed for an execution of
* a {@link DagNode#execute(ExecutionContext)}.
* a {@link DagNode#execute(ExecutionContext, int)}.
*/
public class ExecutionContext implements Serializable {

View File

@@ -32,7 +32,7 @@ public class CleanNode extends DagNode<Boolean> {
}
@Override
public void execute(ExecutionContext executionContext) throws Exception {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing clean node {}", this.getName());
executionContext.getHoodieTestSuiteWriter().getWriteClient(this).clean();
}

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
/**
* Triggers inline clustering. Works only with writeClient. Also, add this as last node and end with validation if possible. As of now, after clustering, further inserts/upserts may not work since we
* call deltaStreamer.
*/
public class ClusteringNode extends DagNode<Option<String>> {
public ClusteringNode(Config config) {
this.config = config;
}
@Override
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
if (config.getIterationCountToExecute() == curItrCount) {
try {
log.warn("Executing ClusteringNode node {}", this.getName());
executionContext.getHoodieTestSuiteWriter().inlineClustering();
} catch (Exception e) {
log.warn("Exception thrown in ClusteringNode Node :: " + e.getCause() + ", msg :: " + e.getMessage());
throw e;
}
}
}
}

View File

@@ -40,10 +40,11 @@ public class CompactNode extends DagNode<JavaRDD<WriteStatus>> {
* if it has one.
*
* @param executionContext Execution context to run this compaction
* @param curItrCount cur interation count.
* @throws Exception will be thrown if any error occurred.
*/
@Override
public void execute(ExecutionContext executionContext) throws Exception {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline()

View File

@@ -91,9 +91,10 @@ public abstract class DagNode<O> implements Comparable<DagNode<O>> {
* Execute the {@link DagNode}.
*
* @param context The context needed for an execution of a node.
* @param curItrCount iteration count for executing the node.
* @throws Exception Thrown if the execution failed.
*/
public abstract void execute(ExecutionContext context) throws Exception;
public abstract void execute(ExecutionContext context, int curItrCount) throws Exception;
public boolean isCompleted() {
return isCompleted;

View File

@@ -36,7 +36,7 @@ public class DelayNode extends DagNode<Boolean> {
}
@Override
public void execute(ExecutionContext context) throws Exception {
public void execute(ExecutionContext context, int curItrCount) throws Exception {
log.warn("Waiting for "+ delayMins+" mins before going for next test run");
Thread.sleep(delayMins * 60 * 1000);
}

View File

@@ -43,7 +43,7 @@ public class HiveQueryNode extends DagNode<Boolean> {
}
@Override
public void execute(ExecutionContext executionContext) throws Exception {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive query node {}", this.getName());
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
HiveSyncConfig hiveSyncConfig = DataSourceUtils

View File

@@ -35,7 +35,7 @@ public class HiveSyncNode extends DagNode<Boolean> {
}
@Override
public void execute(ExecutionContext executionContext) throws Exception {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing hive sync node");
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());

View File

@@ -39,7 +39,7 @@ public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {
}
@Override
public void execute(ExecutionContext executionContext) throws Exception {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
// if the insert node has schema override set, reinitialize the table with new schema.
if (this.config.getReinitContext()) {
log.info(String.format("Reinitializing table with %s", this.config.getOtherConfigs().toString()));

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector;
@@ -42,10 +41,11 @@ public class RollbackNode extends DagNode<Option<HoodieInstant>> {
* Method helps to rollback the last commit instant in the timeline, if it has one.
*
* @param executionContext Execution context to perform this rollback
* @param curItrCount current iteration count.
* @throws Exception will be thrown if any error occurred
*/
@Override
public void execute(ExecutionContext executionContext) throws Exception {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing rollback node {}", this.getName());
// Can only be done with an instantiation of a new WriteClient hence cannot be done during DeltaStreamer
// testing for now

View File

@@ -35,7 +35,7 @@ public class ScheduleCompactNode extends DagNode<Option<String>> {
}
@Override
public void execute(ExecutionContext executionContext) throws Exception {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing schedule compact node {}", this.getName());
// Can only be done with an instantiation of a new WriteClient hence cannot be done during DeltaStreamer
// testing for now
@@ -48,7 +48,7 @@ public class ScheduleCompactNode extends DagNode<Option<String>> {
HoodieCommitMetadata metadata = org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(metaClient
.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
Option<String> scheduledInstant = executionContext.getHoodieTestSuiteWriter().scheduleCompaction(Option.of(metadata
.getExtraMetadata()));
.getExtraMetadata()));
if (scheduledInstant.isPresent()) {
log.info("Scheduling compaction instant {}", scheduledInstant.get());
}

View File

@@ -42,10 +42,11 @@ public class SparkSQLQueryNode extends DagNode<Boolean> {
* Method helps to execute a sparkSql query from a hive table.
*
* @param executionContext Execution context to perform this query.
* @param curItrCount current iteration count.
* @throws Exception will be thrown if ant error occurred
*/
@Override
public void execute(ExecutionContext executionContext) throws Exception {
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
log.info("Executing spark sql query node");
this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration());
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());

View File

@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Node to validate data set sanity like total file versions retained, has cleaning happened, has archival happened, etc.
*/
public class ValidateAsyncOperations extends DagNode<Option<String>> {
private static Logger log = LoggerFactory.getLogger(ValidateAsyncOperations.class);
public ValidateAsyncOperations(Config config) {
this.config = config;
}
@Override
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
if (config.getIterationCountToExecute() == curItrCount) {
try {
log.warn("Executing ValidateHoodieAsyncOperations node {} with target base path {} ", this.getName(),
executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
String basePath = executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath;
int maxCommitsRetained = executionContext.getHoodieTestSuiteWriter().getWriteConfig().getCleanerCommitsRetained() + 1;
FileSystem fs = FSUtils.getFs(basePath, executionContext.getHoodieTestSuiteWriter().getConfiguration());
Map<String, Integer> fileIdCount = new HashMap<>();
AtomicInteger maxVal = new AtomicInteger();
List<String> partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, basePath);
for (String partitionPath : partitionPaths) {
List<FileStatus> fileStatuses = Arrays.stream(FSUtils.getAllDataFilesInPartition(fs, new Path(basePath + "/" + partitionPath))).collect(Collectors.toList());
fileStatuses.forEach(entry -> {
String fileId = FSUtils.getFileId(entry.getPath().getName());
fileIdCount.computeIfAbsent(fileId, k -> 0);
fileIdCount.put(fileId, fileIdCount.get(fileId) + 1);
maxVal.set(Math.max(maxVal.get(), fileIdCount.get(fileId)));
});
}
if (maxVal.get() > maxCommitsRetained) {
throw new AssertionError("Total commits (" + maxVal + ") retained exceeds max value of " + maxCommitsRetained + ", total commits : ");
}
if (config.validateArchival() || config.validateClean()) {
Pattern ARCHIVE_FILE_PATTERN =
Pattern.compile("\\.commits_\\.archive\\..*");
Pattern CLEAN_FILE_PATTERN =
Pattern.compile(".*\\.clean\\..*");
String metadataPath = executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/.hoodie";
FileStatus[] metaFileStatuses = fs.listStatus(new Path(metadataPath));
boolean archFound = false;
boolean cleanFound = false;
for (FileStatus fileStatus : metaFileStatuses) {
Matcher archFileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileStatus.getPath().getName());
if (archFileMatcher.matches()) {
archFound = true;
if (config.validateArchival() && !config.validateClean()) {
break;
}
}
Matcher cleanFileMatcher = CLEAN_FILE_PATTERN.matcher(fileStatus.getPath().getName());
if (cleanFileMatcher.matches()) {
cleanFound = true;
if (!config.validateArchival() && config.validateClean()) {
break;
}
}
if (config.validateClean() && config.validateArchival()) {
if (archFound && cleanFound) {
break;
}
}
}
if (config.validateArchival() && !archFound) {
throw new AssertionError("Archival NotFound in " + metadataPath);
}
if (config.validateClean() && !cleanFound) {
throw new AssertionError("Clean commits NotFound in " + metadataPath);
}
}
} catch (Exception e) {
log.warn("Exception thrown in ValidateHoodieAsyncOperations Node :: " + e.getCause() + ", msg :: " + e.getMessage());
throw e;
}
}
}
}

View File

@@ -62,7 +62,7 @@ public class ValidateDatasetNode extends DagNode<Boolean> {
}
@Override
public void execute(ExecutionContext context) throws Exception {
public void execute(ExecutionContext context, int curItrCount) throws Exception {
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
@@ -111,17 +111,19 @@ public class ValidateDatasetNode extends DagNode<Boolean> {
throw new AssertionError("Hudi contents does not match contents input data. ");
}
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + tableName);
Dataset<Row> trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
intersectionDf = inputSnapshotDf.intersect(trimmedDf);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (inputSnapshotDf.except(intersectionDf).count() != 0) {
log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
if (config.isValidateHive()) {
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + tableName);
Dataset<Row> trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
intersectionDf = inputSnapshotDf.intersect(trimmedDf);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (inputSnapshotDf.except(intersectionDf).count() != 0) {
log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
}
}
// if delete input data is enabled, erase input data.

View File

@@ -40,9 +40,10 @@ public class ValidateNode<R> extends DagNode {
* was set to true or default, but the parent nodes have not completed yet.
*
* @param executionContext Context to execute this node
* @param curItrCount current iteration count.
*/
@Override
public void execute(ExecutionContext executionContext) {
public void execute(ExecutionContext executionContext, int curItrCount) {
if (this.getParentNodes().size() > 0 && (Boolean) this.config.getOtherConfigs().getOrDefault("WAIT_FOR_PARENTS",
true)) {
for (DagNode node : (List<DagNode>) this.getParentNodes()) {

View File

@@ -101,7 +101,8 @@ public class DagScheduler {
while (queue.size() > 0) {
DagNode nodeToExecute = queue.poll();
log.warn("Executing node \"" + nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " + nodeToExecute.getConfig());
futures.add(service.submit(() -> executeNode(nodeToExecute)));
int finalCurRound = curRound;
futures.add(service.submit(() -> executeNode(nodeToExecute, finalCurRound)));
if (nodeToExecute.getChildNodes().size() > 0) {
childNodes.addAll(nodeToExecute.getChildNodes());
}
@@ -114,7 +115,7 @@ public class DagScheduler {
} while (queue.size() > 0);
log.info("Finished workloads for round num " + curRound);
if (curRound < workflowDag.getRounds()) {
new DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext);
new DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext, curRound);
}
// After each level, report and flush the metrics
@@ -128,14 +129,14 @@ public class DagScheduler {
*
* @param node The node to be executed
*/
private void executeNode(DagNode node) {
private void executeNode(DagNode node, int curRound) {
if (node.isCompleted()) {
throw new RuntimeException("DagNode already completed! Cannot re-execute");
}
try {
int repeatCount = node.getConfig().getRepeatCount();
while (repeatCount > 0) {
node.execute(executionContext);
node.execute(executionContext, curRound);
log.info("Finished executing {}", node.getName());
repeatCount--;
}

View File

@@ -70,7 +70,7 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
if (lastRecord == null) {
GenericRecord record = partitionPathsNonEmpty
? this.generator.getNewPayloadWithTimestamp(this.firstPartitionPathField)
: this.generator.getNewPayload();
: this.generator.getNewPayload(partitionPathFieldNames);
lastRecord = record;
return record;
} else {

View File

@@ -331,7 +331,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
*/
public GenericRecord updateTimestamp(GenericRecord record, String fieldName) {
long delta = TimeUnit.MILLISECONDS.convert(++partitionIndex % numDatePartitions, TimeUnit.DAYS);
record.put(fieldName, System.currentTimeMillis() - delta);
record.put(fieldName, (System.currentTimeMillis() - delta)/1000);
return record;
}

View File

@@ -309,7 +309,10 @@ public class DeltaSync implements Serializable {
if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
}
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
} else if (commitMetadata.getOperationType() == WriteOperationType.CLUSTER) {
// incase of CLUSTER commit, no checkpoint will be available in metadata.
resumeCheckpointStr = Option.empty();
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
@@ -373,7 +376,7 @@ public class DeltaSync implements Serializable {
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=("
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
return null;
}

View File

@@ -70,6 +70,7 @@
<include>org.apache.hudi:hudi-common</include>
<include>org.apache.hudi:hudi-client-common</include>
<include>org.apache.hudi:hudi-spark-client</include>
<include>org.apache.hudi:hudi-spark-common</include>
<include>org.apache.hudi:hudi-utilities_${scala.binary.version}</include>
<include>org.apache.hudi:hudi-spark_${scala.binary.version}</include>
<include>org.apache.hudi:hudi-spark2_${scala.binary.version}</include>