From 8affdf8bcbb4c7b236283e97c3afad186d5b6a3e Mon Sep 17 00:00:00 2001 From: hongdd Date: Wed, 25 Dec 2019 20:19:12 +0800 Subject: [PATCH] [HUDI-416] Improve hint information for cli (#1110) --- .../java/org/apache/hudi/cli/HoodieCLI.java | 15 +- .../cli/commands/ArchivedCommitsCommand.java | 10 +- .../hudi/cli/commands/CleansCommand.java | 22 +- .../hudi/cli/commands/CommitsCommand.java | 47 +-- .../hudi/cli/commands/CompactionCommand.java | 312 +++++++++--------- .../hudi/cli/commands/DatasetsCommand.java | 17 +- .../cli/commands/FileSystemViewCommand.java | 7 +- .../cli/commands/HoodieLogFileCommand.java | 20 +- .../hudi/cli/commands/HoodieSyncCommand.java | 12 +- .../hudi/cli/commands/RepairsCommand.java | 21 +- .../hudi/cli/commands/RollbacksCommand.java | 4 +- .../hudi/cli/commands/SavepointsCommand.java | 41 +-- .../hudi/cli/commands/StatsCommand.java | 10 +- .../table/timeline/HoodieActiveTimeline.java | 3 +- 14 files changed, 224 insertions(+), 317 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java index b09c25eed..70ab0ed68 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java @@ -40,7 +40,7 @@ public class HoodieCLI { public static FileSystem fs; public static CLIState state = CLIState.INIT; public static String basePath; - public static HoodieTableMetaClient tableMetadata; + protected static HoodieTableMetaClient tableMetadata; public static HoodieTableMetaClient syncTableMetadata; public static TimelineLayoutVersion layoutVersion; @@ -92,4 +92,17 @@ public class HoodieCLI { setLayoutVersion(layoutVersion); refreshTableMetadata(); } + + /** + * Get tableMetadata, throw NullPointerException when it is null. + * + * @return tableMetadata which is instance of HoodieTableMetaClient + */ + public static HoodieTableMetaClient getTableMetaClient() { + if (tableMetadata == null) { + throw new NullPointerException("There is no hudi dataset. Please use connect command to set dataset first"); + } + return tableMetadata; + } + } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index 2ae827b40..c8f1dc824 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -36,7 +36,6 @@ import org.apache.avro.specific.SpecificData; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -53,11 +52,6 @@ import java.util.stream.Collectors; @Component public class ArchivedCommitsCommand implements CommandMarker { - @CliAvailabilityIndicator({"show archived commits"}) - public boolean isShowArchivedCommitAvailable() { - return HoodieCLI.tableMetadata != null; - } - @CliCommand(value = "show archived commit stats", help = "Read commits from archived files and show details") public String showArchivedCommits( @CliOption(key = {"archiveFolderPattern"}, help = "Archive Folder", unspecifiedDefaultValue = "") String folder, @@ -68,7 +62,7 @@ public class ArchivedCommitsCommand implements CommandMarker { unspecifiedDefaultValue = "false") final boolean headerOnly) throws IOException { System.out.println("===============> Showing only " + limit + " archived commits <==============="); - String basePath = HoodieCLI.tableMetadata.getBasePath(); + String basePath = HoodieCLI.getTableMetaClient().getBasePath(); Path archivePath = new Path(basePath + "/.hoodie/.commits_.archive*"); if (folder != null && !folder.isEmpty()) { archivePath = new Path(basePath + "/.hoodie/" + folder); @@ -146,7 +140,7 @@ public class ArchivedCommitsCommand implements CommandMarker { throws IOException { System.out.println("===============> Showing only " + limit + " archived commits <==============="); - String basePath = HoodieCLI.tableMetadata.getBasePath(); + String basePath = HoodieCLI.getTableMetaClient().getBasePath(); FileStatus[] fsStatuses = FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(new Path(basePath + "/.hoodie/.commits_.archive*")); List allCommits = new ArrayList<>(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java index 529571e63..e63807c62 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.AvroUtils; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -47,21 +46,6 @@ import java.util.stream.Collectors; @Component public class CleansCommand implements CommandMarker { - @CliAvailabilityIndicator({"cleans show"}) - public boolean isShowAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"cleans refresh"}) - public boolean isRefreshAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"clean showpartitions"}) - public boolean isCommitShowAvailable() { - return HoodieCLI.tableMetadata != null; - } - @CliCommand(value = "cleans show", help = "Show the cleans") public String showCleans( @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @@ -71,7 +55,7 @@ public class CleansCommand implements CommandMarker { unspecifiedDefaultValue = "false") final boolean headerOnly) throws IOException { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants(); List cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList()); List rows = new ArrayList<>(); @@ -92,7 +76,7 @@ public class CleansCommand implements CommandMarker { @CliCommand(value = "cleans refresh", help = "Refresh the commits") public String refreshCleans() throws IOException { HoodieCLI.refreshTableMetadata(); - return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed."; + return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed."; } @CliCommand(value = "clean showpartitions", help = "Show partition level details of a clean") @@ -104,7 +88,7 @@ public class CleansCommand implements CommandMarker { unspecifiedDefaultValue = "false") final boolean headerOnly) throws Exception { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants(); HoodieInstant cleanInstant = new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, commitTime); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 19b5256f5..089f6f462 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -33,7 +33,6 @@ import org.apache.hudi.common.util.NumericUtils; import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -52,26 +51,6 @@ import java.util.stream.Collectors; @Component public class CommitsCommand implements CommandMarker { - @CliAvailabilityIndicator({"commits show"}) - public boolean isShowAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"commits refresh"}) - public boolean isRefreshAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"commit rollback"}) - public boolean isRollbackAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"commit show"}) - public boolean isCommitShowAvailable() { - return HoodieCLI.tableMetadata != null; - } - @CliCommand(value = "commits show", help = "Show the commits") public String showCommits( @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", @@ -82,7 +61,7 @@ public class CommitsCommand implements CommandMarker { unspecifiedDefaultValue = "false") final boolean headerOnly) throws IOException { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); List commits = timeline.getReverseOrderedInstants().collect(Collectors.toList()); List rows = new ArrayList<>(); @@ -111,14 +90,14 @@ public class CommitsCommand implements CommandMarker { @CliCommand(value = "commits refresh", help = "Refresh the commits") public String refreshCommits() throws IOException { HoodieCLI.refreshTableMetadata(); - return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed."; + return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed."; } @CliCommand(value = "commit rollback", help = "Rollback a commit") public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String commitTime, @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath) throws Exception { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); @@ -128,7 +107,7 @@ public class CommitsCommand implements CommandMarker { SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), commitTime, - HoodieCLI.tableMetadata.getBasePath()); + HoodieCLI.getTableMetaClient().getBasePath()); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); @@ -149,7 +128,7 @@ public class CommitsCommand implements CommandMarker { unspecifiedDefaultValue = "false") final boolean headerOnly) throws Exception { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); @@ -205,7 +184,7 @@ public class CommitsCommand implements CommandMarker { unspecifiedDefaultValue = "false") final boolean headerOnly) throws Exception { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); @@ -232,18 +211,13 @@ public class CommitsCommand implements CommandMarker { return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); } - @CliAvailabilityIndicator({"commits compare"}) - public boolean isCompareCommitsAvailable() { - return HoodieCLI.tableMetadata != null; - } - @CliCommand(value = "commits compare", help = "Compare commits with another Hoodie dataset") public String compareCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) throws Exception { + HoodieTableMetaClient source = HoodieCLI.getTableMetaClient(); HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf, path); HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - HoodieTableMetaClient source = HoodieCLI.tableMetadata; HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); String targetLatestCommit = targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp(); @@ -265,17 +239,12 @@ public class CommitsCommand implements CommandMarker { } } - @CliAvailabilityIndicator({"commits sync"}) - public boolean isSyncCommitsAvailable() { - return HoodieCLI.tableMetadata != null; - } - @CliCommand(value = "commits sync", help = "Compare commits with another Hoodie dataset") public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) throws Exception { HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path); HoodieCLI.state = HoodieCLI.CLIState.SYNC; - return "Load sync state between " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " and " + return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and " + HoodieCLI.syncTableMetadata.getTableConfig().getTableName(); } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 0057e6795..716b8104c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -29,12 +29,14 @@ import org.apache.hudi.cli.commands.SparkMain.SparkCommand; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.AvroUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; @@ -45,7 +47,6 @@ import org.apache.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.util.Utils; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -71,10 +72,12 @@ public class CompactionCommand implements CommandMarker { private static final String TMP_DIR = "/tmp/"; - @CliAvailabilityIndicator({"compactions show all", "compaction show", "compaction run", "compaction schedule"}) - public boolean isAvailable() { - return (HoodieCLI.tableMetadata != null) - && (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ); + private HoodieTableMetaClient checkAndGetMetaClient() { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + if (client.getTableType() != HoodieTableType.MERGE_ON_READ) { + throw new HoodieException("Compactions can only be run for table type : MERGE_ON_READ"); + } + return client; } @CliCommand(value = "compactions show all", help = "Shows all compactions that are in active timeline") @@ -88,7 +91,8 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly) throws IOException { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTableMetaClient client = checkAndGetMetaClient(); + HoodieActiveTimeline activeTimeline = client.getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline(); HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); Set committed = commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); @@ -148,7 +152,8 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly) throws Exception { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTableMetaClient client = checkAndGetMetaClient(); + HoodieActiveTimeline activeTimeline = client.getActiveTimeline(); HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( activeTimeline.readPlanAsBytes( HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); @@ -171,28 +176,25 @@ public class CompactionCommand implements CommandMarker { @CliCommand(value = "compaction schedule", help = "Schedule Compaction") public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G", help = "Spark executor memory") final String sparkMemory) throws Exception { + HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); // First get a compaction instant time and pass it to spark launcher for scheduling compaction String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); - if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { - String sparkPropertiesPath = - Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); - SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), HoodieCLI.tableMetadata.getBasePath(), - HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, sparkMemory); - Process process = sparkLauncher.launch(); - InputStreamConsumer.captureOutput(process); - int exitCode = process.waitFor(); - if (exitCode != 0) { - return "Failed to run compaction for " + compactionInstantTime; - } - return "Compaction successfully completed for " + compactionInstantTime; - } else { - throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), client.getBasePath(), + client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to run compaction for " + compactionInstantTime; } + return "Compaction successfully completed for " + compactionInstantTime; } @CliCommand(value = "compaction run", help = "Run Compaction for given instant time") @@ -207,38 +209,35 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = "compactionInstant", mandatory = false, help = "Base path for the target hoodie dataset") String compactionInstantTime) throws Exception { + HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); - if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { - if (null == compactionInstantTime) { - // pick outstanding one with lowest timestamp - Option firstPendingInstant = - HoodieCLI.tableMetadata.reloadActiveTimeline().filterCompletedAndCompactionInstants() - .filter(instant -> instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)).firstInstant() - .map(HoodieInstant::getTimestamp); - if (!firstPendingInstant.isPresent()) { - return "NO PENDING COMPACTION TO RUN"; - } - compactionInstantTime = firstPendingInstant.get(); + if (null == compactionInstantTime) { + // pick outstanding one with lowest timestamp + Option firstPendingInstant = + client.reloadActiveTimeline().filterCompletedAndCompactionInstants() + .filter(instant -> instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)).firstInstant() + .map(HoodieInstant::getTimestamp); + if (!firstPendingInstant.isPresent()) { + return "NO PENDING COMPACTION TO RUN"; } - - String sparkPropertiesPath = - Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); - SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), HoodieCLI.tableMetadata.getBasePath(), - HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, - sparkMemory, retry); - Process process = sparkLauncher.launch(); - InputStreamConsumer.captureOutput(process); - int exitCode = process.waitFor(); - if (exitCode != 0) { - return "Failed to run compaction for " + compactionInstantTime; - } - return "Compaction successfully completed for " + compactionInstantTime; - } else { - throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); + compactionInstantTime = firstPendingInstant.get(); } + + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), client.getBasePath(), + client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, + sparkMemory, retry); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to run compaction for " + compactionInstantTime; + } + return "Compaction successfully completed for " + compactionInstantTime; } private static String getTmpSerializerFile() { @@ -271,54 +270,51 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly) throws Exception { + HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); String outputPathStr = getTmpSerializerFile(); Path outputPath = new Path(outputPathStr); String output = null; - if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { - try { - String sparkPropertiesPath = Utils - .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); - SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), HoodieCLI.tableMetadata.getBasePath(), - compactionInstant, outputPathStr, parallelism, master, sparkMemory); - Process process = sparkLauncher.launch(); - InputStreamConsumer.captureOutput(process); - int exitCode = process.waitFor(); - if (exitCode != 0) { - return "Failed to validate compaction for " + compactionInstant; - } - List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); - boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true); - String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; - List rows = new ArrayList<>(); - res.stream().forEach(r -> { - Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(), - r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "", - r.getOperation().getDeltaFileNames().size(), r.isSuccess(), - r.getException().isPresent() ? r.getException().get().getMessage() : ""}; - rows.add(row); - }); - - Map> fieldNameToConverterMap = new HashMap<>(); - TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Base Instant Time") - .addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid") - .addTableHeaderField("Error"); - - output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, - headerOnly, rows); - } finally { - // Delete tmp file used to serialize result - if (HoodieCLI.fs.exists(outputPath)) { - HoodieCLI.fs.delete(outputPath, false); - } + try { + String sparkPropertiesPath = Utils + .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), client.getBasePath(), + compactionInstant, outputPathStr, parallelism, master, sparkMemory); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to validate compaction for " + compactionInstant; + } + List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); + boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true); + String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; + List rows = new ArrayList<>(); + res.stream().forEach(r -> { + Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(), + r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "", + r.getOperation().getDeltaFileNames().size(), r.isSuccess(), + r.getException().isPresent() ? r.getException().get().getMessage() : ""}; + rows.add(row); + }); + + Map> fieldNameToConverterMap = new HashMap<>(); + TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Base Instant Time") + .addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid") + .addTableHeaderField("Error"); + + output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, + headerOnly, rows); + } finally { + // Delete tmp file used to serialize result + if (HoodieCLI.fs.exists(outputPath)) { + HoodieCLI.fs.delete(outputPath, false); } - return output; - } else { - throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); } + return output; } @CliCommand(value = "compaction unschedule", help = "Unschedule Compaction") @@ -335,39 +331,36 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly) throws Exception { + HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); String outputPathStr = getTmpSerializerFile(); Path outputPath = new Path(outputPathStr); String output = ""; - if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { - try { - String sparkPropertiesPath = Utils - .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); - SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), HoodieCLI.tableMetadata.getBasePath(), - compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(skipV).toString(), - Boolean.valueOf(dryRun).toString()); - Process process = sparkLauncher.launch(); - InputStreamConsumer.captureOutput(process); - int exitCode = process.waitFor(); - if (exitCode != 0) { - return "Failed to unschedule compaction for " + compactionInstant; - } - List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); - output = - getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction"); - } finally { - // Delete tmp file used to serialize result - if (HoodieCLI.fs.exists(outputPath)) { - HoodieCLI.fs.delete(outputPath, false); - } + try { + String sparkPropertiesPath = Utils + .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), client.getBasePath(), + compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(skipV).toString(), + Boolean.valueOf(dryRun).toString()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to unschedule compaction for " + compactionInstant; + } + List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); + output = + getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction"); + } finally { + // Delete tmp file used to serialize result + if (HoodieCLI.fs.exists(outputPath)) { + HoodieCLI.fs.delete(outputPath, false); } - return output; - } else { - throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); } + return output; } @CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId") @@ -382,39 +375,36 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending, @CliOption(key = {"headeronly"}, help = "Header Only", unspecifiedDefaultValue = "false") boolean headerOnly) throws Exception { + HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); String outputPathStr = getTmpSerializerFile(); Path outputPath = new Path(outputPathStr); String output = ""; - if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { - try { - String sparkPropertiesPath = Utils - .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); - SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), HoodieCLI.tableMetadata.getBasePath(), - fileId, outputPathStr, "1", master, sparkMemory, Boolean.valueOf(skipV).toString(), - Boolean.valueOf(dryRun).toString()); - Process process = sparkLauncher.launch(); - InputStreamConsumer.captureOutput(process); - int exitCode = process.waitFor(); - if (exitCode != 0) { - return "Failed to unschedule compaction for file " + fileId; - } - List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); - output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, - "unschedule file from pending compaction"); - } finally { - // Delete tmp file used to serialize result - if (HoodieCLI.fs.exists(outputPath)) { - HoodieCLI.fs.delete(outputPath, false); - } + try { + String sparkPropertiesPath = Utils + .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), client.getBasePath(), + fileId, outputPathStr, "1", master, sparkMemory, Boolean.valueOf(skipV).toString(), + Boolean.valueOf(dryRun).toString()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to unschedule compaction for file " + fileId; + } + List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); + output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, + "unschedule file from pending compaction"); + } finally { + // Delete tmp file used to serialize result + if (HoodieCLI.fs.exists(outputPath)) { + HoodieCLI.fs.delete(outputPath, false); } - return output; - } else { - throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); } + return output; } @CliCommand(value = "compaction repair", help = "Renames the files to make them consistent with the timeline as " @@ -431,36 +421,34 @@ public class CompactionCommand implements CommandMarker { @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") boolean headerOnly) throws Exception { + HoodieTableMetaClient client = checkAndGetMetaClient(); boolean initialized = HoodieCLI.initConf(); HoodieCLI.initFS(initialized); + String outputPathStr = getTmpSerializerFile(); Path outputPath = new Path(outputPathStr); String output = ""; - if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) { - try { - String sparkPropertiesPath = Utils - .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); - SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), HoodieCLI.tableMetadata.getBasePath(), - compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(dryRun).toString()); - Process process = sparkLauncher.launch(); - InputStreamConsumer.captureOutput(process); - int exitCode = process.waitFor(); - if (exitCode != 0) { - return "Failed to unschedule compaction for " + compactionInstant; - } - List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); - output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction"); - } finally { - // Delete tmp file used to serialize result - if (HoodieCLI.fs.exists(outputPath)) { - HoodieCLI.fs.delete(outputPath, false); - } + try { + String sparkPropertiesPath = Utils + .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), client.getBasePath(), + compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(dryRun).toString()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + if (exitCode != 0) { + return "Failed to unschedule compaction for " + compactionInstant; + } + List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); + output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction"); + } finally { + // Delete tmp file used to serialize result + if (HoodieCLI.fs.exists(outputPath)) { + HoodieCLI.fs.delete(outputPath, false); } - return output; - } else { - throw new Exception("Compactions can only be run for table type : MERGE_ON_READ"); } + return output; } private String getRenamesToBePrinted(List res, Integer limit, String sortByField, boolean descending, diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java index e43dc6d73..d5d1e82d6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.util.ConsistencyGuardConfig; import org.apache.hudi.exception.DatasetNotFoundException; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -69,7 +68,7 @@ public class DatasetsCommand implements CommandMarker { HoodieCLI.connectTo(path, layoutVersion); HoodieCLI.initFS(true); HoodieCLI.state = HoodieCLI.CLIState.DATASET; - return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " loaded"; + return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " loaded"; } /** @@ -116,22 +115,18 @@ public class DatasetsCommand implements CommandMarker { return connect(path, layoutVersion, false, 0, 0, 0); } - @CliAvailabilityIndicator({"desc"}) - public boolean isDescAvailable() { - return HoodieCLI.tableMetadata != null; - } - /** * Describes table properties. */ @CliCommand(value = "desc", help = "Describle Hoodie Table properties") public String descTable() { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); TableHeader header = new TableHeader().addTableHeaderField("Property").addTableHeaderField("Value"); List rows = new ArrayList<>(); - rows.add(new Comparable[] {"basePath", HoodieCLI.tableMetadata.getBasePath()}); - rows.add(new Comparable[] {"metaPath", HoodieCLI.tableMetadata.getMetaPath()}); - rows.add(new Comparable[] {"fileSystem", HoodieCLI.tableMetadata.getFs().getScheme()}); - HoodieCLI.tableMetadata.getTableConfig().getProps().entrySet().forEach(e -> { + rows.add(new Comparable[] {"basePath", client.getBasePath()}); + rows.add(new Comparable[] {"metaPath", client.getMetaPath()}); + rows.add(new Comparable[] {"fileSystem", client.getFs().getScheme()}); + client.getTableConfig().getProps().entrySet().forEach(e -> { rows.add(new Comparable[] {e.getKey(), e.getValue()}); }); return HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index a2f2dd000..e94e16a3e 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -141,7 +141,7 @@ public class FileSystemViewCommand implements CommandMarker { fileSliceStream = fsView.getLatestFileSlices(partition); } else { if (maxInstant.isEmpty()) { - maxInstant = HoodieCLI.tableMetadata.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant() + maxInstant = HoodieCLI.getTableMetaClient().getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant() .get().getTimestamp(); } fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, maxInstant); @@ -224,10 +224,11 @@ public class FileSystemViewCommand implements CommandMarker { */ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String maxInstant, boolean readOptimizedOnly, boolean includeMaxInstant, boolean includeInflight, boolean excludeCompaction) throws IOException { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); HoodieTableMetaClient metaClient = - new HoodieTableMetaClient(HoodieCLI.tableMetadata.getHadoopConf(), HoodieCLI.tableMetadata.getBasePath(), true); + new HoodieTableMetaClient(client.getHadoopConf(), client.getBasePath(), true); FileSystem fs = HoodieCLI.fs; - String globPath = String.format("%s/%s/*", HoodieCLI.tableMetadata.getBasePath(), globRegex); + String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex); FileStatus[] statuses = fs.globStatus(new Path(globPath)); Stream instantsStream = null; diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index ff7207ab0..acf550faf 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -24,6 +24,7 @@ import org.apache.hudi.cli.TableHeader; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; @@ -46,7 +47,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroSchemaConverter; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -68,11 +68,6 @@ import scala.Tuple3; @Component public class HoodieLogFileCommand implements CommandMarker { - @CliAvailabilityIndicator({"show logfiles"}) - public boolean isShowArchivedLogFileAvailable() { - return HoodieCLI.tableMetadata != null; - } - @CliCommand(value = "show logfile metadata", help = "Read commit metadata from log files") public String showLogFileCommits( @CliOption(key = "logFilePathPattern", mandatory = true, @@ -84,7 +79,7 @@ public class HoodieLogFileCommand implements CommandMarker { unspecifiedDefaultValue = "false") final boolean headerOnly) throws IOException { - FileSystem fs = HoodieCLI.tableMetadata.getFs(); + FileSystem fs = HoodieCLI.getTableMetaClient().getFs(); List logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern))) .map(status -> status.getPath().toString()).collect(Collectors.toList()); Map, Map>, Integer>>> commitCountAndMetadata = @@ -96,7 +91,7 @@ public class HoodieLogFileCommand implements CommandMarker { for (String logFilePath : logFilePaths) { FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath)); Schema writerSchema = new AvroSchemaConverter() - .convert(SchemaUtil.readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFilePath))); + .convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath))); Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); // read the avro blocks @@ -178,7 +173,8 @@ public class HoodieLogFileCommand implements CommandMarker { System.out.println("===============> Showing only " + limit + " records <==============="); - FileSystem fs = HoodieCLI.tableMetadata.getFs(); + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); + FileSystem fs = client.getFs(); List logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern))) .map(status -> status.getPath().toString()).collect(Collectors.toList()); @@ -193,8 +189,8 @@ public class HoodieLogFileCommand implements CommandMarker { if (shouldMerge) { System.out.println("===========================> MERGING RECORDS <==================="); HoodieMergedLogRecordScanner scanner = - new HoodieMergedLogRecordScanner(fs, HoodieCLI.tableMetadata.getBasePath(), logFilePaths, readerSchema, - HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(), + new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema, + client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(), Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES), Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), @@ -210,7 +206,7 @@ public class HoodieLogFileCommand implements CommandMarker { } else { for (String logFile : logFilePaths) { Schema writerSchema = new AvroSchemaConverter() - .convert(SchemaUtil.readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFile))); + .convert(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile))); HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema); // read the avro blocks diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java index 94ce1601f..152e21c01 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java @@ -24,9 +24,9 @@ import org.apache.hudi.cli.utils.HiveUtil; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.exception.HoodieException; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -40,11 +40,6 @@ import java.util.stream.Collectors; @Component public class HoodieSyncCommand implements CommandMarker { - @CliAvailabilityIndicator({"sync validate"}) - public boolean isSyncVerificationAvailable() { - return HoodieCLI.tableMetadata != null && HoodieCLI.syncTableMetadata != null; - } - @CliCommand(value = "sync validate", help = "Validate the sync by counting the number of records") public String validateSync( @CliOption(key = {"mode"}, unspecifiedDefaultValue = "complete", help = "Check mode") final String mode, @@ -60,9 +55,12 @@ public class HoodieSyncCommand implements CommandMarker { @CliOption(key = {"hivePass"}, mandatory = true, unspecifiedDefaultValue = "", help = "hive password to connect to") final String hivePass) throws Exception { + if (HoodieCLI.syncTableMetadata == null) { + throw new HoodieException("Sync validate request target table not null."); + } HoodieTableMetaClient target = HoodieCLI.syncTableMetadata; HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline(); - HoodieTableMetaClient source = HoodieCLI.tableMetadata; + HoodieTableMetaClient source = HoodieCLI.getTableMetaClient(); HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline(); long sourceCount = 0; long targetCount = 0; diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 40bd5b583..202965548 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -23,12 +23,12 @@ import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.FSUtils; import org.apache.hadoop.fs.Path; import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -42,16 +42,6 @@ import java.util.List; @Component public class RepairsCommand implements CommandMarker { - @CliAvailabilityIndicator({"repair deduplicate"}) - public boolean isRepairDeduplicateAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"repair addpartitionmeta"}) - public boolean isRepairAddPartitionMetaAvailable() { - return HoodieCLI.tableMetadata != null; - } - @CliCommand(value = "repair deduplicate", help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with") public String deduplicate( @@ -64,7 +54,7 @@ public class RepairsCommand implements CommandMarker { throws Exception { SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, repairedOutputPath, - HoodieCLI.tableMetadata.getBasePath()); + HoodieCLI.getTableMetaClient().getBasePath()); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); @@ -81,11 +71,12 @@ public class RepairsCommand implements CommandMarker { unspecifiedDefaultValue = "true") final boolean dryRun) throws IOException { + HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); String latestCommit = - HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); List partitionPaths = - FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath()); - Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath()); + FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.fs, client.getBasePath()); + Path basePath = new Path(client.getBasePath()); String[][] rows = new String[partitionPaths.size() + 1][]; int ind = 0; diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java index 825c335f1..a3eb6a7ba 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java @@ -58,7 +58,7 @@ public class RollbacksCommand implements CommandMarker { @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly) throws IOException { - HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.tableMetadata); + HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.getTableMetaClient()); HoodieTimeline rollback = activeTimeline.getRollbackTimeline().filterCompletedInstants(); final List rows = new ArrayList<>(); @@ -94,7 +94,7 @@ public class RollbacksCommand implements CommandMarker { @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly) throws IOException { - HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.tableMetadata); + HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.getTableMetaClient()); final List rows = new ArrayList<>(); HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata( activeTimeline.getInstantDetails(new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)).get(), diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java index 363e0a3b4..69a1584b1 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java @@ -23,17 +23,18 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -48,30 +49,9 @@ import java.util.stream.Collectors; @Component public class SavepointsCommand implements CommandMarker { - @CliAvailabilityIndicator({"savepoints show"}) - public boolean isShowAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"savepoints refresh"}) - public boolean isRefreshAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"savepoint create"}) - public boolean isCreateSavepointAvailable() { - return HoodieCLI.tableMetadata != null; - } - - @CliAvailabilityIndicator({"savepoint rollback"}) - public boolean isRollbackToSavepointAvailable() { - return HoodieCLI.tableMetadata != null - && !HoodieCLI.tableMetadata.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty(); - } - @CliCommand(value = "savepoints show", help = "Show the savepoints") public String showSavepoints() throws IOException { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getSavePointTimeline().filterCompletedInstants(); List commits = timeline.getReverseOrderedInstants().collect(Collectors.toList()); String[][] rows = new String[commits.size()][]; @@ -87,7 +67,8 @@ public class SavepointsCommand implements CommandMarker { @CliOption(key = {"user"}, unspecifiedDefaultValue = "default", help = "User who is creating the savepoint") final String user, @CliOption(key = {"comments"}, unspecifiedDefaultValue = "default", help = "Comments for creating the savepoint") final String comments) throws Exception { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); @@ -96,7 +77,7 @@ public class SavepointsCommand implements CommandMarker { } JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Create Savepoint"); - HoodieWriteClient client = createHoodieClient(jsc, HoodieCLI.tableMetadata.getBasePath()); + HoodieWriteClient client = createHoodieClient(jsc, metaClient.getBasePath()); String result; if (client.savepoint(commitTime, user, comments)) { // Refresh the current @@ -114,7 +95,11 @@ public class SavepointsCommand implements CommandMarker { @CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String commitTime, @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath) throws Exception { - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) { + throw new HoodieException("There are no completed instants to run rollback"); + } + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); @@ -124,7 +109,7 @@ public class SavepointsCommand implements CommandMarker { SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), commitTime, - HoodieCLI.tableMetadata.getBasePath()); + metaClient.getBasePath()); Process process = sparkLauncher.launch(); InputStreamConsumer.captureOutput(process); int exitCode = process.waitFor(); @@ -139,7 +124,7 @@ public class SavepointsCommand implements CommandMarker { @CliCommand(value = "savepoints refresh", help = "Refresh the savepoints") public String refreshMetaClient() throws IOException { HoodieCLI.refreshTableMetadata(); - return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed."; + return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed."; } private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java index 0c35a728b..7fc3b25c1 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.springframework.shell.core.CommandMarker; -import org.springframework.shell.core.annotation.CliAvailabilityIndicator; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; @@ -57,11 +56,6 @@ public class StatsCommand implements CommandMarker { private static final int MAX_FILES = 1000000; - @CliAvailabilityIndicator({"stats wa"}) - public boolean isWriteAmpAvailable() { - return HoodieCLI.tableMetadata != null; - } - @CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how many records were upserted to how many " + "records were actually written") public String writeAmplificationStats( @@ -75,7 +69,7 @@ public class StatsCommand implements CommandMarker { long totalRecordsUpserted = 0; long totalRecordsWritten = 0; - HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline(); + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); List rows = new ArrayList<>(); @@ -121,7 +115,7 @@ public class StatsCommand implements CommandMarker { throws IOException { FileSystem fs = HoodieCLI.fs; - String globPath = String.format("%s/%s/*", HoodieCLI.tableMetadata.getBasePath(), globRegex); + String globPath = String.format("%s/%s/*", HoodieCLI.getTableMetaClient().getBasePath(), globRegex); FileStatus[] statuses = fs.globStatus(new Path(globPath)); // max, min, #small files < 10MB, 50th, avg, 95th diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 81d042b3a..a20353b08 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -77,14 +77,13 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity */ public static String createNewInstantTime() { - lastInstantTime.updateAndGet((oldVal) -> { + return lastInstantTime.updateAndGet((oldVal) -> { String newCommitTime = null; do { newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); } while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL)); return newCommitTime; }); - return lastInstantTime.get(); } protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set includedExtensions) {