[HUDI-603]: DeltaStreamer can now fetch schema before every run in continuous mode (#1566)
Co-authored-by: Balaji Varadarajan <balaji.varadarajan@robinhood.com>
This commit is contained in:
@@ -41,7 +41,8 @@ public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends Abstrac
|
||||
@Override
|
||||
public void compact(HoodieInstant instant) throws IOException {
|
||||
LOG.info("Compactor executing compaction " + instant);
|
||||
JavaRDD<WriteStatus> res = compactionClient.compact(instant.getTimestamp());
|
||||
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>)compactionClient;
|
||||
JavaRDD<WriteStatus> res = writeClient.compact(instant.getTimestamp());
|
||||
long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();
|
||||
if (numWriteErrors != 0) {
|
||||
// We treat even a single error in compaction as fatal
|
||||
@@ -50,6 +51,6 @@ public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends Abstrac
|
||||
"Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
|
||||
}
|
||||
// Commit compaction
|
||||
compactionClient.commitCompaction(instant.getTimestamp(), res, Option.empty());
|
||||
writeClient.commitCompaction(instant.getTimestamp(), res, Option.empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user