diff --git a/hoodie-cli/hoodie-cli.sh b/hoodie-cli/hoodie-cli.sh index 73ffe333f..b67a06741 100755 --- a/hoodie-cli/hoodie-cli.sh +++ b/hoodie-cli/hoodie-cli.sh @@ -13,4 +13,4 @@ fi if [ -z "$CLIENT_JAR" ]; then echo "client jar location not set" fi -java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} org.springframework.shell.Bootstrap +java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java index b5ed2ada5..b5f344e13 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CompactionCommand.java @@ -162,14 +162,8 @@ public class CompactionCommand implements CommandMarker { @CliCommand(value = "compaction schedule", help = "Schedule Compaction") public String scheduleCompact( - @CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName, - @CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField, - @CliOption(key = { - "parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") final String parallelism, - @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String - schemaFilePath, - @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, - @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry) throws Exception { + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G", help = "Spark executor memory") + final String sparkMemory) throws Exception { boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); @@ -181,7 +175,7 @@ public class CompactionCommand implements CommandMarker { scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), HoodieCLI.tableMetadata.getBasePath(), - tableName, compactionInstantTime, rowKeyField, parallelism, schemaFilePath, sparkMemory, retry); + HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, sparkMemory); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); @@ -196,16 +190,15 @@ public class CompactionCommand implements CommandMarker { @CliCommand(value = "compaction run", help = "Run Compaction for given instant time") public String compact( - @CliOption(key = "tableName", mandatory = true, help = "Table name") final String tableName, - @CliOption(key = "rowKeyField", mandatory = true, help = "Row key field name") final String rowKeyField, - @CliOption(key = { - "parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") final String parallelism, - @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") final String - schemaFilePath, - @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, - @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry, - @CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset") final - String compactionInstantTime) throws Exception { + @CliOption(key = {"parallelism"}, mandatory = true, help = "Parallelism for hoodie compaction") + final String parallelism, + @CliOption(key = "schemaFilePath", mandatory = true, help = "Path for Avro schema file") + final String schemaFilePath, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", help = "Spark executor memory") + final String sparkMemory, + @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, + @CliOption(key = "compactionInstant", mandatory = true, help = "Base path for the target hoodie dataset") + final String compactionInstantTime) throws Exception { boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); @@ -214,7 +207,8 @@ public class CompactionCommand implements CommandMarker { scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), HoodieCLI.tableMetadata.getBasePath(), - tableName, compactionInstantTime, rowKeyField, parallelism, schemaFilePath, sparkMemory, retry); + HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, + sparkMemory, retry); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java index 526e64620..bb9189ccb 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java @@ -23,6 +23,7 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; +import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy; import com.uber.hoodie.utilities.HDFSParquetImporter; import com.uber.hoodie.utilities.HoodieCompactor; import org.apache.log4j.Logger; @@ -68,14 +69,14 @@ public class SparkMain { Integer.parseInt(args[7]), args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10])); break; case COMPACT_RUN: - assert (args.length == 9); - returnCode = compact(jsc, args[1], args[2], args[3], args[4], args[5], Integer.parseInt(args[6]), - args[7], args[8], Integer.parseInt(args[9]), false); + assert (args.length == 8); + returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), + args[5], args[6], Integer.parseInt(args[7]), false); break; case COMPACT_SCHEDULE: - assert (args.length == 10); - returnCode = compact(jsc, args[1], args[2], args[3], args[4], args[5], Integer.parseInt(args[6]), - args[7], args[8], Integer.parseInt(args[9]), true); + assert (args.length == 5); + returnCode = compact(jsc, args[1], args[2], args[3], 1, + "", args[4], 0, true); break; default: break; @@ -103,14 +104,13 @@ public class SparkMain { } private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, - String rowKey, String partitionKey, int parallelism, String schemaFile, - String sparkMemory, int retry, boolean schedule) throws Exception { + int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule) throws Exception { HoodieCompactor.Config cfg = new HoodieCompactor.Config(); cfg.basePath = basePath; cfg.tableName = tableName; cfg.compactionInstantTime = compactionInstant; - cfg.rowKey = rowKey; - cfg.partitionKey = partitionKey; + // TODO: Make this configurable along with strategy specific config - For now, this is a generic enough strategy + cfg.strategyClassName = UnBoundedCompactionStrategy.class.getCanonicalName(); cfg.parallelism = parallelism; cfg.schemaFile = schemaFile; cfg.runSchedule = schedule; diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java index d986a6ac4..d327e6d0d 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java @@ -34,7 +34,7 @@ import org.apache.spark.api.java.JavaSparkContext; public class HoodieCompactor { - private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class); + private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class); private final Config cfg; private transient FileSystem fs; @@ -51,12 +51,6 @@ public class HoodieCompactor { @Parameter(names = {"--instant-time", "-sp"}, description = "Compaction Instant time", required = true) public String compactionInstantTime = null; - @Parameter(names = {"--row-key-field", - "-rk"}, description = "Row key field name", required = true) - public String rowKey = null; - @Parameter(names = {"--partition-key-field", - "-pk"}, description = "Partition key field name", required = true) - public String partitionKey = null; @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true) public int parallelism = 1; @@ -120,8 +114,7 @@ public class HoodieCompactor { private int doSchedule(JavaSparkContext jsc) throws Exception { //Get schema. - String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile); - HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, + HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Optional.of(cfg.strategyClassName)); client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty()); return 0;