diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 434638875..ee826ac10 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -223,6 +223,10 @@ public class HoodieDeltaStreamer implements Serializable { + " source-fetch -> Transform -> Hudi Write in loop") public Boolean continuousMode = false; + @Parameter(names = {"--min-sync-interval-seconds"}, description = "the min sync interval of each sync in " + + "continuous mode") + public Integer minSyncIntervalSeconds = 0; + @Parameter(names = {"--spark-master"}, description = "spark master to use.") public String sparkMaster = "local[2]"; @@ -384,6 +388,7 @@ public class HoodieDeltaStreamer implements Serializable { try { while (!isShutdownRequested()) { try { + long start = System.currentTimeMillis(); Option scheduledCompactionInstant = deltaSync.syncOnce(); if (scheduledCompactionInstant.isPresent()) { log.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")"); @@ -392,6 +397,12 @@ public class HoodieDeltaStreamer implements Serializable { scheduledCompactionInstant.get())); asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); } + long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); + if (toSleepMs > 0) { + log.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: " + + toSleepMs + " ms."); + Thread.sleep(toSleepMs); + } } catch (Exception e) { log.error("Shutting down delta-sync due to exception", e); error = true;