[HUDI-1203] add port configuration for EmbeddedTimelineService (#2142)
This commit is contained in:
@@ -103,7 +103,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
|
|||||||
LOG.info("Starting Timeline service !!");
|
LOG.info("Starting Timeline service !!");
|
||||||
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
|
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
|
||||||
timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null),
|
timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null),
|
||||||
config.getClientSpecifiedViewStorageConfig()));
|
config.getEmbeddedTimelineServerPort(), config.getClientSpecifiedViewStorageConfig()));
|
||||||
try {
|
try {
|
||||||
timelineServer.get().startServer();
|
timelineServer.get().startServer();
|
||||||
// Allow executor to find this newly instantiated timeline service
|
// Allow executor to find this newly instantiated timeline service
|
||||||
|
|||||||
@@ -39,17 +39,19 @@ public class EmbeddedTimelineService {
|
|||||||
private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class);
|
private static final Logger LOG = LogManager.getLogger(EmbeddedTimelineService.class);
|
||||||
|
|
||||||
private int serverPort;
|
private int serverPort;
|
||||||
|
private int preferredPort;
|
||||||
private String hostAddr;
|
private String hostAddr;
|
||||||
private final SerializableConfiguration hadoopConf;
|
private final SerializableConfiguration hadoopConf;
|
||||||
private final FileSystemViewStorageConfig config;
|
private final FileSystemViewStorageConfig config;
|
||||||
private transient FileSystemViewManager viewManager;
|
private transient FileSystemViewManager viewManager;
|
||||||
private transient TimelineService server;
|
private transient TimelineService server;
|
||||||
|
|
||||||
public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, FileSystemViewStorageConfig config) {
|
public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort, FileSystemViewStorageConfig config) {
|
||||||
setHostAddr(embeddedTimelineServiceHostAddr);
|
setHostAddr(embeddedTimelineServiceHostAddr);
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.hadoopConf = context.getHadoopConf();
|
this.hadoopConf = context.getHadoopConf();
|
||||||
this.viewManager = createViewManager();
|
this.viewManager = createViewManager();
|
||||||
|
this.preferredPort = embeddedTimelineServerPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileSystemViewManager createViewManager() {
|
private FileSystemViewManager createViewManager() {
|
||||||
@@ -66,7 +68,7 @@ public class EmbeddedTimelineService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void startServer() throws IOException {
|
public void startServer() throws IOException {
|
||||||
server = new TimelineService(0, viewManager, hadoopConf.newCopy());
|
server = new TimelineService(preferredPort, viewManager, hadoopConf.newCopy());
|
||||||
serverPort = server.startService();
|
serverPort = server.startService();
|
||||||
LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
|
LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -99,6 +99,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
|
|
||||||
public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
|
public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
|
||||||
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
|
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
|
||||||
|
public static final String EMBEDDED_TIMELINE_SERVER_PORT = "hoodie.embed.timeline.server.port";
|
||||||
|
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT = "0";
|
||||||
|
|
||||||
public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
|
public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
|
||||||
public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
|
public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
|
||||||
@@ -255,6 +257,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
|
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getEmbeddedTimelineServerPort() {
|
||||||
|
return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_PORT, DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT));
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isFailOnTimelineArchivingEnabled() {
|
public boolean isFailOnTimelineArchivingEnabled() {
|
||||||
return Boolean.parseBoolean(props.getProperty(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP));
|
return Boolean.parseBoolean(props.getProperty(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP));
|
||||||
}
|
}
|
||||||
@@ -953,6 +959,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withEmbeddedTimelineServerPort(int port) {
|
||||||
|
props.setProperty(EMBEDDED_TIMELINE_SERVER_PORT, String.valueOf(port));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withBulkInsertSortMode(String mode) {
|
public Builder withBulkInsertSortMode(String mode) {
|
||||||
props.setProperty(BULKINSERT_SORT_MODE, mode);
|
props.setProperty(BULKINSERT_SORT_MODE, mode);
|
||||||
return this;
|
return this;
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ import java.io.Serializable;
|
|||||||
public class TimelineService {
|
public class TimelineService {
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(TimelineService.class);
|
private static final Logger LOG = LogManager.getLogger(TimelineService.class);
|
||||||
|
private static final int START_SERVICE_MAX_RETRIES = 16;
|
||||||
|
|
||||||
private int serverPort;
|
private int serverPort;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
@@ -98,16 +99,42 @@ public class TimelineService {
|
|||||||
public Boolean help = false;
|
public Boolean help = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int startServiceOnPort(int port) throws IOException {
|
||||||
|
if (!(port == 0 || (1024 <= port && port < 65536))) {
|
||||||
|
throw new IllegalArgumentException(String.format("startPort should be between 1024 and 65535 (inclusive), "
|
||||||
|
+ "or 0 for a random free port. but now is %s.", port));
|
||||||
|
}
|
||||||
|
for (int attempt = 0; attempt < START_SERVICE_MAX_RETRIES; attempt++) {
|
||||||
|
// Returns port to try when trying to bind a service. Handles wrapping and skipping privileged ports.
|
||||||
|
int tryPort = port == 0 ? port : (port + attempt - 1024) % (65536 - 1024) + 1024;
|
||||||
|
try {
|
||||||
|
app.start(tryPort);
|
||||||
|
return app.port();
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (e.getMessage() != null && e.getMessage().contains("Failed to bind to")) {
|
||||||
|
if (tryPort == 0) {
|
||||||
|
LOG.warn("Timeline server could not bind on a random free port.");
|
||||||
|
} else {
|
||||||
|
LOG.warn(String.format("Timeline server could not bind on port %d. "
|
||||||
|
+ "Attempting port %d + 1.",tryPort, tryPort));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn(String.format("Timeline server start failed on port %d. Attempting port %d + 1.",tryPort, tryPort), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IOException(String.format("Timeline server start failed on port %d, after retry %d times", port, START_SERVICE_MAX_RETRIES));
|
||||||
|
}
|
||||||
|
|
||||||
public int startService() throws IOException {
|
public int startService() throws IOException {
|
||||||
app = Javalin.create();
|
app = Javalin.create();
|
||||||
FileSystemViewHandler router = new FileSystemViewHandler(app, conf, fsViewsManager);
|
FileSystemViewHandler router = new FileSystemViewHandler(app, conf, fsViewsManager);
|
||||||
app.get("/", ctx -> ctx.result("Hello World"));
|
app.get("/", ctx -> ctx.result("Hello World"));
|
||||||
router.register();
|
router.register();
|
||||||
app.start(serverPort);
|
int realServerPort = startServiceOnPort(serverPort);
|
||||||
// If port = 0, a dynamic port is assigned. Store it.
|
LOG.info("Starting Timeline server on port :" + realServerPort);
|
||||||
serverPort = app.port();
|
this.serverPort = realServerPort;
|
||||||
LOG.info("Starting Timeline server on port :" + serverPort);
|
return realServerPort;
|
||||||
return serverPort;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() throws IOException {
|
public void run() throws IOException {
|
||||||
|
|||||||
Reference in New Issue
Block a user