1
0

[HUDI-269] Limit sync frequency (#921)

* [HUDI-269] Throttle DeltaStreamer sync runs
This commit is contained in:
Xing Pan
2019-09-24 20:30:35 +08:00
committed by Balaji Varadarajan
parent 1e6dd1ca1d
commit bf05f95413

View File

@@ -223,6 +223,10 @@ public class HoodieDeltaStreamer implements Serializable {
+ " source-fetch -> Transform -> Hudi Write in loop")
public Boolean continuousMode = false;
@Parameter(names = {"--min-sync-interval-seconds"}, description = "the min sync interval of each sync in "
+ "continuous mode")
public Integer minSyncIntervalSeconds = 0;
@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";
@@ -384,6 +388,7 @@ public class HoodieDeltaStreamer implements Serializable {
try {
while (!isShutdownRequested()) {
try {
long start = System.currentTimeMillis();
Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
if (scheduledCompactionInstant.isPresent()) {
log.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")");
@@ -392,6 +397,12 @@ public class HoodieDeltaStreamer implements Serializable {
scheduledCompactionInstant.get()));
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
}
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start);
if (toSleepMs > 0) {
log.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: "
+ toSleepMs + " ms.");
Thread.sleep(toSleepMs);
}
} catch (Exception e) {
log.error("Shutting down delta-sync due to exception", e);
error = true;