1
0

[HUDI-416] Improve hint information for cli (#1110)

This commit is contained in:
hongdd
2019-12-25 20:19:12 +08:00
committed by leesf
parent 9c4217a3e1
commit 8affdf8bcb
14 changed files with 224 additions and 317 deletions

View File

@@ -40,7 +40,7 @@ public class HoodieCLI {
public static FileSystem fs;
public static CLIState state = CLIState.INIT;
public static String basePath;
public static HoodieTableMetaClient tableMetadata;
protected static HoodieTableMetaClient tableMetadata;
public static HoodieTableMetaClient syncTableMetadata;
public static TimelineLayoutVersion layoutVersion;
@@ -92,4 +92,17 @@ public class HoodieCLI {
setLayoutVersion(layoutVersion);
refreshTableMetadata();
}
/**
* Get tableMetadata, throw NullPointerException when it is null.
*
* @return tableMetadata which is instance of HoodieTableMetaClient
*/
public static HoodieTableMetaClient getTableMetaClient() {
if (tableMetadata == null) {
throw new NullPointerException("There is no hudi dataset. Please use connect command to set dataset first");
}
return tableMetadata;
}
}

View File

@@ -36,7 +36,6 @@ import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -53,11 +52,6 @@ import java.util.stream.Collectors;
@Component
public class ArchivedCommitsCommand implements CommandMarker {
@CliAvailabilityIndicator({"show archived commits"})
public boolean isShowArchivedCommitAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "show archived commit stats", help = "Read commits from archived files and show details")
public String showArchivedCommits(
@CliOption(key = {"archiveFolderPattern"}, help = "Archive Folder", unspecifiedDefaultValue = "") String folder,
@@ -68,7 +62,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
System.out.println("===============> Showing only " + limit + " archived commits <===============");
String basePath = HoodieCLI.tableMetadata.getBasePath();
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
Path archivePath = new Path(basePath + "/.hoodie/.commits_.archive*");
if (folder != null && !folder.isEmpty()) {
archivePath = new Path(basePath + "/.hoodie/" + folder);
@@ -146,7 +140,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
throws IOException {
System.out.println("===============> Showing only " + limit + " archived commits <===============");
String basePath = HoodieCLI.tableMetadata.getBasePath();
String basePath = HoodieCLI.getTableMetaClient().getBasePath();
FileStatus[] fsStatuses =
FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(new Path(basePath + "/.hoodie/.commits_.archive*"));
List<Comparable[]> allCommits = new ArrayList<>();

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.AvroUtils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -47,21 +46,6 @@ import java.util.stream.Collectors;
@Component
public class CleansCommand implements CommandMarker {
@CliAvailabilityIndicator({"cleans show"})
public boolean isShowAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"cleans refresh"})
public boolean isRefreshAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"clean showpartitions"})
public boolean isCommitShowAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "cleans show", help = "Show the cleans")
public String showCleans(
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@@ -71,7 +55,7 @@ public class CleansCommand implements CommandMarker {
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
List<HoodieInstant> cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
@@ -92,7 +76,7 @@ public class CleansCommand implements CommandMarker {
@CliCommand(value = "cleans refresh", help = "Refresh the commits")
public String refreshCleans() throws IOException {
HoodieCLI.refreshTableMetadata();
return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed.";
}
@CliCommand(value = "clean showpartitions", help = "Show partition level details of a clean")
@@ -104,7 +88,7 @@ public class CleansCommand implements CommandMarker {
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
HoodieInstant cleanInstant = new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, commitTime);

View File

@@ -33,7 +33,6 @@ import org.apache.hudi.common.util.NumericUtils;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -52,26 +51,6 @@ import java.util.stream.Collectors;
@Component
public class CommitsCommand implements CommandMarker {
@CliAvailabilityIndicator({"commits show"})
public boolean isShowAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"commits refresh"})
public boolean isRefreshAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"commit rollback"})
public boolean isRollbackAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"commit show"})
public boolean isCommitShowAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
@@ -82,7 +61,7 @@ public class CommitsCommand implements CommandMarker {
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
@@ -111,14 +90,14 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commits refresh", help = "Refresh the commits")
public String refreshCommits() throws IOException {
HoodieCLI.refreshTableMetadata();
return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed.";
}
@CliCommand(value = "commit rollback", help = "Rollback a commit")
public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String commitTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
@@ -128,7 +107,7 @@ public class CommitsCommand implements CommandMarker {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), commitTime,
HoodieCLI.tableMetadata.getBasePath());
HoodieCLI.getTableMetaClient().getBasePath());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
@@ -149,7 +128,7 @@ public class CommitsCommand implements CommandMarker {
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
@@ -205,7 +184,7 @@ public class CommitsCommand implements CommandMarker {
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
@@ -232,18 +211,13 @@ public class CommitsCommand implements CommandMarker {
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
@CliAvailabilityIndicator({"commits compare"})
public boolean isCompareCommitsAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "commits compare", help = "Compare commits with another Hoodie dataset")
public String compareCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path)
throws Exception {
HoodieTableMetaClient source = HoodieCLI.getTableMetaClient();
HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTableMetaClient source = HoodieCLI.tableMetadata;
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String targetLatestCommit =
targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp();
@@ -265,17 +239,12 @@ public class CommitsCommand implements CommandMarker {
}
}
@CliAvailabilityIndicator({"commits sync"})
public boolean isSyncCommitsAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "commits sync", help = "Compare commits with another Hoodie dataset")
public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path)
throws Exception {
HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieCLI.state = HoodieCLI.CLIState.SYNC;
return "Load sync state between " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " and "
return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
}

View File

@@ -29,12 +29,14 @@ import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -45,7 +47,6 @@ import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -71,10 +72,12 @@ public class CompactionCommand implements CommandMarker {
private static final String TMP_DIR = "/tmp/";
@CliAvailabilityIndicator({"compactions show all", "compaction show", "compaction run", "compaction schedule"})
public boolean isAvailable() {
return (HoodieCLI.tableMetadata != null)
&& (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ);
private HoodieTableMetaClient checkAndGetMetaClient() {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
if (client.getTableType() != HoodieTableType.MERGE_ON_READ) {
throw new HoodieException("Compactions can only be run for table type : MERGE_ON_READ");
}
return client;
}
@CliCommand(value = "compactions show all", help = "Shows all compactions that are in active timeline")
@@ -88,7 +91,8 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline();
HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
Set<String> committed = commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
@@ -148,7 +152,8 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
activeTimeline.readPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
@@ -171,28 +176,25 @@ public class CompactionCommand implements CommandMarker {
@CliCommand(value = "compaction schedule", help = "Schedule Compaction")
public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G",
help = "Spark executor memory") final String sparkMemory) throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
// First get a compaction instant time and pass it to spark launcher for scheduling compaction
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), HoodieCLI.tableMetadata.getBasePath(),
HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, sparkMemory);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
return "Compaction successfully completed for " + compactionInstantTime;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), client.getBasePath(),
client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
return "Compaction successfully completed for " + compactionInstantTime;
}
@CliCommand(value = "compaction run", help = "Run Compaction for given instant time")
@@ -207,38 +209,35 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = "compactionInstant", mandatory = false,
help = "Base path for the target hoodie dataset") String compactionInstantTime)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
if (null == compactionInstantTime) {
// pick outstanding one with lowest timestamp
Option<String> firstPendingInstant =
HoodieCLI.tableMetadata.reloadActiveTimeline().filterCompletedAndCompactionInstants()
.filter(instant -> instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)).firstInstant()
.map(HoodieInstant::getTimestamp);
if (!firstPendingInstant.isPresent()) {
return "NO PENDING COMPACTION TO RUN";
}
compactionInstantTime = firstPendingInstant.get();
if (null == compactionInstantTime) {
// pick outstanding one with lowest timestamp
Option<String> firstPendingInstant =
client.reloadActiveTimeline().filterCompletedAndCompactionInstants()
.filter(instant -> instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)).firstInstant()
.map(HoodieInstant::getTimestamp);
if (!firstPendingInstant.isPresent()) {
return "NO PENDING COMPACTION TO RUN";
}
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), HoodieCLI.tableMetadata.getBasePath(),
HoodieCLI.tableMetadata.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath,
sparkMemory, retry);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
return "Compaction successfully completed for " + compactionInstantTime;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
compactionInstantTime = firstPendingInstant.get();
}
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), client.getBasePath(),
client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath,
sparkMemory, retry);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
return "Compaction successfully completed for " + compactionInstantTime;
}
private static String getTmpSerializerFile() {
@@ -271,54 +270,51 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") boolean headerOnly)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = null;
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), HoodieCLI.tableMetadata.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to validate compaction for " + compactionInstant;
}
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> {
Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(),
r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "",
r.getOperation().getDeltaFileNames().size(), r.isSuccess(),
r.getException().isPresent() ? r.getException().get().getMessage() : ""};
rows.add(row);
});
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Base Instant Time")
.addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid")
.addTableHeaderField("Error");
output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit,
headerOnly, rows);
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(), client.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to validate compaction for " + compactionInstant;
}
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> {
Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(),
r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "",
r.getOperation().getDeltaFileNames().size(), r.isSuccess(),
r.getException().isPresent() ? r.getException().get().getMessage() : ""};
rows.add(row);
});
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Base Instant Time")
.addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid")
.addTableHeaderField("Error");
output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit,
headerOnly, rows);
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
return output;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
return output;
}
@CliCommand(value = "compaction unschedule", help = "Unschedule Compaction")
@@ -335,39 +331,36 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") boolean headerOnly)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), HoodieCLI.tableMetadata.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output =
getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(), client.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output =
getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "unschedule pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
return output;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
return output;
}
@CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId")
@@ -382,39 +375,36 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") boolean descending,
@CliOption(key = {"headeronly"}, help = "Header Only", unspecifiedDefaultValue = "false") boolean headerOnly)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), HoodieCLI.tableMetadata.getBasePath(),
fileId, outputPathStr, "1", master, sparkMemory, Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for file " + fileId;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly,
"unschedule file from pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), client.getBasePath(),
fileId, outputPathStr, "1", master, sparkMemory, Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for file " + fileId;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly,
"unschedule file from pending compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
return output;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
return output;
}
@CliCommand(value = "compaction repair", help = "Renames the files to make them consistent with the timeline as "
@@ -431,36 +421,34 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") boolean headerOnly)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
if (HoodieCLI.tableMetadata.getTableType() == HoodieTableType.MERGE_ON_READ) {
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), HoodieCLI.tableMetadata.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), client.getBasePath(),
compactionInstant, outputPathStr, parallelism, master, sparkMemory, Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
if (exitCode != 0) {
return "Failed to unschedule compaction for " + compactionInstant;
}
List<RenameOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
output = getRenamesToBePrinted(res, limit, sortByField, descending, headerOnly, "repair compaction");
} finally {
// Delete tmp file used to serialize result
if (HoodieCLI.fs.exists(outputPath)) {
HoodieCLI.fs.delete(outputPath, false);
}
return output;
} else {
throw new Exception("Compactions can only be run for table type : MERGE_ON_READ");
}
return output;
}
private String getRenamesToBePrinted(List<RenameOpResult> res, Integer limit, String sortByField, boolean descending,

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.common.util.ConsistencyGuardConfig;
import org.apache.hudi.exception.DatasetNotFoundException;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -69,7 +68,7 @@ public class DatasetsCommand implements CommandMarker {
HoodieCLI.connectTo(path, layoutVersion);
HoodieCLI.initFS(true);
HoodieCLI.state = HoodieCLI.CLIState.DATASET;
return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " loaded";
return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " loaded";
}
/**
@@ -116,22 +115,18 @@ public class DatasetsCommand implements CommandMarker {
return connect(path, layoutVersion, false, 0, 0, 0);
}
@CliAvailabilityIndicator({"desc"})
public boolean isDescAvailable() {
return HoodieCLI.tableMetadata != null;
}
/**
* Describes table properties.
*/
@CliCommand(value = "desc", help = "Describle Hoodie Table properties")
public String descTable() {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
TableHeader header = new TableHeader().addTableHeaderField("Property").addTableHeaderField("Value");
List<Comparable[]> rows = new ArrayList<>();
rows.add(new Comparable[] {"basePath", HoodieCLI.tableMetadata.getBasePath()});
rows.add(new Comparable[] {"metaPath", HoodieCLI.tableMetadata.getMetaPath()});
rows.add(new Comparable[] {"fileSystem", HoodieCLI.tableMetadata.getFs().getScheme()});
HoodieCLI.tableMetadata.getTableConfig().getProps().entrySet().forEach(e -> {
rows.add(new Comparable[] {"basePath", client.getBasePath()});
rows.add(new Comparable[] {"metaPath", client.getMetaPath()});
rows.add(new Comparable[] {"fileSystem", client.getFs().getScheme()});
client.getTableConfig().getProps().entrySet().forEach(e -> {
rows.add(new Comparable[] {e.getKey(), e.getValue()});
});
return HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);

View File

@@ -141,7 +141,7 @@ public class FileSystemViewCommand implements CommandMarker {
fileSliceStream = fsView.getLatestFileSlices(partition);
} else {
if (maxInstant.isEmpty()) {
maxInstant = HoodieCLI.tableMetadata.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant()
maxInstant = HoodieCLI.getTableMetaClient().getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant()
.get().getTimestamp();
}
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, maxInstant);
@@ -224,10 +224,11 @@ public class FileSystemViewCommand implements CommandMarker {
*/
private HoodieTableFileSystemView buildFileSystemView(String globRegex, String maxInstant, boolean readOptimizedOnly,
boolean includeMaxInstant, boolean includeInflight, boolean excludeCompaction) throws IOException {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
HoodieTableMetaClient metaClient =
new HoodieTableMetaClient(HoodieCLI.tableMetadata.getHadoopConf(), HoodieCLI.tableMetadata.getBasePath(), true);
new HoodieTableMetaClient(client.getHadoopConf(), client.getBasePath(), true);
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", HoodieCLI.tableMetadata.getBasePath(), globRegex);
String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex);
FileStatus[] statuses = fs.globStatus(new Path(globPath));
Stream<HoodieInstant> instantsStream = null;

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -46,7 +47,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -68,11 +68,6 @@ import scala.Tuple3;
@Component
public class HoodieLogFileCommand implements CommandMarker {
@CliAvailabilityIndicator({"show logfiles"})
public boolean isShowArchivedLogFileAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "show logfile metadata", help = "Read commit metadata from log files")
public String showLogFileCommits(
@CliOption(key = "logFilePathPattern", mandatory = true,
@@ -84,7 +79,7 @@ public class HoodieLogFileCommand implements CommandMarker {
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
FileSystem fs = HoodieCLI.tableMetadata.getFs();
FileSystem fs = HoodieCLI.getTableMetaClient().getFs();
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
.map(status -> status.getPath().toString()).collect(Collectors.toList());
Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata =
@@ -96,7 +91,7 @@ public class HoodieLogFileCommand implements CommandMarker {
for (String logFilePath : logFilePaths) {
FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath));
Schema writerSchema = new AvroSchemaConverter()
.convert(SchemaUtil.readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFilePath)));
.convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath)));
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
// read the avro blocks
@@ -178,7 +173,8 @@ public class HoodieLogFileCommand implements CommandMarker {
System.out.println("===============> Showing only " + limit + " records <===============");
FileSystem fs = HoodieCLI.tableMetadata.getFs();
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
FileSystem fs = client.getFs();
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
.map(status -> status.getPath().toString()).collect(Collectors.toList());
@@ -193,8 +189,8 @@ public class HoodieLogFileCommand implements CommandMarker {
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS <===================");
HoodieMergedLogRecordScanner scanner =
new HoodieMergedLogRecordScanner(fs, HoodieCLI.tableMetadata.getBasePath(), logFilePaths, readerSchema,
HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(),
new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema,
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(),
Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
@@ -210,7 +206,7 @@ public class HoodieLogFileCommand implements CommandMarker {
} else {
for (String logFile : logFilePaths) {
Schema writerSchema = new AvroSchemaConverter()
.convert(SchemaUtil.readSchemaFromLogFile(HoodieCLI.tableMetadata.getFs(), new Path(logFile)));
.convert(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile)));
HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema);
// read the avro blocks

View File

@@ -24,9 +24,9 @@ import org.apache.hudi.cli.utils.HiveUtil;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.exception.HoodieException;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -40,11 +40,6 @@ import java.util.stream.Collectors;
@Component
public class HoodieSyncCommand implements CommandMarker {
@CliAvailabilityIndicator({"sync validate"})
public boolean isSyncVerificationAvailable() {
return HoodieCLI.tableMetadata != null && HoodieCLI.syncTableMetadata != null;
}
@CliCommand(value = "sync validate", help = "Validate the sync by counting the number of records")
public String validateSync(
@CliOption(key = {"mode"}, unspecifiedDefaultValue = "complete", help = "Check mode") final String mode,
@@ -60,9 +55,12 @@ public class HoodieSyncCommand implements CommandMarker {
@CliOption(key = {"hivePass"}, mandatory = true, unspecifiedDefaultValue = "",
help = "hive password to connect to") final String hivePass)
throws Exception {
if (HoodieCLI.syncTableMetadata == null) {
throw new HoodieException("Sync validate request target table not null.");
}
HoodieTableMetaClient target = HoodieCLI.syncTableMetadata;
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline();
HoodieTableMetaClient source = HoodieCLI.tableMetadata;
HoodieTableMetaClient source = HoodieCLI.getTableMetaClient();
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline();
long sourceCount = 0;
long targetCount = 0;

View File

@@ -23,12 +23,12 @@ import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hadoop.fs.Path;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -42,16 +42,6 @@ import java.util.List;
@Component
public class RepairsCommand implements CommandMarker {
@CliAvailabilityIndicator({"repair deduplicate"})
public boolean isRepairDeduplicateAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"repair addpartitionmeta"})
public boolean isRepairAddPartitionMetaAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "repair deduplicate",
help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
public String deduplicate(
@@ -64,7 +54,7 @@ public class RepairsCommand implements CommandMarker {
throws Exception {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DEDUPLICATE.toString(), duplicatedPartitionPath, repairedOutputPath,
HoodieCLI.tableMetadata.getBasePath());
HoodieCLI.getTableMetaClient().getBasePath());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
@@ -81,11 +71,12 @@ public class RepairsCommand implements CommandMarker {
unspecifiedDefaultValue = "true") final boolean dryRun)
throws IOException {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
String latestCommit =
HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp();
List<String> partitionPaths =
FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath());
Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath());
FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.fs, client.getBasePath());
Path basePath = new Path(client.getBasePath());
String[][] rows = new String[partitionPaths.size() + 1][];
int ind = 0;

View File

@@ -58,7 +58,7 @@ public class RollbacksCommand implements CommandMarker {
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.tableMetadata);
HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.getTableMetaClient());
HoodieTimeline rollback = activeTimeline.getRollbackTimeline().filterCompletedInstants();
final List<Comparable[]> rows = new ArrayList<>();
@@ -94,7 +94,7 @@ public class RollbacksCommand implements CommandMarker {
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.tableMetadata);
HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.getTableMetaClient());
final List<Comparable[]> rows = new ArrayList<>();
HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata(
activeTimeline.getInstantDetails(new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)).get(),

View File

@@ -23,17 +23,18 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -48,30 +49,9 @@ import java.util.stream.Collectors;
@Component
public class SavepointsCommand implements CommandMarker {
@CliAvailabilityIndicator({"savepoints show"})
public boolean isShowAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"savepoints refresh"})
public boolean isRefreshAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"savepoint create"})
public boolean isCreateSavepointAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliAvailabilityIndicator({"savepoint rollback"})
public boolean isRollbackToSavepointAvailable() {
return HoodieCLI.tableMetadata != null
&& !HoodieCLI.tableMetadata.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty();
}
@CliCommand(value = "savepoints show", help = "Show the savepoints")
public String showSavepoints() throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getSavePointTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
String[][] rows = new String[commits.size()][];
@@ -87,7 +67,8 @@ public class SavepointsCommand implements CommandMarker {
@CliOption(key = {"user"}, unspecifiedDefaultValue = "default", help = "User who is creating the savepoint") final String user,
@CliOption(key = {"comments"}, unspecifiedDefaultValue = "default", help = "Comments for creating the savepoint") final String comments)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
@@ -96,7 +77,7 @@ public class SavepointsCommand implements CommandMarker {
}
JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Create Savepoint");
HoodieWriteClient client = createHoodieClient(jsc, HoodieCLI.tableMetadata.getBasePath());
HoodieWriteClient client = createHoodieClient(jsc, metaClient.getBasePath());
String result;
if (client.savepoint(commitTime, user, comments)) {
// Refresh the current
@@ -114,7 +95,11 @@ public class SavepointsCommand implements CommandMarker {
@CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String commitTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) {
throw new HoodieException("There are no completed instants to run rollback");
}
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
@@ -124,7 +109,7 @@ public class SavepointsCommand implements CommandMarker {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(), commitTime,
HoodieCLI.tableMetadata.getBasePath());
metaClient.getBasePath());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
@@ -139,7 +124,7 @@ public class SavepointsCommand implements CommandMarker {
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
public String refreshMetaClient() throws IOException {
HoodieCLI.refreshTableMetadata();
return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed.";
}
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {

View File

@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
@@ -57,11 +56,6 @@ public class StatsCommand implements CommandMarker {
private static final int MAX_FILES = 1000000;
@CliAvailabilityIndicator({"stats wa"})
public boolean isWriteAmpAvailable() {
return HoodieCLI.tableMetadata != null;
}
@CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how many records were upserted to how many "
+ "records were actually written")
public String writeAmplificationStats(
@@ -75,7 +69,7 @@ public class StatsCommand implements CommandMarker {
long totalRecordsUpserted = 0;
long totalRecordsWritten = 0;
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
List<Comparable[]> rows = new ArrayList<>();
@@ -121,7 +115,7 @@ public class StatsCommand implements CommandMarker {
throws IOException {
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", HoodieCLI.tableMetadata.getBasePath(), globRegex);
String globPath = String.format("%s/%s/*", HoodieCLI.getTableMetaClient().getBasePath(), globRegex);
FileStatus[] statuses = fs.globStatus(new Path(globPath));
// max, min, #small files < 10MB, 50th, avg, 95th