From 7fed7352bd506e20e5316bb0b3ed9e5c1e9c76df Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 27 May 2021 13:38:33 +0800 Subject: [PATCH] [HUDI-1865] Make embedded time line service singleton (#2899) --- .../EmbeddedTimelineServerHelper.java | 41 +++++++++++++------ .../embedded/EmbeddedTimelineService.java | 6 +++ .../apache/hudi/config/HoodieWriteConfig.java | 13 ++++++ .../org/apache/hudi/util/StreamerUtil.java | 1 + 4 files changed, 48 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java index e5a719eb3..558b5ff62 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java @@ -35,6 +35,8 @@ public class EmbeddedTimelineServerHelper { private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class); + private static Option TIMELINE_SERVER = Option.empty(); + /** * Instantiate Embedded Timeline Server. * @param context Hoodie Engine Context @@ -42,21 +44,34 @@ public class EmbeddedTimelineServerHelper { * @return TimelineServer if configured to run * @throws IOException */ - public static Option createEmbeddedTimelineService( + public static synchronized Option createEmbeddedTimelineService( HoodieEngineContext context, HoodieWriteConfig config) throws IOException { - Option timelineServer = Option.empty(); - if (config.isEmbeddedTimelineServerEnabled()) { - // Run Embedded Timeline Server - LOG.info("Starting Timeline service !!"); - Option hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); - timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null), config.getEmbeddedTimelineServerPort(), - config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath(), - config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(), - config.getEmbeddedTimelineServerUseAsync())); - timelineServer.get().startServer(); - updateWriteConfigWithTimelineServer(timelineServer.get(), config); + if (config.isEmbeddedTimelineServerReuseEnabled()) { + if (!TIMELINE_SERVER.isPresent() || !TIMELINE_SERVER.get().canReuseFor(config.getBasePath())) { + TIMELINE_SERVER = Option.of(startTimelineService(context, config)); + } + return TIMELINE_SERVER; } - return timelineServer; + if (config.isEmbeddedTimelineServerEnabled()) { + return Option.of(startTimelineService(context, config)); + } else { + return Option.empty(); + } + } + + private static EmbeddedTimelineService startTimelineService( + HoodieEngineContext context, HoodieWriteConfig config) throws IOException { + // Run Embedded Timeline Server + LOG.info("Starting Timeline service !!"); + Option hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); + EmbeddedTimelineService timelineService = new EmbeddedTimelineService( + context, hostAddr.orElse(null),config.getEmbeddedTimelineServerPort(), + config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath(), + config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(), + config.getEmbeddedTimelineServerUseAsync()); + timelineService.startServer(); + updateWriteConfigWithTimelineServer(timelineService, config); + return timelineService; } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index a2bc7116d..a6d27cedf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -113,6 +113,12 @@ public class EmbeddedTimelineService { return viewManager; } + public boolean canReuseFor(String basePath) { + return this.server != null + && this.viewManager != null + && this.basePath.equals(basePath); + } + public void stop() { if (null != server) { LOG.info("Closing Timeline server"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index cf461fa08..a2784b596 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -115,6 +115,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; + public static final String EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = "hoodie.embed.timeline.server.reuse.enabled"; + public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = "false"; public static final String EMBEDDED_TIMELINE_SERVER_PORT = "hoodie.embed.timeline.server.port"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT = "0"; public static final String EMBEDDED_TIMELINE_SERVER_THREADS = "hoodie.embed.timeline.server.threads"; @@ -332,6 +334,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED)); } + public boolean isEmbeddedTimelineServerReuseEnabled() { + return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED)); + } + public int getEmbeddedTimelineServerPort() { return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_PORT, DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT)); } @@ -1271,6 +1277,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withEmbeddedTimelineServerReuseEnabled(boolean enabled) { + props.setProperty(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED, String.valueOf(enabled)); + return this; + } + public Builder withEmbeddedTimelineServerPort(int port) { props.setProperty(EMBEDDED_TIMELINE_SERVER_PORT, String.valueOf(port)); return this; @@ -1362,6 +1373,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { DEFAULT_MARKERS_DELETE_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED), EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED); + setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED), + EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED); setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP), INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS)); setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP), diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 0a1b0a5bf..6f69101d3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -219,6 +219,7 @@ public class StreamerUtil { .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) .logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) .build()) + .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));