From 2126f13e130ea0394bb395aa38bc6df4052576e6 Mon Sep 17 00:00:00 2001 From: lw0090 Date: Mon, 12 Oct 2020 09:29:57 +0800 Subject: [PATCH] [HUDI-791] Replace null by Option in Delta Streamer (#2171) --- .../deltastreamer/HoodieDeltaStreamer.java | 57 ++++++++----------- .../HoodieMultiTableDeltaStreamer.java | 3 +- .../functional/TestHoodieDeltaStreamer.java | 2 +- 3 files changed, 28 insertions(+), 34 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index f86a94214..002ead385 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -99,22 +99,22 @@ public class HoodieDeltaStreamer implements Serializable { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException { this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), - jssc.hadoopConfiguration(), null); + jssc.hadoopConfiguration(), Option.empty()); } - public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, TypedProperties props) throws IOException { + public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, Option props) throws IOException { this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), jssc.hadoopConfiguration(), props); } public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException { - this(cfg, jssc, fs, conf, null); + this(cfg, jssc, fs, conf, Option.empty()); } public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, - TypedProperties props) throws IOException { + Option props) throws IOException { // Resolving the properties first in a consistent way - this.properties = props != null ? props : UtilHelpers.readConfig( + this.properties = props.isPresent() ? props.get() : UtilHelpers.readConfig( FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), new Path(cfg.propsFilePath), cfg.configs).getConfig(); @@ -128,7 +128,7 @@ public class HoodieDeltaStreamer implements Serializable { this.bootstrapExecutor = Option.ofNullable( cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null); this.deltaSyncService = Option.ofNullable( - cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, this.properties)); + cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, Option.ofNullable(this.properties))); } public void shutdownGracefully() { @@ -503,7 +503,7 @@ public class HoodieDeltaStreamer implements Serializable { /** * Async Compactor Service. */ - private AsyncCompactService asyncCompactService; + private Option asyncCompactService; /** * Table Type. @@ -516,10 +516,11 @@ public class HoodieDeltaStreamer implements Serializable { private transient DeltaSync deltaSync; public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, - TypedProperties properties) throws IOException { + Option properties) throws IOException { this.cfg = cfg; this.jssc = jssc; this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); + this.asyncCompactService = Option.empty(); if (fs.exists(new Path(cfg.targetBasePath))) { HoodieTableMetaClient meta = @@ -547,7 +548,7 @@ public class HoodieDeltaStreamer implements Serializable { ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT, "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); - this.props = properties; + this.props = properties.get(); LOG.info("Creating delta streamer with configs : " + props.toString()); this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor( UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc); @@ -558,7 +559,7 @@ public class HoodieDeltaStreamer implements Serializable { public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException { - this(cfg, jssc, fs, conf, null); + this(cfg, jssc, fs, conf, Option.empty()); } public DeltaSync getDeltaSync() { @@ -579,12 +580,12 @@ public class HoodieDeltaStreamer implements Serializable { while (!isShutdownRequested()) { try { long start = System.currentTimeMillis(); - Pair, JavaRDD> scheduledCompactionInstantAndRDD = deltaSync.syncOnce(); - if (null != scheduledCompactionInstantAndRDD && scheduledCompactionInstantAndRDD.getLeft().isPresent()) { - LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.getLeft() + ")"); - asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, - HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.getLeft().get())); - asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + Option, JavaRDD>> scheduledCompactionInstantAndRDD = Option.ofNullable(deltaSync.syncOnce()); + if (scheduledCompactionInstantAndRDD.isPresent() && scheduledCompactionInstantAndRDD.get().getLeft().isPresent()) { + LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.get().getLeft() + ")"); + asyncCompactService.get().enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, + HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get())); + asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); } long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); if (toSleepMs > 0) { @@ -610,9 +611,9 @@ public class HoodieDeltaStreamer implements Serializable { */ private void shutdownCompactor(boolean error) { LOG.info("Delta Sync shutdown. Error ?" + error); - if (asyncCompactService != null) { + if (asyncCompactService.isPresent()) { LOG.warn("Gracefully shutting down compactor"); - asyncCompactService.shutdown(false); + asyncCompactService.get().shutdown(false); } } @@ -624,23 +625,23 @@ public class HoodieDeltaStreamer implements Serializable { */ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { if (cfg.isAsyncCompactionEnabled()) { - if (null != asyncCompactService) { + if (asyncCompactService.isPresent()) { // Update the write client used by Async Compactor. - asyncCompactService.updateWriteClient(writeClient); + asyncCompactService.get().updateWriteClient(writeClient); } else { - asyncCompactService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient); + asyncCompactService = Option.ofNullable(new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient)); // Enqueue existing pending compactions first HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true); List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); - pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); - asyncCompactService.start((error) -> { + pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingCompaction(hoodieInstant)); + asyncCompactService.get().start((error) -> { // Shutdown DeltaSync shutdown(false); return true; }); try { - asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); + asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); } catch (InterruptedException ie) { throw new HoodieException(ie); } @@ -666,14 +667,6 @@ public class HoodieDeltaStreamer implements Serializable { return sparkSession; } - public JavaSparkContext getJavaSparkContext() { - return jssc; - } - - public AsyncCompactService getAsyncCompactService() { - return asyncCompactService; - } - public TypedProperties getProps() { return props; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java index 1b3073115..5333b64fe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java @@ -22,6 +22,7 @@ import com.beust.jcommander.Parameter; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.ValidationUtils; @@ -351,7 +352,7 @@ public class HoodieMultiTableDeltaStreamer { public void sync() { for (TableExecutionContext context : tableExecutionContexts) { try { - new HoodieDeltaStreamer(context.getConfig(), jssc, context.getProperties()).sync(); + new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync(); successTables.add(Helpers.getTableWithDatabase(context)); } catch (Exception e) { logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 5916a5be0..e7bb15a3c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -520,7 +520,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { // generate parquet files using kafka connect naming convention HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100)), new Path(filePath)); - HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), props); + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), Option.ofNullable(props)); assertEquals("kafka_topic1,0:200", deltaStreamer.getConfig().checkpoint); }