[HUDI-1151] Fix NPE when no new data in kafka using HoodieDeltaStreamer (#1921)
This commit is contained in:
@@ -577,7 +577,7 @@ public class HoodieDeltaStreamer implements Serializable {
|
|||||||
try {
|
try {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
Pair<Option<String>, JavaRDD<WriteStatus>> scheduledCompactionInstantAndRDD = deltaSync.syncOnce();
|
Pair<Option<String>, JavaRDD<WriteStatus>> scheduledCompactionInstantAndRDD = deltaSync.syncOnce();
|
||||||
if (scheduledCompactionInstantAndRDD.getLeft().isPresent()) {
|
if (null != scheduledCompactionInstantAndRDD && scheduledCompactionInstantAndRDD.getLeft().isPresent()) {
|
||||||
LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.getLeft() + ")");
|
LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.getLeft() + ")");
|
||||||
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED,
|
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED,
|
||||||
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.getLeft().get()));
|
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.getLeft().get()));
|
||||||
|
|||||||
Reference in New Issue
Block a user