1
0

[HUDI-1553] Configuration and metrics for the TimelineService. (#2495)

This commit is contained in:
Prashant Wason
2021-03-02 21:58:41 -08:00
committed by GitHub
parent 4fa43359cb
commit f11a6c7b2d
5 changed files with 147 additions and 15 deletions

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.timeline.service;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
@@ -29,7 +30,9 @@ import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
import org.apache.hudi.timeline.service.handlers.TimelineHandler;
@@ -47,6 +50,9 @@ import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
/**
@@ -62,13 +68,24 @@ public class RequestHandler {
private final TimelineHandler instantHandler;
private final FileSliceHandler sliceHandler;
private final BaseFileHandler dataFileHandler;
private Registry metricsRegistry = Registry.getRegistry("TimelineService");
private ScheduledExecutorService asyncResultService = Executors.newSingleThreadScheduledExecutor();
private final boolean useAsync;
public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager viewManager) throws IOException {
public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager viewManager, boolean useAsync) throws IOException {
this.viewManager = viewManager;
this.app = app;
this.instantHandler = new TimelineHandler(conf, viewManager);
this.sliceHandler = new FileSliceHandler(conf, viewManager);
this.dataFileHandler = new BaseFileHandler(conf, viewManager);
this.useAsync = useAsync;
if (useAsync) {
asyncResultService = Executors.newSingleThreadScheduledExecutor();
}
}
public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager viewManager) throws IOException {
this(app, conf, viewManager, false);
}
public void register() {
@@ -130,13 +147,44 @@ public class RequestHandler {
}
private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException {
if (useAsync) {
writeValueAsStringAsync(ctx, obj);
} else {
writeValueAsStringSync(ctx, obj);
}
}
private void writeValueAsStringSync(Context ctx, Object obj) throws JsonProcessingException {
HoodieTimer timer = new HoodieTimer().startTimer();
boolean prettyPrint = ctx.queryParam("pretty") != null;
long beginJsonTs = System.currentTimeMillis();
String result =
prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj);
long endJsonTs = System.currentTimeMillis();
LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs));
final long jsonifyTime = timer.endTimer();
ctx.result(result);
metricsRegistry.add("WRITE_VALUE_CNT", 1);
metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Jsonify TimeTaken=" + jsonifyTime);
}
}
private void writeValueAsStringAsync(Context ctx, Object obj) throws JsonProcessingException {
ctx.result(CompletableFuture.supplyAsync(() -> {
HoodieTimer timer = new HoodieTimer().startTimer();
boolean prettyPrint = ctx.queryParam("pretty") != null;
try {
String result = prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj);
final long jsonifyTime = timer.endTimer();
metricsRegistry.add("WRITE_VALUE_CNT", 1);
metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Jsonify TimeTaken=" + jsonifyTime);
}
return result;
} catch (JsonProcessingException e) {
throw new HoodieException("Failed to JSON encode the value", e);
}
}, asyncResultService));
}
/**
@@ -144,12 +192,14 @@ public class RequestHandler {
*/
private void registerTimelineAPI() {
app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT, new ViewHandler(ctx -> {
metricsRegistry.add("LAST_INSTANT", 1);
List<InstantDTO> dtos = instantHandler
.getLastInstant(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue());
writeValueAsString(ctx, dtos);
}, false));
app.get(RemoteHoodieTableFileSystemView.TIMELINE, new ViewHandler(ctx -> {
metricsRegistry.add("TIMELINE", 1);
TimelineDTO dto = instantHandler
.getTimeline(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue());
writeValueAsString(ctx, dto);
@@ -161,6 +211,7 @@ public class RequestHandler {
*/
private void registerDataFilesAPI() {
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILES_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_DATA_FILES", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -168,6 +219,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILE_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_DATA_FILE", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFile(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -176,12 +228,14 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_ALL_DATA_FILES", 1);
List<BaseFileDTO> dtos = dataFileHandler
.getLatestDataFiles(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_DATA_FILES_BEFORE_ON_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesBeforeOrOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -190,6 +244,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -199,6 +254,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_DATA_FILES", 1);
List<BaseFileDTO> dtos = dataFileHandler.getAllDataFiles(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -206,6 +262,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_RANGE_INSTANT_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_DATA_FILES_RANGE_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesInRange(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), Arrays
.asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(",")));
@@ -218,6 +275,7 @@ public class RequestHandler {
*/
private void registerFileSlicesAPI() {
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlices(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -225,6 +283,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_SLICE", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlice(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -233,6 +292,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_UNCOMPACTED_SLICES_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_UNCOMPACTED_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestUnCompactedFileSlices(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -240,6 +300,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_SLICES_URL, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getAllFileSlices(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -247,6 +308,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_RANGE_INSTANT_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_SLICE_RANGE_INSTANT", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSliceInRange(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), Arrays
.asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(",")));
@@ -254,6 +316,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_SLICES_MERGED_BEFORE_ON_INSTANT", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestMergedFileSlicesBeforeOrOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -262,6 +325,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_SLICES_BEFORE_ON_INSTANT", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesBeforeOrOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -273,12 +337,14 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new ViewHandler(ctx -> {
metricsRegistry.add("PEDING_COMPACTION_OPS", 1);
List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_URL, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION", 1);
List<FileGroupDTO> dtos = sliceHandler.getAllFileGroups(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -286,12 +352,14 @@ public class RequestHandler {
}, true));
app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new ViewHandler(ctx -> {
metricsRegistry.add("REFRESH_TABLE", 1);
boolean success = sliceHandler
.refreshTable(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, success);
}, false));
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON", 1);
List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBeforeOrOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
@@ -300,6 +368,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE", 1);
List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
@@ -308,6 +377,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_REPLACED_FILEGROUPS_PARTITION", 1);
List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -315,6 +385,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new ViewHandler(ctx -> {
metricsRegistry.add("PENDING_CLUSTERING_FILEGROUPS", 1);
List<ClusteringOpDTO> dtos = sliceHandler.getFileGroupsInPendingClustering(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, dtos);
@@ -380,6 +451,12 @@ public class RequestHandler {
} finally {
long endTs = System.currentTimeMillis();
long timeTakenMillis = endTs - beginTs;
metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis);
metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken);
metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken);
metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
metricsRegistry.add("TOTAL_API_CALLS", 1);
LOG.info(String.format(
"TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
+ "Success=%s, Query=%s, Host=%s, synced=%s",

View File

@@ -29,10 +29,14 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import io.javalin.Javalin;
import io.javalin.core.util.JettyServerUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import java.io.IOException;
import java.io.Serializable;
@@ -44,32 +48,40 @@ public class TimelineService {
private static final Logger LOG = LogManager.getLogger(TimelineService.class);
private static final int START_SERVICE_MAX_RETRIES = 16;
private static final int DEFAULT_NUM_THREADS = -1;
private int serverPort;
private Configuration conf;
private transient FileSystem fs;
private transient Javalin app = null;
private transient FileSystemViewManager fsViewsManager;
private final int numThreads;
private final boolean shouldCompressOutput;
private final boolean useAsync;
public int getServerPort() {
return serverPort;
}
public TimelineService(int serverPort, FileSystemViewManager globalFileSystemViewManager, Configuration conf)
throws IOException {
public TimelineService(int serverPort, FileSystemViewManager globalFileSystemViewManager, Configuration conf,
int numThreads, boolean compressOutput, boolean useAsync) throws IOException {
this.conf = FSUtils.prepareHadoopConf(conf);
this.fs = FileSystem.get(conf);
this.serverPort = serverPort;
this.fsViewsManager = globalFileSystemViewManager;
this.numThreads = numThreads;
this.shouldCompressOutput = compressOutput;
this.useAsync = useAsync;
}
public TimelineService(int serverPort, FileSystemViewManager globalFileSystemViewManager) throws IOException {
this(serverPort, globalFileSystemViewManager, new Configuration());
this(serverPort, globalFileSystemViewManager, new Configuration(), DEFAULT_NUM_THREADS, true, false);
}
public TimelineService(Config config) throws IOException {
this(config.serverPort, buildFileSystemViewManager(config,
new SerializableConfiguration(FSUtils.prepareHadoopConf(new Configuration()))));
new SerializableConfiguration(FSUtils.prepareHadoopConf(new Configuration()))), new Configuration(),
config.numThreads, config.compress, config.async);
}
public static class Config implements Serializable {
@@ -97,6 +109,15 @@ public class TimelineService {
@Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root directory for RocksDB")
public String rocksDBPath = FileSystemViewStorageConfig.DEFAULT_ROCKSDB_BASE_PATH;
@Parameter(names = {"--threads", "-t"}, description = "Number of threads to use for serving requests")
public int numThreads = DEFAULT_NUM_THREADS;
@Parameter(names = {"--async"}, description = "Use asyncronous request processing")
public boolean async = false;
@Parameter(names = {"--compress"}, description = "Compress output using gzip")
public boolean compress = true;
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
}
@@ -129,8 +150,15 @@ public class TimelineService {
}
public int startService() throws IOException {
app = Javalin.create();
RequestHandler requestHandler = new RequestHandler(app, conf, fsViewsManager);
final Server server = numThreads == DEFAULT_NUM_THREADS ? JettyServerUtil.defaultServer()
: new Server(new QueuedThreadPool(numThreads));
app = Javalin.create().server(() -> server);
if (!shouldCompressOutput) {
app.disableDynamicGzip();
}
RequestHandler requestHandler = new RequestHandler(app, conf, fsViewsManager, useAsync);
app.get("/", ctx -> ctx.result("Hello World"));
requestHandler.register();
int realServerPort = startServiceOnPort(serverPort);