[HUDI-1138] Add timeline-server-based marker file strategy for improving marker-related latency (#3233)
- Can be enabled for cloud stores like S3. Not supported for hdfs yet, due to partial write failures.
This commit is contained in:
@@ -18,7 +18,9 @@
|
||||
|
||||
package org.apache.hudi.timeline.service;
|
||||
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.metrics.Registry;
|
||||
import org.apache.hudi.common.table.marker.MarkerOperation;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
|
||||
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
|
||||
@@ -35,6 +37,7 @@ import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
|
||||
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
|
||||
import org.apache.hudi.timeline.service.handlers.MarkerHandler;
|
||||
import org.apache.hudi.timeline.service.handlers.TimelineHandler;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
@@ -43,6 +46,7 @@ import io.javalin.Context;
|
||||
import io.javalin.Handler;
|
||||
import io.javalin.Javalin;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
@@ -50,6 +54,7 @@ import org.jetbrains.annotations.NotNull;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
@@ -63,35 +68,49 @@ public class RequestHandler {
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final Logger LOG = LogManager.getLogger(RequestHandler.class);
|
||||
|
||||
private final TimelineService.Config timelineServiceConfig;
|
||||
private final FileSystemViewManager viewManager;
|
||||
private final Javalin app;
|
||||
private final TimelineHandler instantHandler;
|
||||
private final FileSliceHandler sliceHandler;
|
||||
private final BaseFileHandler dataFileHandler;
|
||||
private final MarkerHandler markerHandler;
|
||||
private Registry metricsRegistry = Registry.getRegistry("TimelineService");
|
||||
private ScheduledExecutorService asyncResultService = Executors.newSingleThreadScheduledExecutor();
|
||||
private final boolean useAsync;
|
||||
|
||||
public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager viewManager, boolean useAsync) throws IOException {
|
||||
public RequestHandler(Javalin app, Configuration conf, TimelineService.Config timelineServiceConfig,
|
||||
HoodieEngineContext hoodieEngineContext, FileSystem fileSystem,
|
||||
FileSystemViewManager viewManager) throws IOException {
|
||||
this.timelineServiceConfig = timelineServiceConfig;
|
||||
this.viewManager = viewManager;
|
||||
this.app = app;
|
||||
this.instantHandler = new TimelineHandler(conf, viewManager);
|
||||
this.sliceHandler = new FileSliceHandler(conf, viewManager);
|
||||
this.dataFileHandler = new BaseFileHandler(conf, viewManager);
|
||||
this.useAsync = useAsync;
|
||||
if (useAsync) {
|
||||
this.instantHandler = new TimelineHandler(conf, timelineServiceConfig, fileSystem, viewManager);
|
||||
this.sliceHandler = new FileSliceHandler(conf, timelineServiceConfig, fileSystem, viewManager);
|
||||
this.dataFileHandler = new BaseFileHandler(conf, timelineServiceConfig, fileSystem, viewManager);
|
||||
if (timelineServiceConfig.enableMarkerRequests) {
|
||||
this.markerHandler = new MarkerHandler(
|
||||
conf, timelineServiceConfig, hoodieEngineContext, fileSystem, viewManager, metricsRegistry);
|
||||
} else {
|
||||
this.markerHandler = null;
|
||||
}
|
||||
if (timelineServiceConfig.async) {
|
||||
asyncResultService = Executors.newSingleThreadScheduledExecutor();
|
||||
}
|
||||
}
|
||||
|
||||
public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager viewManager) throws IOException {
|
||||
this(app, conf, viewManager, false);
|
||||
}
|
||||
|
||||
public void register() {
|
||||
registerDataFilesAPI();
|
||||
registerFileSlicesAPI();
|
||||
registerTimelineAPI();
|
||||
if (markerHandler != null) {
|
||||
registerMarkerAPI();
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (markerHandler != null) {
|
||||
markerHandler.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -147,40 +166,50 @@ public class RequestHandler {
|
||||
}
|
||||
|
||||
private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException {
|
||||
if (useAsync) {
|
||||
if (timelineServiceConfig.async) {
|
||||
writeValueAsStringAsync(ctx, obj);
|
||||
} else {
|
||||
writeValueAsStringSync(ctx, obj);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeValueAsStringSync(Context ctx, Object obj) throws JsonProcessingException {
|
||||
/**
|
||||
* Serializes the result into JSON String.
|
||||
*
|
||||
* @param ctx Javalin context
|
||||
* @param obj object to serialize
|
||||
* @param metricsRegistry {@code Registry} instance for storing metrics
|
||||
* @param objectMapper JSON object mapper
|
||||
* @param logger {@code Logger} instance
|
||||
* @return JSON String from the input object
|
||||
* @throws JsonProcessingException
|
||||
*/
|
||||
public static String jsonifyResult(
|
||||
Context ctx, Object obj, Registry metricsRegistry, ObjectMapper objectMapper, Logger logger)
|
||||
throws JsonProcessingException {
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
boolean prettyPrint = ctx.queryParam("pretty") != null;
|
||||
String result =
|
||||
prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj);
|
||||
prettyPrint ? objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj)
|
||||
: objectMapper.writeValueAsString(obj);
|
||||
final long jsonifyTime = timer.endTimer();
|
||||
ctx.result(result);
|
||||
metricsRegistry.add("WRITE_VALUE_CNT", 1);
|
||||
metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Jsonify TimeTaken=" + jsonifyTime);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Jsonify TimeTaken=" + jsonifyTime);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void writeValueAsStringAsync(Context ctx, Object obj) throws JsonProcessingException {
|
||||
private void writeValueAsStringSync(Context ctx, Object obj) throws JsonProcessingException {
|
||||
String result = jsonifyResult(ctx, obj, metricsRegistry, OBJECT_MAPPER, LOG);
|
||||
ctx.result(result);
|
||||
}
|
||||
|
||||
private void writeValueAsStringAsync(Context ctx, Object obj) {
|
||||
ctx.result(CompletableFuture.supplyAsync(() -> {
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
boolean prettyPrint = ctx.queryParam("pretty") != null;
|
||||
try {
|
||||
String result = prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj);
|
||||
final long jsonifyTime = timer.endTimer();
|
||||
metricsRegistry.add("WRITE_VALUE_CNT", 1);
|
||||
metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Jsonify TimeTaken=" + jsonifyTime);
|
||||
}
|
||||
return result;
|
||||
return jsonifyResult(ctx, obj, metricsRegistry, OBJECT_MAPPER, LOG);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new HoodieException("Failed to JSON encode the value", e);
|
||||
}
|
||||
@@ -392,6 +421,44 @@ public class RequestHandler {
|
||||
}, true));
|
||||
}
|
||||
|
||||
private void registerMarkerAPI() {
|
||||
app.get(MarkerOperation.ALL_MARKERS_URL, new ViewHandler(ctx -> {
|
||||
metricsRegistry.add("ALL_MARKERS", 1);
|
||||
Set<String> markers = markerHandler.getAllMarkers(
|
||||
ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, ""));
|
||||
writeValueAsString(ctx, markers);
|
||||
}, false));
|
||||
|
||||
app.get(MarkerOperation.CREATE_AND_MERGE_MARKERS_URL, new ViewHandler(ctx -> {
|
||||
metricsRegistry.add("CREATE_AND_MERGE_MARKERS", 1);
|
||||
Set<String> markers = markerHandler.getCreateAndMergeMarkers(
|
||||
ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, ""));
|
||||
writeValueAsString(ctx, markers);
|
||||
}, false));
|
||||
|
||||
app.get(MarkerOperation.MARKERS_DIR_EXISTS_URL, new ViewHandler(ctx -> {
|
||||
metricsRegistry.add("MARKERS_DIR_EXISTS", 1);
|
||||
boolean exist = markerHandler.doesMarkerDirExist(
|
||||
ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, ""));
|
||||
writeValueAsString(ctx, exist);
|
||||
}, false));
|
||||
|
||||
app.post(MarkerOperation.CREATE_MARKER_URL, new ViewHandler(ctx -> {
|
||||
metricsRegistry.add("CREATE_MARKER", 1);
|
||||
ctx.result(markerHandler.createMarker(
|
||||
ctx,
|
||||
ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, ""),
|
||||
ctx.queryParam(MarkerOperation.MARKER_NAME_PARAM, "")));
|
||||
}, false));
|
||||
|
||||
app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> {
|
||||
metricsRegistry.add("DELETE_MARKER_DIR", 1);
|
||||
boolean success = markerHandler.deleteMarkers(
|
||||
ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, ""));
|
||||
writeValueAsString(ctx, success);
|
||||
}, false));
|
||||
}
|
||||
|
||||
private static boolean isRefreshCheckDisabledInQuery(Context ctxt) {
|
||||
return Boolean.parseBoolean(ctxt.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF));
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ package org.apache.hudi.timeline.service;
|
||||
import org.apache.hudi.common.config.HoodieCommonConfig;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
@@ -31,7 +32,6 @@ import com.beust.jcommander.JCommander;
|
||||
import com.beust.jcommander.Parameter;
|
||||
import io.javalin.Javalin;
|
||||
import io.javalin.core.util.JettyServerUtil;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -52,39 +52,31 @@ public class TimelineService {
|
||||
private static final int DEFAULT_NUM_THREADS = -1;
|
||||
|
||||
private int serverPort;
|
||||
private Config timelineServerConf;
|
||||
private Configuration conf;
|
||||
private transient HoodieEngineContext context;
|
||||
private transient FileSystem fs;
|
||||
private transient Javalin app = null;
|
||||
private transient FileSystemViewManager fsViewsManager;
|
||||
private final int numThreads;
|
||||
private final boolean shouldCompressOutput;
|
||||
private final boolean useAsync;
|
||||
private transient RequestHandler requestHandler;
|
||||
|
||||
public int getServerPort() {
|
||||
return serverPort;
|
||||
}
|
||||
|
||||
public TimelineService(int serverPort, FileSystemViewManager globalFileSystemViewManager, Configuration conf,
|
||||
int numThreads, boolean compressOutput, boolean useAsync) throws IOException {
|
||||
this.conf = FSUtils.prepareHadoopConf(conf);
|
||||
this.fs = FileSystem.get(conf);
|
||||
this.serverPort = serverPort;
|
||||
public TimelineService(HoodieEngineContext context, Configuration hadoopConf, Config timelineServerConf,
|
||||
FileSystem fileSystem, FileSystemViewManager globalFileSystemViewManager) throws IOException {
|
||||
this.conf = FSUtils.prepareHadoopConf(hadoopConf);
|
||||
this.timelineServerConf = timelineServerConf;
|
||||
this.serverPort = timelineServerConf.serverPort;
|
||||
this.context = context;
|
||||
this.fs = fileSystem;
|
||||
this.fsViewsManager = globalFileSystemViewManager;
|
||||
this.numThreads = numThreads;
|
||||
this.shouldCompressOutput = compressOutput;
|
||||
this.useAsync = useAsync;
|
||||
}
|
||||
|
||||
public TimelineService(int serverPort, FileSystemViewManager globalFileSystemViewManager) throws IOException {
|
||||
this(serverPort, globalFileSystemViewManager, new Configuration(), DEFAULT_NUM_THREADS, true, false);
|
||||
}
|
||||
|
||||
public TimelineService(Config config) throws IOException {
|
||||
this(config.serverPort, buildFileSystemViewManager(config,
|
||||
new SerializableConfiguration(FSUtils.prepareHadoopConf(new Configuration()))), new Configuration(),
|
||||
config.numThreads, config.compress, config.async);
|
||||
}
|
||||
|
||||
/**
|
||||
* Config for {@code TimelineService} class.
|
||||
*/
|
||||
public static class Config implements Serializable {
|
||||
|
||||
@Parameter(names = {"--server-port", "-p"}, description = " Server Port")
|
||||
@@ -119,8 +111,128 @@ public class TimelineService {
|
||||
@Parameter(names = {"--compress"}, description = "Compress output using gzip")
|
||||
public boolean compress = true;
|
||||
|
||||
@Parameter(names = {"--enable-marker-requests", "-em"}, description = "Enable handling of marker-related requests")
|
||||
public boolean enableMarkerRequests = false;
|
||||
|
||||
@Parameter(names = {"--marker-batch-threads", "-mbt"}, description = "Number of threads to use for batch processing marker creation requests")
|
||||
public int markerBatchNumThreads = 20;
|
||||
|
||||
@Parameter(names = {"--marker-batch-interval-ms", "-mbi"}, description = "The interval in milliseconds between two batch processing of marker creation requests")
|
||||
public long markerBatchIntervalMs = 50;
|
||||
|
||||
@Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files")
|
||||
public int markerParallelism = 100;
|
||||
|
||||
@Parameter(names = {"--help", "-h"})
|
||||
public Boolean help = false;
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder of Config class.
|
||||
*/
|
||||
public static class Builder {
|
||||
private Integer serverPort = 26754;
|
||||
private FileSystemViewStorageType viewStorageType = FileSystemViewStorageType.SPILLABLE_DISK;
|
||||
private Integer maxViewMemPerTableInMB = 2048;
|
||||
private Double memFractionForCompactionPerTable = 0.001;
|
||||
private String baseStorePathForFileGroups = FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue();
|
||||
private String rocksDBPath = FileSystemViewStorageConfig.ROCKSDB_BASE_PATH_PROP.defaultValue();
|
||||
private int numThreads = DEFAULT_NUM_THREADS;
|
||||
private boolean async = false;
|
||||
private boolean compress = true;
|
||||
private boolean enableMarkerRequests = false;
|
||||
private int markerBatchNumThreads = 20;
|
||||
private long markerBatchIntervalMs = 50L;
|
||||
private int markerParallelism = 100;
|
||||
|
||||
public Builder() {}
|
||||
|
||||
public Builder serverPort(int serverPort) {
|
||||
this.serverPort = serverPort;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder viewStorageType(FileSystemViewStorageType viewStorageType) {
|
||||
this.viewStorageType = viewStorageType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder maxViewMemPerTableInMB(int maxViewMemPerTableInMB) {
|
||||
this.maxViewMemPerTableInMB = maxViewMemPerTableInMB;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder memFractionForCompactionPerTable(double memFractionForCompactionPerTable) {
|
||||
this.memFractionForCompactionPerTable = memFractionForCompactionPerTable;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder baseStorePathForFileGroups(String baseStorePathForFileGroups) {
|
||||
this.baseStorePathForFileGroups = baseStorePathForFileGroups;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder rocksDBPath(String rocksDBPath) {
|
||||
this.rocksDBPath = rocksDBPath;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder numThreads(int numThreads) {
|
||||
this.numThreads = numThreads;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder async(boolean async) {
|
||||
this.async = async;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder compress(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder enableMarkerRequests(boolean enableMarkerRequests) {
|
||||
this.enableMarkerRequests = enableMarkerRequests;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder markerBatchNumThreads(int markerBatchNumThreads) {
|
||||
this.markerBatchNumThreads = markerBatchNumThreads;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder markerBatchIntervalMs(long markerBatchIntervalMs) {
|
||||
this.markerBatchIntervalMs = markerBatchIntervalMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder markerParallelism(int markerParallelism) {
|
||||
this.markerParallelism = markerParallelism;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Config build() {
|
||||
Config config = new Config();
|
||||
config.serverPort = this.serverPort;
|
||||
config.viewStorageType = this.viewStorageType;
|
||||
config.maxViewMemPerTableInMB = this.maxViewMemPerTableInMB;
|
||||
config.memFractionForCompactionPerTable = this.memFractionForCompactionPerTable;
|
||||
config.baseStorePathForFileGroups = this.baseStorePathForFileGroups;
|
||||
config.rocksDBPath = this.rocksDBPath;
|
||||
config.numThreads = this.numThreads;
|
||||
config.async = this.async;
|
||||
config.compress = this.compress;
|
||||
config.enableMarkerRequests = this.enableMarkerRequests;
|
||||
config.markerBatchNumThreads = this.markerBatchNumThreads;
|
||||
config.markerBatchIntervalMs = this.markerBatchIntervalMs;
|
||||
config.markerParallelism = this.markerParallelism;
|
||||
return config;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int startServiceOnPort(int port) throws IOException {
|
||||
@@ -151,16 +263,17 @@ public class TimelineService {
|
||||
}
|
||||
|
||||
public int startService() throws IOException {
|
||||
final Server server = numThreads == DEFAULT_NUM_THREADS ? JettyServerUtil.defaultServer()
|
||||
: new Server(new QueuedThreadPool(numThreads));
|
||||
final Server server = timelineServerConf.numThreads == DEFAULT_NUM_THREADS ? JettyServerUtil.defaultServer()
|
||||
: new Server(new QueuedThreadPool(timelineServerConf.numThreads));
|
||||
|
||||
app = Javalin.create().server(() -> server);
|
||||
if (!shouldCompressOutput) {
|
||||
if (!timelineServerConf.compress) {
|
||||
app.disableDynamicGzip();
|
||||
}
|
||||
|
||||
RequestHandler requestHandler = new RequestHandler(app, conf, fsViewsManager, useAsync);
|
||||
app.get("/", ctx -> ctx.result("Hello World"));
|
||||
requestHandler = new RequestHandler(
|
||||
app, conf, timelineServerConf, context, fs, fsViewsManager);
|
||||
app.get("/", ctx -> ctx.result("Hello Hudi"));
|
||||
requestHandler.register();
|
||||
int realServerPort = startServiceOnPort(serverPort);
|
||||
LOG.info("Starting Timeline server on port :" + realServerPort);
|
||||
@@ -204,6 +317,9 @@ public class TimelineService {
|
||||
|
||||
public void close() {
|
||||
LOG.info("Closing Timeline Service");
|
||||
if (requestHandler != null) {
|
||||
this.requestHandler.stop();
|
||||
}
|
||||
this.app.stop();
|
||||
this.app = null;
|
||||
this.fsViewsManager.close();
|
||||
@@ -228,7 +344,9 @@ public class TimelineService {
|
||||
|
||||
Configuration conf = FSUtils.prepareHadoopConf(new Configuration());
|
||||
FileSystemViewManager viewManager = buildFileSystemViewManager(cfg, new SerializableConfiguration(conf));
|
||||
TimelineService service = new TimelineService(cfg.serverPort, viewManager);
|
||||
TimelineService service = new TimelineService(
|
||||
new HoodieLocalEngineContext(FSUtils.prepareHadoopConf(new Configuration())),
|
||||
new Configuration(), cfg, FileSystem.get(new Configuration()), viewManager);
|
||||
service.run();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,8 +20,10 @@ package org.apache.hudi.timeline.service.handlers;
|
||||
|
||||
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.timeline.service.TimelineService;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@@ -34,8 +36,9 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class BaseFileHandler extends Handler {
|
||||
|
||||
public BaseFileHandler(Configuration conf, FileSystemViewManager viewManager) throws IOException {
|
||||
super(conf, viewManager);
|
||||
public BaseFileHandler(Configuration conf, TimelineService.Config timelineServiceConfig,
|
||||
FileSystem fileSystem, FileSystemViewManager viewManager) throws IOException {
|
||||
super(conf, timelineServiceConfig, fileSystem, viewManager);
|
||||
}
|
||||
|
||||
public List<BaseFileDTO> getLatestDataFiles(String basePath, String partitionPath) {
|
||||
|
||||
@@ -19,11 +19,14 @@
|
||||
package org.apache.hudi.timeline.service.handlers;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
|
||||
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
|
||||
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
|
||||
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.timeline.service.TimelineService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@@ -36,8 +39,9 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class FileSliceHandler extends Handler {
|
||||
|
||||
public FileSliceHandler(Configuration conf, FileSystemViewManager viewManager) throws IOException {
|
||||
super(conf, viewManager);
|
||||
public FileSliceHandler(Configuration conf, TimelineService.Config timelineServiceConfig,
|
||||
FileSystem fileSystem, FileSystemViewManager viewManager) throws IOException {
|
||||
super(conf, timelineServiceConfig, fileSystem, viewManager);
|
||||
}
|
||||
|
||||
public List<FileSliceDTO> getAllFileSlices(String basePath, String partitionPath) {
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
package org.apache.hudi.timeline.service.handlers;
|
||||
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.timeline.service.TimelineService;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@@ -28,12 +29,15 @@ import java.io.IOException;
|
||||
public abstract class Handler {
|
||||
|
||||
protected final Configuration conf;
|
||||
protected final TimelineService.Config timelineServiceConfig;
|
||||
protected final FileSystem fileSystem;
|
||||
protected final FileSystemViewManager viewManager;
|
||||
|
||||
public Handler(Configuration conf, FileSystemViewManager viewManager) throws IOException {
|
||||
public Handler(Configuration conf, TimelineService.Config timelineServiceConfig,
|
||||
FileSystem fileSystem, FileSystemViewManager viewManager) throws IOException {
|
||||
this.conf = conf;
|
||||
this.fileSystem = FileSystem.get(conf);
|
||||
this.timelineServiceConfig = timelineServiceConfig;
|
||||
this.fileSystem = fileSystem;
|
||||
this.viewManager = viewManager;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.timeline.service.handlers;
|
||||
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.metrics.Registry;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.timeline.service.TimelineService;
|
||||
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture;
|
||||
import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationDispatchingRunnable;
|
||||
import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState;
|
||||
|
||||
import io.javalin.Context;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* REST Handler servicing marker requests.
|
||||
*
|
||||
* The marker creation requests are handled asynchronous, while other types of requests
|
||||
* are handled synchronous.
|
||||
*
|
||||
* Marker creation requests are batch processed periodically by a thread. Each batch
|
||||
* processing thread adds new markers to a marker file. Given that marker file operation
|
||||
* can take time, multiple concurrent threads can run at the same, while they operate
|
||||
* on different marker files storing mutually exclusive marker entries. At any given
|
||||
* time, a marker file is touched by at most one thread to guarantee consistency.
|
||||
* Below is an example of running batch processing threads.
|
||||
*
|
||||
* |-----| batch interval
|
||||
* Worker Thread 1 |-------------------------->| writing to MARKERS0
|
||||
* Worker Thread 2 |-------------------------->| writing to MARKERS1
|
||||
* Worker Thread 3 |-------------------------->| writing to MARKERS2
|
||||
*/
|
||||
public class MarkerHandler extends Handler {
|
||||
private static final Logger LOG = LogManager.getLogger(MarkerHandler.class);
|
||||
|
||||
private final Registry metricsRegistry;
|
||||
// a scheduled executor service to schedule dispatching of marker creation requests
|
||||
private final ScheduledExecutorService dispatchingExecutorService;
|
||||
// an executor service to schedule the worker threads of batch processing marker creation requests
|
||||
private final ExecutorService batchingExecutorService;
|
||||
// Parallelism for reading and deleting marker files
|
||||
private final int parallelism;
|
||||
// Marker directory states, {markerDirPath -> MarkerDirState instance}
|
||||
private final Map<String, MarkerDirState> markerDirStateMap = new HashMap<>();
|
||||
// A thread to dispatch marker creation requests to batch processing threads
|
||||
private final MarkerCreationDispatchingRunnable markerCreationDispatchingRunnable;
|
||||
private final Object firstCreationRequestSeenLock = new Object();
|
||||
private transient HoodieEngineContext hoodieEngineContext;
|
||||
private ScheduledFuture<?> dispatchingThreadFuture;
|
||||
private boolean firstCreationRequestSeen;
|
||||
|
||||
public MarkerHandler(Configuration conf, TimelineService.Config timelineServiceConfig,
|
||||
HoodieEngineContext hoodieEngineContext, FileSystem fileSystem,
|
||||
FileSystemViewManager viewManager, Registry metricsRegistry) throws IOException {
|
||||
super(conf, timelineServiceConfig, fileSystem, viewManager);
|
||||
LOG.debug("MarkerHandler FileSystem: " + this.fileSystem.getScheme());
|
||||
LOG.debug("MarkerHandler batching params: batchNumThreads=" + timelineServiceConfig.markerBatchNumThreads
|
||||
+ " batchIntervalMs=" + timelineServiceConfig.markerBatchIntervalMs + "ms");
|
||||
this.hoodieEngineContext = hoodieEngineContext;
|
||||
this.metricsRegistry = metricsRegistry;
|
||||
this.parallelism = timelineServiceConfig.markerParallelism;
|
||||
this.dispatchingExecutorService = Executors.newSingleThreadScheduledExecutor();
|
||||
this.batchingExecutorService = Executors.newFixedThreadPool(timelineServiceConfig.markerBatchNumThreads);
|
||||
this.markerCreationDispatchingRunnable =
|
||||
new MarkerCreationDispatchingRunnable(markerDirStateMap, batchingExecutorService);
|
||||
this.firstCreationRequestSeen = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the dispatching of marker creation requests.
|
||||
*/
|
||||
public void stop() {
|
||||
if (dispatchingThreadFuture != null) {
|
||||
dispatchingThreadFuture.cancel(true);
|
||||
}
|
||||
dispatchingExecutorService.shutdown();
|
||||
batchingExecutorService.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param markerDir marker directory path
|
||||
* @return all marker paths in the marker directory
|
||||
*/
|
||||
public Set<String> getAllMarkers(String markerDir) {
|
||||
MarkerDirState markerDirState = getMarkerDirState(markerDir);
|
||||
return markerDirState.getAllMarkers();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param markerDir marker directory path
|
||||
* @return all marker paths of write IO type "CREATE" and "MERGE"
|
||||
*/
|
||||
public Set<String> getCreateAndMergeMarkers(String markerDir) {
|
||||
return getAllMarkers(markerDir).stream()
|
||||
.filter(markerName -> !markerName.endsWith(IOType.APPEND.name()))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param markerDir marker directory path
|
||||
* @return {@code true} if the marker directory exists; {@code false} otherwise.
|
||||
*/
|
||||
public boolean doesMarkerDirExist(String markerDir) {
|
||||
MarkerDirState markerDirState = getMarkerDirState(markerDir);
|
||||
return markerDirState.exists();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a future for an async marker creation request
|
||||
*
|
||||
* The future is added to the marker creation future list and waits for the next batch processing
|
||||
* of marker creation requests.
|
||||
*
|
||||
* @param context Javalin app context
|
||||
* @param markerDir marker directory path
|
||||
* @param markerName marker name
|
||||
* @return the {@code CompletableFuture} instance for the request
|
||||
*/
|
||||
public CompletableFuture<String> createMarker(Context context, String markerDir, String markerName) {
|
||||
LOG.info("Request: create marker " + markerDir + " " + markerName);
|
||||
MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName);
|
||||
// Add the future to the list
|
||||
MarkerDirState markerDirState = getMarkerDirState(markerDir);
|
||||
markerDirState.addMarkerCreationFuture(future);
|
||||
if (!firstCreationRequestSeen) {
|
||||
synchronized (firstCreationRequestSeenLock) {
|
||||
if (!firstCreationRequestSeen) {
|
||||
dispatchingThreadFuture = dispatchingExecutorService.scheduleAtFixedRate(markerCreationDispatchingRunnable,
|
||||
timelineServiceConfig.markerBatchIntervalMs, timelineServiceConfig.markerBatchIntervalMs,
|
||||
TimeUnit.MILLISECONDS);
|
||||
firstCreationRequestSeen = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes markers in the directory.
|
||||
*
|
||||
* @param markerDir marker directory path
|
||||
* @return {@code true} if successful; {@code false} otherwise.
|
||||
*/
|
||||
public Boolean deleteMarkers(String markerDir) {
|
||||
boolean result = getMarkerDirState(markerDir).deleteAllMarkers();
|
||||
markerDirStateMap.remove(markerDir);
|
||||
return result;
|
||||
}
|
||||
|
||||
private MarkerDirState getMarkerDirState(String markerDir) {
|
||||
MarkerDirState markerDirState = markerDirStateMap.get(markerDir);
|
||||
if (markerDirState == null) {
|
||||
synchronized (markerDirStateMap) {
|
||||
if (markerDirStateMap.get(markerDir) == null) {
|
||||
markerDirState = new MarkerDirState(markerDir, timelineServiceConfig.markerBatchNumThreads,
|
||||
fileSystem, metricsRegistry, hoodieEngineContext, parallelism);
|
||||
markerDirStateMap.put(markerDir, markerDirState);
|
||||
} else {
|
||||
markerDirState = markerDirStateMap.get(markerDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
return markerDirState;
|
||||
}
|
||||
}
|
||||
@@ -21,8 +21,10 @@ package org.apache.hudi.timeline.service.handlers;
|
||||
import org.apache.hudi.common.table.timeline.dto.InstantDTO;
|
||||
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewManager;
|
||||
import org.apache.hudi.timeline.service.TimelineService;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@@ -34,8 +36,9 @@ import java.util.List;
|
||||
*/
|
||||
public class TimelineHandler extends Handler {
|
||||
|
||||
public TimelineHandler(Configuration conf, FileSystemViewManager viewManager) throws IOException {
|
||||
super(conf, viewManager);
|
||||
public TimelineHandler(Configuration conf, TimelineService.Config timelineServiceConfig,
|
||||
FileSystem fileSystem, FileSystemViewManager viewManager) throws IOException {
|
||||
super(conf, timelineServiceConfig, fileSystem, viewManager);
|
||||
}
|
||||
|
||||
public List<InstantDTO> getLastInstant(String basePath) {
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.timeline.service.handlers.marker;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Input of batch processing of marker creation requests for a single marker directory.
|
||||
*/
|
||||
public class BatchedMarkerCreationContext {
|
||||
private final String markerDir;
|
||||
private final MarkerDirState markerDirState;
|
||||
// List of marker creation futures to process
|
||||
private final List<MarkerCreationFuture> futures;
|
||||
// File index to use to write markers
|
||||
private final int fileIndex;
|
||||
|
||||
public BatchedMarkerCreationContext(String markerDir, MarkerDirState markerDirState,
|
||||
List<MarkerCreationFuture> futures, int fileIndex) {
|
||||
this.markerDir = markerDir;
|
||||
this.markerDirState = markerDirState;
|
||||
this.futures = futures;
|
||||
this.fileIndex = fileIndex;
|
||||
}
|
||||
|
||||
public String getMarkerDir() {
|
||||
return markerDir;
|
||||
}
|
||||
|
||||
public MarkerDirState getMarkerDirState() {
|
||||
return markerDirState;
|
||||
}
|
||||
|
||||
public List<MarkerCreationFuture> getFutures() {
|
||||
return futures;
|
||||
}
|
||||
|
||||
public int getFileIndex() {
|
||||
return fileIndex;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.timeline.service.handlers.marker;
|
||||
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A runnable for batch processing marker creation requests.
|
||||
*/
|
||||
public class BatchedMarkerCreationRunnable implements Runnable {
|
||||
private static final Logger LOG = LogManager.getLogger(BatchedMarkerCreationRunnable.class);
|
||||
|
||||
private final List<BatchedMarkerCreationContext> requestContextList;
|
||||
|
||||
public BatchedMarkerCreationRunnable(List<BatchedMarkerCreationContext> requestContextList) {
|
||||
this.requestContextList = requestContextList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.debug("Start processing create marker requests");
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
|
||||
for (BatchedMarkerCreationContext requestContext : requestContextList) {
|
||||
requestContext.getMarkerDirState().processMarkerCreationRequests(
|
||||
requestContext.getFutures(), requestContext.getFileIndex());
|
||||
}
|
||||
LOG.debug("Finish batch processing of create marker requests in " + timer.endTimer() + " ms");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.timeline.service.handlers.marker;
|
||||
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* A runnable that performs periodic, batched creation of markers for write operations.
|
||||
*/
|
||||
public class MarkerCreationDispatchingRunnable implements Runnable {
|
||||
public static final Logger LOG = LogManager.getLogger(MarkerCreationDispatchingRunnable.class);
|
||||
|
||||
// Marker directory states, {markerDirPath -> MarkerDirState instance}
|
||||
private final Map<String, MarkerDirState> markerDirStateMap;
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public MarkerCreationDispatchingRunnable(
|
||||
Map<String, MarkerDirState> markerDirStateMap, ExecutorService executorService) {
|
||||
this.markerDirStateMap = markerDirStateMap;
|
||||
this.executorService = executorService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatches the marker creation requests that can be process to a worker thread of batch
|
||||
* processing the requests.
|
||||
*
|
||||
* For each marker directory, goes through the following steps:
|
||||
* (1) find the next available file index for writing. If no file index is available,
|
||||
* skip the processing of this marker directory;
|
||||
* (2) fetch the pending marker creation requests for this marker directory. If there is
|
||||
* no request, skip this marker directory;
|
||||
* (3) put the marker directory, marker dir state, list of requests futures, and the file index
|
||||
* to a {@code MarkerDirRequestContext} instance and add the instance to the request context list.
|
||||
*
|
||||
* If the request context list is not empty, spins up a worker thread, {@code MarkerCreationBatchingRunnable},
|
||||
* and pass all the request context to the thread for batch processing. The thread is responsible
|
||||
* for responding to the request futures directly.
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
List<BatchedMarkerCreationContext> requestContextList = new ArrayList<>();
|
||||
|
||||
// Only fetch pending marker creation requests that can be processed,
|
||||
// i.e., that markers can be written to a underlying file
|
||||
for (String markerDir : markerDirStateMap.keySet()) {
|
||||
MarkerDirState markerDirState = markerDirStateMap.get(markerDir);
|
||||
Option<Integer> fileIndex = markerDirState.getNextFileIndexToUse();
|
||||
if (!fileIndex.isPresent()) {
|
||||
LOG.debug("All marker files are busy, skip batch processing of create marker requests in " + markerDir);
|
||||
continue;
|
||||
}
|
||||
List<MarkerCreationFuture> futures = markerDirState.fetchPendingMarkerCreationRequests();
|
||||
if (futures.isEmpty()) {
|
||||
markerDirState.markFileAsAvailable(fileIndex.get());
|
||||
continue;
|
||||
}
|
||||
requestContextList.add(
|
||||
new BatchedMarkerCreationContext(markerDir, markerDirState, futures, fileIndex.get()));
|
||||
}
|
||||
|
||||
if (requestContextList.size() > 0) {
|
||||
executorService.execute(
|
||||
new BatchedMarkerCreationRunnable(requestContextList));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.timeline.service.handlers.marker;
|
||||
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
|
||||
import io.javalin.Context;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Future for async marker creation request.
|
||||
*/
|
||||
public class MarkerCreationFuture extends CompletableFuture<String> {
|
||||
private static final Logger LOG = LogManager.getLogger(MarkerCreationFuture.class);
|
||||
private final Context context;
|
||||
private final String markerDirPath;
|
||||
private final String markerName;
|
||||
private boolean result;
|
||||
private final HoodieTimer timer;
|
||||
|
||||
public MarkerCreationFuture(Context context, String markerDirPath, String markerName) {
|
||||
super();
|
||||
this.timer = new HoodieTimer().startTimer();
|
||||
this.context = context;
|
||||
this.markerDirPath = markerDirPath;
|
||||
this.markerName = markerName;
|
||||
this.result = false;
|
||||
}
|
||||
|
||||
public Context getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
public String getMarkerDirPath() {
|
||||
return markerDirPath;
|
||||
}
|
||||
|
||||
public String getMarkerName() {
|
||||
return markerName;
|
||||
}
|
||||
|
||||
public boolean isSuccessful() {
|
||||
return result;
|
||||
}
|
||||
|
||||
public void setResult(boolean result) {
|
||||
LOG.debug("Request queued for " + timer.endTimer() + " ms");
|
||||
this.result = result;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,375 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.timeline.service.handlers.marker;
|
||||
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.metrics.Registry;
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.util.FileIOUtils.closeQuietly;
|
||||
import static org.apache.hudi.timeline.service.RequestHandler.jsonifyResult;
|
||||
|
||||
/**
|
||||
* Stores the state of a marker directory.
|
||||
*
|
||||
* The operations inside this class is designed to be thread-safe.
|
||||
*/
|
||||
public class MarkerDirState implements Serializable {
|
||||
public static final String MARKERS_FILENAME_PREFIX = "MARKERS";
|
||||
private static final Logger LOG = LogManager.getLogger(MarkerDirState.class);
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
// Marker directory
|
||||
private final String markerDirPath;
|
||||
private final FileSystem fileSystem;
|
||||
private final Registry metricsRegistry;
|
||||
// A cached copy of all markers in memory
|
||||
private final Set<String> allMarkers = new HashSet<>();
|
||||
// A cached copy of marker entries in each marker file, stored in StringBuilder
|
||||
// for efficient appending
|
||||
// Mapping: {markerFileIndex -> markers}
|
||||
private final Map<Integer, StringBuilder> fileMarkersMap = new HashMap<>();
|
||||
// A list of use status of underlying files storing markers by a thread.
|
||||
// {@code true} means the file is in use by a {@code BatchCreateMarkerRunnable}.
|
||||
// Index of the list is used for the filename, i.e., "1" -> "MARKERS1"
|
||||
private final List<Boolean> threadUseStatus;
|
||||
// A list of pending futures from async marker creation requests
|
||||
private final List<MarkerCreationFuture> markerCreationFutures = new ArrayList<>();
|
||||
private final int parallelism;
|
||||
private final Object markerCreationProcessingLock = new Object();
|
||||
private transient HoodieEngineContext hoodieEngineContext;
|
||||
// Last underlying file index used, for finding the next file index
|
||||
// in a round-robin fashion
|
||||
private int lastFileIndexUsed = -1;
|
||||
|
||||
public MarkerDirState(String markerDirPath, int markerBatchNumThreads, FileSystem fileSystem,
|
||||
Registry metricsRegistry, HoodieEngineContext hoodieEngineContext, int parallelism) {
|
||||
this.markerDirPath = markerDirPath;
|
||||
this.fileSystem = fileSystem;
|
||||
this.metricsRegistry = metricsRegistry;
|
||||
this.hoodieEngineContext = hoodieEngineContext;
|
||||
this.parallelism = parallelism;
|
||||
this.threadUseStatus =
|
||||
Stream.generate(() -> false).limit(markerBatchNumThreads).collect(Collectors.toList());
|
||||
// Lazy initialization of markers by reading MARKERS* files on the file system
|
||||
syncMarkersFromFileSystem();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@code true} if the marker directory exists in the system.
|
||||
*/
|
||||
public boolean exists() {
|
||||
try {
|
||||
return fileSystem.exists(new Path(markerDirPath));
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return all markers in the marker directory.
|
||||
*/
|
||||
public Set<String> getAllMarkers() {
|
||||
return allMarkers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a {@code MarkerCreationCompletableFuture} instance from a marker
|
||||
* creation request to the queue.
|
||||
*
|
||||
* @param future {@code MarkerCreationCompletableFuture} instance.
|
||||
*/
|
||||
public void addMarkerCreationFuture(MarkerCreationFuture future) {
|
||||
synchronized (markerCreationFutures) {
|
||||
markerCreationFutures.add(future);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the next file index to use in a round-robin fashion,
|
||||
* or empty if no file is available.
|
||||
*/
|
||||
public Option<Integer> getNextFileIndexToUse() {
|
||||
int fileIndex = -1;
|
||||
synchronized (markerCreationProcessingLock) {
|
||||
// Scans for the next free file index to use after {@code lastFileIndexUsed}
|
||||
for (int i = 0; i < threadUseStatus.size(); i++) {
|
||||
int index = (lastFileIndexUsed + 1 + i) % threadUseStatus.size();
|
||||
if (!threadUseStatus.get(index)) {
|
||||
fileIndex = index;
|
||||
threadUseStatus.set(index, true);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (fileIndex >= 0) {
|
||||
lastFileIndexUsed = fileIndex;
|
||||
return Option.of(fileIndex);
|
||||
}
|
||||
}
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks the file as available to use again.
|
||||
*
|
||||
* @param fileIndex file index
|
||||
*/
|
||||
public void markFileAsAvailable(int fileIndex) {
|
||||
synchronized (markerCreationProcessingLock) {
|
||||
threadUseStatus.set(fileIndex, false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return futures of pending marker creation requests and removes them from the list.
|
||||
*/
|
||||
public List<MarkerCreationFuture> fetchPendingMarkerCreationRequests() {
|
||||
List<MarkerCreationFuture> pendingFutures;
|
||||
synchronized (markerCreationFutures) {
|
||||
if (markerCreationFutures.isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
pendingFutures = new ArrayList<>(markerCreationFutures);
|
||||
markerCreationFutures.clear();
|
||||
}
|
||||
return pendingFutures;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes pending marker creation requests.
|
||||
*
|
||||
* @param pendingMarkerCreationFutures futures of pending marker creation requests
|
||||
* @param fileIndex file index to use to write markers
|
||||
*/
|
||||
public void processMarkerCreationRequests(
|
||||
final List<MarkerCreationFuture> pendingMarkerCreationFutures, int fileIndex) {
|
||||
if (pendingMarkerCreationFutures.isEmpty()) {
|
||||
markFileAsAvailable(fileIndex);
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.debug("timeMs=" + System.currentTimeMillis() + " markerDirPath=" + markerDirPath
|
||||
+ " numRequests=" + pendingMarkerCreationFutures.size() + " fileIndex=" + fileIndex);
|
||||
|
||||
synchronized (markerCreationProcessingLock) {
|
||||
for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
|
||||
String markerName = future.getMarkerName();
|
||||
boolean exists = allMarkers.contains(markerName);
|
||||
if (!exists) {
|
||||
allMarkers.add(markerName);
|
||||
StringBuilder stringBuilder = fileMarkersMap.computeIfAbsent(fileIndex, k -> new StringBuilder(16384));
|
||||
stringBuilder.append(markerName);
|
||||
stringBuilder.append('\n');
|
||||
}
|
||||
future.setResult(!exists);
|
||||
}
|
||||
}
|
||||
flushMarkersToFile(fileIndex);
|
||||
markFileAsAvailable(fileIndex);
|
||||
|
||||
for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
|
||||
try {
|
||||
future.complete(jsonifyResult(
|
||||
future.getContext(), future.isSuccessful(), metricsRegistry, OBJECT_MAPPER, LOG));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new HoodieException("Failed to JSON encode the value", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes markers in the directory.
|
||||
*
|
||||
* @return {@code true} if successful; {@code false} otherwise.
|
||||
*/
|
||||
public boolean deleteAllMarkers() {
|
||||
Path dirPath = new Path(markerDirPath);
|
||||
boolean result = false;
|
||||
try {
|
||||
if (fileSystem.exists(dirPath)) {
|
||||
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
|
||||
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
|
||||
.map(fileStatus -> fileStatus.getPath().toString())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (markerDirSubPaths.size() > 0) {
|
||||
SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf());
|
||||
int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism);
|
||||
hoodieEngineContext.foreach(markerDirSubPaths, subPathStr -> {
|
||||
Path subPath = new Path(subPathStr);
|
||||
FileSystem fileSystem = subPath.getFileSystem(conf.get());
|
||||
fileSystem.delete(subPath, true);
|
||||
}, actualParallelism);
|
||||
}
|
||||
|
||||
result = fileSystem.delete(dirPath, false);
|
||||
LOG.info("Removing marker directory at " + dirPath);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
}
|
||||
allMarkers.clear();
|
||||
fileMarkersMap.clear();
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Syncs all markers maintained in the underlying files under the marker directory in the file system.
|
||||
*/
|
||||
private void syncMarkersFromFileSystem() {
|
||||
Path dirPath = new Path(markerDirPath);
|
||||
try {
|
||||
if (fileSystem.exists(dirPath)) {
|
||||
FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
|
||||
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
|
||||
.map(fileStatus -> fileStatus.getPath().toString())
|
||||
.filter(pathStr -> pathStr.contains(MARKERS_FILENAME_PREFIX))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (markerDirSubPaths.size() > 0) {
|
||||
SerializableConfiguration conf = new SerializableConfiguration(fileSystem.getConf());
|
||||
int actualParallelism = Math.min(markerDirSubPaths.size(), parallelism);
|
||||
Map<String, Set<String>> fileMarkersSetMap =
|
||||
hoodieEngineContext.mapToPair(markerDirSubPaths, markersFilePathStr -> {
|
||||
Path markersFilePath = new Path(markersFilePathStr);
|
||||
FileSystem fileSystem = markersFilePath.getFileSystem(conf.get());
|
||||
FSDataInputStream fsDataInputStream = null;
|
||||
BufferedReader bufferedReader = null;
|
||||
Set<String> markers = new HashSet<>();
|
||||
try {
|
||||
LOG.debug("Read marker file: " + markersFilePathStr);
|
||||
fsDataInputStream = fileSystem.open(markersFilePath);
|
||||
bufferedReader = new BufferedReader(new InputStreamReader(fsDataInputStream, StandardCharsets.UTF_8));
|
||||
markers = bufferedReader.lines().collect(Collectors.toSet());
|
||||
bufferedReader.close();
|
||||
fsDataInputStream.close();
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to read MARKERS file " + markerDirPath, e);
|
||||
} finally {
|
||||
closeQuietly(bufferedReader);
|
||||
closeQuietly(fsDataInputStream);
|
||||
}
|
||||
return new ImmutablePair<>(markersFilePathStr, markers);
|
||||
}, actualParallelism);
|
||||
|
||||
for (String markersFilePathStr: fileMarkersSetMap.keySet()) {
|
||||
Set<String> fileMarkers = fileMarkersSetMap.get(markersFilePathStr);
|
||||
if (!fileMarkers.isEmpty()) {
|
||||
int index = parseMarkerFileIndex(markersFilePathStr);
|
||||
|
||||
if (index >= 0) {
|
||||
fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers)));
|
||||
allMarkers.addAll(fileMarkers);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the marker file index from the marker file path.
|
||||
*
|
||||
* E.g., if the marker file path is /tmp/table/.hoodie/.temp/000/MARKERS3, the index returned is 3.
|
||||
*
|
||||
* @param markerFilePathStr full path of marker file
|
||||
* @return the marker file index
|
||||
*/
|
||||
private int parseMarkerFileIndex(String markerFilePathStr) {
|
||||
String markerFileName = new Path(markerFilePathStr).getName();
|
||||
int prefixIndex = markerFileName.indexOf(MARKERS_FILENAME_PREFIX);
|
||||
if (prefixIndex < 0) {
|
||||
return -1;
|
||||
}
|
||||
try {
|
||||
return Integer.parseInt(markerFileName.substring(prefixIndex + MARKERS_FILENAME_PREFIX.length()));
|
||||
} catch (NumberFormatException nfe) {
|
||||
LOG.error("Failed to parse marker file index from " + markerFilePathStr);
|
||||
throw new HoodieException(nfe.getMessage(), nfe);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes markers to the underlying file.
|
||||
*
|
||||
* @param markerFileIndex file index to use.
|
||||
*/
|
||||
private void flushMarkersToFile(int markerFileIndex) {
|
||||
LOG.debug("Write to " + markerDirPath + "/" + MARKERS_FILENAME_PREFIX + markerFileIndex);
|
||||
HoodieTimer timer = new HoodieTimer().startTimer();
|
||||
Path markersFilePath = new Path(markerDirPath, MARKERS_FILENAME_PREFIX + markerFileIndex);
|
||||
Path dirPath = markersFilePath.getParent();
|
||||
try {
|
||||
if (!fileSystem.exists(dirPath)) {
|
||||
fileSystem.mkdirs(dirPath);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to make dir " + dirPath, e);
|
||||
}
|
||||
FSDataOutputStream fsDataOutputStream = null;
|
||||
BufferedWriter bufferedWriter = null;
|
||||
try {
|
||||
fsDataOutputStream = fileSystem.create(markersFilePath);
|
||||
bufferedWriter = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
|
||||
bufferedWriter.write(fileMarkersMap.get(markerFileIndex).toString());
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to overwrite marker file " + markersFilePath, e);
|
||||
} finally {
|
||||
closeQuietly(bufferedWriter);
|
||||
closeQuietly(fsDataOutputStream);
|
||||
}
|
||||
LOG.debug(markersFilePath.toString() + " written in " + timer.endTimer() + " ms");
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,8 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TestHoodieTableFileSystemView;
|
||||
import org.apache.hudi.timeline.service.TimelineService;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -51,7 +53,8 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst
|
||||
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
|
||||
|
||||
try {
|
||||
server = new TimelineService(0,
|
||||
server = new TimelineService(localEngineContext, new Configuration(),
|
||||
TimelineService.Config.builder().serverPort(0).build(), FileSystem.get(new Configuration()),
|
||||
FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, sConf, commonConfig));
|
||||
server.startService();
|
||||
} catch (Exception ex) {
|
||||
|
||||
Reference in New Issue
Block a user