[HUDI-2737] Use earliest instant by default for async compaction and clustering jobs (#3991)
Address review comments Fix test failures Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
This commit is contained in:
@@ -274,6 +274,12 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
||||
return Option.fromJavaOptional(instants.stream().findFirst());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<HoodieInstant> firstInstant(String action, State state) {
|
||||
return Option.fromJavaOptional(instants.stream()
|
||||
.filter(s -> action.equals(s.getAction()) && state.equals(s.getState())).findFirst());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<HoodieInstant> nthInstant(int n) {
|
||||
if (empty() || n >= countInstants()) {
|
||||
|
||||
@@ -209,6 +209,13 @@ public interface HoodieTimeline extends Serializable {
|
||||
*/
|
||||
Option<HoodieInstant> firstInstant();
|
||||
|
||||
/**
|
||||
* @param action Instant action String.
|
||||
* @param state Instant State.
|
||||
* @return first instant of a specific action and state if available
|
||||
*/
|
||||
Option<HoodieInstant> firstInstant(String action, State state);
|
||||
|
||||
/**
|
||||
* @return nth completed instant from the first completed instant
|
||||
*/
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.apache.hudi.common.testutils.MockHoodieTimeline;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -176,6 +177,15 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
||||
assertFalse(timeline.empty());
|
||||
assertFalse(timeline.getCommitTimeline().filterPendingExcludingCompaction().empty());
|
||||
assertEquals(12, timeline.countInstants());
|
||||
assertEquals("01", timeline.firstInstant(
|
||||
HoodieTimeline.COMMIT_ACTION, State.COMPLETED).get().getTimestamp());
|
||||
assertEquals("21", timeline.firstInstant(
|
||||
HoodieTimeline.COMMIT_ACTION, State.INFLIGHT).get().getTimestamp());
|
||||
assertFalse(timeline.firstInstant(
|
||||
HoodieTimeline.COMMIT_ACTION, State.REQUESTED).isPresent());
|
||||
assertFalse(timeline.firstInstant(
|
||||
HoodieTimeline.REPLACE_COMMIT_ACTION, State.COMPLETED).isPresent());
|
||||
|
||||
HoodieTimeline activeCommitTimeline = timeline.getCommitTimeline().filterCompletedInstants();
|
||||
assertEquals(10, activeCommitTimeline.countInstants());
|
||||
|
||||
|
||||
@@ -18,11 +18,6 @@
|
||||
|
||||
package org.apache.hudi.utilities;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import com.beust.jcommander.Parameter;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
@@ -36,10 +31,15 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.exception.HoodieClusteringException;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import com.beust.jcommander.Parameter;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
@@ -53,14 +53,15 @@ import java.util.stream.Collectors;
|
||||
|
||||
public class HoodieClusteringJob {
|
||||
|
||||
public static final String EXECUTE = "execute";
|
||||
public static final String SCHEDULE = "schedule";
|
||||
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieClusteringJob.class);
|
||||
private final Config cfg;
|
||||
private transient FileSystem fs;
|
||||
private TypedProperties props;
|
||||
private final JavaSparkContext jsc;
|
||||
public static final String EXECUTE = "execute";
|
||||
public static final String SCHEDULE = "schedule";
|
||||
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
|
||||
private final HoodieTableMetaClient metaClient;
|
||||
|
||||
public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
|
||||
this.cfg = cfg;
|
||||
@@ -68,6 +69,7 @@ public class HoodieClusteringJob {
|
||||
this.props = cfg.propsFilePath == null
|
||||
? UtilHelpers.buildProperties(cfg.configs)
|
||||
: readConfigFromFileSystem(jsc, cfg);
|
||||
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
|
||||
}
|
||||
|
||||
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
|
||||
@@ -83,8 +85,10 @@ public class HoodieClusteringJob {
|
||||
public String basePath = null;
|
||||
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
|
||||
public String tableName = null;
|
||||
@Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only need when set --mode execute. "
|
||||
+ "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false)
|
||||
@Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only used when set --mode execute. "
|
||||
+ "If the instant time is not provided with --mode execute, "
|
||||
+ "the earliest scheduled clustering instant time is used by default. "
|
||||
+ "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false)
|
||||
public String clusteringInstantTime = null;
|
||||
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
|
||||
public int parallelism = 1;
|
||||
@@ -153,10 +157,6 @@ public class HoodieClusteringJob {
|
||||
if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
|
||||
cfg.runningMode = cfg.runSchedule ? SCHEDULE : EXECUTE;
|
||||
}
|
||||
|
||||
if (cfg.runningMode.equalsIgnoreCase(EXECUTE) && cfg.clusteringInstantTime == null) {
|
||||
throw new RuntimeException("--instant-time couldn't be null when executing clustering plan.");
|
||||
}
|
||||
}
|
||||
|
||||
public int cluster(int retry) {
|
||||
@@ -192,7 +192,6 @@ public class HoodieClusteringJob {
|
||||
}
|
||||
|
||||
private String getSchemaFromLatestInstant() throws Exception {
|
||||
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
|
||||
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
|
||||
if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
|
||||
throw new HoodieException("Cannot run clustering without any completed commits");
|
||||
@@ -204,6 +203,20 @@ public class HoodieClusteringJob {
|
||||
private int doCluster(JavaSparkContext jsc) throws Exception {
|
||||
String schemaStr = getSchemaFromLatestInstant();
|
||||
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
|
||||
if (StringUtils.isNullOrEmpty(cfg.clusteringInstantTime)) {
|
||||
// Instant time is not specified
|
||||
// Find the earliest scheduled clustering instant for execution
|
||||
Option<HoodieInstant> firstClusteringInstant =
|
||||
metaClient.getActiveTimeline().firstInstant(
|
||||
HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieInstant.State.REQUESTED);
|
||||
if (firstClusteringInstant.isPresent()) {
|
||||
cfg.clusteringInstantTime = firstClusteringInstant.get().getTimestamp();
|
||||
LOG.info("Found the earliest scheduled clustering instant which will be executed: "
|
||||
+ cfg.clusteringInstantTime);
|
||||
} else {
|
||||
throw new HoodieClusteringException("There is no scheduled clustering in the table.");
|
||||
}
|
||||
}
|
||||
Option<HoodieCommitMetadata> commitMetadata = client.cluster(cfg.clusteringInstantTime, true).getCommitMetadata();
|
||||
|
||||
return handleErrors(commitMetadata.get(), cfg.clusteringInstantTime);
|
||||
@@ -267,7 +280,7 @@ public class HoodieClusteringJob {
|
||||
|
||||
private int handleErrors(HoodieCommitMetadata metadata, String instantTime) {
|
||||
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
|
||||
e.getValue().stream()).collect(Collectors.toList());
|
||||
e.getValue().stream()).collect(Collectors.toList());
|
||||
long errorsCount = writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum();
|
||||
if (errorsCount == 0) {
|
||||
LOG.info(String.format("Table imported into hoodie with %s instant time.", instantTime));
|
||||
@@ -277,5 +290,4 @@ public class HoodieClusteringJob {
|
||||
LOG.error(String.format("Import failed with %d errors.", errorsCount));
|
||||
return -1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -21,9 +21,15 @@ package org.apache.hudi.utilities;
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.exception.HoodieCompactionException;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import com.beust.jcommander.Parameter;
|
||||
@@ -41,6 +47,7 @@ import java.util.List;
|
||||
public class HoodieCompactor {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
|
||||
private static ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
|
||||
private final Config cfg;
|
||||
private transient FileSystem fs;
|
||||
private TypedProperties props;
|
||||
@@ -67,7 +74,7 @@ public class HoodieCompactor {
|
||||
public String basePath = null;
|
||||
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
|
||||
public String tableName = null;
|
||||
@Parameter(names = {"--instant-time", "-it"}, description = "Compaction Instant time", required = true)
|
||||
@Parameter(names = {"--instant-time", "-it"}, description = "Compaction Instant time", required = false)
|
||||
public String compactionInstantTime = null;
|
||||
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
|
||||
public int parallelism = 1;
|
||||
@@ -134,6 +141,21 @@ public class HoodieCompactor {
|
||||
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
|
||||
SparkRDDWriteClient<HoodieRecordPayload> client =
|
||||
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props);
|
||||
// If no compaction instant is provided by --instant-time, find the earliest scheduled compaction
|
||||
// instant from the active timeline
|
||||
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
|
||||
HoodieTableMetaClient metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
|
||||
Option<HoodieInstant> firstCompactionInstant =
|
||||
metaClient.getActiveTimeline().firstInstant(
|
||||
HoodieTimeline.COMPACTION_ACTION, HoodieInstant.State.REQUESTED);
|
||||
if (firstCompactionInstant.isPresent()) {
|
||||
cfg.compactionInstantTime = firstCompactionInstant.get().getTimestamp();
|
||||
LOG.info("Found the earliest scheduled compaction instant which will be executed: "
|
||||
+ cfg.compactionInstantTime);
|
||||
} else {
|
||||
throw new HoodieCompactionException("There is no scheduled compaction in the table.");
|
||||
}
|
||||
}
|
||||
JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
|
||||
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
|
||||
}
|
||||
@@ -142,6 +164,10 @@ public class HoodieCompactor {
|
||||
// Get schema.
|
||||
SparkRDDWriteClient client =
|
||||
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props);
|
||||
if (cfg.compactionInstantTime == null) {
|
||||
throw new IllegalArgumentException("No instant time is provided for scheduling compaction. "
|
||||
+ "Please specify the compaction instant time by using --instant-time.");
|
||||
}
|
||||
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Option.empty());
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -469,6 +469,15 @@ public class UtilHelpers {
|
||||
};
|
||||
}
|
||||
|
||||
public static HoodieTableMetaClient createMetaClient(
|
||||
JavaSparkContext jsc, String basePath, boolean shouldLoadActiveTimelineOnLoad) {
|
||||
return HoodieTableMetaClient.builder()
|
||||
.setConf(jsc.hadoopConfiguration())
|
||||
.setBasePath(basePath)
|
||||
.setLoadActiveTimelineOnLoad(shouldLoadActiveTimelineOnLoad)
|
||||
.build();
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface CheckedSupplier<T> {
|
||||
T get() throws Throwable;
|
||||
|
||||
@@ -157,11 +157,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@BeforeEach
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
@AfterEach
|
||||
public void teardown() throws Exception {
|
||||
super.teardown();
|
||||
@@ -869,18 +871,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHoodieAsyncClusteringJob() throws Exception {
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/asyncClustering";
|
||||
|
||||
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true");
|
||||
HoodieClusteringJob scheduleClusteringJob = initialHoodieClusteringJob(tableBasePath, null, true, null);
|
||||
|
||||
deltaStreamerTestRunner(ds, (r) -> {
|
||||
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
|
||||
|
||||
Option<String> scheduleClusteringInstantTime = Option.empty();
|
||||
try {
|
||||
HoodieClusteringJob scheduleClusteringJob =
|
||||
initialHoodieClusteringJob(tableBasePath, null, true, null);
|
||||
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Schedule clustering failed", e);
|
||||
@@ -889,7 +893,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
if (scheduleClusteringInstantTime.isPresent()) {
|
||||
LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get());
|
||||
HoodieClusteringJob.Config clusterClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
|
||||
scheduleClusteringInstantTime.get(), false);
|
||||
shouldPassInClusteringInstantTime ? scheduleClusteringInstantTime.get() : null, false);
|
||||
HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig);
|
||||
clusterClusteringJob.cluster(clusterClusteringConfig.retry);
|
||||
LOG.info("Cluster success");
|
||||
@@ -988,7 +992,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"})
|
||||
@ValueSource(strings = {"execute", "schedule", "scheduleAndExecute"})
|
||||
public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String runningMode) throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/asyncClustering2";
|
||||
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false");
|
||||
@@ -1003,7 +1007,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
LOG.info("Cluster success");
|
||||
} else {
|
||||
LOG.warn("Import failed");
|
||||
return false;
|
||||
if (!runningMode.toLowerCase().equals(HoodieClusteringJob.EXECUTE)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("ScheduleAndExecute clustering failed", e);
|
||||
@@ -1023,8 +1029,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
return true;
|
||||
}
|
||||
case HoodieClusteringJob.EXECUTE: {
|
||||
assertNotNull(exception);
|
||||
assertEquals(exception.getMessage(), "--instant-time couldn't be null when executing clustering plan.");
|
||||
TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs);
|
||||
return true;
|
||||
}
|
||||
default:
|
||||
|
||||
Reference in New Issue
Block a user