From c8a2033c275e21a752893fc89311e1f6846f5a78 Mon Sep 17 00:00:00 2001 From: zhangyue19921010 <69956021+zhangyue19921010@users.noreply.github.com> Date: Tue, 13 Jul 2021 09:14:17 +0800 Subject: [PATCH] [HUDI-2144]Bug-Fix:Offline clustering(HoodieClusteringJob) will cause insert action losing data (#3240) * fixed * add testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan ut * fix CheckStyle Co-authored-by: yuezhang --- .../action/commit/UpsertPartitioner.java | 2 +- .../action/commit/TestUpsertPartitioner.java | 45 +++++++++++++++- .../common/testutils/ClusteringTestUtils.java | 54 +++++++++++++++++++ 3 files changed, 99 insertions(+), 2 deletions(-) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/testutils/ClusteringTestUtils.java diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 3ac81510b..3c0a51155 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -146,7 +146,7 @@ public class UpsertPartitioner> extends Partiti * @return smallFiles not in clustering */ private List filterSmallFilesInClustering(final Set pendingClusteringFileGroupsId, final List smallFiles) { - if (this.config.isClusteringEnabled()) { + if (!pendingClusteringFileGroupsId.isEmpty()) { return smallFiles.stream() .filter(smallFile -> !pendingClusteringFileGroupsId.contains(smallFile.location.getFileId())).collect(Collectors.toList()); } else { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index c495ff600..3be0bb340 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -18,21 +18,26 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.ClusteringTestUtils; import org.apache.hudi.common.testutils.CompactionTestUtils; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieHBaseIndexConfig; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; @@ -350,6 +355,44 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { "Should be assigned to only file id not pending compaction which is 2"); } + @Test + public void testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan() throws Exception { + final String testPartitionPath = DEFAULT_PARTITION_PATHS[0]; + + // create HoodieWriteConfig and set inline and async clustering disable here. + HoodieWriteConfig config = makeHoodieClientConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().build()) + .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(false).withAsyncClustering(false).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1000 * 1024).parquetMaxFileSize(1000 * 1024).build()) + .build(); + + // create file slice with instantTime 001 and build clustering plan including this created 001 file slice. + HoodieClusteringPlan clusteringPlan = ClusteringTestUtils.createClusteringPlan(metaClient, "001", "1"); + // create requested replace commit + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build(); + FileCreateUtils.createRequestedReplaceCommit(basePath,"002", Option.of(requestedReplaceMetadata)); + + // create file slice 002 + FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "2", 1); + FileCreateUtils.createCommit(basePath, "002"); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + // generate new data to be ingested + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + List insertRecords = dataGenerator.generateInserts("003", 100); + WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords))); + + HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient); + // create UpsertPartitioner + UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config); + + // for now we have file slice1 and file slice2 and file slice1 is contained in pending clustering plan + // So that only file slice2 can be used for ingestion. + assertEquals(1, partitioner.smallFiles.size(), "Should have 1 small file to be ingested."); + } + @Test public void testUpsertPartitionerWithSmallFileHandlingWithCanIndexLogFiles() throws Exception { // Note this is used because it is same partition path used in CompactionTestUtils.createCompactionPlan() diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/ClusteringTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/ClusteringTestUtils.java new file mode 100644 index 000000000..b142fe90b --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/ClusteringTestUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.exception.HoodieException; + +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; +import static org.apache.hudi.common.testutils.FileCreateUtils.createBaseFile; +import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; + +public class ClusteringTestUtils { + + public static HoodieClusteringPlan createClusteringPlan(HoodieTableMetaClient metaClient, String instantTime, String fileId) { + try { + String basePath = metaClient.getBasePath(); + String partition = DEFAULT_PARTITION_PATHS[0]; + createBaseFile(basePath, partition, instantTime, fileId, 1); + FileSlice slice = new FileSlice(partition, instantTime, fileId); + slice.setBaseFile(new CompactionTestUtils.DummyHoodieBaseFile(Paths.get(basePath, partition, + baseFileName(instantTime, fileId)).toString())); + List[] fileSliceGroups = new List[] {Collections.singletonList(slice)}; + HoodieClusteringPlan clusteringPlan = ClusteringUtils.createClusteringPlan("strategy", new HashMap<>(), + fileSliceGroups, Collections.emptyMap()); + return clusteringPlan; + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); + } + } +}