1
0

[HUDI-3656] Adding medium sized dataset for clustering and minor fixes to integ tests (#5063)

This commit is contained in:
Sivabalan Narayanan
2022-03-18 09:44:56 -07:00
committed by GitHub
parent 6fe4d6e2f6
commit 2551c26183
3 changed files with 152 additions and 15 deletions

View File

@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# to be used with test-aggressive-clean-archival.properties
dag_name: deltastreamer-medium-clustering.yaml
dag_rounds: 20
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 5
repeat_count: 1
num_records_insert: 1000
type: InsertNode
deps: none
second_insert:
config:
record_size: 1000
num_partitions_insert: 50
repeat_count: 1
num_records_insert: 10000
deps: first_insert
type: InsertNode
third_insert:
config:
record_size: 1000
num_partitions_insert: 2
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
first_upsert:
config:
record_size: 1000
num_partitions_insert: 2
num_records_insert: 300
repeat_count: 1
num_records_upsert: 100
num_partitions_upsert: 1
type: UpsertNode
deps: third_insert
first_delete:
config:
num_partitions_delete: 50
num_records_delete: 8000
type: DeleteNode
deps: first_upsert
second_validate:
config:
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 20
type: ValidateAsyncOperations
deps: second_validate

View File

@@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: spark-medium-clustering.yaml
dag_rounds: 20
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
config:
record_size: 200
num_partitions_insert: 50
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: none
first_validate:
config:
validate_hive: false
type: ValidateDatasetNode
deps: first_insert
first_upsert:
config:
record_size: 200
num_partitions_insert: 50
num_records_insert: 300
repeat_count: 1
num_records_upsert: 3000
num_partitions_upsert: 50
type: SparkUpsertNode
deps: first_validate
first_delete:
config:
num_partitions_delete: 50
num_records_delete: 8000
type: SparkDeleteNode
deps: first_upsert
second_validate:
config:
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete
last_validate:
config:
execute_itr_count: 20
type: ValidateAsyncOperations
deps: second_validate

View File

@@ -18,8 +18,15 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
@@ -59,21 +66,19 @@ public class ValidateAsyncOperations extends DagNode<Option<String>> {
int maxCommitsRetained = executionContext.getHoodieTestSuiteWriter().getWriteConfig().getCleanerCommitsRetained() + 1;
FileSystem fs = FSUtils.getFs(basePath, executionContext.getHoodieTestSuiteWriter().getConfiguration());
Map<String, Integer> fileIdCount = new HashMap<>();
AtomicInteger maxVal = new AtomicInteger();
List<String> partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, basePath);
for (String partitionPath : partitionPaths) {
List<FileStatus> fileStatuses = Arrays.stream(FSUtils.getAllDataFilesInPartition(fs, new Path(basePath + "/" + partitionPath))).collect(Collectors.toList());
fileStatuses.forEach(entry -> {
String fileId = FSUtils.getFileId(entry.getPath().getName());
fileIdCount.computeIfAbsent(fileId, k -> 0);
fileIdCount.put(fileId, fileIdCount.get(fileId) + 1);
maxVal.set(Math.max(maxVal.get(), fileIdCount.get(fileId)));
});
}
if (maxVal.get() > maxCommitsRetained) {
throw new AssertionError("Total commits (" + maxVal + ") retained exceeds max value of " + maxCommitsRetained + ", total commits : ");
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
.setConf(executionContext.getJsc().hadoopConfiguration()).build();
Option<HoodieInstant> latestCleanInstant = metaClient.getActiveTimeline().filter(instant -> instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)).lastInstant();
if (latestCleanInstant.isPresent()) {
log.warn("Latest clean commit " + latestCleanInstant.get());
HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, latestCleanInstant.get());
String earliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain();
log.warn("Earliest commit to retain : " + earliestCommitToRetain);
long unCleanedInstants = metaClient.getActiveTimeline().filterCompletedInstants().filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestCommitToRetain)).getInstants().count();
ValidationUtils.checkArgument(unCleanedInstants >= (maxCommitsRetained + 1), "Total uncleaned instants " + unCleanedInstants +
" mismatched with max commits retained " + (maxCommitsRetained + 1));
}
if (config.validateArchival() || config.validateClean()) {