1
0

Revert master (#5925)

* Revert "udate"

This reverts commit 092e35c1e3.

* Revert "[HUDI-3475] Initialize hudi table management module."

This reverts commit 4640a3bbb8.
This commit is contained in:
Zhaojing Yu
2022-06-21 16:58:50 +08:00
committed by GitHub
parent 092e35c1e3
commit c7e430bb46
43 changed files with 0 additions and 3234 deletions

View File

@@ -21,10 +21,8 @@ package org.apache.hudi.table.action.cluster;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -41,9 +39,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {
@@ -106,22 +102,6 @@ public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O
throw new HoodieIOException("Exception scheduling clustering", ioe);
}
}
if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.replacecommit.name())) {
submitClusteringToService();
}
return planOption;
}
private void submitClusteringToService() {
HoodieTableMetaClient metaClient = table.getMetaClient();
List<String> instantsToSubmit = metaClient.getActiveTimeline()
.filterPendingReplaceTimeline()
.getInstants()
.map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(metaClient, config.getTableManagerConfig());
tableManagerClient.submitClustering(instantsToSubmit);
}
}

View File

@@ -333,10 +333,6 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
// do not compact a complete instant.
if (table.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) {
return null;
}
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {

View File

@@ -1,333 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.12.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-table-management-service</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<mybatis.version>3.4.6</mybatis.version>
</properties>
<dependencies>
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<!-- Fasterxml -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Httpcomponents -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>fluent-hc</artifactId>
</dependency>
<dependency>
<groupId>io.javalin</groupId>
<artifactId>javalin</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
<exclusions>
<exclusion>
<artifactId>tools</artifactId>
<groupId>com.sun</groupId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<artifactId>tools</artifactId>
<groupId>com.sun</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<artifactId>tools</artifactId>
<groupId>com.sun</groupId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-java-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.23</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
</configuration>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
<configuration>
<skip>false</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.hudi.compaction.service.TableManagerServer</mainClass>
</transformer>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>jar-with-dependencies</shadedClassifierName>
<artifactSet>
<excludes>
<exclude>
org.slf4j:slf4j-log4j12
</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/services/javax.*</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/test/resources</directory>
</resource>
</resources>
</build>
</project>

View File

@@ -1,194 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management;
import org.apache.hudi.table.management.entity.Action;
import org.apache.hudi.table.management.entity.Engine;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.entity.InstanceStatus;
import org.apache.hudi.table.management.handlers.ActionHandler;
import org.apache.hudi.table.management.store.MetadataStore;
import org.apache.hudi.table.management.util.InstanceUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.javalin.Context;
import io.javalin.Handler;
import io.javalin.Javalin;
import org.apache.hadoop.conf.Configuration;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Main REST Handler class that handles and delegates calls to timeline relevant handlers.
*/
public class RequestHandler {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class);
private final Javalin app;
private final ActionHandler actionHandler;
public RequestHandler(Javalin app,
Configuration conf,
MetadataStore metadataStore) throws IOException {
this.app = app;
this.actionHandler = new ActionHandler(conf, metadataStore);
}
public void register() {
registerCommonAPI();
registerCompactionAPI();
registerClusteringAPI();
}
private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException {
boolean prettyPrint = ctx.queryParam("pretty") != null;
long beginJsonTs = System.currentTimeMillis();
String result =
prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj);
long endJsonTs = System.currentTimeMillis();
LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs));
ctx.result(result);
}
/**
* Register Compaction API calls.
*/
private void registerCommonAPI() {
app.get(HoodieTableManagerClient.REGISTER, new ViewHandler(ctx -> {
}));
}
/**
* Register Compaction API calls.
*/
private void registerCompactionAPI() {
app.get(HoodieTableManagerClient.SUBMIT_COMPACTION, new ViewHandler(ctx -> {
for (String instant : ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow().split(",")) {
Instance instance = Instance.builder()
.basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
.dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
.tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
.action(Action.COMPACTION.getValue())
.instant(instant)
.executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow()))
.owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow())
.queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow())
.resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow())
.parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow())
.status(InstanceStatus.SCHEDULED.getStatus())
.build();
InstanceUtil.checkArgument(instance);
actionHandler.scheduleCompaction(instance);
}
}));
app.get(HoodieTableManagerClient.REMOVE_COMPACTION, new ViewHandler(ctx -> {
Instance instance = Instance.builder()
.basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
.dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
.tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
.instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
.status(InstanceStatus.INVALID.getStatus())
.isDeleted(true)
.build();
actionHandler.removeCompaction(instance);
}));
}
/**
* Register Compaction API calls.
*/
private void registerClusteringAPI() {
app.get(HoodieTableManagerClient.SUBMIT_CLUSTERING, new ViewHandler(ctx -> {
Instance instance = Instance.builder()
.basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
.dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
.tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
.action(Action.CLUSTERING.getValue())
.instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
.executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow()))
.owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow())
.queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow())
.resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow())
.parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow())
.status(InstanceStatus.SCHEDULED.getStatus())
.build();
InstanceUtil.checkArgument(instance);
actionHandler.scheduleClustering(instance);
}));
app.get(HoodieTableManagerClient.REMOVE_CLUSTERING, new ViewHandler(ctx -> {
Instance instance = Instance.builder()
.basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
.dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
.tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
.instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
.status(InstanceStatus.INVALID.getStatus())
.isDeleted(true)
.build();
actionHandler.removeClustering(instance);
}));
}
/**
* Used for logging and performing refresh check.
*/
private class ViewHandler implements Handler {
private final Handler handler;
ViewHandler(Handler handler) {
this.handler = handler;
}
@Override
public void handle(@NotNull Context context) throws Exception {
boolean success = true;
long beginTs = System.currentTimeMillis();
boolean synced = false;
long refreshCheckTimeTaken = 0;
long handleTimeTaken = 0;
long finalCheckTimeTaken = 0;
try {
long handleBeginMs = System.currentTimeMillis();
handler.handle(context);
long handleEndMs = System.currentTimeMillis();
handleTimeTaken = handleEndMs - handleBeginMs;
} catch (RuntimeException re) {
success = false;
LOG.error("Got runtime exception servicing request " + context.queryString(), re);
throw re;
} finally {
long endTs = System.currentTimeMillis();
long timeTakenMillis = endTs - beginTs;
LOG.info(String.format(
"TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
+ "Success=%s, Query=%s, Host=%s, synced=%s",
timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success,
context.queryString(), context.host(), synced));
}
}
}
}

View File

@@ -1,153 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.table.management.common.ServiceConfig;
import org.apache.hudi.table.management.common.TableManagementServiceConfig;
import org.apache.hudi.table.management.service.BaseService;
import org.apache.hudi.table.management.service.CleanService;
import org.apache.hudi.table.management.service.ExecutorService;
import org.apache.hudi.table.management.service.MonitorService;
import org.apache.hudi.table.management.service.RetryService;
import org.apache.hudi.table.management.service.ScheduleService;
import org.apache.hudi.table.management.store.MetadataStore;
import com.beust.jcommander.JCommander;
import io.javalin.Javalin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* A standalone table management service.
*/
public class TableManagementServer {
private static final Logger LOG = LoggerFactory.getLogger(TableManagementServer.class);
private int serverPort;
private final Configuration conf;
private final TableManagementServiceConfig config;
private transient Javalin app = null;
private List<BaseService> services;
private MetadataStore metadataStore;
public TableManagementServer(int serverPort, Configuration conf, TableManagementServiceConfig config)
throws IOException {
this.config = config;
this.conf = FSUtils.prepareHadoopConf(conf);
this.fs = FileSystem.get(conf);
this.serverPort = serverPort;
this.metadataStore = initMetadataStore();
}
public TableManagementServer(TableManagementServiceConfig config) throws IOException {
this(config.serverPort, new Configuration(), config);
}
public int startService() throws IOException {
app = Javalin.create();
RequestHandler requestHandler = new RequestHandler(app, conf, metadataStore);
app.get("/", ctx -> ctx.result("Hello World"));
requestHandler.register();
app.start(serverPort);
registerService();
initAndStartRegisterService();
return serverPort;
}
private MetadataStore initMetadataStore() {
String className = ServiceConfig.getInstance()
.getString(ServiceConfig.ServiceConfVars.MetadataStoreClass);
MetadataStore metadataStore = ReflectionUtils.loadClass(className);
metadataStore.init();
LOG.info("Finish init metastore: " + className);
return metadataStore;
}
private void registerService() {
services = new ArrayList<>();
ExecutorService executorService = new ExecutorService();
services.add(executorService);
services.add(new ScheduleService(executorService, metadataStore));
services.add(new RetryService(metadataStore));
services.add(new MonitorService());
services.add(new CleanService());
}
private void initAndStartRegisterService() {
for (BaseService service : services) {
service.init();
service.startService();
}
}
private void stopRegisterService() {
for (BaseService service : services) {
service.stop();
}
}
public void run() throws IOException {
startService();
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println(
"*** shutting down Table management service since JVM is shutting down");
try {
TableManagementServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** Table management service shut down");
}));
}
/**
* Stop serving requests and shutdown resources.
*/
public void stop() throws InterruptedException {
LOG.info("Stopping Table management Service");
this.app.stop();
this.app = null;
stopRegisterService();
LOG.info("Stopped Table management Service");
}
public static void main(String[] args) throws Exception {
final TableManagementServiceConfig cfg = new TableManagementServiceConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help) {
cmd.usage();
System.exit(1);
}
TableManagementServer service = new TableManagementServer(cfg);
service.run();
}
}

View File

@@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.common;
public class EnvConstant {
public static final String JAVA_HOME = "JAVA_HOME";
public static final String YARN_CONF_DIR = "YARN_CONF_DIR";
public static final String HADOOP_USER_NAME = "HADOOP_USER_NAME";
public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
public static final String SPARK_HOME = "SPARK_HOME";
}

View File

@@ -1,117 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.common;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
public class ServiceConfig extends Properties {
private static Logger LOG = LoggerFactory.getLogger(ServiceConfig.class);
private static final String HOODIE_ENV_PROPS_PREFIX = "hoodie_";
private static ServiceConfig CONFIG = new ServiceConfig();
/**
* Constructor.
*/
private ServiceConfig() {
LOG.info("Start init ServiceConfig");
Map<String, String> envs = System.getenv();
for (Map.Entry<String, String> env : envs.entrySet()) {
if (env.getKey().toLowerCase().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
String key = env.getKey().toLowerCase().replace("_", ".table.management.");
String value = env.getValue().trim();
setProperty(key, value);
LOG.info("Set property " + key + " to " + value);
}
}
LOG.info("Finish init ServiceConfig");
}
public String getString(ServiceConfVars confVars) {
return this.getProperty(confVars.key(), confVars.defVal());
}
public void setString(ServiceConfVars confVars, String value) {
this.setProperty(confVars.key(), value);
}
public Boolean getBool(ServiceConfVars confVars) {
return Boolean.valueOf(this.getProperty(confVars.key(), confVars.defVal()));
}
public int getInt(ServiceConfVars confVars) {
return Integer.parseInt(this.getProperty(confVars.key(), confVars.defVal()));
}
public static ServiceConfig getInstance() {
return CONFIG;
}
public enum ServiceConfVars {
JavaHome("hoodie.table.management.java.home", ""),
SparkHome("hoodie.table.management.spark.home", ""),
YarnConfDir("hoodie.table.management.yarn.conf.dir", ""),
HadoopConfDir("hoodie.table.management.hadoop.conf.dir", ""),
CompactionMainClass("hoodie.table.management.compaction.main.class", "org.apache.hudi.utilities.HoodieCompactor"),
CompactionScheduleWaitInterval("hoodie.table.management.schedule.wait.interval", "30000"),
IntraMaxFailTolerance("hoodie.table.management.max.fail.tolerance", "5"),
MaxRetryNum("hoodie.table.management.instance.max.retry", "3"),
MetadataStoreClass("hoodie.table.management.metadata.store.class",
"org.apache.hudi.table.management.store.impl.RelationDBBasedStore"),
CompactionCacheEnable("hoodie.table.management.compaction.cache.enable", "true"),
RetryTimes("hoodie.table.management.retry.times", "5"),
SparkSubmitJarPath("hoodie.table.management.submit.jar.path", "/tmp/hoodie_submit_jar/spark/"),
SparkShuffleHdfsEnabled("hoodie.table.management.spark.shuffle.hdfs.enabled", "true"),
SparkParallelism("hoodie.table.management.spark.parallelism", "1"),
SparkMaster("hoodie.table.management.spark.parallelism", "local[1]"),
SparkVcoreBoost("hoodie.table.management.spark.vcore.boost", "1"),
SparkVcoreBoostRatio("hoodie.table.management.spark.vcore.boost.ratio", "1"),
SparkSpeculation("hoodie.table.management.spark.speculation", "false"),
ExecutorMemory("hoodie.table.management.executor.memory", "20g"),
DriverMemory("hoodie.table.management.driver.memory", "20g"),
ExecutorMemoryOverhead("hoodie.table.management.executor.memory.overhead", "5g"),
ExecutorCores("hoodie.table.management.executor.cores", "1"),
MinExecutors("hoodie.table.management.min.executors", "5"),
MaxExecutors("hoodie.table.management.max.executors", "1000"),
CoreExecuteSize("hoodie.table.management.core.executor.pool.size", "300"),
MaxExecuteSize("hoodie.table.management.max.executor.pool.size", "1000");
private final String key;
private final String defaultVal;
ServiceConfVars(String key, String defaultVal) {
this.key = key;
this.defaultVal = defaultVal;
}
public String key() {
return this.key;
}
public String defVal() {
return this.defaultVal;
}
}
}

View File

@@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.common;
import org.apache.hudi.table.management.store.jdbc.InstanceDao;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ServiceContext {
private static ConcurrentHashMap<String, String> runningInstance = new ConcurrentHashMap<>();
public static void addRunningInstance(String instanceIdentifier, String threadIdentifier) {
runningInstance.put(instanceIdentifier, threadIdentifier);
}
public static void removeRunningInstance(String instanceIdentifier) {
runningInstance.remove(instanceIdentifier);
}
public static int getRunningInstanceNum() {
return runningInstance.size();
}
public static List<String> getRunningInstanceInfo() {
List<String> runningInfos = new ArrayList<>();
for (Map.Entry<String, String> instance : runningInstance.entrySet()) {
runningInfos.add("instance " + instance.getKey() + " execution on " + instance.getValue());
}
return runningInfos;
}
private static ConcurrentHashMap<String, Long> pendingInstances = new ConcurrentHashMap<>();
public static boolean containsPendingInstant(String key) {
return pendingInstances.containsKey(key);
}
public static void refreshPendingInstant(String key) {
pendingInstances.put(key, System.currentTimeMillis());
}
public static void removePendingInstant(String key) {
pendingInstances.remove(key);
}
public static ConcurrentHashMap<String, Long> getPendingInstances() {
return pendingInstances;
}
public static InstanceDao getInstanceDao() {
return ServiceContextHolder.INSTANCE_DAO;
}
private static class ServiceContextHolder {
private static final InstanceDao INSTANCE_DAO = new InstanceDao();
}
}

View File

@@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.common;
import com.beust.jcommander.Parameter;
public class TableManagementServiceConfig {
@Parameter(names = {"--server-port", "-p"}, description = " Server Port")
public Integer serverPort = 26755;
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
}

View File

@@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.entity;
public enum Action {
COMPACTION(0),
CLUSTERING(1);
private final int value;
Action(int value) {
this.value = value;
}
public int getValue() {
return this.value;
}
public static void checkActionType(Instance instance) {
for (Action action : Action.values()) {
if (action.getValue() == instance.getAction()) {
return;
}
}
throw new RuntimeException("Invalid action type: " + instance);
}
public static Action getAction(int actionValue) {
for (Action action : Action.values()) {
if (action.getValue() == actionValue) {
return action;
}
}
throw new RuntimeException("Invalid instance action: " + actionValue);
}
}

View File

@@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.entity;
import org.apache.hudi.table.management.common.ServiceConfig;
import org.apache.hudi.table.management.util.DateTimeUtils;
import lombok.Getter;
import java.util.Date;
@Getter
public class AssistQueryEntity {
private int maxRetry = ServiceConfig.getInstance()
.getInt(ServiceConfig.ServiceConfVars.MaxRetryNum);
private Date queryStartTime = DateTimeUtils.addDay(-3);
private int status;
public AssistQueryEntity() {
}
public AssistQueryEntity(int status, Date queryStartTime) {
this.status = status;
this.queryStartTime = queryStartTime;
}
}

View File

@@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.entity;
public enum Engine {
SPARK(0),
FLINK(1);
private final int value;
Engine(int value) {
this.value = value;
}
public int getValue() {
return this.value;
}
public static void checkEngineType(Instance instance) {
for (Engine engine : Engine.values()) {
if (engine.equals(instance.getExecutionEngine())) {
return;
}
}
throw new RuntimeException("Invalid engine type: " + instance);
}
}

View File

@@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import java.util.Date;
@Builder
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class Instance {
private long id;
private String dbName;
private String tableName;
private String basePath;
private Engine executionEngine;
private String owner;
private String queue;
private String resource;
private String parallelism;
private String instant;
private int action;
private int status;
private int runTimes;
private String applicationId;
private String doradoJobId;
private Date scheduleTime;
private Date createTime;
private Date updateTime;
private boolean isDeleted;
public String getFullTableName() {
return dbName + "." + tableName;
}
public String getIdentifier() {
return dbName + "." + tableName + "." + instant + "." + status;
}
public String getInstanceRunStatus() {
return dbName + "." + tableName + "." + instant + "." + status + "."
+ runTimes + "." + updateTime;
}
public String getRecordKey() {
return dbName + "." + tableName + "." + instant;
}
}

View File

@@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.entity;
public enum InstanceStatus {
SCHEDULED(0, "scheduled"),
RUNNING(1, "running"),
FAILED(2, "failed"),
INVALID(3, "invalid"),
COMPLETED(4, "completed");
private int status;
private String desc;
InstanceStatus(int status, String desc) {
this.status = status;
this.desc = desc;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public static InstanceStatus getInstance(int status) {
for (InstanceStatus instanceStatus : InstanceStatus.values()) {
if (instanceStatus.getStatus() == status) {
return instanceStatus;
}
}
throw new RuntimeException("Invalid instance status: " + status);
}
}

View File

@@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.exception;
import org.apache.hudi.exception.HoodieException;
public class HoodieTableManagementException extends HoodieException {
public HoodieTableManagementException(String msg) {
super(msg);
}
public HoodieTableManagementException(String msg, Throwable e) {
super(msg, e);
}
}

View File

@@ -1,102 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.executor;
import org.apache.hudi.table.management.common.ServiceConfig;
import org.apache.hudi.table.management.common.ServiceContext;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.entity.InstanceStatus;
import org.apache.hudi.table.management.executor.submitter.ExecutionEngine;
import org.apache.hudi.table.management.executor.submitter.SparkEngine;
import org.apache.hudi.table.management.store.jdbc.InstanceDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BaseActionExecutor implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(BaseActionExecutor.class);
protected InstanceDao instanceDao;
protected Instance instance;
public int maxFailTolerance;
protected ExecutionEngine engine;
public BaseActionExecutor(Instance instance) {
this.instance = instance;
this.instanceDao = ServiceContext.getInstanceDao();
this.maxFailTolerance = ServiceConfig.getInstance()
.getInt(ServiceConfig.ServiceConfVars.IntraMaxFailTolerance);
String mainClass = ServiceConfig.getInstance()
.getString(ServiceConfig.ServiceConfVars.CompactionMainClass);
switch (instance.getExecutionEngine()) {
case SPARK:
engine = new SparkEngine(getJobName(instance), instance, mainClass);
break;
case FLINK:
default:
throw new IllegalStateException("Unexpected value: " + instance.getExecutionEngine());
}
}
@Override
public void run() {
ServiceContext.addRunningInstance(instance.getRecordKey(), getThreadIdentifier());
try {
execute();
} finally {
ServiceContext.removeRunningInstance(instance.getRecordKey());
if (ServiceConfig.getInstance()
.getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable)) {
ServiceContext.removePendingInstant(instance.getRecordKey());
}
}
}
public abstract boolean doExecute();
public abstract String getJobName(Instance instance);
public void execute() {
try {
boolean success = doExecute();
if (success) {
instance.setStatus(InstanceStatus.COMPLETED.getStatus());
LOG.info("Success exec instance: " + instance.getIdentifier());
} else {
instance.setStatus(InstanceStatus.FAILED.getStatus());
LOG.info("Fail exec instance: " + instance.getIdentifier());
}
} catch (Exception e) {
instance.setStatus(InstanceStatus.FAILED.getStatus());
LOG.error("Fail exec instance: " + instance.getIdentifier() + ", errMsg: ", e);
}
instanceDao.updateStatus(instance);
}
public String getThreadIdentifier() {
return Thread.currentThread().getId() + "." + Thread.currentThread().getName() + "."
+ Thread.currentThread().getState();
}
@Override
public String toString() {
return this.getClass().getName() + ", instance: " + instance.getIdentifier();
}
}

View File

@@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.executor;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.entity.InstanceStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompactionExecutor extends BaseActionExecutor {
private static final Logger LOG = LoggerFactory.getLogger(CompactionExecutor.class);
public static final String COMPACT_JOB_NAME = "Hoodie compact %s.%s %s";
public CompactionExecutor(Instance instance) {
super(instance);
}
@Override
public boolean doExecute() {
String jobName = getJobName(instance);
LOG.info("Start exec : " + jobName);
instance.setStatus(InstanceStatus.RUNNING.getStatus());
instanceDao.saveInstance(instance);
String applicationId = engine.execute(jobName, instance);
if (StringUtils.isNullOrEmpty(applicationId)) {
LOG.warn("Failed to run compaction for " + jobName);
return false;
}
LOG.info("Compaction successfully completed for " + jobName);
return true;
}
@Override
public String getJobName(Instance instance) {
return String.format(COMPACT_JOB_NAME, instance.getDbName(), instance.getTableName(),
instance.getInstant());
}
}

View File

@@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.executor.submitter;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.exception.HoodieTableManagementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public abstract class ExecutionEngine {
private static final Logger LOG = LoggerFactory.getLogger(ExecutionEngine.class);
protected static final String YARN_SUBMITTED = "Submitted application";
public String execute(String jobName, Instance instance) throws HoodieTableManagementException {
try {
LOG.info("Submitting instance {}:{}", jobName, instance.getIdentifier());
beforeExecuteCommand();
return executeCommand(jobName, instance);
} catch (Exception e) {
throw new HoodieTableManagementException("Failed submit instance " + instance, e);
}
}
protected String executeCommand(String jobName, Instance instance) {
String command = "";
try {
command = getCommand();
LOG.info("Execute command: {}", command);
Map<String, String> env = setProcessEnv();
LOG.info("Execute env: {}", env);
return "-1";
// ExecuteHelper executeHelper = new ExecuteHelper(command, jobName, env);
// CompletableFuture<Void> executeFuture = executeHelper.getExecuteThread();
// executeFuture.whenComplete((Void ignored, Throwable throwable) -> executeHelper.closeProcess());
// while (!executeFuture.isDone()) {
// LOG.info("Waiting for execute job " + jobName);
// TimeUnit.SECONDS.sleep(5);
// }
// if (executeHelper.isSuccess) {
// LOG.info("Execute job {} command success", jobName);
// } else {
// LOG.info("Execute job {} command failed", jobName);
// }
// return executeHelper.applicationId;
} catch (Exception e) {
LOG.error("Execute command error with exception: ", e);
throw new HoodieTableManagementException("Execute " + command + " command error", e);
}
}
protected abstract String getCommand() throws IOException;
protected abstract void beforeExecuteCommand();
public abstract Map<String, String> setProcessEnv();
}

View File

@@ -1,220 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.executor.submitter;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.table.management.common.EnvConstant;
import org.apache.hudi.table.management.common.ServiceConfig;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.exception.HoodieTableManagementException;
import org.apache.commons.io.IOUtils;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hudi.cli.utils.SparkUtil.initLauncher;
public class SparkEngine extends ExecutionEngine {
private static final Logger LOG = LoggerFactory.getLogger(SparkEngine.class);
private String jobName;
private Instance instance;
private String mainClass;
public SparkEngine(String jobName, Instance instance, String mainClass) {
this.jobName = jobName;
this.instance = instance;
this.mainClass = mainClass;
}
@Override
protected String getCommand() throws IOException {
String format = "%s/bin/spark-submit --class %s --master yarn --deploy-mode cluster %s %s %s";
return String.format(format, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome),
mainClass, getSparkArgs(), getSubmitJar(), getJobArgs());
}
@Override
protected void beforeExecuteCommand() {
}
@Override
public Map<String, String> setProcessEnv() {
Map<String, String> env = new HashMap<>(16);
env.put(EnvConstant.JAVA_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.JavaHome));
env.put(EnvConstant.YARN_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.YarnConfDir));
env.put(EnvConstant.SPARK_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome));
env.put(EnvConstant.HADOOP_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.HadoopConfDir));
env.put(EnvConstant.HADOOP_USER_NAME, instance.getOwner());
return env;
}
private String getJobArgs() throws IOException {
return null;
}
private String getSubmitJar() throws IOException {
File sparkSubmitJarPath = new File(ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSubmitJarPath));
if (!sparkSubmitJarPath.isDirectory()) {
throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath + " should be be a directory");
}
File[] jars = sparkSubmitJarPath.listFiles(file -> !file.getName().endsWith(".jar"));
if (jars == null || jars.length != 1) {
throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath
+ " should only have one jar, jars = " + Arrays.toString(jars));
}
return jars[0].getCanonicalPath();
}
private String getSparkArgs() {
StringBuilder sparkArgs = new StringBuilder();
sparkArgs.append("--queue ").append(instance.getQueue());
sparkArgs.append(" --name ").append(jobName);
Map<String, String> sparkParams = new HashMap<>();
sparkParams.put("mapreduce.job.queuename", instance.getQueue());
sparkParams.put("spark.shuffle.hdfs.enabled", ServiceConfig.getInstance()
.getString(ServiceConfig.ServiceConfVars.SparkShuffleHdfsEnabled));
String parallelism = StringUtils.isNullOrEmpty(instance.getParallelism())
? ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MaxExecutors)
: instance.getParallelism();
sparkParams.put("spark.dynamicAllocation.maxExecutors", parallelism);
sparkParams.put("spark.dynamicAllocation.minExecutors",
ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MinExecutors));
sparkParams.put("spark.vcore.boost",
ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoost));
sparkParams.put("spark.vcore.boost.ratio",
ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoostRatio));
sparkParams.put("spark.speculation",
ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSpeculation));
String driverResource;
String executorResource;
String resource = instance.getResource().trim();
if (StringUtils.isNullOrEmpty(resource)) {
driverResource = ServiceConfig.getInstance()
.getString(ServiceConfig.ServiceConfVars.DriverMemory);
executorResource = ServiceConfig.getInstance()
.getString(ServiceConfig.ServiceConfVars.ExecutorMemory);
} else {
String[] resourceArray = resource.split(":");
if (resourceArray.length == 1) {
driverResource = resourceArray[0];
executorResource = resourceArray[0];
} else if (resourceArray.length == 2) {
driverResource = resourceArray[0];
executorResource = resourceArray[1];
} else {
throw new RuntimeException(
"Invalid conf: " + instance.getIdentifier() + ", resource: " + resource);
}
}
sparkParams.put("spark.executor.cores",
ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorCores));
sparkParams.put("spark.executor.memory", executorResource);
sparkParams.put("spark.driver.memory", driverResource);
sparkParams.put("spark.executor.memoryOverhead", ServiceConfig.getInstance()
.getString(ServiceConfig.ServiceConfVars.ExecutorMemoryOverhead));
for (Map.Entry<String, String> entry : sparkParams.entrySet()) {
sparkArgs
.append(" --conf ")
.append(entry.getKey())
.append("=")
.append(entry.getValue());
}
return sparkArgs.toString();
}
@Override
public String executeCommand(String jobName, Instance instance) throws HoodieTableManagementException {
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher;
try {
sparkLauncher = initLauncher(sparkPropertiesPath);
} catch (URISyntaxException e) {
LOG.error("Failed to init spark launcher");
throw new HoodieTableManagementException("Failed to init spark launcher", e);
}
String master = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkMaster);
String sparkMemory = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorMemory);
String parallelism = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkParallelism);
String retry = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.RetryTimes);
sparkLauncher.addAppArgs("COMPACT_RUN", master, sparkMemory, instance.getBasePath(),
instance.getTableName(), instance.getInstant(), parallelism, "", retry, "");
Process process;
try {
process = sparkLauncher.launch();
} catch (IOException e) {
LOG.error("Failed to launcher spark process");
throw new HoodieTableManagementException("Failed to init spark launcher", e);
}
InputStream inputStream = null;
BufferedReader bufferedReader = null;
String applicationId = null;
try {
inputStream = process.getInputStream();
bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line;
while ((line = bufferedReader.readLine()) != null) {
LOG.info(line);
if (line.contains(YARN_SUBMITTED)) {
String[] split = line.split(YARN_SUBMITTED);
applicationId = split[1].trim();
LOG.info("Execute job {} get application id {}", jobName, applicationId);
break;
}
}
} catch (Exception e) {
LOG.error("execute {} process get application id error", jobName, e);
throw new HoodieTableManagementException("execute " + jobName + " process get application id error", e);
} finally {
if (process != null) {
process.destroyForcibly();
}
if (inputStream != null) {
IOUtils.closeQuietly(inputStream);
}
if (bufferedReader != null) {
IOUtils.closeQuietly(bufferedReader);
}
}
return applicationId;
}
}

View File

@@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.handlers;
import org.apache.hudi.table.management.common.ServiceConfig;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.store.MetadataStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class ActionHandler implements AutoCloseable {
private static Logger LOG = LoggerFactory.getLogger(ActionHandler.class);
protected final Configuration conf;
protected final FileSystem fileSystem;
protected final MetadataStore metadataStore;
private final CompactionHandler compactionHandler;
public ActionHandler(Configuration conf,
MetadataStore metadataStore) throws IOException {
this.conf = conf;
this.fileSystem = FileSystem.get(conf);
this.metadataStore = metadataStore;
boolean cacheEnable = ServiceConfig.getInstance().getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable);
this.compactionHandler = new CompactionHandler(cacheEnable);
}
public void scheduleCompaction(Instance instance) {
compactionHandler.scheduleCompaction(metadataStore, instance);
}
public void removeCompaction(Instance instance) throws IOException {
compactionHandler.removeCompaction(metadataStore, instance);
}
// TODO: support clustering
public void scheduleClustering(Instance instance) {
}
public void removeClustering(Instance instance) {
}
@Override
public void close() throws Exception {
this.fileSystem.close();
}
}

View File

@@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.handlers;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.store.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* REST Handler servicing clustering requests.
*/
public class ClusteringHandler {
private static Logger LOG = LoggerFactory.getLogger(ClusteringHandler.class);
public void scheduleClustering(MetadataStore metadataStore,
Instance instance) {
LOG.info("Start register compaction instance: " + instance.getIdentifier());
metadataStore.saveInstance(instance);
}
public void removeClustering(MetadataStore metadataStore,
Instance instance) {
LOG.info("Start remove clustering instance: " + instance.getIdentifier());
// 1. check instance exist
Instance result = metadataStore.getInstance(instance);
if (result == null) {
throw new RuntimeException("Instance not exist: " + instance);
}
// 2. update status
metadataStore.updateStatus(instance);
}
}

View File

@@ -1,66 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.handlers;
import org.apache.hudi.table.management.common.ServiceContext;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.store.MetadataStore;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* REST Handler servicing compaction requests.
*/
public class CompactionHandler {
private static Logger LOG = LoggerFactory.getLogger(CompactionHandler.class);
protected boolean cacheEnable;
public CompactionHandler(boolean cacheEnable) {
this.cacheEnable = cacheEnable;
}
public void scheduleCompaction(MetadataStore metadataStore,
Instance instance) {
String recordKey = instance.getRecordKey();
LOG.info("Start register compaction instance: " + recordKey);
if ((cacheEnable && ServiceContext.containsPendingInstant(recordKey))
|| metadataStore.getInstance(instance) != null) {
LOG.warn("Instance has existed, instance: " + instance);
} else {
metadataStore.saveInstance(instance);
}
if (cacheEnable) {
ServiceContext.refreshPendingInstant(recordKey);
}
}
public void removeCompaction(@NotNull MetadataStore metadataStore,
Instance instance) {
LOG.info("Start remove compaction instance: " + instance.getIdentifier());
// 1. check instance exist
Instance result = metadataStore.getInstance(instance);
if (result == null) {
throw new RuntimeException("Instance not exist: " + instance);
}
// 2. update status
metadataStore.updateStatus(instance);
}
}

View File

@@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.service;
public interface BaseService {
void init();
void startService();
void stop();
}

View File

@@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.service;
import org.apache.hudi.table.management.common.ServiceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class CleanService implements BaseService {
private static final Logger LOG = LoggerFactory.getLogger(CleanService.class);
private ScheduledExecutorService service;
private long cacheInterval = 3600 * 1000; //ms
@Override
public void init() {
LOG.info("Init service: " + CleanService.class.getName());
//ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Clean-Service-%d").build();
this.service = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void startService() {
LOG.info("Start service: " + CleanService.class.getName());
service.scheduleAtFixedRate(new RetryRunnable(), 30, 300, TimeUnit.SECONDS);
}
@Override
public void stop() {
LOG.info("Stop service: " + CleanService.class.getName());
if (service != null && !service.isShutdown()) {
service.shutdown();
}
}
private class RetryRunnable implements Runnable {
@Override
public void run() {
cleanCache();
}
}
private void cleanCache() {
long currentTime = System.currentTimeMillis();
ConcurrentHashMap<String, Long> pendingInstances = ServiceContext.getPendingInstances();
for (Map.Entry<String, Long> instance : pendingInstances.entrySet()) {
if (currentTime - instance.getValue() > cacheInterval) {
LOG.info("Instance has expired: " + instance.getKey());
pendingInstances.remove(instance.getKey());
}
}
}
}

View File

@@ -1,102 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.service;
import org.apache.hudi.table.management.common.ServiceConfig;
import org.apache.hudi.table.management.common.ServiceContext;
import org.apache.hudi.table.management.executor.BaseActionExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorService implements BaseService {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
private ThreadPoolExecutor executorService;
private ScheduledExecutorService service;
private BlockingQueue<BaseActionExecutor> taskQueue;
private int coreExecuteSize;
private int maxExecuteSize;
public void init() {
service = Executors.newSingleThreadScheduledExecutor();
coreExecuteSize = ServiceConfig.getInstance()
.getInt(ServiceConfig.ServiceConfVars.CoreExecuteSize);
maxExecuteSize = ServiceConfig.getInstance()
.getInt(ServiceConfig.ServiceConfVars.MaxExecuteSize);
//ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Executor-Service-%d").build();
executorService = new ThreadPoolExecutor(coreExecuteSize, maxExecuteSize, 60,
TimeUnit.SECONDS, new SynchronousQueue<>());
taskQueue = new LinkedBlockingQueue<>();
LOG.info("Init service: " + ExecutorService.class.getName() + ", coreExecuteSize: "
+ coreExecuteSize + ", maxExecuteSize: " + maxExecuteSize);
}
@Override
public void startService() {
LOG.info("Start service: " + ExecutorService.class.getName());
service.submit(new ExecutionTask());
}
@Override
public void stop() {
LOG.info("Stop service: " + ExecutorService.class.getName());
if (executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
}
if (service != null && service.isShutdown()) {
service.shutdown();
}
LOG.info("Finish stop service: " + ExecutorService.class.getName());
}
private class ExecutionTask implements Runnable {
@Override
public void run() {
while (true) {
try {
BaseActionExecutor executor = taskQueue.take();
LOG.info("Start execute: " + executor);
executorService.execute(executor);
} catch (InterruptedException interruptedException) {
LOG.error("Occur exception when exec job: " + interruptedException);
}
}
}
}
public void submitTask(BaseActionExecutor task) {
taskQueue.add(task);
}
public int getFreeSize() {
return maxExecuteSize - ServiceContext.getRunningInstanceNum();
}
}

View File

@@ -1,65 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.service;
import org.apache.hudi.table.management.common.ServiceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MonitorService implements BaseService {
private static final Logger LOG = LoggerFactory.getLogger(MonitorService.class);
private ScheduledExecutorService service;
@Override
public void init() {
LOG.info("Init service: " + MonitorService.class);
this.service = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void startService() {
LOG.info("Start service: " + MonitorService.class.getName());
service.scheduleAtFixedRate(new MonitorRunnable(), 30, 180, TimeUnit.SECONDS);
}
@Override
public void stop() {
LOG.info("Stop service: " + MonitorService.class.getName());
if (service != null && !service.isShutdown()) {
service.shutdown();
}
}
private class MonitorRunnable implements Runnable {
@Override
public void run() {
for (String info : ServiceContext.getRunningInstanceInfo()) {
LOG.info(info);
}
}
}
}

View File

@@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.service;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.entity.InstanceStatus;
import org.apache.hudi.table.management.store.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class RetryService implements BaseService {
private static final Logger LOG = LoggerFactory.getLogger(RetryService.class);
private MetadataStore metadataStore;
private ScheduledExecutorService service;
public RetryService(MetadataStore metadataStore) {
this.metadataStore = metadataStore;
}
@Override
public void init() {
LOG.info("Init service: " + RetryService.class.getName());
//ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Retry-Service-%d").build();
this.service = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void startService() {
LOG.info("Start service: " + RetryService.class.getName());
service.scheduleAtFixedRate(new RetryRunnable(), 30, 180, TimeUnit.SECONDS);
}
@Override
public void stop() {
LOG.info("Stop service: " + RetryService.class.getName());
if (service != null && !service.isShutdown()) {
service.shutdown();
}
}
private class RetryRunnable implements Runnable {
@Override
public void run() {
submitFailTask();
}
}
public void submitFailTask() {
List<Instance> failInstances = metadataStore.getRetryInstances();
for (Instance instance : failInstances) {
LOG.info("Start retry instance: " + instance.getIdentifier());
instance.setStatus(InstanceStatus.SCHEDULED.getStatus());
metadataStore.updateStatus(instance);
}
}
}

View File

@@ -1,116 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.service;
import org.apache.hudi.table.management.common.ServiceConfig;
import org.apache.hudi.table.management.entity.Action;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.entity.InstanceStatus;
import org.apache.hudi.table.management.exception.HoodieTableManagementException;
import org.apache.hudi.table.management.executor.BaseActionExecutor;
import org.apache.hudi.table.management.executor.CompactionExecutor;
import org.apache.hudi.table.management.store.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleService implements BaseService {
private static final Logger LOG = LoggerFactory.getLogger(ScheduleService.class);
private ScheduledExecutorService service;
private ExecutorService executionService;
private MetadataStore metadataStore;
private int compactionWaitInterval;
public ScheduleService(ExecutorService executionService,
MetadataStore metadataStore) {
this.executionService = executionService;
this.metadataStore = metadataStore;
this.compactionWaitInterval = ServiceConfig.getInstance()
.getInt(ServiceConfig.ServiceConfVars.CompactionScheduleWaitInterval);
}
@Override
public void init() {
LOG.info("Finish init schedule service, compactionWaitInterval: " + compactionWaitInterval);
//ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Schedule-Service-%d").build();
this.service = Executors.newSingleThreadScheduledExecutor();
}
@Override
public void startService() {
LOG.info("Start service: " + ScheduleService.class.getName());
service.scheduleAtFixedRate(new ScheduleRunnable(), 30, 60, TimeUnit.SECONDS);
}
@Override
public void stop() {
LOG.info("Stop service: " + ScheduleService.class.getName());
if (service != null && !service.isShutdown()) {
service.shutdown();
}
}
private class ScheduleRunnable implements Runnable {
@Override
public void run() {
submitReadyTask();
}
}
public void submitReadyTask() {
int limitSize = executionService.getFreeSize();
LOG.info("Start get ready instances, limitSize: " + limitSize);
if (limitSize > 0) {
List<Instance> readyInstances = metadataStore.getInstances(
InstanceStatus.SCHEDULED.getStatus(), limitSize);
for (Instance readyInstance : readyInstances) {
if (waitSchedule(readyInstance)) {
LOG.info("Instance should wait schedule: " + readyInstance.getInstanceRunStatus());
continue;
}
LOG.info("Schedule ready instances: " + readyInstance.getInstanceRunStatus());
BaseActionExecutor executor = getActionExecutor(readyInstance);
executionService.submitTask(executor);
}
}
}
private boolean waitSchedule(Instance instance) {
return instance.getAction() == Action.COMPACTION.getValue()
&& instance.getUpdateTime().getTime() + compactionWaitInterval
> System.currentTimeMillis();
}
protected BaseActionExecutor getActionExecutor(Instance instance) {
if (instance.getAction() == Action.COMPACTION.getValue()) {
return new CompactionExecutor(instance);
} else {
throw new HoodieTableManagementException("Unsupported action " + instance.getAction());
}
}
}

View File

@@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.store;
import org.apache.hudi.table.management.entity.AssistQueryEntity;
import org.apache.hudi.table.management.entity.Instance;
import java.util.List;
public interface MetadataStore {
void saveInstance(Instance instance);
void updateStatus(Instance instance);
void init();
Instance getInstance(Instance instance);
List<Instance> getInstances(int status, int limit);
List<Instance> getRetryInstances();
List<Instance> getAlertInstances(AssistQueryEntity queryEntity);
}

View File

@@ -1,70 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.store.impl;
import org.apache.hudi.table.management.entity.AssistQueryEntity;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.store.MetadataStore;
import org.apache.hudi.table.management.store.jdbc.InstanceDao;
import java.util.List;
public class RelationDBBasedStore implements MetadataStore {
private final InstanceDao instanceDao;
public RelationDBBasedStore() {
this.instanceDao = new InstanceDao();
}
@Override
public void saveInstance(Instance instance) {
instanceDao.saveInstance(instance);
}
@Override
public void updateStatus(Instance instance) {
instanceDao.updateStatus(instance);
}
@Override
public void init() {
// do nothing
}
@Override
public Instance getInstance(Instance instance) {
return instanceDao.getInstance(instance);
}
@Override
public List<Instance> getInstances(int status, int limit) {
return instanceDao.getInstances(status, limit);
}
@Override
public List<Instance> getRetryInstances() {
return instanceDao.getRetryInstances();
}
@Override
public List<Instance> getAlertInstances(AssistQueryEntity queryEntity) {
return instanceDao.getAlertInstances(queryEntity);
}
}

View File

@@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.store.jdbc;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory;
import org.apache.ibatis.io.Resources;
import java.io.IOException;
import java.util.Properties;
public class HikariDataSourceFactory extends UnpooledDataSourceFactory {
private static final String PROPERTIES_PATH = "hikariPool.properties";
public HikariDataSourceFactory() throws IOException {
Properties properties = new Properties();
properties.load(Resources.getResourceAsStream(PROPERTIES_PATH));
HikariConfig config = new HikariConfig(properties);
this.dataSource = new HikariDataSource(config);
}
}

View File

@@ -1,156 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.store.jdbc;
import org.apache.hudi.table.management.entity.AssistQueryEntity;
import org.apache.hudi.table.management.entity.Instance;
import org.apache.hudi.table.management.entity.InstanceStatus;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class InstanceDao {
private static Logger LOG = LoggerFactory.getLogger(InstanceDao.class);
private static final String NAMESPACE = "Instance";
public void saveInstance(Instance instance) {
try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
sqlSession.insert(statement(NAMESPACE, "saveInstance"), instance);
sqlSession.commit();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void updateStatus(Instance instance) {
try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
int ret = sqlSession.update(statement(NAMESPACE, getUpdateStatusSqlId(instance)), instance);
sqlSession.commit();
if (ret != 1) {
LOG.error("Fail update status instance: " + instance);
throw new RuntimeException("Fail update status instance: " + instance.getIdentifier());
}
LOG.info("Success update status instance: " + instance.getIdentifier());
} catch (Exception e) {
LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e);
throw new RuntimeException(e);
}
}
public void updateExecutionInfo(Instance instance) {
int retryNum = 0;
try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
while (retryNum++ < 3) {
int ret = sqlSession.update(statement(NAMESPACE, "updateExecutionInfo"), instance);
sqlSession.commit();
if (ret != 1) {
LOG.warn("Fail update execution info instance: " + instance);
TimeUnit.SECONDS.sleep(5);
} else {
LOG.info("Success update execution info, instance: " + instance.getIdentifier());
return;
}
}
throw new RuntimeException("Fail update execution info: " + instance.getIdentifier());
} catch (Exception e) {
LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e);
throw new RuntimeException(e);
}
}
public Instance getInstance(Instance instance) {
try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
return sqlSession.selectOne(statement(NAMESPACE, "getInstance"), instance);
} catch (Exception e) {
LOG.error("Fail get Instance: " + instance.getIdentifier() + ", errMsg: ", e);
throw new RuntimeException(e);
}
}
private String getUpdateStatusSqlId(Instance instance) {
switch (InstanceStatus.getInstance(instance.getStatus())) {
case SCHEDULED:
return "retryInstance";
case RUNNING:
return "runningInstance";
case COMPLETED:
return "successInstance";
case FAILED:
return "failInstance";
case INVALID:
return "invalidInstance";
default:
throw new RuntimeException("Invalid instance: " + instance.getIdentifier());
}
}
public List<Instance> getInstances(int status, int limit) {
try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
if (limit > 0) {
return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status,
new RowBounds(0, limit));
} else {
return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status);
}
} catch (Exception e) {
LOG.error("Fail get instances, status: " + status + ", errMsg: ", e);
throw new RuntimeException("Fail get instances, status: " + status);
}
}
public List<Instance> getRetryInstances() {
try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
return sqlSession.selectList(statement(NAMESPACE, "getRetryInstances"),
new AssistQueryEntity());
} catch (Exception e) {
LOG.error("Fail get retry instances, errMsg: ", e);
throw new RuntimeException("Fail get retry instances");
}
}
public List<Instance> getAlertInstances(AssistQueryEntity queryEntity) {
try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
return sqlSession.selectList(statement(NAMESPACE, "getAlertInstances"),
queryEntity);
} catch (Exception e) {
LOG.error("Fail get alert instances, errMsg: ", e);
throw new RuntimeException("Fail get alert instances");
}
}
public List<Instance> getInstanceAfterTime(AssistQueryEntity queryEntity) {
try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
return sqlSession.selectList(statement(NAMESPACE, "getInstanceAfterTime"), queryEntity);
} catch (Exception e) {
LOG.error("Fail get instances after time, errMsg: ", e);
throw new RuntimeException("Fail get alert instances");
}
}
private String statement(String namespace, String sqlID) {
return namespace + "." + sqlID;
}
}

View File

@@ -1,82 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.store.jdbc;
import org.apache.hudi.table.management.exception.HoodieTableManagementException;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;
public class SqlSessionFactoryUtil {
private static final String CONFIG_PATH = "mybatis-config.xml";
private static SqlSessionFactory sqlSessionFactory;
private static final Class<?> CLASS_LOCK = SqlSessionFactoryUtil.class;
private SqlSessionFactoryUtil() {
}
public static void initSqlSessionFactory() {
try (InputStream inputStream = Resources.getResourceAsStream(CONFIG_PATH)) {
synchronized (CLASS_LOCK) {
if (sqlSessionFactory == null) {
sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static SqlSession openSqlSession() {
if (sqlSessionFactory == null) {
initSqlSessionFactory();
init();
}
return sqlSessionFactory.openSession();
}
public static void init() {
try {
String[] ddls = org.apache.commons.io.IOUtils.readLines(
SqlSessionFactoryUtil.class.getResourceAsStream("/table-management-service.sql"))
.stream().filter(e -> !e.startsWith("--"))
.collect(Collectors.joining(""))
.split(";");
for (String ddl : ddls) {
try (PreparedStatement statement = SqlSessionFactoryUtil.openSqlSession().getConnection()
.prepareStatement(ddl)) {
statement.execute();
}
}
} catch (Exception e) {
throw new HoodieTableManagementException("Unable to read init ddl file", e);
}
}
}

View File

@@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.util;
import java.util.Calendar;
import java.util.Date;
public class DateTimeUtils {
public static Date addDay(int amount) {
Calendar c = Calendar.getInstance();
c.setTime(new Date());
c.add(Calendar.DATE, amount);
return c.getTime();
}
}

View File

@@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.management.util;
import org.apache.hudi.table.management.entity.Action;
import org.apache.hudi.table.management.entity.Engine;
import org.apache.hudi.table.management.entity.Instance;
public class InstanceUtil {
public static void checkArgument(Instance instance) {
if (instance.getExecutionEngine() == null) {
instance.setExecutionEngine(Engine.SPARK);
}
Engine.checkEngineType(instance);
Action.checkActionType(instance);
}
}

View File

@@ -1,20 +0,0 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
jdbcUrl=jdbc:h2:mem:tms;MODE=MYSQL
dataSource.user=root
dataSource.password=password

View File

@@ -1,41 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<include resource="com/bytedance/logback/agent/base.xml"/>
<include resource="com/bytedance/logback/agent/agent-appender.xml"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</layout>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
<appender-ref ref="AGENT"/>
</root>
<logger name="org.apache.spark" level="WARN"/>
</configuration>

View File

@@ -1,42 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<settings>
<setting name="lazyLoadingEnabled" value="false" />
<setting name="callSettersOnNulls" value="true"/>
<setting name="logImpl" value="STDOUT_LOGGING" />
</settings>
<typeAliases>
</typeAliases>
<environments default="development">
<environment id="development">
<transactionManager type="JDBC"/>
<dataSource type="org.apache.hudi.table.management.store.jdbc.HikariDataSourceFactory"/>
</environment>
</environments>
<mappers>
<mapper resource="mybatis/Instance.xml"/>
</mappers>
</configuration>

View File

@@ -1,165 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="Instance">
<resultMap type="org.apache.hudi.table.management.entity.Instance" id="InstanceMapping">
<result column="id" property="id" javaType="java.lang.Long"/>
<result column="db_name" property="dbName"/>
<result column="table_name" property="tableName"/>
<result column="base_path" property="basePath"/>
<result column="execution_engine" property="executionEngine"/>
<result column="owner" property="owner"/>
<result column="cluster" property="cluster"/>
<result column="queue" property="queue"/>
<result column="resource" property="resource"/>
<result column="parallelism" property="parallelism"/>
<result column="auto_clean" property="autoClean"/>
<result column="instant" property="instant"/>
<result column="action" property="action" javaType="java.lang.Integer"/>
<result column="status" property="status" javaType="java.lang.Integer"/>
<result column="run_times" property="runTimes" javaType="java.lang.Integer"/>
<result column="application_id" property="applicationId"/>
<result column="dorado_job_id" property="doradoJobId"/>
<result column="schedule_time" property="scheduleTime" javaType="java.util.Date"/>
<result column="create_time" property="createTime" javaType="java.util.Date"/>
<result column="update_time" property="updateTime" javaType="java.util.Date"/>
</resultMap>
<sql id="selectColumns">
id, db_name, table_name, base_path, execution_engine, owner, cluster, queue, resource, parallelism, auto_clean,
instant, action, status, run_times, application_id, dorado_job_id, schedule_time, create_time, update_time
</sql>
<insert id="saveInstance"
parameterType="org.apache.hudi.table.management.entity.Instance"
useGeneratedKeys="true" keyProperty="id">
INSERT INTO instance (db_name, table_name, base_path, execution_engine, owner, cluster,
queue, resource, parallelism, auto_clean, instant, action, status, run_times)
VALUES (#{dbName}, #{tableName}, #{basePath}, #{executionEngine}, #{owner}, #{cluster},
#{queue},#{resource}, #{parallelism}, #{autoClean}, #{instant}, #{action}, #{status}, 0)
</insert>
<update id="runningInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
UPDATE instance
SET status = #{status},
schedule_time = now(),
run_times = run_times + 1
WHERE db_name = #{dbName}
and table_name = #{tableName}
and instant = #{instant}
and status = 0
</update>
<update id="retryInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
UPDATE instance
SET status = #{status}
WHERE db_name = #{dbName}
and table_name = #{tableName}
and instant = #{instant}
</update>
<update id="updateExecutionInfo"
parameterType="org.apache.hudi.table.management.entity.Instance">
UPDATE instance
SET dorado_job_id = #{doradoJobId},
application_id = #{applicationId}
WHERE db_name = #{dbName}
and table_name = #{tableName}
and instant = #{instant}
</update>
<update id="successInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
UPDATE instance
SET status = #{status}
WHERE db_name = #{dbName}
and table_name = #{tableName}
and instant = #{instant}
and status = 1
</update>
<update id="failInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
UPDATE instance
SET status = #{status}
WHERE db_name = #{dbName}
and table_name = #{tableName}
and instant = #{instant}
and status = 1
</update>
<update id="invalidInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
UPDATE instance
SET status = #{status}
WHERE db_name = #{dbName}
and table_name = #{tableName}
and instant = #{instant}
</update>
<select id="getInstance" parameterType="org.apache.hudi.table.management.entity.Instance"
resultMap="InstanceMapping">
SELECT <include refid="selectColumns"/>
FROM instance
WHERE db_name = #{dbName}
and table_name = #{tableName}
and instant = #{instant}
</select>
<select id="getInstances" parameterType="java.lang.Integer"
resultMap="InstanceMapping">
SELECT
<include refid="selectColumns"/>
FROM instance
WHERE status = #{status}
order by id
</select>
<select id="getRetryInstances" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
resultMap="InstanceMapping">
SELECT
<include refid="selectColumns"/>
FROM instance
WHERE status = 2
and run_times <![CDATA[ <= ]]> #{maxRetry}
and update_time > #{queryStartTime}
order by id
</select>
<select id="getAlertInstances" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
resultMap="InstanceMapping">
SELECT
<include refid="selectColumns"/>
FROM instance
WHERE status = #{status}
and run_times > #{maxRetry}
and update_time > #{queryStartTime}
order by id
</select>
<select id="getInstanceAfterTime" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
resultMap="InstanceMapping">
SELECT
<include refid="selectColumns"/>
FROM instance
WHERE status = #{status}
and update_time > #{queryStartTime}
order by id
</select>
</mapper>

View File

@@ -1,46 +0,0 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
CREATE TABLE if not exists `instance`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',
`db_name` varchar(128) NOT NULL COMMENT 'db name',
`table_name` varchar(128) NOT NULL COMMENT 'table name',
`base_path` varchar(128) NOT NULL COMMENT 'base path',
`execution_engine` varchar(128) NOT NULL COMMENT 'execution engine',
`owner` varchar(128) NOT NULL COMMENT 'owner',
`cluster` varchar(128) NOT NULL COMMENT 'cluster',
`queue` varchar(128) NOT NULL COMMENT 'queue',
`resource` varchar(128) NOT NULL COMMENT 'resource',
`parallelism` varchar(128) NOT NULL COMMENT 'parallelism',
`auto_clean` int NOT NULL DEFAULT '0' COMMENT 'auto_clean',
`instant` varchar(128) NOT NULL COMMENT 'instant',
`action` int NOT NULL COMMENT 'action',
`status` int NOT NULL COMMENT 'status',
`run_times` int NOT NULL DEFAULT '0' COMMENT 'run times',
`application_id` varchar(128) DEFAULT NULL COMMENT 'application id',
`dorado_job_id` varchar(128) DEFAULT NULL COMMENT 'job id',
`schedule_time` timestamp NULL DEFAULT NULL COMMENT 'schedule time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_table_instant` (`db_name`,`table_name`,`instant`),
KEY `idx_status` (`status`),
KEY `idx_update_time_status` (`update_time`,`status`)
) COMMENT='Table Management Service instance';

View File

@@ -1,29 +0,0 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache.hudi=DEBUG
# CONSOLE is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
# CONSOLE uses PatternLayout.
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL

View File

@@ -1,30 +0,0 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache=INFO
log4j.logger.org.apache.hudi=DEBUG
# A1 is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL

View File

@@ -43,7 +43,6 @@
<module>hudi-hadoop-mr</module>
<module>hudi-spark-datasource</module>
<module>hudi-timeline-service</module>
<module>hudi-table-management-service</module>
<module>hudi-utilities</module>
<module>hudi-sync</module>
<module>packaging/hudi-hadoop-mr-bundle</module>