diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java index e63807c62..857fb0d84 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java @@ -23,17 +23,24 @@ import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.AvroUtils; - +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; +import scala.collection.JavaConverters; import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -113,4 +120,32 @@ public class CleansCommand implements CommandMarker { return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); } + + @CliCommand(value = "cleans run", help = "run clean") + public String runClean(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for cleaning", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master) throws IOException, InterruptedException, URISyntaxException { + boolean initialized = HoodieCLI.initConf(); + HoodieCLI.initFS(initialized); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + + String cmd = SparkMain.SparkCommand.CLEAN.toString(); + sparkLauncher.addAppArgs(cmd, metaClient.getBasePath(), master, propsFilePath, sparkMemory); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to clean hoodie dataset"; + } + return "Cleaned hoodie dataset"; + } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 716b8104c..6a188c182 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -42,6 +42,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; @@ -175,7 +176,11 @@ public class CompactionCommand implements CommandMarker { @CliCommand(value = "compaction schedule", help = "Schedule Compaction") public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G", - help = "Spark executor memory") final String sparkMemory) throws Exception { + help = "Spark executor memory") final String sparkMemory, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs) throws Exception { HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); @@ -187,7 +192,8 @@ public class CompactionCommand implements CommandMarker { Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), client.getBasePath(), - client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory); + client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory, propsFilePath); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); @@ -207,7 +213,11 @@ public class CompactionCommand implements CommandMarker { help = "Spark executor memory") final String sparkMemory, @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, @CliOption(key = "compactionInstant", mandatory = false, - help = "Base path for the target hoodie dataset") String compactionInstantTime) + help = "Base path for the target hoodie dataset") String compactionInstantTime, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs) throws Exception { HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); @@ -224,13 +234,13 @@ public class CompactionCommand implements CommandMarker { } compactionInstantTime = firstPendingInstant.get(); } - String sparkPropertiesPath = Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), client.getBasePath(), client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, - sparkMemory, retry); + sparkMemory, retry, propsFilePath); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java index e892ca939..4ba9d3d6f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java @@ -24,8 +24,7 @@ import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.util.Utils; import org.springframework.shell.core.CommandMarker; @@ -41,8 +40,6 @@ import scala.collection.JavaConverters; @Component public class HDFSParquetImportCommand implements CommandMarker { - private static final Logger LOG = LogManager.getLogger(HDFSParquetImportCommand.class); - @CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset") public String convert( @CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false", @@ -61,7 +58,11 @@ public class HDFSParquetImportCommand implements CommandMarker { help = "Path for Avro schema file") final String schemaFilePath, @CliOption(key = "format", mandatory = true, help = "Format for the input data") final String format, @CliOption(key = "sparkMemory", mandatory = true, help = "Spark executor memory") final String sparkMemory, - @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry) throws Exception { + @CliOption(key = "retry", mandatory = true, help = "Number of retries") final String retry, + @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for importing", + unspecifiedDefaultValue = "") final String propsFilePath, + @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", + unspecifiedDefaultValue = "") final String[] configs) throws Exception { (new FormatValidator()).validate("format", format); @@ -78,7 +79,8 @@ public class HDFSParquetImportCommand implements CommandMarker { } sparkLauncher.addAppArgs(cmd, srcPath, targetPath, tableName, tableType, rowKeyField, partitionPathField, - parallelism, schemaFilePath, sparkMemory, retry); + parallelism, schemaFilePath, sparkMemory, retry, propsFilePath); + UtilHelpers.validateAndAddProperties(configs, sparkLauncher); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 5d0892118..8ff52fad9 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -18,6 +18,7 @@ package org.apache.hudi.cli.commands; +import com.google.common.base.Strings; import org.apache.hudi.HoodieWriteClient; import org.apache.hudi.cli.DedupeSparkJob; import org.apache.hudi.cli.utils.SparkUtil; @@ -28,6 +29,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.io.compact.strategy.UnBoundedCompactionStrategy; import org.apache.hudi.utilities.HDFSParquetImporter; import org.apache.hudi.utilities.HDFSParquetImporter.Config; +import org.apache.hudi.utilities.HoodieCleaner; import org.apache.hudi.utilities.HoodieCompactionAdminTool; import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation; import org.apache.hudi.utilities.HoodieCompactor; @@ -36,6 +38,10 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + /** * This class deals with initializing spark context based on command entered to hudi-cli. */ @@ -47,7 +53,7 @@ public class SparkMain { * Commands. */ enum SparkCommand { - ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR + ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN } public static void main(String[] args) throws Exception { @@ -73,18 +79,42 @@ public class SparkMain { break; case IMPORT: case UPSERT: - assert (args.length == 11); + assert (args.length >= 12); + String propsFilePath = null; + if (!Strings.isNullOrEmpty(args[11])) { + propsFilePath = args[11]; + } + List configs = new ArrayList<>(); + if (args.length > 12) { + configs.addAll(Arrays.asList(args).subList(12, args.length)); + } returnCode = dataLoad(jsc, command, args[1], args[2], args[3], args[4], args[5], args[6], - Integer.parseInt(args[7]), args[8], SparkUtil.DEFUALT_SPARK_MASTER, args[9], Integer.parseInt(args[10])); + Integer.parseInt(args[7]), args[8], args[9], Integer.parseInt(args[10]), propsFilePath, configs); break; case COMPACT_RUN: - assert (args.length == 8); + assert (args.length >= 9); + propsFilePath = null; + if (!Strings.isNullOrEmpty(args[8])) { + propsFilePath = args[8]; + } + configs = new ArrayList<>(); + if (args.length > 9) { + configs.addAll(Arrays.asList(args).subList(9, args.length)); + } returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], - Integer.parseInt(args[7]), false); + Integer.parseInt(args[7]), false, propsFilePath, configs); break; case COMPACT_SCHEDULE: - assert (args.length == 5); - returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true); + assert (args.length >= 6); + propsFilePath = null; + if (!Strings.isNullOrEmpty(args[5])) { + propsFilePath = args[5]; + } + configs = new ArrayList<>(); + if (args.length > 6) { + configs.addAll(Arrays.asList(args).subList(6, args.length)); + } + returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true, propsFilePath, configs); break; case COMPACT_VALIDATE: assert (args.length == 7); @@ -109,15 +139,40 @@ public class SparkMain { Boolean.valueOf(args[7]), Boolean.valueOf(args[8])); returnCode = 0; break; + case CLEAN: + assert (args.length >= 5); + propsFilePath = null; + if (!Strings.isNullOrEmpty(args[3])) { + propsFilePath = args[3]; + } + configs = new ArrayList<>(); + if (args.length > 5) { + configs.addAll(Arrays.asList(args).subList(5, args.length)); + } + clean(jsc, args[1], args[2], propsFilePath, args[4], configs); + break; default: break; } System.exit(returnCode); } + private static void clean(JavaSparkContext jsc, String basePath, String sparkMaster, String propsFilePath, + String sparkMemory, List configs) throws Exception { + HoodieCleaner.Config cfg = new HoodieCleaner.Config(); + cfg.basePath = basePath; + if ((null != sparkMaster) && (!sparkMaster.isEmpty())) { + jsc.getConf().setMaster(sparkMaster); + } + jsc.getConf().set("spark.executor.memory", sparkMemory); + cfg.propsFilePath = propsFilePath; + cfg.configs = configs; + new HoodieCleaner(cfg, jsc).run(); + } + private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName, - String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMaster, - String sparkMemory, int retry) throws Exception { + String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile, String sparkMemory, + int retry, String propsFilePath, List configs) { Config cfg = new Config(); cfg.command = command; cfg.srcPath = srcPath; @@ -128,6 +183,8 @@ public class SparkMain { cfg.partitionKey = partitionKey; cfg.parallelism = parallelism; cfg.schemaFile = schemaFile; + cfg.propsFilePath = propsFilePath; + cfg.configs = configs; jsc.getConf().set("spark.executor.memory", sparkMemory); return new HDFSParquetImporter(cfg).dataImport(jsc, retry); } @@ -200,7 +257,8 @@ public class SparkMain { } private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, - int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule) throws Exception { + int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule, String propsFilePath, + List configs) { HoodieCompactor.Config cfg = new HoodieCompactor.Config(); cfg.basePath = basePath; cfg.tableName = tableName; @@ -210,12 +268,14 @@ public class SparkMain { cfg.parallelism = parallelism; cfg.schemaFile = schemaFile; cfg.runSchedule = schedule; + cfg.propsFilePath = propsFilePath; + cfg.configs = configs; jsc.getConf().set("spark.executor.memory", sparkMemory); return new HoodieCompactor(cfg).compact(jsc, retry); } private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplicatedPartitionPath, - String repairedOutputPath, String basePath) throws Exception { + String repairedOutputPath, String basePath) { DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), FSUtils.getFs(basePath, jsc.hadoopConfiguration())); job.fixDuplicates(true); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 4ace07c24..21fafa5c5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -69,7 +69,7 @@ public class HoodieCompactor { public int retry = 0; @Parameter(names = {"--schedule", "-sc"}, description = "Schedule compaction", required = false) public Boolean runSchedule = false; - @Parameter(names = {"--strategy", "-st"}, description = "Stratgey Class", required = false) + @Parameter(names = {"--strategy", "-st"}, description = "Strategy Class", required = false) public String strategyClassName = null; @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 4cb56e9aa..996173ae9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -43,6 +43,7 @@ import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.sql.SparkSession; import java.io.BufferedReader; @@ -50,6 +51,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.StringReader; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -114,7 +116,7 @@ public class UtilHelpers { public static TypedProperties buildProperties(List props) { TypedProperties properties = new TypedProperties(); - props.stream().forEach(x -> { + props.forEach(x -> { String[] kv = x.split("="); Preconditions.checkArgument(kv.length == 2); properties.setProperty(kv[0], kv[1]); @@ -122,6 +124,10 @@ public class UtilHelpers { return properties; } + public static void validateAndAddProperties(String[] configs, SparkLauncher sparkLauncher) { + Arrays.stream(configs).filter(config -> config.contains("=") && config.split("=").length == 2).forEach(sparkLauncher::addAppArgs); + } + /** * Parse Schema from file. *