From 538ec44fa8a23926b584c3bcdd24feb9894d4c51 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Tue, 15 Feb 2022 06:49:53 -0800 Subject: [PATCH] [HUDI-2931] Add config to disable table services (#4777) --- .../hudi/async/AsyncArchiveService.java | 3 +- .../hudi/async/AsyncCleanerService.java | 3 +- .../hudi/async/AsyncClusteringService.java | 4 +- .../hudi/async/AsyncCompactService.java | 4 +- .../apache/hudi/async/HoodieAsyncService.java | 6 ++ .../hudi/async/HoodieAsyncTableService.java | 50 ++++++++++++++++ .../hudi/client/BaseHoodieWriteClient.java | 18 +++++- .../apache/hudi/client/RunsTableService.java | 37 ++++++++++++ .../apache/hudi/config/HoodieWriteConfig.java | 17 +++++- .../async/TestHoodieAsyncTableService.java | 58 +++++++++++++++++++ .../hudi/common/model/TableServiceType.java | 5 +- .../sink/compact/HoodieFlinkCompactor.java | 5 +- 12 files changed, 199 insertions(+), 11 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/async/TestHoodieAsyncTableService.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java index 980239f33..3fdc21dd2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java @@ -34,7 +34,7 @@ import java.util.concurrent.Executors; /** * Async archive service to run concurrently with write operation. */ -public class AsyncArchiveService extends HoodieAsyncService { +public class AsyncArchiveService extends HoodieAsyncTableService { private static final Logger LOG = LogManager.getLogger(AsyncArchiveService.class); @@ -42,6 +42,7 @@ public class AsyncArchiveService extends HoodieAsyncService { private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); protected AsyncArchiveService(BaseHoodieWriteClient writeClient) { + super(writeClient.getConfig()); this.writeClient = writeClient; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java index e316f5dca..72907e6d3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java @@ -35,7 +35,7 @@ import java.util.concurrent.Executors; /** * Async clean service to run concurrently with write operation. */ -public class AsyncCleanerService extends HoodieAsyncService { +public class AsyncCleanerService extends HoodieAsyncTableService { private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); @@ -43,6 +43,7 @@ public class AsyncCleanerService extends HoodieAsyncService { private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); protected AsyncCleanerService(BaseHoodieWriteClient writeClient) { + super(writeClient.getConfig()); this.writeClient = writeClient; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java index ca2faebfb..cce2ff565 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java @@ -38,7 +38,7 @@ import java.util.stream.IntStream; * Async clustering service that runs in a separate thread. * Currently, only one clustering thread is allowed to run at any time. */ -public abstract class AsyncClusteringService extends HoodieAsyncService { +public abstract class AsyncClusteringService extends HoodieAsyncTableService { private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class); @@ -51,7 +51,7 @@ public abstract class AsyncClusteringService extends HoodieAsyncService { } public AsyncClusteringService(BaseHoodieWriteClient writeClient, boolean runInDaemonMode) { - super(runInDaemonMode); + super(writeClient.getConfig(), runInDaemonMode); this.clusteringClient = createClusteringClient(writeClient); this.maxConcurrentClustering = 1; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java index 3ddd88768..6bfaa2f5c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java @@ -37,7 +37,7 @@ import java.util.stream.IntStream; /** * Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time. */ -public abstract class AsyncCompactService extends HoodieAsyncService { +public abstract class AsyncCompactService extends HoodieAsyncTableService { private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class); @@ -56,7 +56,7 @@ public abstract class AsyncCompactService extends HoodieAsyncService { } public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client, boolean runInDaemonMode) { - super(runInDaemonMode); + super(client.getConfig(), runInDaemonMode); this.context = context; this.compactor = createCompactor(client); this.maxConcurrentCompaction = 1; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java index a85635a7f..a1665c7fb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java @@ -89,6 +89,9 @@ public abstract class HoodieAsyncService implements Serializable { * @throws InterruptedException */ public void waitForShutdown() throws ExecutionException, InterruptedException { + if (future == null) { + return; + } try { future.get(); } catch (ExecutionException ex) { @@ -152,6 +155,9 @@ public abstract class HoodieAsyncService implements Serializable { */ @SuppressWarnings("unchecked") private void shutdownCallback(Function callback) { + if (future == null) { + return; + } future.whenComplete((resp, error) -> { if (null != callback) { callback.apply(null != error); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java new file mode 100644 index 000000000..6a53d3006 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.client.RunsTableService; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.function.Function; + +public abstract class HoodieAsyncTableService extends HoodieAsyncService implements RunsTableService { + + protected HoodieWriteConfig writeConfig; + + protected HoodieAsyncTableService() { + } + + protected HoodieAsyncTableService(HoodieWriteConfig writeConfig) { + this.writeConfig = writeConfig; + } + + protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, boolean runInDaemonMode) { + super(runInDaemonMode); + this.writeConfig = writeConfig; + } + + @Override + public void start(Function onShutdownCallback) { + if (!tableServicesEnabled(writeConfig)) { + return; + } + super.start(onShutdownCallback); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 55ccb540f..f3dc53b0f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -100,7 +100,8 @@ import java.util.stream.Stream; * @param Type of keys * @param Type of outputs */ -public abstract class BaseHoodieWriteClient extends BaseHoodieClient { +public abstract class BaseHoodieWriteClient extends BaseHoodieClient + implements RunsTableService { protected static final String LOOKUP_STR = "lookup"; private static final long serialVersionUID = 1L; @@ -470,6 +471,9 @@ public abstract class BaseHoodieWriteClient table, HoodieCommitMetadata metadata, Option> extraMetadata) { + if (!tableServicesEnabled(config)) { + return; + } if (config.areAnyTableServicesExecutedInline() || config.areAnyTableServicesScheduledInline()) { if (config.isMetadataTableEnabled()) { table.getHoodieView().sync(); @@ -760,6 +764,9 @@ public abstract class BaseHoodieWriteClient table) { + if (!tableServicesEnabled(config)) { + return; + } try { // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table); @@ -1141,7 +1151,13 @@ public abstract class BaseHoodieWriteClient scheduleTableServiceInternal(String instantTime, Option> extraMetadata, TableServiceType tableServiceType) { + if (!tableServicesEnabled(config)) { + return Option.empty(); + } switch (tableServiceType) { + case ARCHIVE: + LOG.info("Scheduling archiving is not supported. Skipping."); + return Option.empty(); case CLUSTER: LOG.info("Scheduling clustering at instant time :" + instantTime); Option clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java new file mode 100644 index 000000000..64e540568 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public interface RunsTableService { + + Logger LOG = LogManager.getLogger(RunsTableService.class); + + default boolean tableServicesEnabled(HoodieWriteConfig config) { + boolean enabled = config.areTableServicesEnabled(); + if (!enabled) { + LOG.warn(String.format("Table services are disabled. Set `%s` to enable.", HoodieWriteConfig.TABLE_SERVICES_ENABLED)); + } + return enabled; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index dbc62494e..e3efe418f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -61,9 +61,9 @@ import org.apache.hudi.table.RandomFileIdPrefixProvider; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; +import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hudi.table.storage.HoodieStorageLayout; import org.apache.orc.CompressionKind; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -440,6 +440,12 @@ public class HoodieWriteConfig extends HoodieConfig { .sinceVersion("0.10.0") .withDocumentation("File Id Prefix provider class, that implements `org.apache.hudi.fileid.FileIdPrefixProvider`"); + public static final ConfigProperty TABLE_SERVICES_ENABLED = ConfigProperty + .key("hoodie.table.services.enabled") + .defaultValue(true) + .sinceVersion("0.11.0") + .withDocumentation("Master control to disable all table services including archive, clean, compact, cluster, etc."); + private ConsistencyGuardConfig consistencyGuardConfig; // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled @@ -1920,6 +1926,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(FILEID_PREFIX_PROVIDER_CLASS); } + public boolean areTableServicesEnabled() { + return getBooleanOrDefault(TABLE_SERVICES_ENABLED); + } + /** * Layout configs. */ @@ -2285,6 +2295,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withTableServicesEnabled(boolean enabled) { + writeConfig.setValue(TABLE_SERVICES_ENABLED, Boolean.toString(enabled)); + return this; + } + public Builder withProperties(Properties properties) { this.writeConfig.getProps().putAll(properties); return this; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/async/TestHoodieAsyncTableService.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/async/TestHoodieAsyncTableService.java new file mode 100644 index 000000000..0c19576d0 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/async/TestHoodieAsyncTableService.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.async; + +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class TestHoodieAsyncTableService { + + @Test + void tableServiceShouldNotStartIfDisabled(@Mock HoodieWriteConfig config) { + when(config.areTableServicesEnabled()).thenReturn(false); + HoodieAsyncTableService service = new DummyAsyncTableService(config); + service.start(null); + assertFalse(service.isStarted()); + } + + private static class DummyAsyncTableService extends HoodieAsyncTableService { + + protected DummyAsyncTableService(HoodieWriteConfig writeConfig) { + super(writeConfig); + } + + @Override + protected Pair startService() { + return null; + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/TableServiceType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/TableServiceType.java index 90444a3d6..69dd30782 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/TableServiceType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/TableServiceType.java @@ -24,10 +24,13 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; * Supported runtime table services. */ public enum TableServiceType { - COMPACT, CLUSTER, CLEAN; + ARCHIVE, COMPACT, CLUSTER, CLEAN; public String getAction() { switch (this) { + case ARCHIVE: + // for table service type completeness; there is no timeline action associated with archive + return "NONE"; case COMPACT: return HoodieTimeline.COMPACTION_ACTION; case CLEAN: diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index a6161f2c8..546136e41 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -18,7 +18,7 @@ package org.apache.hudi.sink.compact; -import org.apache.hudi.async.HoodieAsyncService; +import org.apache.hudi.async.HoodieAsyncTableService; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -116,7 +116,7 @@ public class HoodieFlinkCompactor { /** * Schedules compaction in service. */ - public static class AsyncCompactionService extends HoodieAsyncService { + public static class AsyncCompactionService extends HoodieAsyncTableService { private static final long serialVersionUID = 1L; /** @@ -173,6 +173,7 @@ public class HoodieFlinkCompactor { CompactionUtil.inferChangelogMode(conf, metaClient); this.writeClient = StreamerUtil.createWriteClient(conf); + this.writeConfig = writeClient.getConfig(); this.table = writeClient.getHoodieTable(); }