[HUDI-1865] Make embedded time line service singleton (#2899)
This commit is contained in:
@@ -35,6 +35,8 @@ public class EmbeddedTimelineServerHelper {
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class);
|
private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class);
|
||||||
|
|
||||||
|
private static Option<EmbeddedTimelineService> TIMELINE_SERVER = Option.empty();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate Embedded Timeline Server.
|
* Instantiate Embedded Timeline Server.
|
||||||
* @param context Hoodie Engine Context
|
* @param context Hoodie Engine Context
|
||||||
@@ -42,21 +44,34 @@ public class EmbeddedTimelineServerHelper {
|
|||||||
* @return TimelineServer if configured to run
|
* @return TimelineServer if configured to run
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static Option<EmbeddedTimelineService> createEmbeddedTimelineService(
|
public static synchronized Option<EmbeddedTimelineService> createEmbeddedTimelineService(
|
||||||
HoodieEngineContext context, HoodieWriteConfig config) throws IOException {
|
HoodieEngineContext context, HoodieWriteConfig config) throws IOException {
|
||||||
Option<EmbeddedTimelineService> timelineServer = Option.empty();
|
if (config.isEmbeddedTimelineServerReuseEnabled()) {
|
||||||
if (config.isEmbeddedTimelineServerEnabled()) {
|
if (!TIMELINE_SERVER.isPresent() || !TIMELINE_SERVER.get().canReuseFor(config.getBasePath())) {
|
||||||
// Run Embedded Timeline Server
|
TIMELINE_SERVER = Option.of(startTimelineService(context, config));
|
||||||
LOG.info("Starting Timeline service !!");
|
}
|
||||||
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
|
return TIMELINE_SERVER;
|
||||||
timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null), config.getEmbeddedTimelineServerPort(),
|
|
||||||
config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath(),
|
|
||||||
config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(),
|
|
||||||
config.getEmbeddedTimelineServerUseAsync()));
|
|
||||||
timelineServer.get().startServer();
|
|
||||||
updateWriteConfigWithTimelineServer(timelineServer.get(), config);
|
|
||||||
}
|
}
|
||||||
return timelineServer;
|
if (config.isEmbeddedTimelineServerEnabled()) {
|
||||||
|
return Option.of(startTimelineService(context, config));
|
||||||
|
} else {
|
||||||
|
return Option.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static EmbeddedTimelineService startTimelineService(
|
||||||
|
HoodieEngineContext context, HoodieWriteConfig config) throws IOException {
|
||||||
|
// Run Embedded Timeline Server
|
||||||
|
LOG.info("Starting Timeline service !!");
|
||||||
|
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
|
||||||
|
EmbeddedTimelineService timelineService = new EmbeddedTimelineService(
|
||||||
|
context, hostAddr.orElse(null),config.getEmbeddedTimelineServerPort(),
|
||||||
|
config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath(),
|
||||||
|
config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(),
|
||||||
|
config.getEmbeddedTimelineServerUseAsync());
|
||||||
|
timelineService.startServer();
|
||||||
|
updateWriteConfigWithTimelineServer(timelineService, config);
|
||||||
|
return timelineService;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -113,6 +113,12 @@ public class EmbeddedTimelineService {
|
|||||||
return viewManager;
|
return viewManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean canReuseFor(String basePath) {
|
||||||
|
return this.server != null
|
||||||
|
&& this.viewManager != null
|
||||||
|
&& this.basePath.equals(basePath);
|
||||||
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
if (null != server) {
|
if (null != server) {
|
||||||
LOG.info("Closing Timeline server");
|
LOG.info("Closing Timeline server");
|
||||||
|
|||||||
@@ -115,6 +115,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
|
|
||||||
public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
|
public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
|
||||||
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
|
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
|
||||||
|
public static final String EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = "hoodie.embed.timeline.server.reuse.enabled";
|
||||||
|
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = "false";
|
||||||
public static final String EMBEDDED_TIMELINE_SERVER_PORT = "hoodie.embed.timeline.server.port";
|
public static final String EMBEDDED_TIMELINE_SERVER_PORT = "hoodie.embed.timeline.server.port";
|
||||||
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT = "0";
|
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT = "0";
|
||||||
public static final String EMBEDDED_TIMELINE_SERVER_THREADS = "hoodie.embed.timeline.server.threads";
|
public static final String EMBEDDED_TIMELINE_SERVER_THREADS = "hoodie.embed.timeline.server.threads";
|
||||||
@@ -332,6 +334,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
|
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isEmbeddedTimelineServerReuseEnabled() {
|
||||||
|
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED));
|
||||||
|
}
|
||||||
|
|
||||||
public int getEmbeddedTimelineServerPort() {
|
public int getEmbeddedTimelineServerPort() {
|
||||||
return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_PORT, DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT));
|
return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_PORT, DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT));
|
||||||
}
|
}
|
||||||
@@ -1271,6 +1277,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withEmbeddedTimelineServerReuseEnabled(boolean enabled) {
|
||||||
|
props.setProperty(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED, String.valueOf(enabled));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withEmbeddedTimelineServerPort(int port) {
|
public Builder withEmbeddedTimelineServerPort(int port) {
|
||||||
props.setProperty(EMBEDDED_TIMELINE_SERVER_PORT, String.valueOf(port));
|
props.setProperty(EMBEDDED_TIMELINE_SERVER_PORT, String.valueOf(port));
|
||||||
return this;
|
return this;
|
||||||
@@ -1362,6 +1373,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
DEFAULT_MARKERS_DELETE_PARALLELISM);
|
DEFAULT_MARKERS_DELETE_PARALLELISM);
|
||||||
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
|
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
|
||||||
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
|
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED),
|
||||||
|
EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);
|
||||||
setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
|
setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
|
||||||
INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS));
|
INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS));
|
||||||
setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
|
setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
|
||||||
|
|||||||
@@ -219,6 +219,7 @@ public class StreamerUtil {
|
|||||||
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
|
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
|
||||||
.logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
|
.logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
|
||||||
.build())
|
.build())
|
||||||
|
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
|
||||||
.withAutoCommit(false)
|
.withAutoCommit(false)
|
||||||
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
|
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user