1
0

[HUDI-3369] New ScheduleAndExecute mode for HoodieCompactor and hudi-cli (#4750)

Schedule and execute compaction plan in one single mode.
This commit is contained in:
YueZhang
2022-02-07 17:31:34 +08:00
committed by GitHub
parent 0880a8a5e4
commit de206acbae
4 changed files with 249 additions and 48 deletions

View File

@@ -264,6 +264,41 @@ public class CompactionCommand implements CommandMarker {
return "Compaction successfully completed for " + compactionInstantTime; return "Compaction successfully completed for " + compactionInstantTime;
} }
@CliCommand(value = "compaction scheduleAndExecute", help = "Schedule compaction plan and execute this plan")
public String compact(
@CliOption(key = {"parallelism"}, mandatory = true,
help = "Parallelism for hoodie compaction") final String parallelism,
@CliOption(key = "schemaFilePath", mandatory = true,
help = "Path for Avro schema file") final String schemaFilePath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local",
help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory, client.getBasePath(),
client.getTableConfig().getTableName(), parallelism, schemaFilePath,
retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to schedule and execute compaction ";
}
return "Schedule and execute compaction successfully completed";
}
/** /**
* Prints all compaction details. * Prints all compaction details.
*/ */

View File

@@ -74,7 +74,7 @@ public class SparkMain {
* Commands. * Commands.
*/ */
enum SparkCommand { enum SparkCommand {
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
} }
@@ -128,7 +128,21 @@ public class SparkMain {
configs.addAll(Arrays.asList(args).subList(9, args.length)); configs.addAll(Arrays.asList(args).subList(9, args.length));
} }
returnCode = compact(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[7], returnCode = compact(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[7],
Integer.parseInt(args[8]), false, propsFilePath, configs); Integer.parseInt(args[8]), HoodieCompactor.EXECUTE, propsFilePath, configs);
break;
case COMPACT_SCHEDULE_AND_EXECUTE:
assert (args.length >= 9);
propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[8])) {
propsFilePath = args[8];
}
configs = new ArrayList<>();
if (args.length > 9) {
configs.addAll(Arrays.asList(args).subList(8, args.length));
}
returnCode = compact(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[6],
Integer.parseInt(args[7]), HoodieCompactor.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break; break;
case COMPACT_SCHEDULE: case COMPACT_SCHEDULE:
assert (args.length >= 7); assert (args.length >= 7);
@@ -140,7 +154,7 @@ public class SparkMain {
if (args.length > 7) { if (args.length > 7) {
configs.addAll(Arrays.asList(args).subList(7, args.length)); configs.addAll(Arrays.asList(args).subList(7, args.length));
} }
returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, true, propsFilePath, configs); returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, HoodieCompactor.SCHEDULE, propsFilePath, configs);
break; break;
case COMPACT_VALIDATE: case COMPACT_VALIDATE:
assert (args.length == 7); assert (args.length == 7);
@@ -320,7 +334,7 @@ public class SparkMain {
} }
private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
int parallelism, String schemaFile, int retry, boolean schedule, String propsFilePath, int parallelism, String schemaFile, int retry, String mode, String propsFilePath,
List<String> configs) { List<String> configs) {
HoodieCompactor.Config cfg = new HoodieCompactor.Config(); HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath; cfg.basePath = basePath;
@@ -330,7 +344,7 @@ public class SparkMain {
cfg.strategyClassName = UnBoundedCompactionStrategy.class.getCanonicalName(); cfg.strategyClassName = UnBoundedCompactionStrategy.class.getCanonicalName();
cfg.parallelism = parallelism; cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile; cfg.schemaFile = schemaFile;
cfg.runSchedule = schedule; cfg.runningMode = mode;
cfg.propsFilePath = propsFilePath; cfg.propsFilePath = propsFilePath;
cfg.configs = configs; cfg.configs = configs;
return new HoodieCompactor(jsc, cfg).compact(retry); return new HoodieCompactor(jsc, cfg).compact(retry);

View File

@@ -140,6 +140,33 @@ public class ITTestCompactionCommand extends AbstractShellIntegrationTest {
"Pending compaction must be completed"); "Pending compaction must be completed");
} }
/**
* Test case for command 'compaction scheduleAndExecute'.
*/
@Test
public void testCompactScheduleAndExecute() throws IOException {
// generate commits
generateCommits();
String schemaPath = Paths.get(basePath, "compaction.schema").toString();
writeSchemaToTmpFile(schemaPath);
CommandResult cr2 = getShell().executeCommand(
String.format("compaction scheduleAndExecute --parallelism %s --schemaFilePath %s --sparkMaster %s",
2, schemaPath, "local"));
assertAll("Command run failed",
() -> assertTrue(cr2.isSuccess()),
() -> assertTrue(
cr2.getResult().toString().startsWith("Schedule and execute compaction successfully completed")));
// assert compaction complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstants()
.map(HoodieInstant::getTimestamp).count() > 0,
"Completed compaction couldn't be 0");
}
/** /**
* Test case for command 'compaction validate'. * Test case for command 'compaction validate'.
*/ */

View File

@@ -18,13 +18,14 @@
package org.apache.hudi.utilities; package org.apache.hudi.utilities;
import org.apache.avro.Schema;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -35,6 +36,9 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
@@ -43,15 +47,19 @@ import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects;
public class HoodieCompactor { public class HoodieCompactor {
private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class); private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
private static ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); public static final String EXECUTE = "execute";
public static final String SCHEDULE = "schedule";
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
private final Config cfg; private final Config cfg;
private transient FileSystem fs; private transient FileSystem fs;
private TypedProperties props; private TypedProperties props;
private final JavaSparkContext jsc; private final JavaSparkContext jsc;
private final HoodieTableMetaClient metaClient;
public HoodieCompactor(JavaSparkContext jsc, Config cfg) { public HoodieCompactor(JavaSparkContext jsc, Config cfg) {
this.cfg = cfg; this.cfg = cfg;
@@ -59,6 +67,7 @@ public class HoodieCompactor {
this.props = cfg.propsFilePath == null this.props = cfg.propsFilePath == null
? UtilHelpers.buildProperties(cfg.configs) ? UtilHelpers.buildProperties(cfg.configs)
: readConfigFromFileSystem(jsc, cfg); : readConfigFromFileSystem(jsc, cfg);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
} }
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
@@ -73,9 +82,9 @@ public class HoodieCompactor {
public String tableName = null; public String tableName = null;
@Parameter(names = {"--instant-time", "-it"}, description = "Compaction Instant time", required = false) @Parameter(names = {"--instant-time", "-it"}, description = "Compaction Instant time", required = false)
public String compactionInstantTime = null; public String compactionInstantTime = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true) @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
public int parallelism = 1; public int parallelism = 200;
@Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = true) @Parameter(names = {"--schema-file", "-sf"}, description = "path for Avro schema file", required = false)
public String schemaFile = null; public String schemaFile = null;
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
public String sparkMaster = null; public String sparkMaster = null;
@@ -85,8 +94,12 @@ public class HoodieCompactor {
public int retry = 0; public int retry = 0;
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false) @Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false)
public Boolean runSchedule = false; public Boolean runSchedule = false;
@Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a compact plan; "
+ "Set \"execute\" means execute a compact plan at given instant which means --instant-time is needed here; "
+ "Set \"scheduleAndExecute\" means make a compact plan first and execute that plan immediately", required = false)
public String runningMode = null;
@Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", required = false) @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", required = false)
public String strategyClassName = null; public String strategyClassName = LogFileSizeBasedCompactionStrategy.class.getName();
@Parameter(names = {"--help", "-h"}, help = true) @Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false; public Boolean help = false;
@@ -98,6 +111,55 @@ public class HoodieCompactor {
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class) splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>(); public List<String> configs = new ArrayList<>();
@Override
public String toString() {
return "HoodieCompactorConfig {\n"
+ " --base-path " + basePath + ", \n"
+ " --table-name " + tableName + ", \n"
+ " --instant-time " + compactionInstantTime + ", \n"
+ " --parallelism " + parallelism + ", \n"
+ " --schema-file " + schemaFile + ", \n"
+ " --spark-master " + sparkMaster + ", \n"
+ " --spark-memory " + sparkMemory + ", \n"
+ " --retry " + retry + ", \n"
+ " --schedule " + runSchedule + ", \n"
+ " --mode " + runningMode + ", \n"
+ " --strategy " + strategyClassName + ", \n"
+ " --props " + propsFilePath + ", \n"
+ " --hoodie-conf " + configs
+ "\n}";
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Config config = (Config) o;
return basePath.equals(config.basePath)
&& Objects.equals(tableName, config.tableName)
&& Objects.equals(compactionInstantTime, config.compactionInstantTime)
&& Objects.equals(parallelism, config.parallelism)
&& Objects.equals(schemaFile, config.schemaFile)
&& Objects.equals(sparkMaster, config.sparkMaster)
&& Objects.equals(sparkMemory, config.sparkMemory)
&& Objects.equals(retry, config.retry)
&& Objects.equals(runSchedule, config.runSchedule)
&& Objects.equals(runningMode, config.runningMode)
&& Objects.equals(strategyClassName, config.strategyClassName)
&& Objects.equals(propsFilePath, config.propsFilePath)
&& Objects.equals(configs, config.configs);
}
@Override
public int hashCode() {
return Objects.hash(basePath, tableName, compactionInstantTime, schemaFile,
sparkMaster, parallelism, sparkMemory, retry, runSchedule, runningMode, strategyClassName, propsFilePath, configs, help);
}
} }
public static void main(String[] args) { public static void main(String[] args) {
@@ -120,24 +182,75 @@ public class HoodieCompactor {
public int compact(int retry) { public int compact(int retry) {
this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
// need to do validate in case that users call compact() directly without setting cfg.runningMode
validateRunningMode(cfg);
LOG.info(cfg);
int ret = UtilHelpers.retry(retry, () -> { int ret = UtilHelpers.retry(retry, () -> {
if (cfg.runSchedule) { switch (cfg.runningMode.toLowerCase()) {
if (null == cfg.strategyClassName) { case SCHEDULE: {
throw new IllegalArgumentException("Missing Strategy class name for running compaction"); LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
Option<String> instantTime = doSchedule(jsc);
int result = instantTime.isPresent() ? 0 : -1;
if (result == 0) {
LOG.info("The schedule instant time is " + instantTime.get());
} }
return doSchedule(jsc); return result;
} else { }
case SCHEDULE_AND_EXECUTE: {
LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
return doScheduleAndCompact(jsc);
}
case EXECUTE: {
LOG.info("Running Mode: [" + EXECUTE + "]; Do compaction");
return doCompact(jsc); return doCompact(jsc);
} }
default: {
LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
return -1;
}
}
}, "Compact failed"); }, "Compact failed");
return ret; return ret;
} }
private Integer doScheduleAndCompact(JavaSparkContext jsc) throws Exception {
LOG.info("Step 1: Do schedule");
Option<String> instantTime = doSchedule(jsc);
if (!instantTime.isPresent()) {
LOG.warn("Couldn't do schedule");
return -1;
} else {
cfg.compactionInstantTime = instantTime.get();
}
LOG.info("The schedule instant time is " + instantTime.get());
LOG.info("Step 2: Do compaction");
return doCompact(jsc);
}
// make sure that cfg.runningMode couldn't be null
private static void validateRunningMode(Config cfg) {
// --mode has a higher priority than --schedule
// If we remove --schedule option in the future we need to change runningMode default value to EXECUTE
if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
cfg.runningMode = cfg.runSchedule ? SCHEDULE : EXECUTE;
}
}
private int doCompact(JavaSparkContext jsc) throws Exception { private int doCompact(JavaSparkContext jsc) throws Exception {
// Get schema. // Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); String schemaStr;
SparkRDDWriteClient<HoodieRecordPayload> client = if (StringUtils.isNullOrEmpty(cfg.schemaFile)) {
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props); schemaStr = getSchemaFromLatestInstant();
} else {
schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
}
LOG.info("Schema --> : " + schemaStr);
try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
// If no compaction instant is provided by --instant-time, find the earliest scheduled compaction // If no compaction instant is provided by --instant-time, find the earliest scheduled compaction
// instant from the active timeline // instant from the active timeline
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) { if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
@@ -156,16 +269,28 @@ public class HoodieCompactor {
JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime); JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse); return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
} }
}
private Option<String> doSchedule(JavaSparkContext jsc) {
try (SparkRDDWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props)) {
private int doSchedule(JavaSparkContext jsc) throws Exception {
// Get schema.
SparkRDDWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props);
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) { if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
throw new IllegalArgumentException("No instant time is provided for scheduling compaction. " LOG.warn("No instant time is provided for scheduling compaction.");
+ "Please specify the compaction instant time by using --instant-time."); return client.scheduleCompaction(Option.empty());
} }
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Option.empty()); client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Option.empty());
return 0; return Option.of(cfg.compactionInstantTime);
}
}
private String getSchemaFromLatestInstant() throws Exception {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
throw new HoodieException("Cannot run compaction without any completed commits");
}
Schema schema = schemaUtil.getTableAvroSchema(false);
return schema.toString();
} }
} }