1
0

[HUDI-696] Add unit test for CommitsCommand (#1724)

This commit is contained in:
hongdd
2020-06-18 21:42:13 +08:00
committed by GitHub
parent 5099a91edd
commit f3a701757b
9 changed files with 513 additions and 43 deletions

View File

@@ -31,6 +31,7 @@ public class HoodieTableHeaderFields {
public static final String HEADER_CLEAN_TIME = "CleanTime";
public static final String HEADER_EARLIEST_COMMAND_RETAINED = "EarliestCommandRetained";
public static final String HEADER_CLEANING_POLICY = "Cleaning policy";
public static final String HEADER_FILE_SIZE = "File Size";
public static final String HEADER_TOTAL_FILES_DELETED = "Total Files Deleted";
public static final String HEADER_TOTAL_FILES_SUCCESSFULLY_DELETED = "Total Files Successfully Deleted";
@@ -58,7 +59,7 @@ public class HoodieTableHeaderFields {
public static final String HEADER_DELTA_SIZE = "Delta Size";
public static final String HEADER_DELTA_FILES = "Delta Files";
public static final String HEADER_TOTAL_DELTA_SIZE = "Total " + HEADER_DELTA_SIZE;
public static final String HEADER_TOTAL_DELTA_FILE_SIZE = "Total Delta File Size";
public static final String HEADER_TOTAL_DELTA_FILE_SIZE = "Total Delta " + HEADER_FILE_SIZE;
public static final String HEADER_NUM_DELTA_FILES = "Num " + HEADER_DELTA_FILES;
/**
@@ -78,7 +79,7 @@ public class HoodieTableHeaderFields {
* Fields of Repair.
*/
public static final String HEADER_METADATA_PRESENT = "Metadata Present?";
public static final String HEADER_REPAIR_ACTION = "Action";
public static final String HEADER_ACTION = "Action";
public static final String HEADER_HOODIE_PROPERTY = "Property";
public static final String HEADER_OLD_VALUE = "Old Value";
public static final String HEADER_NEW_VALUE = "New Value";
@@ -113,4 +114,30 @@ public class HoodieTableHeaderFields {
public static final String HEADER_HISTOGRAM_NUM_FILES = "NumFiles";
public static final String HEADER_HISTOGRAM_STD_DEV = "StdDev";
/**
* Fields of Commit.
*/
public static final String HEADER_TOTAL_BYTES_WRITTEN = "Total Bytes Written";
public static final String HEADER_TOTAL_FILES_ADDED = "Total Files Added";
public static final String HEADER_TOTAL_FILES_UPDATED = "Total Files Updated";
public static final String HEADER_TOTAL_PARTITIONS_WRITTEN = "Total Partitions Written";
public static final String HEADER_TOTAL_RECORDS_WRITTEN = "Total Records Written";
public static final String HEADER_TOTAL_UPDATE_RECORDS_WRITTEN = "Total Update Records Written";
public static final String HEADER_TOTAL_RECORDS_INSERTED = "Total Records Inserted";
public static final String HEADER_TOTAL_RECORDS_UPDATED = "Total Records Updated";
public static final String HEADER_TOTAL_ERRORS = "Total Errors";
/**
* Fields of commit metadata.
*/
public static final String HEADER_PREVIOUS_COMMIT = "Previous Commit";
public static final String HEADER_NUM_WRITES = "Num Writes";
public static final String HEADER_NUM_INSERTS = "Num Inserts";
public static final String HEADER_NUM_DELETES = "Num Deletes";
public static final String HEADER_NUM_UPDATE_WRITES = "Num Update Writes";
public static final String HEADER_TOTAL_LOG_BLOCKS = "Total Log Blocks";
public static final String HEADER_TOTAL_CORRUPT_LOG_BLOCKS = "Total Corrupt LogBlocks";
public static final String HEADER_TOTAL_ROLLBACK_BLOCKS = "Total Rollback Blocks";
public static final String HEADER_TOTAL_LOG_RECORDS = "Total Log Records";
public static final String HEADER_TOTAL_UPDATED_RECORDS_COMPACTED = "Total Updated Records Compacted";
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
@@ -84,19 +85,19 @@ public class CommitsCommand implements CommandMarker {
}
final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> {
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
});
final TableHeader header = new TableHeader()
.addTableHeaderField("CommitTime")
.addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added")
.addTableHeaderField("Total Files Updated")
.addTableHeaderField("Total Partitions Written")
.addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Update Records Written")
.addTableHeaderField("Total Errors");
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows, tempTableName);
@@ -136,17 +137,26 @@ public class CommitsCommand implements CommandMarker {
}
final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> {
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
});
TableHeader header = new TableHeader().addTableHeaderField("Action").addTableHeaderField("Instant")
.addTableHeaderField("Partition").addTableHeaderField("File Id").addTableHeaderField("Prev Instant")
.addTableHeaderField("Num Writes").addTableHeaderField("Num Inserts").addTableHeaderField("Num Deletes")
.addTableHeaderField("Num Update Writes").addTableHeaderField("Total Write Errors")
.addTableHeaderField("Total Log Blocks").addTableHeaderField("Total Corrupt LogBlocks")
.addTableHeaderField("Total Rollback Blocks").addTableHeaderField("Total Log Records")
.addTableHeaderField("Total Updated Records Compacted").addTableHeaderField("Total Write Bytes");
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_WRITES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_INSERTS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELETES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_UPDATE_WRITES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_CORRUPT_LOG_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ROLLBACK_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_RECORDS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATED_RECORDS_COMPACTED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN);
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows, tempTableName);
@@ -216,7 +226,10 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commit rollback", help = "Rollback a commit")
public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String instantTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
@@ -226,7 +239,7 @@ public class CommitsCommand implements CommandMarker {
}
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), instantTime,
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime,
HoodieCLI.getTableMetaClient().getBasePath());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -286,12 +299,16 @@ public class CommitsCommand implements CommandMarker {
}
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry ->
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
TableHeader header = new TableHeader().addTableHeaderField("Partition Path")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
.addTableHeaderField("Total Records Inserted").addTableHeaderField("Total Records Updated")
.addTableHeaderField("Total Bytes Written").addTableHeaderField("Total Errors");
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending,
limit, headerOnly, rows, exportTableName);
@@ -328,27 +345,30 @@ public class CommitsCommand implements CommandMarker {
}
}
TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File ID")
.addTableHeaderField("Previous Commit").addTableHeaderField("Total Records Updated")
.addTableHeaderField("Total Records Written").addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Errors").addTableHeaderField("File Size");
TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE);
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending,
limit, headerOnly, rows, exportTableName);
}
@CliCommand(value = "commits compare", help = "Compare commits with another Hoodie table")
public String compareCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path)
throws Exception {
public String compareCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) {
HoodieTableMetaClient source = HoodieCLI.getTableMetaClient();
HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String targetLatestCommit =
targetTimeline.getInstants().iterator().hasNext() ? "0" : targetTimeline.lastInstant().get().getTimestamp();
targetTimeline.getInstants().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0";
String sourceLatestCommit =
sourceTimeline.getInstants().iterator().hasNext() ? "0" : sourceTimeline.lastInstant().get().getTimestamp();
sourceTimeline.getInstants().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0";
if (sourceLatestCommit != null
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {

View File

@@ -136,7 +136,7 @@ public class RepairsCommand implements CommandMarker {
}
return HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_REPAIR_ACTION}, rows);
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows);
}
@CliCommand(value = "repair overwrite-hoodie-props", help = "Overwrite hoodie.properties with provided file. Risky operation. Proceed with caution!")

View File

@@ -125,7 +125,7 @@ public class RollbacksCommand implements CommandMarker {
/**
* An Active timeline containing only rollbacks.
*/
static class RollbackTimeline extends HoodieActiveTimeline {
public static class RollbackTimeline extends HoodieActiveTimeline {
public RollbackTimeline(HoodieTableMetaClient metaClient) {
super(metaClient, CollectionUtils.createImmutableSet(HoodieTimeline.ROLLBACK_EXTENSION));

View File

@@ -71,8 +71,8 @@ public class SparkMain {
int returnCode = 0;
switch (cmd) {
case ROLLBACK:
assert (args.length == 3);
returnCode = rollback(jsc, args[1], args[2]);
assert (args.length == 5);
returnCode = rollback(jsc, args[3], args[4]);
break;
case DEDUPLICATE:
assert (args.length == 7);
@@ -174,7 +174,7 @@ public class SparkMain {
List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE, SparkCommand.SAVEPOINT,
SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT);
SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, SparkCommand.ROLLBACK);
return masterContained.contains(command);
}