From 971c7d41bd912b0e00ae8606adbe36548e2c49b3 Mon Sep 17 00:00:00 2001 From: harveyyue Date: Fri, 10 Jan 2020 15:22:44 +0800 Subject: [PATCH] [HUDI-322] DeltaSteamer should pick checkpoints off only deltacommits for MOR tables --- .../hudi/utilities/deltastreamer/DeltaSync.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 291b4b8db..8dc438eb7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -195,7 +195,16 @@ public class DeltaSync implements Serializable { if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, cfg.payloadClassName); - this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); + switch (meta.getTableType()) { + case COPY_ON_WRITE: + this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); + break; + case MERGE_ON_READ: + this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()); + break; + default: + throw new HoodieException("Unsupported table type :" + meta.getTableType()); + } } else { this.commitTimelineOpt = Option.empty(); HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, @@ -378,7 +387,7 @@ public class DeltaSync implements Serializable { // Schedule compaction if needed if (cfg.isAsyncCompactionEnabled()) { - scheduledCompactionInstant = writeClient.scheduleCompaction(Option.of(checkpointCommitMetadata)); + scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty()); } if (!isEmpty) {