1
0

Simplify and fix CLI to schedule and run compactions

This commit is contained in:
Balaji Varadarajan
2018-09-06 12:02:09 -07:00
committed by vinoth chandar
parent fad4b513ea
commit e2dee68ccd
4 changed files with 27 additions and 40 deletions

View File

@@ -13,4 +13,4 @@ fi
if [ -z "$CLIENT_JAR" ]; then if [ -z "$CLIENT_JAR" ]; then
echo "client jar location not set" echo "client jar location not set"
fi fi
java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} org.springframework.shell.Bootstrap java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap

View File

@@ -162,14 +162,8 @@ public class CompactionCommand implements CommandMarker {
@CliCommand(value = "compaction schedule", help = "Schedule Compaction") @CliCommand(value = "compaction schedule", help = "Schedule Compaction")
public String scheduleCompact( public String scheduleCompact(
@CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G", help = "Spark executor memory")
@CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField, final String sparkMemory) throws Exception {
@CliOption(key = {
"parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") final String parallelism,
@CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String
schemaFilePath,
@CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry) throws Exception {
boolean initialized = HoodieCLI.initConf(); boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized); HoodieCLI.initFS(initialized);
@@ -181,7 +175,7 @@ public class CompactionCommand implements CommandMarker {
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), HoodieCLI.tableMetadata.getBasePath(), sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), HoodieCLI.tableMetadata.getBasePath(),
tableName, compactionInstantTime, rowKeyField, parallelism, schemaFilePath, sparkMemory, retry); HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, sparkMemory);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor(); int exitCode = process.waitFor();
@@ -196,16 +190,15 @@ public class CompactionCommand implements CommandMarker {
@CliCommand(value = "compaction run", help = "Run Compaction for given instant time") @CliCommand(value = "compaction run", help = "Run Compaction for given instant time")
public String compact( public String compact(
@CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName, @CliOption(key = {"parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction")
@CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField, final String parallelism,
@CliOption(key = { @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file")
"parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") final String parallelism, final String schemaFilePath,
@CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", help = "Spark executor memory")
schemaFilePath, final String sparkMemory,
@CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
@CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry, @CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset")
@CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset") final final String compactionInstantTime) throws Exception {
String compactionInstantTime) throws Exception {
boolean initialized = HoodieCLI.initConf(); boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized); HoodieCLI.initFS(initialized);
@@ -214,7 +207,8 @@ public class CompactionCommand implements CommandMarker {
scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), HoodieCLI.tableMetadata.getBasePath(), sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), HoodieCLI.tableMetadata.getBasePath(),
tableName, compactionInstantTime, rowKeyField, parallelism, schemaFilePath, sparkMemory, retry); HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath,
sparkMemory, retry);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor(); int exitCode = process.waitFor();

View File

@@ -23,6 +23,7 @@ import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy;
import com.uber.hoodie.utilities.HDFSParquetImporter; import com.uber.hoodie.utilities.HDFSParquetImporter;
import com.uber.hoodie.utilities.HoodieCompactor; import com.uber.hoodie.utilities.HoodieCompactor;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -68,14 +69,14 @@ public class SparkMain {
Integer.parseInt(args[7]), args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10])); Integer.parseInt(args[7]), args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10]));
break; break;
case COMPACT_RUN: case COMPACT_RUN:
assert (args.length == 9); assert (args.length == 8);
returnCode = compact(jsc, args[1], args[2], args[3], args[4], args[5], Integer.parseInt(args[6]), returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]),
args[7], args[8], Integer.parseInt(args[9]), false); args[5], args[6], Integer.parseInt(args[7]), false);
break; break;
case COMPACT_SCHEDULE: case COMPACT_SCHEDULE:
assert (args.length == 10); assert (args.length == 5);
returnCode = compact(jsc, args[1], args[2], args[3], args[4], args[5], Integer.parseInt(args[6]), returnCode = compact(jsc, args[1], args[2], args[3], 1,
args[7], args[8], Integer.parseInt(args[9]), true); "", args[4], 0, true);
break; break;
default: default:
break; break;
@@ -103,14 +104,13 @@ public class SparkMain {
} }
private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
String rowKey, String partitionKey, int parallelism, String schemaFile, int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule) throws Exception {
String sparkMemory, int retry, boolean schedule) throws Exception {
HoodieCompactor.Config cfg = new HoodieCompactor.Config(); HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath; cfg.basePath = basePath;
cfg.tableName = tableName; cfg.tableName = tableName;
cfg.compactionInstantTime = compactionInstant; cfg.compactionInstantTime = compactionInstant;
cfg.rowKey = rowKey; // TODO: Make this configurable along with strategy specific config - For now, this is a generic enough strategy
cfg.partitionKey = partitionKey; cfg.strategyClassName = UnBoundedCompactionStrategy.class.getCanonicalName();
cfg.parallelism = parallelism; cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile; cfg.schemaFile = schemaFile;
cfg.runSchedule = schedule; cfg.runSchedule = schedule;

View File

@@ -34,7 +34,7 @@ import org.apache.spark.api.java.JavaSparkContext;
public class HoodieCompactor { public class HoodieCompactor {
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class);
private final Config cfg; private final Config cfg;
private transient FileSystem fs; private transient FileSystem fs;
@@ -51,12 +51,6 @@ public class HoodieCompactor {
@Parameter(names = {"--instant-time", @Parameter(names = {"--instant-time",
"-sp"}, description = "Compaction Instant time", required = true) "-sp"}, description = "Compaction Instant time", required = true)
public String compactionInstantTime = null; public String compactionInstantTime = null;
@Parameter(names = {"--row-key-field",
"-rk"}, description = "Row key field name", required = true)
public String rowKey = null;
@Parameter(names = {"--partition-key-field",
"-pk"}, description = "Partition key field name", required = true)
public String partitionKey = null;
@Parameter(names = {"--parallelism", @Parameter(names = {"--parallelism",
"-pl"}, description = "Parallelism for hoodie insert", required = true) "-pl"}, description = "Parallelism for hoodie insert", required = true)
public int parallelism = 1; public int parallelism = 1;
@@ -120,8 +114,7 @@ public class HoodieCompactor {
private int doSchedule(JavaSparkContext jsc) throws Exception { private int doSchedule(JavaSparkContext jsc) throws Exception {
//Get schema. //Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Optional.of(cfg.strategyClassName)); Optional.of(cfg.strategyClassName));
client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty()); client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty());
return 0; return 0;