1
0

[MINOR] Optimize hudi-cli module (#1136)

This commit is contained in:
SteNicholas
2020-01-05 01:05:50 +08:00
committed by vinoth chandar
parent 290278fc6c
commit a733f4ef72
17 changed files with 142 additions and 176 deletions

View File

@@ -84,7 +84,7 @@ public class HoodiePrintHelper {
buffer.getFieldNames().toArray(header); buffer.getFieldNames().toArray(header);
String[][] rows = String[][] rows =
buffer.getRenderRows().stream().map(l -> l.stream().toArray(String[]::new)).toArray(String[][]::new); buffer.getRenderRows().stream().map(l -> l.toArray(new String[l.size()])).toArray(String[][]::new);
return printTextTable(header, rows); return printTextTable(header, rows);
} }

View File

@@ -22,7 +22,6 @@ import org.apache.hudi.common.util.Option;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -89,7 +88,7 @@ public class Table implements Iterable<List<String>> {
* @return * @return
*/ */
public Table addAll(List<List<Comparable>> rows) { public Table addAll(List<List<Comparable>> rows) {
rows.forEach(r -> add(r)); rows.forEach(this::add);
return this; return this;
} }
@@ -120,16 +119,11 @@ public class Table implements Iterable<List<String>> {
*/ */
private List<List<Comparable>> orderRows() { private List<List<Comparable>> orderRows() {
return orderingFieldNameOptional.map(orderingColumnName -> { return orderingFieldNameOptional.map(orderingColumnName -> {
return rawRows.stream().sorted(new Comparator<List<Comparable>>() { return rawRows.stream().sorted((row1, row2) -> {
@Override
public int compare(List<Comparable> row1, List<Comparable> row2) {
Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName)); Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName));
Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName)); Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName));
int cmpRawResult = fieldForRow1.compareTo(fieldForRow2); int cmpRawResult = fieldForRow1.compareTo(fieldForRow2);
return isDescendingOptional.map(isDescending -> { return isDescendingOptional.map(isDescending -> isDescending ? -1 * cmpRawResult : cmpRawResult).orElse(cmpRawResult);
return isDescending ? -1 * cmpRawResult : cmpRawResult;
}).orElse(cmpRawResult);
}
}).collect(Collectors.toList()); }).collect(Collectors.toList());
}).orElse(rawRows); }).orElse(rawRows);
} }
@@ -141,16 +135,14 @@ public class Table implements Iterable<List<String>> {
this.renderRows = new ArrayList<>(); this.renderRows = new ArrayList<>();
final int limit = this.limitOptional.orElse(rawRows.size()); final int limit = this.limitOptional.orElse(rawRows.size());
final List<List<Comparable>> orderedRows = orderRows(); final List<List<Comparable>> orderedRows = orderRows();
renderRows = orderedRows.stream().limit(limit).map(row -> { renderRows = orderedRows.stream().limit(limit).map(row -> IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
return IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
String fieldName = rowHeader.get(idx); String fieldName = rowHeader.get(idx);
if (fieldNameToConverterMap.containsKey(fieldName)) { if (fieldNameToConverterMap.containsKey(fieldName)) {
return fieldNameToConverterMap.get(fieldName).apply(row.get(idx)); return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
} }
Object v = row.get(idx); Object v = row.get(idx);
return v == null ? "null" : v.toString(); return v == null ? "null" : v.toString();
}).collect(Collectors.toList()); }).collect(Collectors.toList())).collect(Collectors.toList());
}).collect(Collectors.toList());
} }
@Override @Override

View File

@@ -89,8 +89,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
.deepCopy(HoodieCommitMetadata.SCHEMA$, r.get("hoodieCommitMetadata")); .deepCopy(HoodieCommitMetadata.SCHEMA$, r.get("hoodieCommitMetadata"));
final String instantTime = r.get("commitTime").toString(); final String instantTime = r.get("commitTime").toString();
final String action = r.get("actionType").toString(); final String action = r.get("actionType").toString();
return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> { return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> hoodieWriteStats.stream().map(hoodieWriteStat -> {
return hoodieWriteStats.stream().map(hoodieWriteStat -> {
List<Comparable> row = new ArrayList<>(); List<Comparable> row = new ArrayList<>();
row.add(action); row.add(action);
row.add(instantTime); row.add(instantTime);
@@ -110,8 +109,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
row.add(hoodieWriteStat.getTotalWriteBytes()); row.add(hoodieWriteStat.getTotalWriteBytes());
row.add(hoodieWriteStat.getTotalWriteErrors()); row.add(hoodieWriteStat.getTotalWriteErrors());
return row; return row;
}); })).map(rowList -> rowList.toArray(new Comparable[0]));
}).map(rowList -> rowList.toArray(new Comparable[0]));
}).collect(Collectors.toList()); }).collect(Collectors.toList());
allStats.addAll(readCommits); allStats.addAll(readCommits);
reader.close(); reader.close();
@@ -183,14 +181,7 @@ public class ArchivedCommitsCommand implements CommandMarker {
} }
break; break;
} }
case HoodieTimeline.COMMIT_ACTION: { case HoodieTimeline.COMMIT_ACTION:
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
if (!skipMetadata) {
commitDetails.add(record.get("hoodieCommitMetadata").toString());
}
break;
}
case HoodieTimeline.DELTA_COMMIT_ACTION: { case HoodieTimeline.DELTA_COMMIT_ACTION: {
commitDetails.add(record.get("commitTime")); commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString()); commitDetails.add(record.get("actionType").toString());

View File

@@ -66,8 +66,7 @@ public class CleansCommand implements CommandMarker {
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants(); HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
List<HoodieInstant> cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList()); List<HoodieInstant> cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
for (int i = 0; i < cleans.size(); i++) { for (HoodieInstant clean : cleans) {
HoodieInstant clean = cleans.get(i);
HoodieCleanMetadata cleanMetadata = HoodieCleanMetadata cleanMetadata =
AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get()); AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
rows.add(new Comparable[]{clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), rows.add(new Comparable[]{clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
@@ -110,8 +109,8 @@ public class CleansCommand implements CommandMarker {
String path = entry.getKey(); String path = entry.getKey();
HoodieCleanPartitionMetadata stats = entry.getValue(); HoodieCleanPartitionMetadata stats = entry.getValue();
String policy = stats.getPolicy(); String policy = stats.getPolicy();
Integer totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size(); int totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size();
Integer totalFailedDeletedFiles = stats.getFailedDeleteFiles().size(); int totalFailedDeletedFiles = stats.getFailedDeleteFiles().size();
rows.add(new Comparable[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles}); rows.add(new Comparable[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles});
} }

View File

@@ -53,7 +53,7 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commits show", help = "Show the commits") @CliCommand(value = "commits show", help = "Show the commits")
public String showCommits( public String showCommits(
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", @CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit, unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -65,8 +65,7 @@ public class CommitsCommand implements CommandMarker {
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList()); List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
for (int i = 0; i < commits.size(); i++) { for (HoodieInstant commit : commits) {
HoodieInstant commit = commits.get(i);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(), rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(),
@@ -76,9 +75,7 @@ public class CommitsCommand implements CommandMarker {
} }
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> { fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))));
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
});
TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written") TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated") .addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
@@ -95,7 +92,7 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commit rollback", help = "Rollback a commit") @CliCommand(value = "commit rollback", help = "Rollback a commit")
public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String commitTime, public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String commitTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath) @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
throws Exception { throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
@@ -163,9 +160,7 @@ public class CommitsCommand implements CommandMarker {
} }
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> { fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
return NumericUtils.humanReadableByteCount((Long.valueOf(entry.toString())));
});
TableHeader header = new TableHeader().addTableHeaderField("Partition Path") TableHeader header = new TableHeader().addTableHeaderField("Partition Path")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated") .addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
@@ -240,8 +235,7 @@ public class CommitsCommand implements CommandMarker {
} }
@CliCommand(value = "commits sync", help = "Compare commits with another Hoodie dataset") @CliCommand(value = "commits sync", help = "Compare commits with another Hoodie dataset")
public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) {
throws Exception {
HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path); HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieCLI.state = HoodieCLI.CLIState.SYNC; HoodieCLI.state = HoodieCLI.CLIState.SYNC;
return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and " return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "

View File

@@ -38,11 +38,12 @@ import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.func.OperationResult;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.launcher.SparkLauncher;
@@ -85,7 +86,7 @@ public class CompactionCommand implements CommandMarker {
public String compactionsAll( public String compactionsAll(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", @CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit, unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -100,10 +101,9 @@ public class CompactionCommand implements CommandMarker {
List<HoodieInstant> instants = timeline.getReverseOrderedInstants().collect(Collectors.toList()); List<HoodieInstant> instants = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
for (int i = 0; i < instants.size(); i++) { for (HoodieInstant instant : instants) {
HoodieInstant instant = instants.get(i);
HoodieCompactionPlan compactionPlan = null; HoodieCompactionPlan compactionPlan = null;
if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) { if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
try { try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails // This could be a completed compaction. Assume a compaction request file is present but skip if fails
compactionPlan = AvroUtils.deserializeCompactionPlan( compactionPlan = AvroUtils.deserializeCompactionPlan(
@@ -118,7 +118,7 @@ public class CompactionCommand implements CommandMarker {
} }
if (null != compactionPlan) { if (null != compactionPlan) {
HoodieInstant.State state = instant.getState(); State state = instant.getState();
if (committed.contains(instant.getTimestamp())) { if (committed.contains(instant.getTimestamp())) {
state = State.COMPLETED; state = State.COMPLETED;
} }
@@ -146,7 +146,7 @@ public class CompactionCommand implements CommandMarker {
public String compactionShow( public String compactionShow(
@CliOption(key = "instant", mandatory = true, @CliOption(key = "instant", mandatory = true,
help = "Base path for the target hoodie dataset") final String compactionInstantTime, help = "Base path for the target hoodie dataset") final String compactionInstantTime,
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", @CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit, unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@@ -212,8 +212,7 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
@CliOption(key = "compactionInstant", mandatory = false, @CliOption(key = "compactionInstant", help = "Base path for the target hoodie dataset") String compactionInstantTime,
help = "Base path for the target hoodie dataset") String compactionInstantTime,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath, unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
@@ -286,7 +285,7 @@ public class CompactionCommand implements CommandMarker {
String outputPathStr = getTmpSerializerFile(); String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr); Path outputPath = new Path(outputPathStr);
String output = null; String output;
try { try {
String sparkPropertiesPath = Utils String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -300,10 +299,10 @@ public class CompactionCommand implements CommandMarker {
return "Failed to validate compaction for " + compactionInstant; return "Failed to validate compaction for " + compactionInstant;
} }
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true); boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> { res.forEach(r -> {
Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(), Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(),
r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "", r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "",
r.getOperation().getDeltaFileNames().size(), r.isSuccess(), r.getOperation().getDeltaFileNames().size(), r.isSuccess(),
@@ -347,7 +346,7 @@ public class CompactionCommand implements CommandMarker {
String outputPathStr = getTmpSerializerFile(); String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr); Path outputPath = new Path(outputPathStr);
String output = ""; String output;
try { try {
String sparkPropertiesPath = Utils String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -391,7 +390,7 @@ public class CompactionCommand implements CommandMarker {
String outputPathStr = getTmpSerializerFile(); String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr); Path outputPath = new Path(outputPathStr);
String output = ""; String output;
try { try {
String sparkPropertiesPath = Utils String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -437,7 +436,7 @@ public class CompactionCommand implements CommandMarker {
String outputPathStr = getTmpSerializerFile(); String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr); Path outputPath = new Path(outputPathStr);
String output = ""; String output;
try { try {
String sparkPropertiesPath = Utils String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
@@ -476,7 +475,7 @@ public class CompactionCommand implements CommandMarker {
} }
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> { res.forEach(r -> {
Comparable[] row = Comparable[] row =
new Comparable[] {r.getOperation().fileId, r.getOperation().srcPath, r.getOperation().destPath, new Comparable[] {r.getOperation().fileId, r.getOperation().srcPath, r.getOperation().destPath,
r.isExecuted(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""}; r.isExecuted(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""};

View File

@@ -49,14 +49,14 @@ public class DatasetsCommand implements CommandMarker {
@CliCommand(value = "connect", help = "Connect to a hoodie dataset") @CliCommand(value = "connect", help = "Connect to a hoodie dataset")
public String connect( public String connect(
@CliOption(key = {"path"}, mandatory = true, help = "Base Path of the dataset") final String path, @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the dataset") final String path,
@CliOption(key = {"layoutVersion"}, mandatory = false, help = "Timeline Layout version") Integer layoutVersion, @CliOption(key = {"layoutVersion"}, help = "Timeline Layout version") Integer layoutVersion,
@CliOption(key = {"eventuallyConsistent"}, mandatory = false, unspecifiedDefaultValue = "false", @CliOption(key = {"eventuallyConsistent"}, unspecifiedDefaultValue = "false",
help = "Enable eventual consistency") final boolean eventuallyConsistent, help = "Enable eventual consistency") final boolean eventuallyConsistent,
@CliOption(key = {"initialCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "2000", @CliOption(key = {"initialCheckIntervalMs"}, unspecifiedDefaultValue = "2000",
help = "Initial wait time for eventual consistency") final Integer initialConsistencyIntervalMs, help = "Initial wait time for eventual consistency") final Integer initialConsistencyIntervalMs,
@CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "300000", @CliOption(key = {"maxCheckIntervalMs"}, unspecifiedDefaultValue = "300000",
help = "Max wait time for eventual consistency") final Integer maxConsistencyIntervalMs, help = "Max wait time for eventual consistency") final Integer maxConsistencyIntervalMs,
@CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "7", @CliOption(key = {"maxCheckIntervalMs"}, unspecifiedDefaultValue = "7",
help = "Max checks for eventual consistency") final Integer maxConsistencyChecks) help = "Max checks for eventual consistency") final Integer maxConsistencyChecks)
throws IOException { throws IOException {
HoodieCLI HoodieCLI
@@ -118,7 +118,7 @@ public class DatasetsCommand implements CommandMarker {
/** /**
* Describes table properties. * Describes table properties.
*/ */
@CliCommand(value = "desc", help = "Describle Hoodie Table properties") @CliCommand(value = "desc", help = "Describe Hoodie Table properties")
public String descTable() { public String descTable() {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
TableHeader header = new TableHeader().addTableHeaderField("Property").addTableHeaderField("Value"); TableHeader header = new TableHeader().addTableHeaderField("Property").addTableHeaderField("Value");

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
@@ -90,13 +91,13 @@ public class FileSystemViewCommand implements CommandMarker {
row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1; row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1;
if (!readOptimizedOnly) { if (!readOptimizedOnly) {
row[idx++] = fs.getLogFiles().count(); row[idx++] = fs.getLogFiles().count();
row[idx++] = fs.getLogFiles().mapToLong(lf -> lf.getFileSize()).sum(); row[idx++] = fs.getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum();
row[idx++] = fs.getLogFiles().collect(Collectors.toList()).toString(); row[idx++] = fs.getLogFiles().collect(Collectors.toList()).toString();
} }
rows.add(row); rows.add(row);
})); }));
Function<Object, String> converterFunction = Function<Object, String> converterFunction =
entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Delta File Size", converterFunction); fieldNameToConverterMap.put("Total Delta File Size", converterFunction);
fieldNameToConverterMap.put("Data-File Size", converterFunction); fieldNameToConverterMap.put("Data-File Size", converterFunction);
@@ -160,15 +161,15 @@ public class FileSystemViewCommand implements CommandMarker {
if (!readOptimizedOnly) { if (!readOptimizedOnly) {
row[idx++] = fs.getLogFiles().count(); row[idx++] = fs.getLogFiles().count();
row[idx++] = fs.getLogFiles().mapToLong(lf -> lf.getFileSize()).sum(); row[idx++] = fs.getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum();
long logFilesScheduledForCompactionTotalSize = long logFilesScheduledForCompactionTotalSize =
fs.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(fs.getBaseInstantTime())) fs.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
.mapToLong(lf -> lf.getFileSize()).sum(); .mapToLong(HoodieLogFile::getFileSize).sum();
row[idx++] = logFilesScheduledForCompactionTotalSize; row[idx++] = logFilesScheduledForCompactionTotalSize;
long logFilesUnscheduledTotalSize = long logFilesUnscheduledTotalSize =
fs.getLogFiles().filter(lf -> !lf.getBaseCommitTime().equals(fs.getBaseInstantTime())) fs.getLogFiles().filter(lf -> !lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
.mapToLong(lf -> lf.getFileSize()).sum(); .mapToLong(HoodieLogFile::getFileSize).sum();
row[idx++] = logFilesUnscheduledTotalSize; row[idx++] = logFilesUnscheduledTotalSize;
double logSelectedForCompactionToBaseRatio = double logSelectedForCompactionToBaseRatio =
@@ -186,7 +187,7 @@ public class FileSystemViewCommand implements CommandMarker {
}); });
Function<Object, String> converterFunction = Function<Object, String> converterFunction =
entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Data-File Size", converterFunction); fieldNameToConverterMap.put("Data-File Size", converterFunction);
if (!readOptimizedOnly) { if (!readOptimizedOnly) {
@@ -230,9 +231,9 @@ public class FileSystemViewCommand implements CommandMarker {
FileSystem fs = HoodieCLI.fs; FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex); String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex);
FileStatus[] statuses = fs.globStatus(new Path(globPath)); FileStatus[] statuses = fs.globStatus(new Path(globPath));
Stream<HoodieInstant> instantsStream = null; Stream<HoodieInstant> instantsStream;
HoodieTimeline timeline = null; HoodieTimeline timeline;
if (readOptimizedOnly) { if (readOptimizedOnly) {
timeline = metaClient.getActiveTimeline().getCommitTimeline(); timeline = metaClient.getActiveTimeline().getCommitTimeline();
} else if (excludeCompaction) { } else if (excludeCompaction) {

View File

@@ -23,8 +23,8 @@ import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator; import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator;
import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils; import org.apache.spark.util.Utils;
import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.CommandMarker;
@@ -42,7 +42,7 @@ public class HDFSParquetImportCommand implements CommandMarker {
@CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset") @CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset")
public String convert( public String convert(
@CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false", @CliOption(key = "upsert", unspecifiedDefaultValue = "false",
help = "Uses upsert API instead of the default insert API of WriteClient") boolean useUpsert, help = "Uses upsert API instead of the default insert API of WriteClient") boolean useUpsert,
@CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") final String srcPath, @CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") final String srcPath,
@CliOption(key = "targetPath", mandatory = true, @CliOption(key = "targetPath", mandatory = true,

View File

@@ -38,8 +38,10 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.hive.util.SchemaUtil; import org.apache.hudi.hive.util.SchemaUtil;
import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@@ -84,14 +86,13 @@ public class HoodieLogFileCommand implements CommandMarker {
.map(status -> status.getPath().toString()).collect(Collectors.toList()); .map(status -> status.getPath().toString()).collect(Collectors.toList());
Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata = Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata =
Maps.newHashMap(); Maps.newHashMap();
int totalEntries = 0;
int numCorruptBlocks = 0; int numCorruptBlocks = 0;
int dummyInstantTimeCount = 0; int dummyInstantTimeCount = 0;
for (String logFilePath : logFilePaths) { for (String logFilePath : logFilePaths) {
FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath)); FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath));
Schema writerSchema = new AvroSchemaConverter() Schema writerSchema = new AvroSchemaConverter()
.convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath))); .convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath))));
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
// read the avro blocks // read the avro blocks
@@ -124,14 +125,12 @@ public class HoodieLogFileCommand implements CommandMarker {
if (commitCountAndMetadata.containsKey(instantTime)) { if (commitCountAndMetadata.containsKey(instantTime)) {
commitCountAndMetadata.get(instantTime).add( commitCountAndMetadata.get(instantTime).add(
new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
totalEntries++;
} else { } else {
List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>> list = List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>> list =
new ArrayList<>(); new ArrayList<>();
list.add( list.add(
new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
commitCountAndMetadata.put(instantTime, list); commitCountAndMetadata.put(instantTime, list);
totalEntries++;
} }
} }
reader.close(); reader.close();
@@ -141,7 +140,7 @@ public class HoodieLogFileCommand implements CommandMarker {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata for (Map.Entry<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> entry : commitCountAndMetadata
.entrySet()) { .entrySet()) {
String instantTime = entry.getKey().toString(); String instantTime = entry.getKey();
for (Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer> tuple3 : entry for (Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer> tuple3 : entry
.getValue()) { .getValue()) {
Comparable[] output = new Comparable[5]; Comparable[] output = new Comparable[5];
@@ -163,11 +162,11 @@ public class HoodieLogFileCommand implements CommandMarker {
@CliCommand(value = "show logfile records", help = "Read records from log files") @CliCommand(value = "show logfile records", help = "Read records from log files")
public String showLogFileRecords( public String showLogFileRecords(
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", @CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "10") final Integer limit, unspecifiedDefaultValue = "10") final Integer limit,
@CliOption(key = "logFilePathPattern", mandatory = true, @CliOption(key = "logFilePathPattern", mandatory = true,
help = "Fully qualified paths for the log files") final String logFilePathPattern, help = "Fully qualified paths for the log files") final String logFilePathPattern,
@CliOption(key = "mergeRecords", mandatory = false, help = "If the records in the log files should be merged", @CliOption(key = "mergeRecords", help = "If the records in the log files should be merged",
unspecifiedDefaultValue = "false") final Boolean shouldMerge) unspecifiedDefaultValue = "false") final Boolean shouldMerge)
throws IOException { throws IOException {
@@ -182,7 +181,7 @@ public class HoodieLogFileCommand implements CommandMarker {
AvroSchemaConverter converter = new AvroSchemaConverter(); AvroSchemaConverter converter = new AvroSchemaConverter();
// get schema from last log file // get schema from last log file
Schema readerSchema = Schema readerSchema =
converter.convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)))); converter.convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)))));
List<IndexedRecord> allRecords = new ArrayList<>(); List<IndexedRecord> allRecords = new ArrayList<>();
@@ -191,10 +190,10 @@ public class HoodieLogFileCommand implements CommandMarker {
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner scanner =
new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema, new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema,
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(), client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(),
Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES), HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) { for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema); Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
@@ -205,7 +204,7 @@ public class HoodieLogFileCommand implements CommandMarker {
} else { } else {
for (String logFile : logFilePaths) { for (String logFile : logFilePaths) {
Schema writerSchema = new AvroSchemaConverter() Schema writerSchema = new AvroSchemaConverter()
.convert(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile))); .convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile))));
HoodieLogFormat.Reader reader = HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema); HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema);
// read the avro blocks // read the avro blocks

View File

@@ -31,6 +31,7 @@ import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption; import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@@ -50,7 +51,7 @@ public class HoodieSyncCommand implements CommandMarker {
help = "total number of recent partitions to validate") final int partitionCount, help = "total number of recent partitions to validate") final int partitionCount,
@CliOption(key = {"hiveServerUrl"}, mandatory = true, @CliOption(key = {"hiveServerUrl"}, mandatory = true,
help = "hiveServerURL to connect to") final String hiveServerUrl, help = "hiveServerURL to connect to") final String hiveServerUrl,
@CliOption(key = {"hiveUser"}, mandatory = false, unspecifiedDefaultValue = "", @CliOption(key = {"hiveUser"}, unspecifiedDefaultValue = "",
help = "hive username to connect to") final String hiveUser, help = "hive username to connect to") final String hiveUser,
@CliOption(key = {"hivePass"}, mandatory = true, unspecifiedDefaultValue = "", @CliOption(key = {"hivePass"}, mandatory = true, unspecifiedDefaultValue = "",
help = "hive password to connect to") final String hivePass) help = "hive password to connect to") final String hivePass)
@@ -80,6 +81,15 @@ public class HoodieSyncCommand implements CommandMarker {
if (sourceLatestCommit != null if (sourceLatestCommit != null
&& HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) { && HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) {
// source is behind the target // source is behind the target
return getString(target, targetTimeline, source, sourceCount, targetCount, sourceLatestCommit);
} else {
return getString(source, sourceTimeline, target, targetCount, sourceCount, targetLatestCommit);
}
}
private String getString(HoodieTableMetaClient target, HoodieTimeline targetTimeline, HoodieTableMetaClient source, long sourceCount, long targetCount, String sourceLatestCommit)
throws IOException {
List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE) List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
.getInstants().collect(Collectors.toList()); .getInstants().collect(Collectors.toList());
if (commitsToCatchup.isEmpty()) { if (commitsToCatchup.isEmpty()) {
@@ -92,21 +102,6 @@ public class HoodieSyncCommand implements CommandMarker {
+ source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is " + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is "
+ newInserts; + newInserts;
} }
} else {
List<HoodieInstant> commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE)
.getInstants().collect(Collectors.toList());
if (commitsToCatchup.isEmpty()) {
return "Count difference now is (count(" + source.getTableConfig().getTableName() + ") - count("
+ target.getTableConfig().getTableName() + ") == " + (sourceCount - targetCount);
} else {
long newInserts = CommitUtil.countNewRecords(source,
commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()));
return "Count difference now is (count(" + source.getTableConfig().getTableName() + ") - count("
+ target.getTableConfig().getTableName() + ") == " + (sourceCount - targetCount) + ". Catch up count is "
+ newInserts;
}
}
} }
} }

View File

@@ -49,7 +49,7 @@ public class RepairsCommand implements CommandMarker {
mandatory = true) final String duplicatedPartitionPath, mandatory = true) final String duplicatedPartitionPath,
@CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files", @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files",
mandatory = true) final String repairedOutputPath, mandatory = true) final String repairedOutputPath,
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path", @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path",
mandatory = true) final String sparkPropertiesPath) mandatory = true) final String sparkPropertiesPath)
throws Exception { throws Exception {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);

View File

@@ -99,20 +99,18 @@ public class RollbacksCommand implements CommandMarker {
HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata( HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata(
activeTimeline.getInstantDetails(new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)).get(), activeTimeline.getInstantDetails(new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)).get(),
HoodieRollbackMetadata.class); HoodieRollbackMetadata.class);
metadata.getPartitionMetadata().entrySet().forEach(e -> { metadata.getPartitionMetadata().forEach((key, value) -> Stream
Stream .concat(value.getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
.concat(e.getValue().getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)), value.getFailedDeleteFiles().stream().map(f -> Pair.of(f, false)))
e.getValue().getFailedDeleteFiles().stream().map(f -> Pair.of(f, false)))
.forEach(fileWithDeleteStatus -> { .forEach(fileWithDeleteStatus -> {
Comparable[] row = new Comparable[5]; Comparable[] row = new Comparable[5];
row[0] = metadata.getStartRollbackTime(); row[0] = metadata.getStartRollbackTime();
row[1] = metadata.getCommitsRollback().toString(); row[1] = metadata.getCommitsRollback().toString();
row[2] = e.getKey(); row[2] = key;
row[3] = fileWithDeleteStatus.getLeft(); row[3] = fileWithDeleteStatus.getLeft();
row[4] = fileWithDeleteStatus.getRight(); row[4] = fileWithDeleteStatus.getRight();
rows.add(row); rows.add(row);
}); }));
});
TableHeader header = new TableHeader().addTableHeaderField("Instant").addTableHeaderField("Rolledback Instants") TableHeader header = new TableHeader().addTableHeaderField("Instant").addTableHeaderField("Rolledback Instants")
.addTableHeaderField("Partition").addTableHeaderField("Deleted File").addTableHeaderField("Succeeded"); .addTableHeaderField("Partition").addTableHeaderField("Deleted File").addTableHeaderField("Succeeded");

View File

@@ -93,7 +93,7 @@ public class SavepointsCommand implements CommandMarker {
@CliCommand(value = "savepoint rollback", help = "Savepoint a commit") @CliCommand(value = "savepoint rollback", help = "Savepoint a commit")
public String rollbackToSavepoint( public String rollbackToSavepoint(
@CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String commitTime, @CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String commitTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath) @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
throws Exception { throws Exception {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) { if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) {
@@ -122,7 +122,7 @@ public class SavepointsCommand implements CommandMarker {
} }
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints") @CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
public String refreshMetaClient() throws IOException { public String refreshMetaClient() {
HoodieCLI.refreshTableMetadata(); HoodieCLI.refreshTableMetadata();
return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed."; return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed.";
} }

View File

@@ -124,19 +124,19 @@ public class SparkMain {
case COMPACT_REPAIR: case COMPACT_REPAIR:
assert (args.length == 8); assert (args.length == 8);
doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
Boolean.valueOf(args[7])); Boolean.parseBoolean(args[7]));
returnCode = 0; returnCode = 0;
break; break;
case COMPACT_UNSCHEDULE_FILE: case COMPACT_UNSCHEDULE_FILE:
assert (args.length == 9); assert (args.length == 9);
doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
Boolean.valueOf(args[7]), Boolean.valueOf(args[8])); Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0; returnCode = 0;
break; break;
case COMPACT_UNSCHEDULE_PLAN: case COMPACT_UNSCHEDULE_PLAN:
assert (args.length == 9); assert (args.length == 9);
doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6],
Boolean.valueOf(args[7]), Boolean.valueOf(args[8])); Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0; returnCode = 0;
break; break;
case CLEAN: case CLEAN:

View File

@@ -73,7 +73,6 @@ public class StatsCommand implements CommandMarker {
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
int i = 0;
DecimalFormat df = new DecimalFormat("#.00"); DecimalFormat df = new DecimalFormat("#.00");
for (HoodieInstant commitTime : timeline.getInstants().collect(Collectors.toList())) { for (HoodieInstant commitTime : timeline.getInstants().collect(Collectors.toList())) {
String waf = "0"; String waf = "0";
@@ -94,7 +93,7 @@ public class StatsCommand implements CommandMarker {
rows.add(new Comparable[] {"Total", totalRecordsUpserted, totalRecordsWritten, waf}); rows.add(new Comparable[] {"Total", totalRecordsUpserted, totalRecordsWritten, waf});
TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Upserted") TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Upserted")
.addTableHeaderField("Total Written").addTableHeaderField("Write Amplifiation Factor"); .addTableHeaderField("Total Written").addTableHeaderField("Write Amplification Factor");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
} }
@@ -120,7 +119,7 @@ public class StatsCommand implements CommandMarker {
// max, min, #small files < 10MB, 50th, avg, 95th // max, min, #small files < 10MB, 50th, avg, 95th
Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES)); Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES));
HashMap<String, Histogram> commitHistoMap = new HashMap<String, Histogram>(); HashMap<String, Histogram> commitHistoMap = new HashMap<>();
for (FileStatus fileStatus : statuses) { for (FileStatus fileStatus : statuses) {
String commitTime = FSUtils.getCommitTime(fileStatus.getPath().getName()); String commitTime = FSUtils.getCommitTime(fileStatus.getPath().getName());
long sz = fileStatus.getLen(); long sz = fileStatus.getLen();
@@ -132,7 +131,6 @@ public class StatsCommand implements CommandMarker {
} }
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
int ind = 0;
for (String commitTime : commitHistoMap.keySet()) { for (String commitTime : commitHistoMap.keySet()) {
Snapshot s = commitHistoMap.get(commitTime).getSnapshot(); Snapshot s = commitHistoMap.get(commitTime).getSnapshot();
rows.add(printFileSizeHistogram(commitTime, s)); rows.add(printFileSizeHistogram(commitTime, s));
@@ -141,7 +139,7 @@ public class StatsCommand implements CommandMarker {
rows.add(printFileSizeHistogram("ALL", s)); rows.add(printFileSizeHistogram("ALL", s));
Function<Object, String> converterFunction = Function<Object, String> converterFunction =
entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Min", converterFunction); fieldNameToConverterMap.put("Min", converterFunction);
fieldNameToConverterMap.put("10th", converterFunction); fieldNameToConverterMap.put("10th", converterFunction);

View File

@@ -24,7 +24,8 @@ import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.log4j.Logger; import com.google.common.base.Preconditions;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.launcher.SparkLauncher;
@@ -38,8 +39,7 @@ import java.util.Map;
*/ */
public class SparkUtil { public class SparkUtil {
private static final Logger LOG = Logger.getLogger(SparkUtil.class); public static final String DEFAULT_SPARK_MASTER = "yarn-client";
public static final String DEFUALT_SPARK_MASTER = "yarn-client";
/** /**
* TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro. * TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro.
@@ -55,7 +55,7 @@ public class SparkUtil {
sparkLauncher.setPropertiesFile(propertiesFile); sparkLauncher.setPropertiesFile(propertiesFile);
} }
File libDirectory = new File(new File(currentJar).getParent(), "lib"); File libDirectory = new File(new File(currentJar).getParent(), "lib");
for (String library : libDirectory.list()) { for (String library : Preconditions.checkNotNull(libDirectory.list())) {
sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath()); sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath());
} }
return sparkLauncher; return sparkLauncher;
@@ -66,7 +66,7 @@ public class SparkUtil {
String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER"); String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER");
if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) { if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) {
sparkConf.setMaster(DEFUALT_SPARK_MASTER); sparkConf.setMaster(DEFAULT_SPARK_MASTER);
} else { } else {
sparkConf.setMaster(defMasterFromEnv); sparkConf.setMaster(defMasterFromEnv);
} }
@@ -82,7 +82,7 @@ public class SparkUtil {
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
sparkConf = HoodieWriteClient.registerClasses(sparkConf); HoodieWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", false); jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration()); FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());