[HUDI-2164] Let users build cluster plan and execute this plan at once using HoodieClusteringJob for async clustering (#3259)
* add --mode schedule/execute/scheduleandexecute * fix checkstyle * add UT testHoodieAsyncClusteringJobWithScheduleAndExecute * log changed * try to make ut success * try to fix ut * modify ut * review changed * code review * code review * code review * code review Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -31,7 +31,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
@@ -49,6 +51,9 @@ public class HoodieClusteringJob {
|
||||
private transient FileSystem fs;
|
||||
private TypedProperties props;
|
||||
private final JavaSparkContext jsc;
|
||||
public static final String EXECUTE = "execute";
|
||||
public static final String SCHEDULE = "schedule";
|
||||
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
|
||||
|
||||
public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
|
||||
this.cfg = cfg;
|
||||
@@ -71,8 +76,8 @@ public class HoodieClusteringJob {
|
||||
public String basePath = null;
|
||||
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
|
||||
public String tableName = null;
|
||||
@Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when cluster. "
|
||||
+ "And schedule clustering can generate it.", required = false)
|
||||
@Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when set --mode execute. "
|
||||
+ "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false)
|
||||
public String clusteringInstantTime = null;
|
||||
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
|
||||
public int parallelism = 1;
|
||||
@@ -83,8 +88,14 @@ public class HoodieClusteringJob {
|
||||
@Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
|
||||
public int retry = 0;
|
||||
|
||||
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering")
|
||||
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead")
|
||||
public Boolean runSchedule = false;
|
||||
|
||||
@Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a cluster plan; "
|
||||
+ "Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; "
|
||||
+ "Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately", required = false)
|
||||
public String runningMode = null;
|
||||
|
||||
@Parameter(names = {"--help", "-h"}, help = true)
|
||||
public Boolean help = false;
|
||||
|
||||
@@ -101,15 +112,17 @@ public class HoodieClusteringJob {
|
||||
public static void main(String[] args) {
|
||||
final Config cfg = new Config();
|
||||
JCommander cmd = new JCommander(cfg, null, args);
|
||||
if (cfg.help || args.length == 0 || (!cfg.runSchedule && cfg.clusteringInstantTime == null)) {
|
||||
|
||||
if (cfg.help || args.length == 0) {
|
||||
cmd.usage();
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
final JavaSparkContext jsc = UtilHelpers.buildSparkContext("clustering-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
|
||||
HoodieClusteringJob clusteringJob = new HoodieClusteringJob(jsc, cfg);
|
||||
int result = clusteringJob.cluster(cfg.retry);
|
||||
String resultMsg = String.format("Clustering with basePath: %s, tableName: %s, runSchedule: %s",
|
||||
cfg.basePath, cfg.tableName, cfg.runSchedule);
|
||||
String resultMsg = String.format("Clustering with basePath: %s, tableName: %s, runningMode: %s",
|
||||
cfg.basePath, cfg.tableName, cfg.runningMode);
|
||||
if (result == -1) {
|
||||
LOG.error(resultMsg + " failed");
|
||||
} else {
|
||||
@@ -118,20 +131,46 @@ public class HoodieClusteringJob {
|
||||
jsc.stop();
|
||||
}
|
||||
|
||||
// make sure that cfg.runningMode couldn't be null
|
||||
private static void validateRunningMode(Config cfg) {
|
||||
// --mode has a higher priority than --schedule
|
||||
// If we remove --schedule option in the future we need to change runningMode default value to EXECUTE
|
||||
if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
|
||||
cfg.runningMode = cfg.runSchedule ? SCHEDULE : EXECUTE;
|
||||
}
|
||||
|
||||
if (cfg.runningMode.equalsIgnoreCase(EXECUTE) && cfg.clusteringInstantTime == null) {
|
||||
throw new RuntimeException("--instant-time couldn't be null when executing clustering plan.");
|
||||
}
|
||||
}
|
||||
|
||||
public int cluster(int retry) {
|
||||
this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
|
||||
// need to do validate in case that users call cluster() directly without setting cfg.runningMode
|
||||
validateRunningMode(cfg);
|
||||
int ret = UtilHelpers.retry(retry, () -> {
|
||||
if (cfg.runSchedule) {
|
||||
LOG.info("Do schedule");
|
||||
Option<String> instantTime = doSchedule(jsc);
|
||||
int result = instantTime.isPresent() ? 0 : -1;
|
||||
if (result == 0) {
|
||||
LOG.info("The schedule instant time is " + instantTime.get());
|
||||
switch (cfg.runningMode.toLowerCase()) {
|
||||
case SCHEDULE: {
|
||||
LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
|
||||
Option<String> instantTime = doSchedule(jsc);
|
||||
int result = instantTime.isPresent() ? 0 : -1;
|
||||
if (result == 0) {
|
||||
LOG.info("The schedule instant time is " + instantTime.get());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
case SCHEDULE_AND_EXECUTE: {
|
||||
LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
|
||||
return doScheduleAndCluster(jsc);
|
||||
}
|
||||
case EXECUTE: {
|
||||
LOG.info("Running Mode: [" + EXECUTE + "]; Do cluster");
|
||||
return doCluster(jsc);
|
||||
}
|
||||
default: {
|
||||
LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
|
||||
return -1;
|
||||
}
|
||||
return result;
|
||||
} else {
|
||||
LOG.info("Do cluster");
|
||||
return doCluster(jsc);
|
||||
}
|
||||
}, "Cluster failed");
|
||||
return ret;
|
||||
@@ -164,11 +203,37 @@ public class HoodieClusteringJob {
|
||||
private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
|
||||
String schemaStr = getSchemaFromLatestInstant();
|
||||
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
|
||||
if (cfg.clusteringInstantTime != null) {
|
||||
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
|
||||
return Option.of(cfg.clusteringInstantTime);
|
||||
}
|
||||
return client.scheduleClustering(Option.empty());
|
||||
return doSchedule(client);
|
||||
}
|
||||
}
|
||||
|
||||
private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
|
||||
if (cfg.clusteringInstantTime != null) {
|
||||
client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, Option.empty());
|
||||
return Option.of(cfg.clusteringInstantTime);
|
||||
}
|
||||
return client.scheduleClustering(Option.empty());
|
||||
}
|
||||
|
||||
public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
|
||||
LOG.info("Step 1: Do schedule");
|
||||
String schemaStr = getSchemaFromLatestInstant();
|
||||
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
|
||||
|
||||
Option<String> instantTime = doSchedule(client);
|
||||
int result = instantTime.isPresent() ? 0 : -1;
|
||||
|
||||
if (result == -1) {
|
||||
LOG.info("Couldn't generate cluster plan");
|
||||
return result;
|
||||
}
|
||||
|
||||
LOG.info("The schedule instant time is " + instantTime.get());
|
||||
LOG.info("Step 2: Do cluster");
|
||||
JavaRDD<WriteStatus> writeResponse =
|
||||
(JavaRDD<WriteStatus>) client.cluster(instantTime.get(), true).getWriteStatuses();
|
||||
return UtilHelpers.handleErrors(jsc, instantTime.get(), writeResponse);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -18,6 +18,10 @@
|
||||
|
||||
package org.apache.hudi.utilities.functional;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
|
||||
import org.apache.hudi.common.config.HoodieConfig;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
@@ -77,22 +81,22 @@ import org.apache.spark.sql.api.java.UDF4;
|
||||
import org.apache.spark.sql.functions;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -103,6 +107,7 @@ import java.util.stream.Stream;
|
||||
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
@@ -142,6 +147,38 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
|
||||
return props;
|
||||
}
|
||||
|
||||
protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String tableBasePath, int totalRecords, String asyncCluster) throws IOException {
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
|
||||
cfg.continuousMode = true;
|
||||
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
|
||||
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", asyncCluster, ""));
|
||||
return new HoodieDeltaStreamer(cfg, jsc);
|
||||
}
|
||||
|
||||
protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) {
|
||||
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
|
||||
clusteringInstantTime, runSchedule, scheduleAndExecute);
|
||||
return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void cleanupClass() {
|
||||
UtilitiesTestBase.cleanupClass();
|
||||
if (testUtils != null) {
|
||||
testUtils.teardown();
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void teardown() throws Exception {
|
||||
super.teardown();
|
||||
}
|
||||
|
||||
static class TestHelpers {
|
||||
|
||||
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, WriteOperationType op) {
|
||||
@@ -317,6 +354,22 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
|
||||
int numDeltaCommits = (int) timeline.getInstants().count();
|
||||
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
|
||||
}
|
||||
|
||||
static void assertNoReplaceCommits(int expected, String tablePath, FileSystem fs) {
|
||||
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
|
||||
HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
|
||||
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
||||
int numDeltaCommits = (int) timeline.getInstants().count();
|
||||
assertEquals(expected, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + expected);
|
||||
}
|
||||
|
||||
static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) {
|
||||
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
|
||||
HoodieTimeline timeline = meta.getActiveTimeline().filterPendingReplaceTimeline();
|
||||
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
||||
int numDeltaCommits = (int) timeline.getInstants().count();
|
||||
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -794,6 +847,10 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
|
||||
dsFuture.get();
|
||||
}
|
||||
|
||||
private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function<Boolean, Boolean> condition) throws Exception {
|
||||
deltaStreamerTestRunner(ds, null, condition);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInlineClustering() throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/inlineClustering";
|
||||
@@ -836,32 +893,34 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
|
||||
}
|
||||
|
||||
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
|
||||
String clusteringInstantTime, boolean runSchedule) {
|
||||
String clusteringInstantTime,
|
||||
boolean runSchedule) {
|
||||
return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null);
|
||||
}
|
||||
|
||||
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
|
||||
String clusteringInstantTime,
|
||||
boolean runSchedule,
|
||||
String runningMode) {
|
||||
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
|
||||
config.basePath = basePath;
|
||||
config.clusteringInstantTime = clusteringInstantTime;
|
||||
config.runSchedule = runSchedule;
|
||||
config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
|
||||
config.runningMode = runningMode;
|
||||
return config;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHoodieAsyncClusteringJob() throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/asyncClustering";
|
||||
// Keep it higher than batch-size to test continuous mode
|
||||
int totalRecords = 3000;
|
||||
|
||||
// Initial bulk insert
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
|
||||
cfg.continuousMode = true;
|
||||
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
|
||||
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", ""));
|
||||
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
|
||||
deltaStreamerTestRunner(ds, cfg, (r) -> {
|
||||
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");
|
||||
HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, null);
|
||||
|
||||
deltaStreamerTestRunner(ds, (r) -> {
|
||||
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
||||
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
|
||||
null, true);
|
||||
HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, scheduleClusteringConfig);
|
||||
|
||||
Option<String> scheduleClusteringInstantTime = Option.empty();
|
||||
try {
|
||||
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
|
||||
@@ -923,6 +982,52 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
|
||||
});
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"})
|
||||
public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/asyncClustering2";
|
||||
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false");
|
||||
HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, runningMode);
|
||||
|
||||
deltaStreamerTestRunner(ds, (r) -> {
|
||||
Exception exception = null;
|
||||
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
||||
try {
|
||||
int result = scheduleClusteringJob.cluster(0);
|
||||
if (result == 0) {
|
||||
LOG.info("Cluster success");
|
||||
} else {
|
||||
LOG.warn("Import failed");
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("ScheduleAndExecute clustering failed", e);
|
||||
exception = e;
|
||||
if (!runningMode.equalsIgnoreCase(HoodieClusteringJob.EXECUTE)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
switch (runningMode.toLowerCase()) {
|
||||
case HoodieClusteringJob.SCHEDULE_AND_EXECUTE: {
|
||||
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
|
||||
return true;
|
||||
}
|
||||
case HoodieClusteringJob.SCHEDULE: {
|
||||
TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs);
|
||||
TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs);
|
||||
return true;
|
||||
}
|
||||
case HoodieClusteringJob.EXECUTE: {
|
||||
assertNotNull(exception);
|
||||
assertEquals(exception.getMessage(), "--instant-time couldn't be null when executing clustering plan.");
|
||||
return true;
|
||||
}
|
||||
default:
|
||||
throw new IllegalStateException("Unexpected value: " + runningMode);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental processing using a 2 step pipeline The first
|
||||
* step involves using a SQL template to transform a source TEST-DATA-SOURCE ============================> HUDI TABLE
|
||||
|
||||
Reference in New Issue
Block a user