diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index a4dc74127..8f7489249 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -31,7 +31,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -49,6 +51,9 @@ public class HoodieClusteringJob { private transient FileSystem fs; private TypedProperties props; private final JavaSparkContext jsc; + public static final String EXECUTE = "execute"; + public static final String SCHEDULE = "schedule"; + public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute"; public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) { this.cfg = cfg; @@ -71,8 +76,8 @@ public class HoodieClusteringJob { public String basePath = null; @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true) public String tableName = null; - @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when cluster. " - + "And schedule clustering can generate it.", required = false) + @Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when set --mode execute. " + + "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false) public String clusteringInstantTime = null; @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false) public int parallelism = 1; @@ -83,8 +88,14 @@ public class HoodieClusteringJob { @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false) public int retry = 0; - @Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering") + @Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead") public Boolean runSchedule = false; + + @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a cluster plan; " + + "Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; " + + "Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately", required = false) + public String runningMode = null; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; @@ -101,15 +112,17 @@ public class HoodieClusteringJob { public static void main(String[] args) { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); - if (cfg.help || args.length == 0 || (!cfg.runSchedule && cfg.clusteringInstantTime == null)) { + + if (cfg.help || args.length == 0) { cmd.usage(); System.exit(1); } + final JavaSparkContext jsc = UtilHelpers.buildSparkContext("clustering-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory); HoodieClusteringJob clusteringJob = new HoodieClusteringJob(jsc, cfg); int result = clusteringJob.cluster(cfg.retry); - String resultMsg = String.format("Clustering with basePath: %s, tableName: %s, runSchedule: %s", - cfg.basePath, cfg.tableName, cfg.runSchedule); + String resultMsg = String.format("Clustering with basePath: %s, tableName: %s, runningMode: %s", + cfg.basePath, cfg.tableName, cfg.runningMode); if (result == -1) { LOG.error(resultMsg + " failed"); } else { @@ -118,20 +131,46 @@ public class HoodieClusteringJob { jsc.stop(); } + // make sure that cfg.runningMode couldn't be null + private static void validateRunningMode(Config cfg) { + // --mode has a higher priority than --schedule + // If we remove --schedule option in the future we need to change runningMode default value to EXECUTE + if (StringUtils.isNullOrEmpty(cfg.runningMode)) { + cfg.runningMode = cfg.runSchedule ? SCHEDULE : EXECUTE; + } + + if (cfg.runningMode.equalsIgnoreCase(EXECUTE) && cfg.clusteringInstantTime == null) { + throw new RuntimeException("--instant-time couldn't be null when executing clustering plan."); + } + } + public int cluster(int retry) { this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); + // need to do validate in case that users call cluster() directly without setting cfg.runningMode + validateRunningMode(cfg); int ret = UtilHelpers.retry(retry, () -> { - if (cfg.runSchedule) { - LOG.info("Do schedule"); - Option instantTime = doSchedule(jsc); - int result = instantTime.isPresent() ? 0 : -1; - if (result == 0) { - LOG.info("The schedule instant time is " + instantTime.get()); + switch (cfg.runningMode.toLowerCase()) { + case SCHEDULE: { + LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule"); + Option instantTime = doSchedule(jsc); + int result = instantTime.isPresent() ? 0 : -1; + if (result == 0) { + LOG.info("The schedule instant time is " + instantTime.get()); + } + return result; + } + case SCHEDULE_AND_EXECUTE: { + LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]"); + return doScheduleAndCluster(jsc); + } + case EXECUTE: { + LOG.info("Running Mode: [" + EXECUTE + "]; Do cluster"); + return doCluster(jsc); + } + default: { + LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly"); + return -1; } - return result; - } else { - LOG.info("Do cluster"); - return doCluster(jsc); } }, "Cluster failed"); return ret; @@ -164,11 +203,37 @@ public class HoodieClusteringJob { private Option doSchedule(JavaSparkContext jsc) throws Exception { String schemaStr = getSchemaFromLatestInstant(); try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { - if (cfg.clusteringInstantTime != null) { - client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); - return Option.of(cfg.clusteringInstantTime); - } - return client.scheduleClustering(Option.empty()); + return doSchedule(client); } } + + private Option doSchedule(SparkRDDWriteClient client) { + if (cfg.clusteringInstantTime != null) { + client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty()); + return Option.of(cfg.clusteringInstantTime); + } + return client.scheduleClustering(Option.empty()); + } + + public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { + LOG.info("Step 1: Do schedule"); + String schemaStr = getSchemaFromLatestInstant(); + try (SparkRDDWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) { + + Option instantTime = doSchedule(client); + int result = instantTime.isPresent() ? 0 : -1; + + if (result == -1) { + LOG.info("Couldn't generate cluster plan"); + return result; + } + + LOG.info("The schedule instant time is " + instantTime.get()); + LOG.info("Step 2: Do cluster"); + JavaRDD writeResponse = + (JavaRDD) client.cluster(instantTime.get(), true).getWriteStatuses(); + return UtilHelpers.handleErrors(jsc, instantTime.get(), writeResponse); + } + } + } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 1c68476f7..0ae0aeb8b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -18,6 +18,10 @@ package org.apache.hudi.utilities.functional; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.ConcurrentModificationException; +import java.util.concurrent.ExecutorService; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; @@ -77,22 +81,22 @@ import org.apache.spark.sql.api.java.UDF4; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.ConcurrentModificationException; import java.util.List; import java.util.Properties; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -103,6 +107,7 @@ import java.util.stream.Stream; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -142,6 +147,38 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { return props; } + protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster) throws IOException { + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); + cfg.continuousMode = true; + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", asyncCluster, "")); + return new HoodieDeltaStreamer(cfg, jsc); + } + + protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) { + HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, + clusteringInstantTime, runSchedule, scheduleAndExecute); + return new HoodieClusteringJob(jsc, scheduleClusteringConfig); + } + + @AfterAll + public static void cleanupClass() { + UtilitiesTestBase.cleanupClass(); + if (testUtils != null) { + testUtils.teardown(); + } + } + + @BeforeEach + public void setup() throws Exception { + super.setup(); + } + + @AfterEach + public void teardown() throws Exception { + super.teardown(); + } + static class TestHelpers { static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) { @@ -317,6 +354,22 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { int numDeltaCommits = (int) timeline.getInstants().count(); assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } + + static void assertNoReplaceCommits(int expected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + int numDeltaCommits = (int) timeline.getInstants().count(); + assertEquals(expected, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + expected); + } + + static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); + HoodieTimeline timeline = meta.getActiveTimeline().filterPendingReplaceTimeline(); + LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList())); + int numDeltaCommits = (int) timeline.getInstants().count(); + assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); + } } @Test @@ -794,6 +847,10 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { dsFuture.get(); } + private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function condition) throws Exception { + deltaStreamerTestRunner(ds, null, condition); + } + @Test public void testInlineClustering() throws Exception { String tableBasePath = dfsBasePath + "/inlineClustering"; @@ -836,32 +893,34 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { } private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, - String clusteringInstantTime, boolean runSchedule) { + String clusteringInstantTime, + boolean runSchedule) { + return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null); + } + + private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath, + String clusteringInstantTime, + boolean runSchedule, + String runningMode) { HoodieClusteringJob.Config config = new HoodieClusteringJob.Config(); config.basePath = basePath; config.clusteringInstantTime = clusteringInstantTime; config.runSchedule = runSchedule; config.propsFilePath = dfsBasePath + "/clusteringjob.properties"; + config.runningMode = runningMode; return config; } @Test public void testHoodieAsyncClusteringJob() throws Exception { String tableBasePath = dfsBasePath + "/asyncClustering"; - // Keep it higher than batch-size to test continuous mode - int totalRecords = 3000; - // Initial bulk insert - HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); - cfg.continuousMode = true; - cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); - cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "")); - HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); - deltaStreamerTestRunner(ds, cfg, (r) -> { + HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true"); + HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, null); + + deltaStreamerTestRunner(ds, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); - HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath, - null, true); - HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig); + Option scheduleClusteringInstantTime = Option.empty(); try { scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule(); @@ -923,6 +982,52 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase { }); } + @ParameterizedTest + @ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"}) + public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception { + String tableBasePath = dfsBasePath + "/asyncClustering2"; + HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false"); + HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, runningMode); + + deltaStreamerTestRunner(ds, (r) -> { + Exception exception = null; + TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + try { + int result = scheduleClusteringJob.cluster(0); + if (result == 0) { + LOG.info("Cluster success"); + } else { + LOG.warn("Import failed"); + return false; + } + } catch (Exception e) { + LOG.warn("ScheduleAndExecute clustering failed", e); + exception = e; + if (!runningMode.equalsIgnoreCase(HoodieClusteringJob.EXECUTE)) { + return false; + } + } + switch (runningMode.toLowerCase()) { + case HoodieClusteringJob.SCHEDULE_AND_EXECUTE: { + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); + return true; + } + case HoodieClusteringJob.SCHEDULE: { + TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs); + TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs); + return true; + } + case HoodieClusteringJob.EXECUTE: { + assertNotNull(exception); + assertEquals(exception.getMessage(), "--instant-time couldn't be null when executing clustering plan."); + return true; + } + default: + throw new IllegalStateException("Unexpected value: " + runningMode); + } + }); + } + /** * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline The first * step involves using a SQL template to transform a source TEST-DATA-SOURCE ============================> HUDI TABLE