1
0

[HUDI-2791] Allows duplicate files for metadata commit (#4033)

This commit is contained in:
Danny Chan
2021-11-19 14:30:17 +08:00
committed by GitHub
parent 4e067ca581
commit 7a00f867ae
2 changed files with 6 additions and 3 deletions

View File

@@ -102,8 +102,10 @@ public class HoodieTableMetadataUtil {
int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
String filename = pathWithPartition.substring(offset);
ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata");
newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
long totalWriteBytes = newFiles.containsKey(filename)
? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes()
: hoodieWriteStat.getTotalWriteBytes();
newFiles.put(filename, totalWriteBytes);
});
// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(

View File

@@ -333,7 +333,6 @@ public class StreamWriteOperatorCoordinator
this.instant = instant;
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
this.writeClient.upgradeDowngrade(this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
}
@@ -360,6 +359,8 @@ public class StreamWriteOperatorCoordinator
}
// starts a new instant
startInstant();
// upgrade downgrade
this.writeClient.upgradeDowngrade(this.instant);
}, "initialize instant %s", instant);
}