diff --git a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml new file mode 100644 index 000000000..0cd4108cb --- /dev/null +++ b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# to be used with test-aggressive-clean-archival.properties + +dag_name: deltastreamer-medium-clustering.yaml +dag_rounds: 20 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 5 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 50 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 2 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 2 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: third_insert + first_delete: + config: + num_partitions_delete: 50 + num_records_delete: 8000 + type: DeleteNode + deps: first_upsert + second_validate: + config: + validate_hive: false + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete + last_validate: + config: + execute_itr_count: 20 + type: ValidateAsyncOperations + deps: second_validate diff --git a/docker/demo/config/test-suite/spark-medium-clustering.yaml b/docker/demo/config/test-suite/spark-medium-clustering.yaml new file mode 100644 index 000000000..09537a23d --- /dev/null +++ b/docker/demo/config/test-suite/spark-medium-clustering.yaml @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: spark-medium-clustering.yaml +dag_rounds: 20 +dag_intermittent_delay_mins: 0 +dag_content: + first_insert: + config: + record_size: 200 + num_partitions_insert: 50 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: none + first_validate: + config: + validate_hive: false + type: ValidateDatasetNode + deps: first_insert + first_upsert: + config: + record_size: 200 + num_partitions_insert: 50 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 3000 + num_partitions_upsert: 50 + type: SparkUpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 50 + num_records_delete: 8000 + type: SparkDeleteNode + deps: first_upsert + second_validate: + config: + validate_hive: false + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete + last_validate: + config: + execute_itr_count: 20 + type: ValidateAsyncOperations + deps: second_validate diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java index a7c3245f6..7b5d4dd41 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java @@ -18,8 +18,15 @@ package org.apache.hudi.integ.testsuite.dag.nodes; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; @@ -59,21 +66,19 @@ public class ValidateAsyncOperations extends DagNode> { int maxCommitsRetained = executionContext.getHoodieTestSuiteWriter().getWriteConfig().getCleanerCommitsRetained() + 1; FileSystem fs = FSUtils.getFs(basePath, executionContext.getHoodieTestSuiteWriter().getConfiguration()); - Map fileIdCount = new HashMap<>(); - - AtomicInteger maxVal = new AtomicInteger(); - List partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, basePath); - for (String partitionPath : partitionPaths) { - List fileStatuses = Arrays.stream(FSUtils.getAllDataFilesInPartition(fs, new Path(basePath + "/" + partitionPath))).collect(Collectors.toList()); - fileStatuses.forEach(entry -> { - String fileId = FSUtils.getFileId(entry.getPath().getName()); - fileIdCount.computeIfAbsent(fileId, k -> 0); - fileIdCount.put(fileId, fileIdCount.get(fileId) + 1); - maxVal.set(Math.max(maxVal.get(), fileIdCount.get(fileId))); - }); - } - if (maxVal.get() > maxCommitsRetained) { - throw new AssertionError("Total commits (" + maxVal + ") retained exceeds max value of " + maxCommitsRetained + ", total commits : "); + + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath) + .setConf(executionContext.getJsc().hadoopConfiguration()).build(); + Option latestCleanInstant = metaClient.getActiveTimeline().filter(instant -> instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)).lastInstant(); + if (latestCleanInstant.isPresent()) { + log.warn("Latest clean commit " + latestCleanInstant.get()); + HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, latestCleanInstant.get()); + String earliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain(); + log.warn("Earliest commit to retain : " + earliestCommitToRetain); + long unCleanedInstants = metaClient.getActiveTimeline().filterCompletedInstants().filter(instant -> + HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestCommitToRetain)).getInstants().count(); + ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants + + " mismatched with max commits retained " + (maxCommitsRetained + 1)); } if (config.validateArchival() || config.validateClean()) {