1
0

[HUDI-791] Replace null by Option in Delta Streamer (#2171)

This commit is contained in:
lw0090
2020-10-12 09:29:57 +08:00
committed by GitHub
parent 032bc3b08f
commit 2126f13e13
3 changed files with 28 additions and 34 deletions

View File

@@ -99,22 +99,22 @@ public class HoodieDeltaStreamer implements Serializable {
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
jssc.hadoopConfiguration(), null); jssc.hadoopConfiguration(), Option.empty());
} }
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, TypedProperties props) throws IOException { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, Option<TypedProperties> props) throws IOException {
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
jssc.hadoopConfiguration(), props); jssc.hadoopConfiguration(), props);
} }
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException { public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) throws IOException {
this(cfg, jssc, fs, conf, null); this(cfg, jssc, fs, conf, Option.empty());
} }
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
TypedProperties props) throws IOException { Option<TypedProperties> props) throws IOException {
// Resolving the properties first in a consistent way // Resolving the properties first in a consistent way
this.properties = props != null ? props : UtilHelpers.readConfig( this.properties = props.isPresent() ? props.get() : UtilHelpers.readConfig(
FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()), FSUtils.getFs(cfg.propsFilePath, jssc.hadoopConfiguration()),
new Path(cfg.propsFilePath), cfg.configs).getConfig(); new Path(cfg.propsFilePath), cfg.configs).getConfig();
@@ -128,7 +128,7 @@ public class HoodieDeltaStreamer implements Serializable {
this.bootstrapExecutor = Option.ofNullable( this.bootstrapExecutor = Option.ofNullable(
cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null); cfg.runBootstrap ? new BootstrapExecutor(cfg, jssc, fs, conf, this.properties) : null);
this.deltaSyncService = Option.ofNullable( this.deltaSyncService = Option.ofNullable(
cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, this.properties)); cfg.runBootstrap ? null : new DeltaSyncService(cfg, jssc, fs, conf, Option.ofNullable(this.properties)));
} }
public void shutdownGracefully() { public void shutdownGracefully() {
@@ -503,7 +503,7 @@ public class HoodieDeltaStreamer implements Serializable {
/** /**
* Async Compactor Service. * Async Compactor Service.
*/ */
private AsyncCompactService asyncCompactService; private Option<AsyncCompactService> asyncCompactService;
/** /**
* Table Type. * Table Type.
@@ -516,10 +516,11 @@ public class HoodieDeltaStreamer implements Serializable {
private transient DeltaSync deltaSync; private transient DeltaSync deltaSync;
public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf, public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf,
TypedProperties properties) throws IOException { Option<TypedProperties> properties) throws IOException {
this.cfg = cfg; this.cfg = cfg;
this.jssc = jssc; this.jssc = jssc;
this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate(); this.sparkSession = SparkSession.builder().config(jssc.getConf()).getOrCreate();
this.asyncCompactService = Option.empty();
if (fs.exists(new Path(cfg.targetBasePath))) { if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = HoodieTableMetaClient meta =
@@ -547,7 +548,7 @@ public class HoodieDeltaStreamer implements Serializable {
ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT, ValidationUtils.checkArgument(!cfg.filterDupes || cfg.operation != Operation.UPSERT,
"'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed."); "'--filter-dupes' needs to be disabled when '--op' is 'UPSERT' to ensure updates are not missed.");
this.props = properties; this.props = properties.get();
LOG.info("Creating delta streamer with configs : " + props.toString()); LOG.info("Creating delta streamer with configs : " + props.toString());
this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor( this.schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc); UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc), props, jssc);
@@ -558,7 +559,7 @@ public class HoodieDeltaStreamer implements Serializable {
public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf) public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext jssc, FileSystem fs, Configuration conf)
throws IOException { throws IOException {
this(cfg, jssc, fs, conf, null); this(cfg, jssc, fs, conf, Option.empty());
} }
public DeltaSync getDeltaSync() { public DeltaSync getDeltaSync() {
@@ -579,12 +580,12 @@ public class HoodieDeltaStreamer implements Serializable {
while (!isShutdownRequested()) { while (!isShutdownRequested()) {
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Pair<Option<String>, JavaRDD<WriteStatus>> scheduledCompactionInstantAndRDD = deltaSync.syncOnce(); Option<Pair<Option<String>, JavaRDD<WriteStatus>>> scheduledCompactionInstantAndRDD = Option.ofNullable(deltaSync.syncOnce());
if (null != scheduledCompactionInstantAndRDD && scheduledCompactionInstantAndRDD.getLeft().isPresent()) { if (scheduledCompactionInstantAndRDD.isPresent() && scheduledCompactionInstantAndRDD.get().getLeft().isPresent()) {
LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.getLeft() + ")"); LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstantAndRDD.get().getLeft() + ")");
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED, asyncCompactService.get().enqueuePendingCompaction(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.getLeft().get())); HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get()));
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
} }
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start);
if (toSleepMs > 0) { if (toSleepMs > 0) {
@@ -610,9 +611,9 @@ public class HoodieDeltaStreamer implements Serializable {
*/ */
private void shutdownCompactor(boolean error) { private void shutdownCompactor(boolean error) {
LOG.info("Delta Sync shutdown. Error ?" + error); LOG.info("Delta Sync shutdown. Error ?" + error);
if (asyncCompactService != null) { if (asyncCompactService.isPresent()) {
LOG.warn("Gracefully shutting down compactor"); LOG.warn("Gracefully shutting down compactor");
asyncCompactService.shutdown(false); asyncCompactService.get().shutdown(false);
} }
} }
@@ -624,23 +625,23 @@ public class HoodieDeltaStreamer implements Serializable {
*/ */
protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) {
if (cfg.isAsyncCompactionEnabled()) { if (cfg.isAsyncCompactionEnabled()) {
if (null != asyncCompactService) { if (asyncCompactService.isPresent()) {
// Update the write client used by Async Compactor. // Update the write client used by Async Compactor.
asyncCompactService.updateWriteClient(writeClient); asyncCompactService.get().updateWriteClient(writeClient);
} else { } else {
asyncCompactService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient); asyncCompactService = Option.ofNullable(new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient));
// Enqueue existing pending compactions first // Enqueue existing pending compactions first
HoodieTableMetaClient meta = HoodieTableMetaClient meta =
new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true); new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta); List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta);
pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingCompaction(hoodieInstant));
asyncCompactService.start((error) -> { asyncCompactService.get().start((error) -> {
// Shutdown DeltaSync // Shutdown DeltaSync
shutdown(false); shutdown(false);
return true; return true;
}); });
try { try {
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions); asyncCompactService.get().waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new HoodieException(ie); throw new HoodieException(ie);
} }
@@ -666,14 +667,6 @@ public class HoodieDeltaStreamer implements Serializable {
return sparkSession; return sparkSession;
} }
public JavaSparkContext getJavaSparkContext() {
return jssc;
}
public AsyncCompactService getAsyncCompactService() {
return asyncCompactService;
}
public TypedProperties getProps() { public TypedProperties getProps() {
return props; return props;
} }

View File

@@ -22,6 +22,7 @@ import com.beust.jcommander.Parameter;
import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
@@ -351,7 +352,7 @@ public class HoodieMultiTableDeltaStreamer {
public void sync() { public void sync() {
for (TableExecutionContext context : tableExecutionContexts) { for (TableExecutionContext context : tableExecutionContexts) {
try { try {
new HoodieDeltaStreamer(context.getConfig(), jssc, context.getProperties()).sync(); new HoodieDeltaStreamer(context.getConfig(), jssc, Option.ofNullable(context.getProperties())).sync();
successTables.add(Helpers.getTableWithDatabase(context)); successTables.add(Helpers.getTableWithDatabase(context));
} catch (Exception e) { } catch (Exception e) {
logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e); logger.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);

View File

@@ -520,7 +520,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
// generate parquet files using kafka connect naming convention // generate parquet files using kafka connect naming convention
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100)), new Path(filePath)); Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100)), new Path(filePath));
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), props); HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, hdfsTestService.getHadoopConf(), Option.ofNullable(props));
assertEquals("kafka_topic1,0:200", deltaStreamer.getConfig().checkpoint); assertEquals("kafka_topic1,0:200", deltaStreamer.getConfig().checkpoint);
} }