diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 291b4b8db..8dc438eb7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -195,7 +195,16 @@ public class DeltaSync implements Serializable { if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(fs.getConf()), cfg.targetBasePath, cfg.payloadClassName); - this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()); + switch (meta.getTableType()) { + case COPY_ON_WRITE: + this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants()); + break; + case MERGE_ON_READ: + this.commitTimelineOpt = Option.of(meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()); + break; + default: + throw new HoodieException("Unsupported table type :" + meta.getTableType()); + } } else { this.commitTimelineOpt = Option.empty(); HoodieTableMetaClient.initTableType(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, @@ -378,7 +387,7 @@ public class DeltaSync implements Serializable { // Schedule compaction if needed if (cfg.isAsyncCompactionEnabled()) { - scheduledCompactionInstant = writeClient.scheduleCompaction(Option.of(checkpointCommitMetadata)); + scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty()); } if (!isEmpty) {