From c7e430bb46ab721e4716ac89e1067d75a79b3179 Mon Sep 17 00:00:00 2001 From: Zhaojing Yu Date: Tue, 21 Jun 2022 16:58:50 +0800 Subject: [PATCH] Revert master (#5925) * Revert "udate" This reverts commit 092e35c1e300f1eb1a7474136826fed26bc10ccd. * Revert "[HUDI-3475] Initialize hudi table management module." This reverts commit 4640a3bbb8e212030f94848a0112784d98772de8. --- .../cluster/ClusteringPlanActionExecutor.java | 20 -- .../hudi/client/SparkRDDWriteClient.java | 4 - hudi-table-management-service/pom.xml | 333 ------------------ .../hudi/table/management/RequestHandler.java | 194 ---------- .../management/TableManagementServer.java | 153 -------- .../table/management/common/EnvConstant.java | 29 -- .../management/common/ServiceConfig.java | 117 ------ .../management/common/ServiceContext.java | 78 ---- .../common/TableManagementServiceConfig.java | 29 -- .../hudi/table/management/entity/Action.java | 52 --- .../management/entity/AssistQueryEntity.java | 47 --- .../hudi/table/management/entity/Engine.java | 44 --- .../table/management/entity/Instance.java | 92 ----- .../management/entity/InstanceStatus.java | 61 ---- .../HoodieTableManagementException.java | 32 -- .../executor/BaseActionExecutor.java | 102 ------ .../executor/CompactionExecutor.java | 59 ---- .../executor/submitter/ExecutionEngine.java | 82 ----- .../executor/submitter/SparkEngine.java | 220 ------------ .../management/handlers/ActionHandler.java | 71 ---- .../handlers/ClusteringHandler.java | 51 --- .../handlers/CompactionHandler.java | 66 ---- .../table/management/service/BaseService.java | 29 -- .../management/service/CleanService.java | 78 ---- .../management/service/ExecutorService.java | 102 ------ .../management/service/MonitorService.java | 65 ---- .../management/service/RetryService.java | 81 ----- .../management/service/ScheduleService.java | 116 ------ .../table/management/store/MetadataStore.java | 41 --- .../store/impl/RelationDBBasedStore.java | 70 ---- .../store/jdbc/HikariDataSourceFactory.java | 38 -- .../management/store/jdbc/InstanceDao.java | 156 -------- .../store/jdbc/SqlSessionFactoryUtil.java | 82 ----- .../table/management/util/DateTimeUtils.java | 32 -- .../table/management/util/InstanceUtil.java | 34 -- .../src/main/resources/hikariPool.properties | 20 -- .../src/main/resources/logback.xml | 41 --- .../src/main/resources/mybatis-config.xml | 42 --- .../src/main/resources/mybatis/Instance.xml | 165 --------- .../resources/table-management-service.sql | 46 --- .../resources/log4j-surefire-quiet.properties | 29 -- .../test/resources/log4j-surefire.properties | 30 -- pom.xml | 1 - 43 files changed, 3234 deletions(-) delete mode 100644 hudi-table-management-service/pom.xml delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java delete mode 100644 hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java delete mode 100644 hudi-table-management-service/src/main/resources/hikariPool.properties delete mode 100644 hudi-table-management-service/src/main/resources/logback.xml delete mode 100644 hudi-table-management-service/src/main/resources/mybatis-config.xml delete mode 100644 hudi-table-management-service/src/main/resources/mybatis/Instance.xml delete mode 100644 hudi-table-management-service/src/main/resources/table-management-service.sql delete mode 100644 hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties delete mode 100644 hudi-table-management-service/src/test/resources/log4j-surefire.properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java index 94ba014c4..15ead5efb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java @@ -21,10 +21,8 @@ package org.apache.hudi.table.action.cluster; import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; @@ -41,9 +39,7 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class ClusteringPlanActionExecutor extends BaseActionExecutor> { @@ -106,22 +102,6 @@ public class ClusteringPlanActionExecutor instantsToSubmit = metaClient.getActiveTimeline() - .filterPendingReplaceTimeline() - .getInstants() - .map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(metaClient, config.getTableManagerConfig()); - tableManagerClient.submitClustering(instantsToSubmit); - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index df38ba4a1..bdf478a8f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -333,10 +333,6 @@ public class SparkRDDWriteClient extends protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); - // do not compact a complete instant. - if (table.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) { - return null; - } HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { diff --git a/hudi-table-management-service/pom.xml b/hudi-table-management-service/pom.xml deleted file mode 100644 index d5251abc4..000000000 --- a/hudi-table-management-service/pom.xml +++ /dev/null @@ -1,333 +0,0 @@ - - - - - hudi - org.apache.hudi - 0.12.0-SNAPSHOT - - 4.0.0 - - hudi-table-management-service - - - 8 - 8 - 3.4.6 - - - - - - org.apache.hudi - hudi-common - ${project.version} - - - org.apache.hudi - hudi-cli - ${project.version} - - - org.apache.hudi - hudi-client-common - ${project.version} - - - - - org.apache.spark - spark-core_${scala.binary.version} - - - org.apache.spark - spark-sql_${scala.binary.version} - - - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - - - org.apache.httpcomponents - fluent-hc - - - - io.javalin - javalin - 2.8.0 - - - - com.beust - jcommander - - - - org.rocksdb - rocksdbjni - - - - - org.apache.hadoop - hadoop-hdfs - - - - tools - com.sun - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - - - org.apache.hadoop - hadoop-common - - - org.mortbay.jetty - * - - - javax.servlet.jsp - * - - - javax.servlet - * - - - tools - com.sun - - - - - org.apache.hadoop - hadoop-client - - - tools - com.sun - - - javax.servlet - * - - - - - org.apache.hadoop - hadoop-auth - - - slf4j-log4j12 - org.slf4j - - - - - - org.apache.hudi - hudi-java-client - ${project.version} - - - - org.mybatis - mybatis - ${mybatis.version} - - - - org.projectlombok - lombok - 1.18.24 - - - - org.apache.avro - avro - ${avro.version} - - - - org.slf4j - slf4j-api - 1.7.25 - - - - com.zaxxer - HikariCP - 4.0.3 - - - - mysql - mysql-connector-java - 8.0.23 - - - - com.google.code.gson - gson - 2.8.2 - - - - - com.h2database - h2 - 1.4.200 - - - org.junit.jupiter - junit-jupiter-api - test - - - org.apache.hudi - hudi-common - ${project.version} - tests - test-jar - test - - - org.apache.hudi - hudi-client-common - ${project.version} - tests - test-jar - test - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - - - org.jacoco - jacoco-maven-plugin - - - org.apache.maven.plugins - maven-jar-plugin - ${maven-jar-plugin.version} - - - - test-jar - - test-compile - - - - false - - - - org.apache.rat - apache-rat-plugin - - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.4 - - - package - - shade - - - - - org.apache.hudi.compaction.service.TableManagerServer - - - true - jar-with-dependencies - - - - org.slf4j:slf4j-log4j12 - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - META-INF/services/javax.* - - - - - - - - - - - - src/main/resources - - - src/test/resources - - - - - \ No newline at end of file diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java deleted file mode 100644 index 32d2ebbe7..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management; - -import org.apache.hudi.table.management.entity.Action; -import org.apache.hudi.table.management.entity.Engine; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.entity.InstanceStatus; -import org.apache.hudi.table.management.handlers.ActionHandler; -import org.apache.hudi.table.management.store.MetadataStore; -import org.apache.hudi.table.management.util.InstanceUtil; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.javalin.Context; -import io.javalin.Handler; -import io.javalin.Javalin; -import org.apache.hadoop.conf.Configuration; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Main REST Handler class that handles and delegates calls to timeline relevant handlers. - */ -public class RequestHandler { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class); - - private final Javalin app; - private final ActionHandler actionHandler; - - public RequestHandler(Javalin app, - Configuration conf, - MetadataStore metadataStore) throws IOException { - this.app = app; - this.actionHandler = new ActionHandler(conf, metadataStore); - } - - public void register() { - registerCommonAPI(); - registerCompactionAPI(); - registerClusteringAPI(); - } - - private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException { - boolean prettyPrint = ctx.queryParam("pretty") != null; - long beginJsonTs = System.currentTimeMillis(); - String result = - prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj); - long endJsonTs = System.currentTimeMillis(); - LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs)); - ctx.result(result); - } - - /** - * Register Compaction API calls. - */ - private void registerCommonAPI() { - app.get(HoodieTableManagerClient.REGISTER, new ViewHandler(ctx -> { - })); - } - - /** - * Register Compaction API calls. - */ - private void registerCompactionAPI() { - app.get(HoodieTableManagerClient.SUBMIT_COMPACTION, new ViewHandler(ctx -> { - for (String instant : ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow().split(",")) { - Instance instance = Instance.builder() - .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow()) - .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow()) - .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow()) - .action(Action.COMPACTION.getValue()) - .instant(instant) - .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow())) - .owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow()) - .queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow()) - .resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow()) - .parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow()) - .status(InstanceStatus.SCHEDULED.getStatus()) - .build(); - InstanceUtil.checkArgument(instance); - actionHandler.scheduleCompaction(instance); - } - })); - - app.get(HoodieTableManagerClient.REMOVE_COMPACTION, new ViewHandler(ctx -> { - Instance instance = Instance.builder() - .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow()) - .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow()) - .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow()) - .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow()) - .status(InstanceStatus.INVALID.getStatus()) - .isDeleted(true) - .build(); - actionHandler.removeCompaction(instance); - })); - } - - /** - * Register Compaction API calls. - */ - private void registerClusteringAPI() { - app.get(HoodieTableManagerClient.SUBMIT_CLUSTERING, new ViewHandler(ctx -> { - Instance instance = Instance.builder() - .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow()) - .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow()) - .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow()) - .action(Action.CLUSTERING.getValue()) - .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow()) - .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow())) - .owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow()) - .queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow()) - .resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow()) - .parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow()) - .status(InstanceStatus.SCHEDULED.getStatus()) - .build(); - InstanceUtil.checkArgument(instance); - actionHandler.scheduleClustering(instance); - })); - - app.get(HoodieTableManagerClient.REMOVE_CLUSTERING, new ViewHandler(ctx -> { - Instance instance = Instance.builder() - .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow()) - .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow()) - .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow()) - .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow()) - .status(InstanceStatus.INVALID.getStatus()) - .isDeleted(true) - .build(); - actionHandler.removeClustering(instance); - })); - } - - /** - * Used for logging and performing refresh check. - */ - private class ViewHandler implements Handler { - - private final Handler handler; - - ViewHandler(Handler handler) { - this.handler = handler; - } - - @Override - public void handle(@NotNull Context context) throws Exception { - boolean success = true; - long beginTs = System.currentTimeMillis(); - boolean synced = false; - long refreshCheckTimeTaken = 0; - long handleTimeTaken = 0; - long finalCheckTimeTaken = 0; - try { - long handleBeginMs = System.currentTimeMillis(); - handler.handle(context); - long handleEndMs = System.currentTimeMillis(); - handleTimeTaken = handleEndMs - handleBeginMs; - } catch (RuntimeException re) { - success = false; - LOG.error("Got runtime exception servicing request " + context.queryString(), re); - throw re; - } finally { - long endTs = System.currentTimeMillis(); - long timeTakenMillis = endTs - beginTs; - LOG.info(String.format( - "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " - + "Success=%s, Query=%s, Host=%s, synced=%s", - timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, - context.queryString(), context.host(), synced)); - } - } - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java deleted file mode 100644 index 3fbf6b2db..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management; - -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.table.management.common.ServiceConfig; -import org.apache.hudi.table.management.common.TableManagementServiceConfig; -import org.apache.hudi.table.management.service.BaseService; -import org.apache.hudi.table.management.service.CleanService; -import org.apache.hudi.table.management.service.ExecutorService; -import org.apache.hudi.table.management.service.MonitorService; -import org.apache.hudi.table.management.service.RetryService; -import org.apache.hudi.table.management.service.ScheduleService; -import org.apache.hudi.table.management.store.MetadataStore; - -import com.beust.jcommander.JCommander; -import io.javalin.Javalin; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * A standalone table management service. - */ -public class TableManagementServer { - - private static final Logger LOG = LoggerFactory.getLogger(TableManagementServer.class); - - private int serverPort; - private final Configuration conf; - private final TableManagementServiceConfig config; - private transient Javalin app = null; - private List services; - private MetadataStore metadataStore; - - public TableManagementServer(int serverPort, Configuration conf, TableManagementServiceConfig config) - throws IOException { - this.config = config; - this.conf = FSUtils.prepareHadoopConf(conf); - this.fs = FileSystem.get(conf); - this.serverPort = serverPort; - this.metadataStore = initMetadataStore(); - } - - public TableManagementServer(TableManagementServiceConfig config) throws IOException { - this(config.serverPort, new Configuration(), config); - } - - public int startService() throws IOException { - app = Javalin.create(); - RequestHandler requestHandler = new RequestHandler(app, conf, metadataStore); - app.get("/", ctx -> ctx.result("Hello World")); - requestHandler.register(); - app.start(serverPort); - registerService(); - initAndStartRegisterService(); - return serverPort; - } - - private MetadataStore initMetadataStore() { - String className = ServiceConfig.getInstance() - .getString(ServiceConfig.ServiceConfVars.MetadataStoreClass); - MetadataStore metadataStore = ReflectionUtils.loadClass(className); - metadataStore.init(); - LOG.info("Finish init metastore: " + className); - return metadataStore; - } - - private void registerService() { - services = new ArrayList<>(); - ExecutorService executorService = new ExecutorService(); - services.add(executorService); - services.add(new ScheduleService(executorService, metadataStore)); - services.add(new RetryService(metadataStore)); - services.add(new MonitorService()); - services.add(new CleanService()); - } - - private void initAndStartRegisterService() { - for (BaseService service : services) { - service.init(); - service.startService(); - } - } - - private void stopRegisterService() { - for (BaseService service : services) { - service.stop(); - } - } - - public void run() throws IOException { - startService(); - Runtime.getRuntime() - .addShutdownHook( - new Thread( - () -> { - // Use stderr here since the logger may have been reset by its JVM shutdown hook. - System.err.println( - "*** shutting down Table management service since JVM is shutting down"); - try { - TableManagementServer.this.stop(); - } catch (InterruptedException e) { - e.printStackTrace(System.err); - } - System.err.println("*** Table management service shut down"); - })); - } - - /** - * Stop serving requests and shutdown resources. - */ - public void stop() throws InterruptedException { - LOG.info("Stopping Table management Service"); - this.app.stop(); - this.app = null; - stopRegisterService(); - LOG.info("Stopped Table management Service"); - } - - public static void main(String[] args) throws Exception { - final TableManagementServiceConfig cfg = new TableManagementServiceConfig(); - JCommander cmd = new JCommander(cfg, null, args); - if (cfg.help) { - cmd.usage(); - System.exit(1); - } - TableManagementServer service = new TableManagementServer(cfg); - service.run(); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java deleted file mode 100644 index c33e88b10..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.common; - -public class EnvConstant { - public static final String JAVA_HOME = "JAVA_HOME"; - public static final String YARN_CONF_DIR = "YARN_CONF_DIR"; - - public static final String HADOOP_USER_NAME = "HADOOP_USER_NAME"; - public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR"; - - public static final String SPARK_HOME = "SPARK_HOME"; -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java deleted file mode 100644 index d60cf05be..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.common; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Properties; - -public class ServiceConfig extends Properties { - - private static Logger LOG = LoggerFactory.getLogger(ServiceConfig.class); - private static final String HOODIE_ENV_PROPS_PREFIX = "hoodie_"; - - private static ServiceConfig CONFIG = new ServiceConfig(); - - /** - * Constructor. - */ - private ServiceConfig() { - LOG.info("Start init ServiceConfig"); - Map envs = System.getenv(); - for (Map.Entry env : envs.entrySet()) { - if (env.getKey().toLowerCase().startsWith(HOODIE_ENV_PROPS_PREFIX)) { - String key = env.getKey().toLowerCase().replace("_", ".table.management."); - String value = env.getValue().trim(); - setProperty(key, value); - LOG.info("Set property " + key + " to " + value); - } - } - LOG.info("Finish init ServiceConfig"); - } - - public String getString(ServiceConfVars confVars) { - return this.getProperty(confVars.key(), confVars.defVal()); - } - - public void setString(ServiceConfVars confVars, String value) { - this.setProperty(confVars.key(), value); - } - - public Boolean getBool(ServiceConfVars confVars) { - return Boolean.valueOf(this.getProperty(confVars.key(), confVars.defVal())); - } - - public int getInt(ServiceConfVars confVars) { - return Integer.parseInt(this.getProperty(confVars.key(), confVars.defVal())); - } - - public static ServiceConfig getInstance() { - return CONFIG; - } - - public enum ServiceConfVars { - JavaHome("hoodie.table.management.java.home", ""), - SparkHome("hoodie.table.management.spark.home", ""), - YarnConfDir("hoodie.table.management.yarn.conf.dir", ""), - HadoopConfDir("hoodie.table.management.hadoop.conf.dir", ""), - CompactionMainClass("hoodie.table.management.compaction.main.class", "org.apache.hudi.utilities.HoodieCompactor"), - CompactionScheduleWaitInterval("hoodie.table.management.schedule.wait.interval", "30000"), - IntraMaxFailTolerance("hoodie.table.management.max.fail.tolerance", "5"), - MaxRetryNum("hoodie.table.management.instance.max.retry", "3"), - MetadataStoreClass("hoodie.table.management.metadata.store.class", - "org.apache.hudi.table.management.store.impl.RelationDBBasedStore"), - CompactionCacheEnable("hoodie.table.management.compaction.cache.enable", "true"), - RetryTimes("hoodie.table.management.retry.times", "5"), - SparkSubmitJarPath("hoodie.table.management.submit.jar.path", "/tmp/hoodie_submit_jar/spark/"), - SparkShuffleHdfsEnabled("hoodie.table.management.spark.shuffle.hdfs.enabled", "true"), - SparkParallelism("hoodie.table.management.spark.parallelism", "1"), - SparkMaster("hoodie.table.management.spark.parallelism", "local[1]"), - SparkVcoreBoost("hoodie.table.management.spark.vcore.boost", "1"), - SparkVcoreBoostRatio("hoodie.table.management.spark.vcore.boost.ratio", "1"), - SparkSpeculation("hoodie.table.management.spark.speculation", "false"), - ExecutorMemory("hoodie.table.management.executor.memory", "20g"), - DriverMemory("hoodie.table.management.driver.memory", "20g"), - ExecutorMemoryOverhead("hoodie.table.management.executor.memory.overhead", "5g"), - ExecutorCores("hoodie.table.management.executor.cores", "1"), - MinExecutors("hoodie.table.management.min.executors", "5"), - MaxExecutors("hoodie.table.management.max.executors", "1000"), - CoreExecuteSize("hoodie.table.management.core.executor.pool.size", "300"), - MaxExecuteSize("hoodie.table.management.max.executor.pool.size", "1000"); - - private final String key; - private final String defaultVal; - - ServiceConfVars(String key, String defaultVal) { - this.key = key; - this.defaultVal = defaultVal; - } - - public String key() { - return this.key; - } - - public String defVal() { - return this.defaultVal; - } - } - -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java deleted file mode 100644 index cd5240da3..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.common; - -import org.apache.hudi.table.management.store.jdbc.InstanceDao; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class ServiceContext { - - private static ConcurrentHashMap runningInstance = new ConcurrentHashMap<>(); - - public static void addRunningInstance(String instanceIdentifier, String threadIdentifier) { - runningInstance.put(instanceIdentifier, threadIdentifier); - } - - public static void removeRunningInstance(String instanceIdentifier) { - runningInstance.remove(instanceIdentifier); - } - - public static int getRunningInstanceNum() { - return runningInstance.size(); - } - - public static List getRunningInstanceInfo() { - List runningInfos = new ArrayList<>(); - for (Map.Entry instance : runningInstance.entrySet()) { - runningInfos.add("instance " + instance.getKey() + " execution on " + instance.getValue()); - } - return runningInfos; - } - - private static ConcurrentHashMap pendingInstances = new ConcurrentHashMap<>(); - - public static boolean containsPendingInstant(String key) { - return pendingInstances.containsKey(key); - } - - public static void refreshPendingInstant(String key) { - pendingInstances.put(key, System.currentTimeMillis()); - } - - public static void removePendingInstant(String key) { - pendingInstances.remove(key); - } - - public static ConcurrentHashMap getPendingInstances() { - return pendingInstances; - } - - public static InstanceDao getInstanceDao() { - return ServiceContextHolder.INSTANCE_DAO; - } - - private static class ServiceContextHolder { - private static final InstanceDao INSTANCE_DAO = new InstanceDao(); - } - -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java deleted file mode 100644 index c756631f0..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.common; - -import com.beust.jcommander.Parameter; - -public class TableManagementServiceConfig { - @Parameter(names = {"--server-port", "-p"}, description = " Server Port") - public Integer serverPort = 26755; - - @Parameter(names = {"--help", "-h"}) - public Boolean help = false; -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java deleted file mode 100644 index e84f75666..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.entity; - -public enum Action { - COMPACTION(0), - CLUSTERING(1); - - private final int value; - - Action(int value) { - this.value = value; - } - - public int getValue() { - return this.value; - } - - public static void checkActionType(Instance instance) { - for (Action action : Action.values()) { - if (action.getValue() == instance.getAction()) { - return; - } - } - throw new RuntimeException("Invalid action type: " + instance); - } - - public static Action getAction(int actionValue) { - for (Action action : Action.values()) { - if (action.getValue() == actionValue) { - return action; - } - } - throw new RuntimeException("Invalid instance action: " + actionValue); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java deleted file mode 100644 index d7e8b9fe9..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.entity; - -import org.apache.hudi.table.management.common.ServiceConfig; -import org.apache.hudi.table.management.util.DateTimeUtils; - -import lombok.Getter; - -import java.util.Date; - -@Getter -public class AssistQueryEntity { - - private int maxRetry = ServiceConfig.getInstance() - .getInt(ServiceConfig.ServiceConfVars.MaxRetryNum); - - private Date queryStartTime = DateTimeUtils.addDay(-3); - - private int status; - - public AssistQueryEntity() { - - } - - public AssistQueryEntity(int status, Date queryStartTime) { - this.status = status; - this.queryStartTime = queryStartTime; - } - -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java deleted file mode 100644 index edcb45c0a..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.entity; - -public enum Engine { - - SPARK(0), - FLINK(1); - - private final int value; - - Engine(int value) { - this.value = value; - } - - public int getValue() { - return this.value; - } - - public static void checkEngineType(Instance instance) { - for (Engine engine : Engine.values()) { - if (engine.equals(instance.getExecutionEngine())) { - return; - } - } - throw new RuntimeException("Invalid engine type: " + instance); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java deleted file mode 100644 index 91cfeb6ce..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.entity; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; -import lombok.ToString; - -import java.util.Date; - -@Builder -@Getter -@Setter -@ToString -@NoArgsConstructor -@AllArgsConstructor -public class Instance { - - private long id; - - private String dbName; - - private String tableName; - - private String basePath; - - private Engine executionEngine; - - private String owner; - - private String queue; - - private String resource; - - private String parallelism; - - private String instant; - - private int action; - - private int status; - - private int runTimes; - - private String applicationId; - - private String doradoJobId; - - private Date scheduleTime; - - private Date createTime; - - private Date updateTime; - - private boolean isDeleted; - - public String getFullTableName() { - return dbName + "." + tableName; - } - - public String getIdentifier() { - return dbName + "." + tableName + "." + instant + "." + status; - } - - public String getInstanceRunStatus() { - return dbName + "." + tableName + "." + instant + "." + status + "." - + runTimes + "." + updateTime; - } - - public String getRecordKey() { - return dbName + "." + tableName + "." + instant; - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java deleted file mode 100644 index 4577cd236..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.entity; - -public enum InstanceStatus { - - SCHEDULED(0, "scheduled"), - RUNNING(1, "running"), - FAILED(2, "failed"), - INVALID(3, "invalid"), - COMPLETED(4, "completed"); - - private int status; - private String desc; - - InstanceStatus(int status, String desc) { - this.status = status; - this.desc = desc; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public String getDesc() { - return desc; - } - - public void setDesc(String desc) { - this.desc = desc; - } - - public static InstanceStatus getInstance(int status) { - for (InstanceStatus instanceStatus : InstanceStatus.values()) { - if (instanceStatus.getStatus() == status) { - return instanceStatus; - } - } - throw new RuntimeException("Invalid instance status: " + status); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java deleted file mode 100644 index 84db00785..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.exception; - -import org.apache.hudi.exception.HoodieException; - -public class HoodieTableManagementException extends HoodieException { - - public HoodieTableManagementException(String msg) { - super(msg); - } - - public HoodieTableManagementException(String msg, Throwable e) { - super(msg, e); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java deleted file mode 100644 index e3ac83749..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.executor; - -import org.apache.hudi.table.management.common.ServiceConfig; -import org.apache.hudi.table.management.common.ServiceContext; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.entity.InstanceStatus; -import org.apache.hudi.table.management.executor.submitter.ExecutionEngine; -import org.apache.hudi.table.management.executor.submitter.SparkEngine; -import org.apache.hudi.table.management.store.jdbc.InstanceDao; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class BaseActionExecutor implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(BaseActionExecutor.class); - - protected InstanceDao instanceDao; - protected Instance instance; - public int maxFailTolerance; - protected ExecutionEngine engine; - - public BaseActionExecutor(Instance instance) { - this.instance = instance; - this.instanceDao = ServiceContext.getInstanceDao(); - this.maxFailTolerance = ServiceConfig.getInstance() - .getInt(ServiceConfig.ServiceConfVars.IntraMaxFailTolerance); - String mainClass = ServiceConfig.getInstance() - .getString(ServiceConfig.ServiceConfVars.CompactionMainClass); - switch (instance.getExecutionEngine()) { - case SPARK: - engine = new SparkEngine(getJobName(instance), instance, mainClass); - break; - case FLINK: - default: - throw new IllegalStateException("Unexpected value: " + instance.getExecutionEngine()); - } - } - - @Override - public void run() { - ServiceContext.addRunningInstance(instance.getRecordKey(), getThreadIdentifier()); - try { - execute(); - } finally { - ServiceContext.removeRunningInstance(instance.getRecordKey()); - if (ServiceConfig.getInstance() - .getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable)) { - ServiceContext.removePendingInstant(instance.getRecordKey()); - } - } - } - - public abstract boolean doExecute(); - - public abstract String getJobName(Instance instance); - - public void execute() { - try { - boolean success = doExecute(); - if (success) { - instance.setStatus(InstanceStatus.COMPLETED.getStatus()); - LOG.info("Success exec instance: " + instance.getIdentifier()); - } else { - instance.setStatus(InstanceStatus.FAILED.getStatus()); - LOG.info("Fail exec instance: " + instance.getIdentifier()); - } - } catch (Exception e) { - instance.setStatus(InstanceStatus.FAILED.getStatus()); - LOG.error("Fail exec instance: " + instance.getIdentifier() + ", errMsg: ", e); - } - instanceDao.updateStatus(instance); - } - - public String getThreadIdentifier() { - return Thread.currentThread().getId() + "." + Thread.currentThread().getName() + "." - + Thread.currentThread().getState(); - } - - @Override - public String toString() { - return this.getClass().getName() + ", instance: " + instance.getIdentifier(); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java deleted file mode 100644 index 9e2166365..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.executor; - -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.entity.InstanceStatus; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CompactionExecutor extends BaseActionExecutor { - - private static final Logger LOG = LoggerFactory.getLogger(CompactionExecutor.class); - - public static final String COMPACT_JOB_NAME = "Hoodie compact %s.%s %s"; - - public CompactionExecutor(Instance instance) { - super(instance); - } - - @Override - public boolean doExecute() { - String jobName = getJobName(instance); - LOG.info("Start exec : " + jobName); - instance.setStatus(InstanceStatus.RUNNING.getStatus()); - instanceDao.saveInstance(instance); - String applicationId = engine.execute(jobName, instance); - if (StringUtils.isNullOrEmpty(applicationId)) { - LOG.warn("Failed to run compaction for " + jobName); - return false; - } - - LOG.info("Compaction successfully completed for " + jobName); - return true; - } - - @Override - public String getJobName(Instance instance) { - return String.format(COMPACT_JOB_NAME, instance.getDbName(), instance.getTableName(), - instance.getInstant()); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java deleted file mode 100644 index c4fdb98fa..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.executor.submitter; - -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.exception.HoodieTableManagementException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -public abstract class ExecutionEngine { - - private static final Logger LOG = LoggerFactory.getLogger(ExecutionEngine.class); - - protected static final String YARN_SUBMITTED = "Submitted application"; - - public String execute(String jobName, Instance instance) throws HoodieTableManagementException { - try { - LOG.info("Submitting instance {}:{}", jobName, instance.getIdentifier()); - beforeExecuteCommand(); - return executeCommand(jobName, instance); - } catch (Exception e) { - throw new HoodieTableManagementException("Failed submit instance " + instance, e); - } - } - - protected String executeCommand(String jobName, Instance instance) { - String command = ""; - try { - command = getCommand(); - LOG.info("Execute command: {}", command); - Map env = setProcessEnv(); - LOG.info("Execute env: {}", env); - - return "-1"; - -// ExecuteHelper executeHelper = new ExecuteHelper(command, jobName, env); -// CompletableFuture executeFuture = executeHelper.getExecuteThread(); -// executeFuture.whenComplete((Void ignored, Throwable throwable) -> executeHelper.closeProcess()); -// while (!executeFuture.isDone()) { -// LOG.info("Waiting for execute job " + jobName); -// TimeUnit.SECONDS.sleep(5); -// } -// if (executeHelper.isSuccess) { -// LOG.info("Execute job {} command success", jobName); -// } else { -// LOG.info("Execute job {} command failed", jobName); -// } -// return executeHelper.applicationId; - } catch (Exception e) { - LOG.error("Execute command error with exception: ", e); - throw new HoodieTableManagementException("Execute " + command + " command error", e); - } - } - - protected abstract String getCommand() throws IOException; - - protected abstract void beforeExecuteCommand(); - - public abstract Map setProcessEnv(); -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java deleted file mode 100644 index 99babcd2f..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.executor.submitter; - -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.table.management.common.EnvConstant; -import org.apache.hudi.table.management.common.ServiceConfig; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.exception.HoodieTableManagementException; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.launcher.SparkLauncher; -import org.apache.spark.util.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.hudi.cli.utils.SparkUtil.initLauncher; - -public class SparkEngine extends ExecutionEngine { - - private static final Logger LOG = LoggerFactory.getLogger(SparkEngine.class); - - private String jobName; - private Instance instance; - private String mainClass; - - public SparkEngine(String jobName, Instance instance, String mainClass) { - this.jobName = jobName; - this.instance = instance; - this.mainClass = mainClass; - } - - @Override - protected String getCommand() throws IOException { - String format = "%s/bin/spark-submit --class %s --master yarn --deploy-mode cluster %s %s %s"; - return String.format(format, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome), - mainClass, getSparkArgs(), getSubmitJar(), getJobArgs()); - } - - @Override - protected void beforeExecuteCommand() { - - } - - @Override - public Map setProcessEnv() { - Map env = new HashMap<>(16); - env.put(EnvConstant.JAVA_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.JavaHome)); - env.put(EnvConstant.YARN_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.YarnConfDir)); - env.put(EnvConstant.SPARK_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome)); - env.put(EnvConstant.HADOOP_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.HadoopConfDir)); - env.put(EnvConstant.HADOOP_USER_NAME, instance.getOwner()); - return env; - } - - private String getJobArgs() throws IOException { - return null; - } - - private String getSubmitJar() throws IOException { - File sparkSubmitJarPath = new File(ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSubmitJarPath)); - if (!sparkSubmitJarPath.isDirectory()) { - throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath + " should be be a directory"); - } - File[] jars = sparkSubmitJarPath.listFiles(file -> !file.getName().endsWith(".jar")); - if (jars == null || jars.length != 1) { - throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath - + " should only have one jar, jars = " + Arrays.toString(jars)); - } - return jars[0].getCanonicalPath(); - } - - private String getSparkArgs() { - StringBuilder sparkArgs = new StringBuilder(); - sparkArgs.append("--queue ").append(instance.getQueue()); - sparkArgs.append(" --name ").append(jobName); - - Map sparkParams = new HashMap<>(); - sparkParams.put("mapreduce.job.queuename", instance.getQueue()); - sparkParams.put("spark.shuffle.hdfs.enabled", ServiceConfig.getInstance() - .getString(ServiceConfig.ServiceConfVars.SparkShuffleHdfsEnabled)); - String parallelism = StringUtils.isNullOrEmpty(instance.getParallelism()) - ? ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MaxExecutors) - : instance.getParallelism(); - sparkParams.put("spark.dynamicAllocation.maxExecutors", parallelism); - sparkParams.put("spark.dynamicAllocation.minExecutors", - ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MinExecutors)); - sparkParams.put("spark.vcore.boost", - ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoost)); - sparkParams.put("spark.vcore.boost.ratio", - ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoostRatio)); - sparkParams.put("spark.speculation", - ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSpeculation)); - String driverResource; - String executorResource; - String resource = instance.getResource().trim(); - if (StringUtils.isNullOrEmpty(resource)) { - driverResource = ServiceConfig.getInstance() - .getString(ServiceConfig.ServiceConfVars.DriverMemory); - executorResource = ServiceConfig.getInstance() - .getString(ServiceConfig.ServiceConfVars.ExecutorMemory); - } else { - String[] resourceArray = resource.split(":"); - if (resourceArray.length == 1) { - driverResource = resourceArray[0]; - executorResource = resourceArray[0]; - } else if (resourceArray.length == 2) { - driverResource = resourceArray[0]; - executorResource = resourceArray[1]; - } else { - throw new RuntimeException( - "Invalid conf: " + instance.getIdentifier() + ", resource: " + resource); - } - } - sparkParams.put("spark.executor.cores", - ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorCores)); - sparkParams.put("spark.executor.memory", executorResource); - sparkParams.put("spark.driver.memory", driverResource); - sparkParams.put("spark.executor.memoryOverhead", ServiceConfig.getInstance() - .getString(ServiceConfig.ServiceConfVars.ExecutorMemoryOverhead)); - - for (Map.Entry entry : sparkParams.entrySet()) { - sparkArgs - .append(" --conf ") - .append(entry.getKey()) - .append("=") - .append(entry.getValue()); - } - - return sparkArgs.toString(); - } - - @Override - public String executeCommand(String jobName, Instance instance) throws HoodieTableManagementException { - String sparkPropertiesPath = - Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); - SparkLauncher sparkLauncher; - try { - sparkLauncher = initLauncher(sparkPropertiesPath); - } catch (URISyntaxException e) { - LOG.error("Failed to init spark launcher"); - throw new HoodieTableManagementException("Failed to init spark launcher", e); - } - - String master = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkMaster); - String sparkMemory = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorMemory); - String parallelism = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkParallelism); - String retry = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.RetryTimes); - - sparkLauncher.addAppArgs("COMPACT_RUN", master, sparkMemory, instance.getBasePath(), - instance.getTableName(), instance.getInstant(), parallelism, "", retry, ""); - - Process process; - try { - process = sparkLauncher.launch(); - } catch (IOException e) { - LOG.error("Failed to launcher spark process"); - throw new HoodieTableManagementException("Failed to init spark launcher", e); - } - - InputStream inputStream = null; - BufferedReader bufferedReader = null; - String applicationId = null; - try { - inputStream = process.getInputStream(); - bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - String line; - while ((line = bufferedReader.readLine()) != null) { - LOG.info(line); - if (line.contains(YARN_SUBMITTED)) { - String[] split = line.split(YARN_SUBMITTED); - applicationId = split[1].trim(); - LOG.info("Execute job {} get application id {}", jobName, applicationId); - break; - } - } - } catch (Exception e) { - LOG.error("execute {} process get application id error", jobName, e); - throw new HoodieTableManagementException("execute " + jobName + " process get application id error", e); - } finally { - if (process != null) { - process.destroyForcibly(); - } - if (inputStream != null) { - IOUtils.closeQuietly(inputStream); - } - if (bufferedReader != null) { - IOUtils.closeQuietly(bufferedReader); - } - } - - return applicationId; - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java deleted file mode 100644 index 62e8ca45d..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.handlers; - -import org.apache.hudi.table.management.common.ServiceConfig; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.store.MetadataStore; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class ActionHandler implements AutoCloseable { - private static Logger LOG = LoggerFactory.getLogger(ActionHandler.class); - - protected final Configuration conf; - protected final FileSystem fileSystem; - protected final MetadataStore metadataStore; - - private final CompactionHandler compactionHandler; - - public ActionHandler(Configuration conf, - MetadataStore metadataStore) throws IOException { - this.conf = conf; - this.fileSystem = FileSystem.get(conf); - this.metadataStore = metadataStore; - boolean cacheEnable = ServiceConfig.getInstance().getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable); - this.compactionHandler = new CompactionHandler(cacheEnable); - } - - public void scheduleCompaction(Instance instance) { - compactionHandler.scheduleCompaction(metadataStore, instance); - } - - public void removeCompaction(Instance instance) throws IOException { - compactionHandler.removeCompaction(metadataStore, instance); - } - - // TODO: support clustering - public void scheduleClustering(Instance instance) { - - } - - public void removeClustering(Instance instance) { - - } - - @Override - public void close() throws Exception { - this.fileSystem.close(); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java deleted file mode 100644 index 1afc97390..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.handlers; - -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.store.MetadataStore; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * REST Handler servicing clustering requests. - */ -public class ClusteringHandler { - - private static Logger LOG = LoggerFactory.getLogger(ClusteringHandler.class); - - public void scheduleClustering(MetadataStore metadataStore, - Instance instance) { - LOG.info("Start register compaction instance: " + instance.getIdentifier()); - metadataStore.saveInstance(instance); - } - - public void removeClustering(MetadataStore metadataStore, - Instance instance) { - LOG.info("Start remove clustering instance: " + instance.getIdentifier()); - // 1. check instance exist - Instance result = metadataStore.getInstance(instance); - if (result == null) { - throw new RuntimeException("Instance not exist: " + instance); - } - // 2. update status - metadataStore.updateStatus(instance); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java deleted file mode 100644 index 4edf96261..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.handlers; - -import org.apache.hudi.table.management.common.ServiceContext; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.store.MetadataStore; - -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * REST Handler servicing compaction requests. - */ -public class CompactionHandler { - private static Logger LOG = LoggerFactory.getLogger(CompactionHandler.class); - protected boolean cacheEnable; - - public CompactionHandler(boolean cacheEnable) { - this.cacheEnable = cacheEnable; - } - - public void scheduleCompaction(MetadataStore metadataStore, - Instance instance) { - String recordKey = instance.getRecordKey(); - LOG.info("Start register compaction instance: " + recordKey); - if ((cacheEnable && ServiceContext.containsPendingInstant(recordKey)) - || metadataStore.getInstance(instance) != null) { - LOG.warn("Instance has existed, instance: " + instance); - } else { - metadataStore.saveInstance(instance); - } - if (cacheEnable) { - ServiceContext.refreshPendingInstant(recordKey); - } - } - - public void removeCompaction(@NotNull MetadataStore metadataStore, - Instance instance) { - LOG.info("Start remove compaction instance: " + instance.getIdentifier()); - // 1. check instance exist - Instance result = metadataStore.getInstance(instance); - if (result == null) { - throw new RuntimeException("Instance not exist: " + instance); - } - // 2. update status - metadataStore.updateStatus(instance); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java deleted file mode 100644 index 5855535e3..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.service; - -public interface BaseService { - - void init(); - - void startService(); - - void stop(); - -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java deleted file mode 100644 index b792dfee7..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.service; - -import org.apache.hudi.table.management.common.ServiceContext; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class CleanService implements BaseService { - - private static final Logger LOG = LoggerFactory.getLogger(CleanService.class); - private ScheduledExecutorService service; - private long cacheInterval = 3600 * 1000; //ms - - @Override - public void init() { - LOG.info("Init service: " + CleanService.class.getName()); - //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Clean-Service-%d").build(); - this.service = Executors.newSingleThreadScheduledExecutor(); - } - - @Override - public void startService() { - LOG.info("Start service: " + CleanService.class.getName()); - service.scheduleAtFixedRate(new RetryRunnable(), 30, 300, TimeUnit.SECONDS); - } - - @Override - public void stop() { - LOG.info("Stop service: " + CleanService.class.getName()); - if (service != null && !service.isShutdown()) { - service.shutdown(); - } - } - - private class RetryRunnable implements Runnable { - - @Override - public void run() { - cleanCache(); - } - } - - private void cleanCache() { - long currentTime = System.currentTimeMillis(); - ConcurrentHashMap pendingInstances = ServiceContext.getPendingInstances(); - for (Map.Entry instance : pendingInstances.entrySet()) { - if (currentTime - instance.getValue() > cacheInterval) { - LOG.info("Instance has expired: " + instance.getKey()); - pendingInstances.remove(instance.getKey()); - } - } - } - -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java deleted file mode 100644 index 919199a51..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.service; - -import org.apache.hudi.table.management.common.ServiceConfig; -import org.apache.hudi.table.management.common.ServiceContext; -import org.apache.hudi.table.management.executor.BaseActionExecutor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -public class ExecutorService implements BaseService { - - private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class); - - private ThreadPoolExecutor executorService; - private ScheduledExecutorService service; - private BlockingQueue taskQueue; - private int coreExecuteSize; - private int maxExecuteSize; - - public void init() { - service = Executors.newSingleThreadScheduledExecutor(); - coreExecuteSize = ServiceConfig.getInstance() - .getInt(ServiceConfig.ServiceConfVars.CoreExecuteSize); - maxExecuteSize = ServiceConfig.getInstance() - .getInt(ServiceConfig.ServiceConfVars.MaxExecuteSize); - //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Executor-Service-%d").build(); - executorService = new ThreadPoolExecutor(coreExecuteSize, maxExecuteSize, 60, - TimeUnit.SECONDS, new SynchronousQueue<>()); - taskQueue = new LinkedBlockingQueue<>(); - LOG.info("Init service: " + ExecutorService.class.getName() + ", coreExecuteSize: " - + coreExecuteSize + ", maxExecuteSize: " + maxExecuteSize); - } - - @Override - public void startService() { - LOG.info("Start service: " + ExecutorService.class.getName()); - service.submit(new ExecutionTask()); - } - - @Override - public void stop() { - LOG.info("Stop service: " + ExecutorService.class.getName()); - if (executorService != null && !executorService.isShutdown()) { - executorService.shutdown(); - } - if (service != null && service.isShutdown()) { - service.shutdown(); - } - LOG.info("Finish stop service: " + ExecutorService.class.getName()); - } - - private class ExecutionTask implements Runnable { - - @Override - public void run() { - while (true) { - try { - BaseActionExecutor executor = taskQueue.take(); - LOG.info("Start execute: " + executor); - executorService.execute(executor); - } catch (InterruptedException interruptedException) { - LOG.error("Occur exception when exec job: " + interruptedException); - } - } - } - } - - public void submitTask(BaseActionExecutor task) { - taskQueue.add(task); - } - - public int getFreeSize() { - return maxExecuteSize - ServiceContext.getRunningInstanceNum(); - } - -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java deleted file mode 100644 index 164549a38..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.service; - -import org.apache.hudi.table.management.common.ServiceContext; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class MonitorService implements BaseService { - - private static final Logger LOG = LoggerFactory.getLogger(MonitorService.class); - - private ScheduledExecutorService service; - - @Override - public void init() { - LOG.info("Init service: " + MonitorService.class); - this.service = Executors.newSingleThreadScheduledExecutor(); - } - - @Override - public void startService() { - LOG.info("Start service: " + MonitorService.class.getName()); - service.scheduleAtFixedRate(new MonitorRunnable(), 30, 180, TimeUnit.SECONDS); - } - - @Override - public void stop() { - LOG.info("Stop service: " + MonitorService.class.getName()); - if (service != null && !service.isShutdown()) { - service.shutdown(); - } - } - - private class MonitorRunnable implements Runnable { - - @Override - public void run() { - for (String info : ServiceContext.getRunningInstanceInfo()) { - LOG.info(info); - } - } - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java deleted file mode 100644 index 17bc3bb5d..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.service; - -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.entity.InstanceStatus; -import org.apache.hudi.table.management.store.MetadataStore; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class RetryService implements BaseService { - - private static final Logger LOG = LoggerFactory.getLogger(RetryService.class); - - private MetadataStore metadataStore; - private ScheduledExecutorService service; - - public RetryService(MetadataStore metadataStore) { - this.metadataStore = metadataStore; - } - - @Override - public void init() { - LOG.info("Init service: " + RetryService.class.getName()); - //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Retry-Service-%d").build(); - this.service = Executors.newSingleThreadScheduledExecutor(); - } - - @Override - public void startService() { - LOG.info("Start service: " + RetryService.class.getName()); - service.scheduleAtFixedRate(new RetryRunnable(), 30, 180, TimeUnit.SECONDS); - } - - @Override - public void stop() { - LOG.info("Stop service: " + RetryService.class.getName()); - if (service != null && !service.isShutdown()) { - service.shutdown(); - } - } - - private class RetryRunnable implements Runnable { - - @Override - public void run() { - submitFailTask(); - } - } - - public void submitFailTask() { - List failInstances = metadataStore.getRetryInstances(); - for (Instance instance : failInstances) { - LOG.info("Start retry instance: " + instance.getIdentifier()); - instance.setStatus(InstanceStatus.SCHEDULED.getStatus()); - metadataStore.updateStatus(instance); - } - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java deleted file mode 100644 index a7e1d708b..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.service; - -import org.apache.hudi.table.management.common.ServiceConfig; -import org.apache.hudi.table.management.entity.Action; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.entity.InstanceStatus; -import org.apache.hudi.table.management.exception.HoodieTableManagementException; -import org.apache.hudi.table.management.executor.BaseActionExecutor; -import org.apache.hudi.table.management.executor.CompactionExecutor; -import org.apache.hudi.table.management.store.MetadataStore; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ScheduleService implements BaseService { - - private static final Logger LOG = LoggerFactory.getLogger(ScheduleService.class); - - private ScheduledExecutorService service; - private ExecutorService executionService; - private MetadataStore metadataStore; - private int compactionWaitInterval; - - public ScheduleService(ExecutorService executionService, - MetadataStore metadataStore) { - this.executionService = executionService; - this.metadataStore = metadataStore; - this.compactionWaitInterval = ServiceConfig.getInstance() - .getInt(ServiceConfig.ServiceConfVars.CompactionScheduleWaitInterval); - } - - @Override - public void init() { - LOG.info("Finish init schedule service, compactionWaitInterval: " + compactionWaitInterval); - //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Schedule-Service-%d").build(); - this.service = Executors.newSingleThreadScheduledExecutor(); - } - - @Override - public void startService() { - LOG.info("Start service: " + ScheduleService.class.getName()); - service.scheduleAtFixedRate(new ScheduleRunnable(), 30, 60, TimeUnit.SECONDS); - } - - @Override - public void stop() { - LOG.info("Stop service: " + ScheduleService.class.getName()); - if (service != null && !service.isShutdown()) { - service.shutdown(); - } - } - - private class ScheduleRunnable implements Runnable { - - @Override - public void run() { - submitReadyTask(); - } - } - - public void submitReadyTask() { - int limitSize = executionService.getFreeSize(); - LOG.info("Start get ready instances, limitSize: " + limitSize); - if (limitSize > 0) { - List readyInstances = metadataStore.getInstances( - InstanceStatus.SCHEDULED.getStatus(), limitSize); - for (Instance readyInstance : readyInstances) { - if (waitSchedule(readyInstance)) { - LOG.info("Instance should wait schedule: " + readyInstance.getInstanceRunStatus()); - continue; - } - LOG.info("Schedule ready instances: " + readyInstance.getInstanceRunStatus()); - BaseActionExecutor executor = getActionExecutor(readyInstance); - executionService.submitTask(executor); - } - } - } - - private boolean waitSchedule(Instance instance) { - return instance.getAction() == Action.COMPACTION.getValue() - && instance.getUpdateTime().getTime() + compactionWaitInterval - > System.currentTimeMillis(); - } - - protected BaseActionExecutor getActionExecutor(Instance instance) { - if (instance.getAction() == Action.COMPACTION.getValue()) { - return new CompactionExecutor(instance); - } else { - throw new HoodieTableManagementException("Unsupported action " + instance.getAction()); - } - } - -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java deleted file mode 100644 index 8d730212c..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.store; - -import org.apache.hudi.table.management.entity.AssistQueryEntity; -import org.apache.hudi.table.management.entity.Instance; - -import java.util.List; - -public interface MetadataStore { - - void saveInstance(Instance instance); - - void updateStatus(Instance instance); - - void init(); - - Instance getInstance(Instance instance); - - List getInstances(int status, int limit); - - List getRetryInstances(); - - List getAlertInstances(AssistQueryEntity queryEntity); -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java deleted file mode 100644 index ac42fe92a..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.store.impl; - -import org.apache.hudi.table.management.entity.AssistQueryEntity; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.store.MetadataStore; -import org.apache.hudi.table.management.store.jdbc.InstanceDao; - -import java.util.List; - -public class RelationDBBasedStore implements MetadataStore { - - private final InstanceDao instanceDao; - - public RelationDBBasedStore() { - this.instanceDao = new InstanceDao(); - } - - @Override - public void saveInstance(Instance instance) { - instanceDao.saveInstance(instance); - } - - @Override - public void updateStatus(Instance instance) { - instanceDao.updateStatus(instance); - } - - @Override - public void init() { - // do nothing - } - - @Override - public Instance getInstance(Instance instance) { - return instanceDao.getInstance(instance); - } - - @Override - public List getInstances(int status, int limit) { - return instanceDao.getInstances(status, limit); - } - - @Override - public List getRetryInstances() { - return instanceDao.getRetryInstances(); - } - - @Override - public List getAlertInstances(AssistQueryEntity queryEntity) { - return instanceDao.getAlertInstances(queryEntity); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java deleted file mode 100644 index e43fc21b8..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.store.jdbc; - -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; -import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory; -import org.apache.ibatis.io.Resources; - -import java.io.IOException; -import java.util.Properties; - -public class HikariDataSourceFactory extends UnpooledDataSourceFactory { - private static final String PROPERTIES_PATH = "hikariPool.properties"; - - public HikariDataSourceFactory() throws IOException { - Properties properties = new Properties(); - properties.load(Resources.getResourceAsStream(PROPERTIES_PATH)); - HikariConfig config = new HikariConfig(properties); - this.dataSource = new HikariDataSource(config); - } -} \ No newline at end of file diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java deleted file mode 100644 index 4c106138f..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.store.jdbc; - -import org.apache.hudi.table.management.entity.AssistQueryEntity; -import org.apache.hudi.table.management.entity.Instance; -import org.apache.hudi.table.management.entity.InstanceStatus; - -import org.apache.ibatis.session.RowBounds; -import org.apache.ibatis.session.SqlSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class InstanceDao { - - private static Logger LOG = LoggerFactory.getLogger(InstanceDao.class); - - private static final String NAMESPACE = "Instance"; - - public void saveInstance(Instance instance) { - try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { - sqlSession.insert(statement(NAMESPACE, "saveInstance"), instance); - sqlSession.commit(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void updateStatus(Instance instance) { - try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { - int ret = sqlSession.update(statement(NAMESPACE, getUpdateStatusSqlId(instance)), instance); - sqlSession.commit(); - if (ret != 1) { - LOG.error("Fail update status instance: " + instance); - throw new RuntimeException("Fail update status instance: " + instance.getIdentifier()); - } - LOG.info("Success update status instance: " + instance.getIdentifier()); - } catch (Exception e) { - LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e); - throw new RuntimeException(e); - } - } - - public void updateExecutionInfo(Instance instance) { - int retryNum = 0; - try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { - while (retryNum++ < 3) { - int ret = sqlSession.update(statement(NAMESPACE, "updateExecutionInfo"), instance); - sqlSession.commit(); - if (ret != 1) { - LOG.warn("Fail update execution info instance: " + instance); - TimeUnit.SECONDS.sleep(5); - } else { - LOG.info("Success update execution info, instance: " + instance.getIdentifier()); - return; - } - } - throw new RuntimeException("Fail update execution info: " + instance.getIdentifier()); - } catch (Exception e) { - LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e); - throw new RuntimeException(e); - } - } - - public Instance getInstance(Instance instance) { - try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { - return sqlSession.selectOne(statement(NAMESPACE, "getInstance"), instance); - } catch (Exception e) { - LOG.error("Fail get Instance: " + instance.getIdentifier() + ", errMsg: ", e); - throw new RuntimeException(e); - } - } - - private String getUpdateStatusSqlId(Instance instance) { - switch (InstanceStatus.getInstance(instance.getStatus())) { - case SCHEDULED: - return "retryInstance"; - case RUNNING: - return "runningInstance"; - case COMPLETED: - return "successInstance"; - case FAILED: - return "failInstance"; - case INVALID: - return "invalidInstance"; - default: - throw new RuntimeException("Invalid instance: " + instance.getIdentifier()); - } - } - - public List getInstances(int status, int limit) { - try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { - if (limit > 0) { - return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status, - new RowBounds(0, limit)); - } else { - return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status); - } - } catch (Exception e) { - LOG.error("Fail get instances, status: " + status + ", errMsg: ", e); - throw new RuntimeException("Fail get instances, status: " + status); - } - } - - public List getRetryInstances() { - try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { - return sqlSession.selectList(statement(NAMESPACE, "getRetryInstances"), - new AssistQueryEntity()); - } catch (Exception e) { - LOG.error("Fail get retry instances, errMsg: ", e); - throw new RuntimeException("Fail get retry instances"); - } - } - - public List getAlertInstances(AssistQueryEntity queryEntity) { - try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { - return sqlSession.selectList(statement(NAMESPACE, "getAlertInstances"), - queryEntity); - } catch (Exception e) { - LOG.error("Fail get alert instances, errMsg: ", e); - throw new RuntimeException("Fail get alert instances"); - } - } - - public List getInstanceAfterTime(AssistQueryEntity queryEntity) { - try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { - return sqlSession.selectList(statement(NAMESPACE, "getInstanceAfterTime"), queryEntity); - } catch (Exception e) { - LOG.error("Fail get instances after time, errMsg: ", e); - throw new RuntimeException("Fail get alert instances"); - } - } - - private String statement(String namespace, String sqlID) { - return namespace + "." + sqlID; - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java deleted file mode 100644 index 4dea08332..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.store.jdbc; - -import org.apache.hudi.table.management.exception.HoodieTableManagementException; - -import org.apache.ibatis.io.Resources; -import org.apache.ibatis.session.SqlSession; -import org.apache.ibatis.session.SqlSessionFactory; -import org.apache.ibatis.session.SqlSessionFactoryBuilder; - -import java.io.IOException; -import java.io.InputStream; -import java.sql.PreparedStatement; -import java.util.stream.Collectors; - -public class SqlSessionFactoryUtil { - - private static final String CONFIG_PATH = "mybatis-config.xml"; - - private static SqlSessionFactory sqlSessionFactory; - private static final Class CLASS_LOCK = SqlSessionFactoryUtil.class; - - private SqlSessionFactoryUtil() { - - } - - public static void initSqlSessionFactory() { - try (InputStream inputStream = Resources.getResourceAsStream(CONFIG_PATH)) { - synchronized (CLASS_LOCK) { - if (sqlSessionFactory == null) { - sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream); - } - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - public static SqlSession openSqlSession() { - if (sqlSessionFactory == null) { - initSqlSessionFactory(); - init(); - } - return sqlSessionFactory.openSession(); - } - - public static void init() { - try { - String[] ddls = org.apache.commons.io.IOUtils.readLines( - SqlSessionFactoryUtil.class.getResourceAsStream("/table-management-service.sql")) - .stream().filter(e -> !e.startsWith("--")) - .collect(Collectors.joining("")) - .split(";"); - for (String ddl : ddls) { - try (PreparedStatement statement = SqlSessionFactoryUtil.openSqlSession().getConnection() - .prepareStatement(ddl)) { - statement.execute(); - } - } - } catch (Exception e) { - throw new HoodieTableManagementException("Unable to read init ddl file", e); - } - } - -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java deleted file mode 100644 index 763047a26..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.util; - -import java.util.Calendar; -import java.util.Date; - -public class DateTimeUtils { - - public static Date addDay(int amount) { - Calendar c = Calendar.getInstance(); - c.setTime(new Date()); - c.add(Calendar.DATE, amount); - return c.getTime(); - } -} diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java deleted file mode 100644 index 27139bf6f..000000000 --- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.table.management.util; - -import org.apache.hudi.table.management.entity.Action; -import org.apache.hudi.table.management.entity.Engine; -import org.apache.hudi.table.management.entity.Instance; - -public class InstanceUtil { - - public static void checkArgument(Instance instance) { - if (instance.getExecutionEngine() == null) { - instance.setExecutionEngine(Engine.SPARK); - } - Engine.checkEngineType(instance); - Action.checkActionType(instance); - } -} diff --git a/hudi-table-management-service/src/main/resources/hikariPool.properties b/hudi-table-management-service/src/main/resources/hikariPool.properties deleted file mode 100644 index a14104a12..000000000 --- a/hudi-table-management-service/src/main/resources/hikariPool.properties +++ /dev/null @@ -1,20 +0,0 @@ -### -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -### -jdbcUrl=jdbc:h2:mem:tms;MODE=MYSQL -dataSource.user=root -dataSource.password=password \ No newline at end of file diff --git a/hudi-table-management-service/src/main/resources/logback.xml b/hudi-table-management-service/src/main/resources/logback.xml deleted file mode 100644 index f4d55b0ab..000000000 --- a/hudi-table-management-service/src/main/resources/logback.xml +++ /dev/null @@ -1,41 +0,0 @@ - - - - - - - - - - - - %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n - - - - - - - - - - - - - - \ No newline at end of file diff --git a/hudi-table-management-service/src/main/resources/mybatis-config.xml b/hudi-table-management-service/src/main/resources/mybatis-config.xml deleted file mode 100644 index d9b6fc581..000000000 --- a/hudi-table-management-service/src/main/resources/mybatis-config.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/hudi-table-management-service/src/main/resources/mybatis/Instance.xml b/hudi-table-management-service/src/main/resources/mybatis/Instance.xml deleted file mode 100644 index c0d5d86d7..000000000 --- a/hudi-table-management-service/src/main/resources/mybatis/Instance.xml +++ /dev/null @@ -1,165 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - id, db_name, table_name, base_path, execution_engine, owner, cluster, queue, resource, parallelism, auto_clean, - instant, action, status, run_times, application_id, dorado_job_id, schedule_time, create_time, update_time - - - - INSERT INTO instance (db_name, table_name, base_path, execution_engine, owner, cluster, - queue, resource, parallelism, auto_clean, instant, action, status, run_times) - VALUES (#{dbName}, #{tableName}, #{basePath}, #{executionEngine}, #{owner}, #{cluster}, - #{queue},#{resource}, #{parallelism}, #{autoClean}, #{instant}, #{action}, #{status}, 0) - - - - UPDATE instance - SET status = #{status}, - schedule_time = now(), - run_times = run_times + 1 - WHERE db_name = #{dbName} - and table_name = #{tableName} - and instant = #{instant} - and status = 0 - - - - UPDATE instance - SET status = #{status} - WHERE db_name = #{dbName} - and table_name = #{tableName} - and instant = #{instant} - - - - UPDATE instance - SET dorado_job_id = #{doradoJobId}, - application_id = #{applicationId} - WHERE db_name = #{dbName} - and table_name = #{tableName} - and instant = #{instant} - - - - UPDATE instance - SET status = #{status} - WHERE db_name = #{dbName} - and table_name = #{tableName} - and instant = #{instant} - and status = 1 - - - - UPDATE instance - SET status = #{status} - WHERE db_name = #{dbName} - and table_name = #{tableName} - and instant = #{instant} - and status = 1 - - - - UPDATE instance - SET status = #{status} - WHERE db_name = #{dbName} - and table_name = #{tableName} - and instant = #{instant} - - - - - - - - - - - - - diff --git a/hudi-table-management-service/src/main/resources/table-management-service.sql b/hudi-table-management-service/src/main/resources/table-management-service.sql deleted file mode 100644 index 243880b2d..000000000 --- a/hudi-table-management-service/src/main/resources/table-management-service.sql +++ /dev/null @@ -1,46 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one --- or more contributor license agreements. See the NOTICE file --- distributed with this work for additional information --- regarding copyright ownership. The ASF licenses this file --- to you under the Apache License, Version 2.0 (the --- "License"); you may not use this file except in compliance --- with the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- - -CREATE TABLE if not exists `instance` -( - `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key', - `db_name` varchar(128) NOT NULL COMMENT 'db name', - `table_name` varchar(128) NOT NULL COMMENT 'table name', - `base_path` varchar(128) NOT NULL COMMENT 'base path', - `execution_engine` varchar(128) NOT NULL COMMENT 'execution engine', - `owner` varchar(128) NOT NULL COMMENT 'owner', - `cluster` varchar(128) NOT NULL COMMENT 'cluster', - `queue` varchar(128) NOT NULL COMMENT 'queue', - `resource` varchar(128) NOT NULL COMMENT 'resource', - `parallelism` varchar(128) NOT NULL COMMENT 'parallelism', - `auto_clean` int NOT NULL DEFAULT '0' COMMENT 'auto_clean', - `instant` varchar(128) NOT NULL COMMENT 'instant', - `action` int NOT NULL COMMENT 'action', - `status` int NOT NULL COMMENT 'status', - `run_times` int NOT NULL DEFAULT '0' COMMENT 'run times', - `application_id` varchar(128) DEFAULT NULL COMMENT 'application id', - `dorado_job_id` varchar(128) DEFAULT NULL COMMENT 'job id', - `schedule_time` timestamp NULL DEFAULT NULL COMMENT 'schedule time', - `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', - `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', - PRIMARY KEY (`id`), - UNIQUE KEY `uniq_table_instant` (`db_name`,`table_name`,`instant`), - KEY `idx_status` (`status`), - KEY `idx_update_time_status` (`update_time`,`status`) -) COMMENT='Table Management Service instance'; - diff --git a/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties b/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties deleted file mode 100644 index b21b5d407..000000000 --- a/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties +++ /dev/null @@ -1,29 +0,0 @@ -### -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -### -log4j.rootLogger=WARN, CONSOLE -log4j.logger.org.apache.hudi=DEBUG - -# CONSOLE is set to be a ConsoleAppender. -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -# CONSOLE uses PatternLayout. -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n -log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter -log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true -log4j.appender.CONSOLE.filter.a.LevelMin=WARN -log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-table-management-service/src/test/resources/log4j-surefire.properties b/hudi-table-management-service/src/test/resources/log4j-surefire.properties deleted file mode 100644 index c03e808cc..000000000 --- a/hudi-table-management-service/src/test/resources/log4j-surefire.properties +++ /dev/null @@ -1,30 +0,0 @@ -### -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -### -log4j.rootLogger=WARN, CONSOLE -log4j.logger.org.apache=INFO -log4j.logger.org.apache.hudi=DEBUG - -# A1 is set to be a ConsoleAppender. -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -# A1 uses PatternLayout. -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n -log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter -log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true -log4j.appender.CONSOLE.filter.a.LevelMin=WARN -log4j.appender.CONSOLE.filter.a.LevelMax=FATAL \ No newline at end of file diff --git a/pom.xml b/pom.xml index 96a86a0bb..e3c8b3e8c 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,6 @@ hudi-hadoop-mr hudi-spark-datasource hudi-timeline-service - hudi-table-management-service hudi-utilities hudi-sync packaging/hudi-hadoop-mr-bundle