1
0

[HUDI-2533] New option for hoodieClusteringJob to check, rollback and re-execute the last failed clustering job (#3765)

* coding finished and need to do uts

* add uts

* code review

* code review

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
zhangyue19921010
2021-11-22 19:00:33 +08:00
committed by GitHub
parent 02f7ca2b05
commit a2c91a7a9b
2 changed files with 94 additions and 11 deletions

View File

@@ -31,10 +31,15 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
@@ -42,6 +47,7 @@ import org.jetbrains.annotations.TestOnly;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
@@ -92,6 +98,10 @@ public class HoodieClusteringJob {
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead")
public Boolean runSchedule = false;
@Parameter(names = {"--retry-last-failed-clustering-job", "-rc"}, description = "Take effect when using --mode/-m scheduleAndExecute. Set true means "
+ "check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.", required = false)
public Boolean retryLastFailedClusteringJob = false;
@Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a cluster plan; "
+ "Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; "
+ "Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately", required = false)
@@ -100,6 +110,10 @@ public class HoodieClusteringJob {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--job-max-processing-time-ms", "-jt"}, description = "Take effect when using --mode/-m scheduleAndExecute and --retry-last-failed-clustering-job/-rc true. "
+ "If maxProcessingTimeMs passed but clustering job is still unfinished, hoodie would consider this job as failed and relaunch.", required = false)
public long maxProcessingTimeMs = 0;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ "hoodie client for clustering")
public String propsFilePath = null;
@@ -216,17 +230,32 @@ public class HoodieClusteringJob {
return client.scheduleClustering(Option.empty());
}
public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
LOG.info("Step 1: Do schedule");
String schemaStr = getSchemaFromLatestInstant();
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
Option<String> instantTime = Option.empty();
Option<String> instantTime = doSchedule(client);
int result = instantTime.isPresent() ? 0 : -1;
if (cfg.retryLastFailedClusteringJob) {
HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext());
HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights();
if (!inflightHoodieTimeline.empty()) {
HoodieInstant inflightClusteringInstant = inflightHoodieTimeline.lastInstant().get();
Date clusteringStartTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(inflightClusteringInstant.getTimestamp());
if (clusteringStartTime.getTime() + cfg.maxProcessingTimeMs < System.currentTimeMillis()) {
// if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering.
LOG.info("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again.");
instantTime = Option.of(inflightHoodieTimeline.lastInstant().get().getTimestamp());
} else {
LOG.info(inflightClusteringInstant + " might still be in progress, will trigger a new clustering job.");
}
}
}
if (result == -1) {
instantTime = instantTime.isPresent() ? instantTime : doSchedule(client);
if (!instantTime.isPresent()) {
LOG.info("Couldn't generate cluster plan");
return result;
return -1;
}
LOG.info("The schedule instant time is " + instantTime.get());

View File

@@ -139,9 +139,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
return new HoodieDeltaStreamer(cfg, jsc);
}
protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, boolean runSchedule, String scheduleAndExecute) {
protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute) {
return initialHoodieClusteringJob(tableBasePath, clusteringInstantTime, runSchedule, scheduleAndExecute, null);
}
protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob) {
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
clusteringInstantTime, runSchedule, scheduleAndExecute);
clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
}
@@ -844,20 +848,24 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
String clusteringInstantTime,
boolean runSchedule) {
return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null);
Boolean runSchedule) {
return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, null, null);
}
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String basePath,
String clusteringInstantTime,
boolean runSchedule,
String runningMode) {
Boolean runSchedule,
String runningMode,
Boolean retryLastFailedClusteringJob) {
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
config.basePath = basePath;
config.clusteringInstantTime = clusteringInstantTime;
config.runSchedule = runSchedule;
config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
config.runningMode = runningMode;
if (retryLastFailedClusteringJob != null) {
config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
}
return config;
}
@@ -933,6 +941,52 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
});
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testAsyncClusteringJobWithRetry(boolean retryLastFailedClusteringJob) throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering3";
// ingest data
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT);
cfg.continuousMode = false;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", "0", "false", "0"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
// assert ingest successful
TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);
// schedule a clustering job to build a clustering plan
HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
schedule.cluster(0);
// do another ingestion
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
ds2.sync();
// convert clustering request into inflight, Simulate the last clustering failed scenario
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
List<HoodieInstant> hoodieClusteringInstants = meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
HoodieInstant hoodieInflightInstant = meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest, Option.empty());
// trigger a scheduleAndExecute clustering job
// when retryFailedClustering true => will rollback and re-execute failed clustering plan with same instant timestamp.
// when retryFailedClustering false => will make and execute a new clustering plan with new instant timestamp.
HoodieClusteringJob scheduleAndExecute = initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", retryLastFailedClusteringJob);
scheduleAndExecute.cluster(0);
String completeClusteringTimeStamp = meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
if (retryLastFailedClusteringJob) {
assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp);
} else {
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
}
}
@ParameterizedTest
@ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"})
public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception {