From 7a00f867aef79f2c0f411f0f9ff22572fb61093e Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 19 Nov 2021 14:30:17 +0800 Subject: [PATCH] [HUDI-2791] Allows duplicate files for metadata commit (#4033) --- .../org/apache/hudi/metadata/HoodieTableMetadataUtil.java | 6 ++++-- .../apache/hudi/sink/StreamWriteOperatorCoordinator.java | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index b028056bb..9078bd08d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -102,8 +102,10 @@ public class HoodieTableMetadataUtil { int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1; String filename = pathWithPartition.substring(offset); - ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata"); - newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes()); + long totalWriteBytes = newFiles.containsKey(filename) + ? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes() + : hoodieWriteStat.getTotalWriteBytes(); + newFiles.put(filename, totalWriteBytes); }); // New files added to a partition HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 08a04e3f8..c1b56b95f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -333,7 +333,6 @@ public class StreamWriteOperatorCoordinator this.instant = instant; this.writeClient.startCommitWithTime(instant, tableState.commitAction); this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); - this.writeClient.upgradeDowngrade(this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } @@ -360,6 +359,8 @@ public class StreamWriteOperatorCoordinator } // starts a new instant startInstant(); + // upgrade downgrade + this.writeClient.upgradeDowngrade(this.instant); }, "initialize instant %s", instant); }