1
0

[HUDI-699] Fix CompactionCommand and add unit test for CompactionCommand (#2325)

This commit is contained in:
hongdd
2021-04-08 15:35:33 +08:00
committed by GitHub
parent 18459d4045
commit ecdbd2517f
11 changed files with 725 additions and 84 deletions

View File

@@ -143,4 +143,25 @@ public class HoodieTableHeaderFields {
public static final String HEADER_TOTAL_ROLLBACK_BLOCKS = "Total Rollback Blocks"; public static final String HEADER_TOTAL_ROLLBACK_BLOCKS = "Total Rollback Blocks";
public static final String HEADER_TOTAL_LOG_RECORDS = "Total Log Records"; public static final String HEADER_TOTAL_LOG_RECORDS = "Total Log Records";
public static final String HEADER_TOTAL_UPDATED_RECORDS_COMPACTED = "Total Updated Records Compacted"; public static final String HEADER_TOTAL_UPDATED_RECORDS_COMPACTED = "Total Updated Records Compacted";
/**
* Fields of Compaction.
*/
public static final String HEADER_INSTANT_BLANK_TIME = "Instant Time";
public static final String HEADER_FILE_PATH = "File Path";
public static final String HEADER_COMPACTION_INSTANT_TIME = "Compaction " + HEADER_INSTANT_BLANK_TIME;
public static final String HEADER_STATE = "State";
public static final String HEADER_TOTAL_FILES_TO_BE_COMPACTED = "Total FileIds to be Compacted";
public static final String HEADER_EXTRA_METADATA = "Extra Metadata";
public static final String HEADER_DATA_FILE_PATH = "Data " + HEADER_FILE_PATH;
public static final String HEADER_TOTAL_DELTA_FILES = "Total " + HEADER_DELTA_FILES;
public static final String HEADER_METRICS = "getMetrics";
public static final String HEADER_BASE_INSTANT_TIME = "Base " + HEADER_INSTANT_BLANK_TIME;
public static final String HEADER_BASE_DATA_FILE = "Base Data File";
public static final String HEADER_VALID = "Valid";
public static final String HEADER_ERROR = "Error";
public static final String HEADER_SOURCE_FILE_PATH = "Source " + HEADER_FILE_PATH;
public static final String HEADER_DESTINATION_FILE_PATH = "Destination " + HEADER_FILE_PATH;
public static final String HEADER_RENAME_EXECUTED = "Rename Executed?";
public static final String HEADER_RENAME_SUCCEEDED = "Rename Succeeded?";
} }

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand; import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.CommitUtil; import org.apache.hudi.cli.utils.CommitUtil;
@@ -97,8 +98,7 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", @CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly) unspecifiedDefaultValue = "false") final boolean headerOnly) {
throws IOException {
HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline(); HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
return printAllCompactions(activeTimeline, return printAllCompactions(activeTimeline,
@@ -139,8 +139,7 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", @CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly) unspecifiedDefaultValue = "false") final boolean headerOnly) {
throws Exception {
if (StringUtils.isNullOrEmpty(startTs)) { if (StringUtils.isNullOrEmpty(startTs)) {
startTs = CommitUtil.getTimeDaysAgo(10); startTs = CommitUtil.getTimeDaysAgo(10);
} }
@@ -150,7 +149,7 @@ public class CompactionCommand implements CommandMarker {
HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline(); HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); archivedTimeline.loadCompactionDetailsInMemory(startTs, endTs);
try { try {
return printAllCompactions(archivedTimeline, return printAllCompactions(archivedTimeline,
compactionPlanReader(this::readCompactionPlanForArchivedTimeline, archivedTimeline), compactionPlanReader(this::readCompactionPlanForArchivedTimeline, archivedTimeline),
@@ -175,25 +174,25 @@ public class CompactionCommand implements CommandMarker {
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline(); HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime); HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
String startTs = CommitUtil.addHours(compactionInstantTime, -1);
String endTs = CommitUtil.addHours(compactionInstantTime, 1);
try { try {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan( HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeAvroRecordMetadata(
archivedTimeline.getInstantDetails(instant).get()); archivedTimeline.getInstantDetails(instant).get(), HoodieCompactionPlan.getClassSchema());
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly); return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
} finally { } finally {
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs); archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
} }
} }
@CliCommand(value = "compaction schedule", help = "Schedule Compaction") @CliCommand(value = "compaction schedule", help = "Schedule Compaction")
public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G", public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G",
help = "Spark executor memory") final String sparkMemory, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath, unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
unspecifiedDefaultValue = "") final String[] configs) throws Exception { unspecifiedDefaultValue = "") final String[] configs,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf(); boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized); HoodieCLI.initFS(initialized);
@@ -204,8 +203,9 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), client.getBasePath(), String cmd = SparkCommand.COMPACT_SCHEDULE.toString();
client.getTableConfig().getTableName(), compactionInstantTime, sparkMemory, propsFilePath); sparkLauncher.addAppArgs(cmd, master, sparkMemory, client.getBasePath(),
client.getTableConfig().getTableName(), compactionInstantTime, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher); UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
@@ -222,6 +222,8 @@ public class CompactionCommand implements CommandMarker {
help = "Parallelism for hoodie compaction") final String parallelism, help = "Parallelism for hoodie compaction") final String parallelism,
@CliOption(key = "schemaFilePath", mandatory = true, @CliOption(key = "schemaFilePath", mandatory = true,
help = "Path for Avro schema file") final String schemaFilePath, help = "Path for Avro schema file") final String schemaFilePath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local",
help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory, help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
@@ -249,9 +251,9 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), client.getBasePath(), sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), master, sparkMemory, client.getBasePath(),
client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath,
sparkMemory, retry, propsFilePath); retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher); UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
@@ -279,15 +281,15 @@ public class CompactionCommand implements CommandMarker {
.filter(pair -> pair.getRight() != null) .filter(pair -> pair.getRight() != null)
.collect(Collectors.toList()); .collect(Collectors.toList());
Set<HoodieInstant> committedInstants = timeline.getCommitTimeline().filterCompletedInstants() Set<String> committedInstants = timeline.getCommitTimeline().filterCompletedInstants()
.getInstants().collect(Collectors.toSet()); .getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();
for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan : compactionPlans) { for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan : compactionPlans) {
HoodieCompactionPlan plan = compactionPlan.getRight(); HoodieCompactionPlan plan = compactionPlan.getRight();
HoodieInstant instant = compactionPlan.getLeft(); HoodieInstant instant = compactionPlan.getLeft();
final HoodieInstant.State state; final HoodieInstant.State state;
if (committedInstants.contains(instant)) { if (committedInstants.contains(instant.getTimestamp())) {
state = HoodieInstant.State.COMPLETED; state = HoodieInstant.State.COMPLETED;
} else { } else {
state = instant.getState(); state = instant.getState();
@@ -304,10 +306,12 @@ public class CompactionCommand implements CommandMarker {
} }
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State") TableHeader header = new TableHeader()
.addTableHeaderField("Total FileIds to be Compacted"); .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPACTION_INSTANT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_TO_BE_COMPACTED);
if (includeExtraMetadata) { if (includeExtraMetadata) {
header = header.addTableHeaderField("Extra Metadata"); header = header.addTableHeaderField(HoodieTableHeaderFields.HEADER_EXTRA_METADATA);
} }
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
} }
@@ -326,14 +330,17 @@ public class CompactionCommand implements CommandMarker {
private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline, private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
HoodieInstant instant) { HoodieInstant instant) {
if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) { // filter inflight compaction
return null; if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())
} else { && HoodieInstant.State.INFLIGHT.equals(instant.getState())) {
try { try {
return TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get()); return TimelineMetadataUtils.deserializeAvroRecordMetadata(archivedTimeline.getInstantDetails(instant).get(),
} catch (IOException e) { HoodieCompactionPlan.getClassSchema());
throw new HoodieIOException(e.getMessage(), e); } catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
} }
} else {
return null;
} }
} }
@@ -362,7 +369,7 @@ public class CompactionCommand implements CommandMarker {
} }
} }
private String printCompaction(HoodieCompactionPlan compactionPlan, protected String printCompaction(HoodieCompactionPlan compactionPlan,
String sortByField, String sortByField,
boolean descending, boolean descending,
int limit, int limit,
@@ -376,9 +383,13 @@ public class CompactionCommand implements CommandMarker {
} }
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id") TableHeader header = new TableHeader()
.addTableHeaderField("Base Instant").addTableHeaderField("Data File Path") .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
.addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics"); .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_INSTANT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DATA_FILE_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_DELTA_FILES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_METRICS);
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
} }
@@ -404,7 +415,7 @@ public class CompactionCommand implements CommandMarker {
public String validateCompaction( public String validateCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") String sortByField,
@@ -444,9 +455,13 @@ public class CompactionCommand implements CommandMarker {
}); });
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Base Instant Time") TableHeader header = new TableHeader()
.addTableHeaderField("Base Data File").addTableHeaderField("Num Delta Files").addTableHeaderField("Valid") .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField("Error"); .addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_INSTANT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_DATA_FILE)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELTA_FILES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_VALID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_ERROR);
output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, output = message + HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit,
headerOnly, rows); headerOnly, rows);
@@ -463,7 +478,7 @@ public class CompactionCommand implements CommandMarker {
public String unscheduleCompaction( public String unscheduleCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV, @CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun,
@@ -508,7 +523,8 @@ public class CompactionCommand implements CommandMarker {
@CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId") @CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule Compaction for a fileId")
public String unscheduleCompactFile( public String unscheduleCompactFile(
@CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId, @CliOption(key = "fileId", mandatory = true, help = "File Id") final String fileId,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, @CliOption(key = "partitionPath", mandatory = true, help = "partition path") final String partitionPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV, @CliOption(key = {"skipValidation"}, help = "skip validation", unspecifiedDefaultValue = "false") boolean skipV,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun,
@@ -529,7 +545,7 @@ public class CompactionCommand implements CommandMarker {
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), master, sparkMemory, client.getBasePath(), sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), master, sparkMemory, client.getBasePath(),
fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(), fileId, partitionPath, outputPathStr, "1", Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString()); Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch(); Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process); InputStreamConsumer.captureOutput(process);
@@ -554,7 +570,7 @@ public class CompactionCommand implements CommandMarker {
public String repairCompaction( public String repairCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant, @CliOption(key = "instant", mandatory = true, help = "Compaction Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism, @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = "Parallelism") String parallelism,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master ") String master, @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory, @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = "executor memory") String sparkMemory,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun, @CliOption(key = {"dryRun"}, help = "Dry Run Mode", unspecifiedDefaultValue = "false") boolean dryRun,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") Integer limit,
@@ -616,9 +632,13 @@ public class CompactionCommand implements CommandMarker {
}); });
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("File Id").addTableHeaderField("Source File Path") TableHeader header = new TableHeader()
.addTableHeaderField("Destination File Path").addTableHeaderField("Rename Executed?") .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField("Rename Succeeded?").addTableHeaderField("Error"); .addTableHeaderField(HoodieTableHeaderFields.HEADER_SOURCE_FILE_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DESTINATION_FILE_PATH)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_RENAME_EXECUTED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_RENAME_SUCCEEDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_ERROR);
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
} else { } else {

View File

@@ -79,9 +79,7 @@ public class SparkMain {
SparkCommand cmd = SparkCommand.valueOf(command); SparkCommand cmd = SparkCommand.valueOf(command);
JavaSparkContext jsc = sparkMasterContained(cmd) JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2]));
? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, Option.of(args[1]), Option.of(args[2]))
: SparkUtil.initJavaSparkConf("hoodie-cli-" + command);
int returnCode = 0; int returnCode = 0;
try { try {
switch (cmd) { switch (cmd) {
@@ -112,29 +110,29 @@ public class SparkMain {
Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs); Integer.parseInt(args[9]), args[10], Integer.parseInt(args[11]), propsFilePath, configs);
break; break;
case COMPACT_RUN: case COMPACT_RUN:
assert (args.length >= 9); assert (args.length >= 10);
propsFilePath = null; propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[8])) { if (!StringUtils.isNullOrEmpty(args[9])) {
propsFilePath = args[8]; propsFilePath = args[9];
} }
configs = new ArrayList<>(); configs = new ArrayList<>();
if (args.length > 9) { if (args.length > 10) {
configs.addAll(Arrays.asList(args).subList(9, args.length)); configs.addAll(Arrays.asList(args).subList(9, args.length));
} }
returnCode = compact(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], returnCode = compact(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[7],
Integer.parseInt(args[7]), false, propsFilePath, configs); Integer.parseInt(args[8]), false, propsFilePath, configs);
break; break;
case COMPACT_SCHEDULE: case COMPACT_SCHEDULE:
assert (args.length >= 6); assert (args.length >= 7);
propsFilePath = null; propsFilePath = null;
if (!StringUtils.isNullOrEmpty(args[5])) { if (!StringUtils.isNullOrEmpty(args[6])) {
propsFilePath = args[5]; propsFilePath = args[6];
} }
configs = new ArrayList<>(); configs = new ArrayList<>();
if (args.length > 6) { if (args.length > 7) {
configs.addAll(Arrays.asList(args).subList(6, args.length)); configs.addAll(Arrays.asList(args).subList(7, args.length));
} }
returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 0, true, propsFilePath, configs); returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, true, propsFilePath, configs);
break; break;
case COMPACT_VALIDATE: case COMPACT_VALIDATE:
assert (args.length == 7); assert (args.length == 7);
@@ -148,9 +146,9 @@ public class SparkMain {
returnCode = 0; returnCode = 0;
break; break;
case COMPACT_UNSCHEDULE_FILE: case COMPACT_UNSCHEDULE_FILE:
assert (args.length == 9); assert (args.length == 10);
doCompactUnscheduleFile(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), doCompactUnscheduleFile(jsc, args[3], args[4], args[5], args[6], Integer.parseInt(args[7]),
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); Boolean.parseBoolean(args[8]), Boolean.parseBoolean(args[9]));
returnCode = 0; returnCode = 0;
break; break;
case COMPACT_UNSCHEDULE_PLAN: case COMPACT_UNSCHEDULE_PLAN:
@@ -209,14 +207,6 @@ public class SparkMain {
System.exit(returnCode); System.exit(returnCode);
} }
private static boolean sparkMasterContained(SparkCommand command) {
List<SparkCommand> masterContained = Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
SparkCommand.COMPACT_UNSCHEDULE_PLAN, SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE, SparkCommand.SAVEPOINT,
SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, SparkCommand.ROLLBACK, SparkCommand.BOOTSTRAP);
return masterContained.contains(command);
}
protected static void clean(JavaSparkContext jsc, String basePath, String propsFilePath, protected static void clean(JavaSparkContext jsc, String basePath, String propsFilePath,
List<String> configs) { List<String> configs) {
HoodieCleaner.Config cfg = new HoodieCleaner.Config(); HoodieCleaner.Config cfg = new HoodieCleaner.Config();
@@ -280,13 +270,14 @@ public class SparkMain {
new HoodieCompactionAdminTool(cfg).run(jsc); new HoodieCompactionAdminTool(cfg).run(jsc);
} }
private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId, String outputPath, private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePath, String fileId, String partitionPath,
int parallelism, boolean skipValidation, boolean dryRun) String outputPath, int parallelism, boolean skipValidation, boolean dryRun)
throws Exception { throws Exception {
HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config(); HoodieCompactionAdminTool.Config cfg = new HoodieCompactionAdminTool.Config();
cfg.basePath = basePath; cfg.basePath = basePath;
cfg.operation = Operation.UNSCHEDULE_FILE; cfg.operation = Operation.UNSCHEDULE_FILE;
cfg.outputPath = outputPath; cfg.outputPath = outputPath;
cfg.partitionPath = partitionPath;
cfg.fileId = fileId; cfg.fileId = fileId;
cfg.parallelism = parallelism; cfg.parallelism = parallelism;
cfg.dryRun = dryRun; cfg.dryRun = dryRun;
@@ -295,7 +286,7 @@ public class SparkMain {
} }
private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant, private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
int parallelism, String schemaFile, String sparkMemory, int retry, boolean schedule, String propsFilePath, int parallelism, String schemaFile, int retry, boolean schedule, String propsFilePath,
List<String> configs) { List<String> configs) {
HoodieCompactor.Config cfg = new HoodieCompactor.Config(); HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath; cfg.basePath = basePath;
@@ -308,7 +299,6 @@ public class SparkMain {
cfg.runSchedule = schedule; cfg.runSchedule = schedule;
cfg.propsFilePath = propsFilePath; cfg.propsFilePath = propsFilePath;
cfg.configs = configs; cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HoodieCompactor(jsc, cfg).compact(retry); return new HoodieCompactor(jsc, cfg).compact(retry);
} }

View File

@@ -194,7 +194,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
// archived 101 and 102 instants, remove 103 and 104 instant // archived 101 and 102 instants, remove 103 and 104 instant
data.remove("103"); data.remove("103");
data.remove("104"); data.remove("104");
String expected = generateExpectData(3, data); String expected = generateExpectData(1, data);
expected = removeNonWordAndStripSpace(expected); expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString()); String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got); assertEquals(expected, got);

View File

@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.commands;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Test Cases for {@link CompactionCommand}.
*/
public class TestCompactionCommand extends AbstractShellIntegrationTest {
private String tableName;
private String tablePath;
@BeforeEach
public void init() {
tableName = "test_table";
tablePath = basePath + tableName;
}
@Test
public void testVerifyTableType() throws IOException {
// create COW table.
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName());
// expect HoodieException for COPY_ON_WRITE table.
assertThrows(HoodieException.class,
() -> new CompactionCommand().compactionsAll(false, -1, "", false, false));
}
/**
* Test case for command 'compactions show all'.
*/
@Test
public void testCompactionsAll() throws IOException {
// create MOR table.
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName());
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(), false, 3, 4, 3, 3);
HoodieCLI.getTableMetaClient().reloadActiveTimeline();
CommandResult cr = getShell().executeCommand("compactions show all");
System.out.println(cr.getResult().toString());
TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State")
.addTableHeaderField("Total FileIds to be Compacted");
Map<String, Integer> fileIds = new HashMap();
fileIds.put("001", 3);
fileIds.put("003", 4);
fileIds.put("005", 3);
fileIds.put("007", 3);
List<Comparable[]> rows = new ArrayList<>();
Arrays.asList("001", "003", "005", "007").stream().sorted(Comparator.reverseOrder()).forEach(instant -> {
rows.add(new Comparable[] {instant, "REQUESTED", fileIds.get(instant)});
});
String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows);
assertEquals(expected, cr.getResult().toString());
}
/**
* Test case for command 'compaction show'.
*/
@Test
public void testCompactionShow() throws IOException {
// create MOR table.
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName());
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(), false, 3, 4, 3, 3);
HoodieCLI.getTableMetaClient().reloadActiveTimeline();
CommandResult cr = getShell().executeCommand("compaction show --instant 001");
System.out.println(cr.getResult().toString());
}
private void generateCompactionInstances() throws IOException {
// create MOR table.
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName());
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(), true, 1, 2, 3, 4);
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().reloadActiveTimeline();
// Create six commits
Arrays.asList("001", "003", "005", "007").forEach(timestamp -> {
activeTimeline.transitionCompactionInflightToComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, COMPACTION_ACTION, timestamp), Option.empty());
});
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
}
private void generateArchive() throws IOException {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.forTable("test-trip-table").build();
// archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
archiveLog.archiveIfRequired(context);
}
/**
* Test case for command 'compactions showarchived'.
*/
@Test
public void testCompactionsShowArchived() throws IOException {
generateCompactionInstances();
generateArchive();
CommandResult cr = getShell().executeCommand("compactions showarchived --startTs 001 --endTs 005");
// generate result
Map<String, Integer> fileMap = new HashMap<>();
fileMap.put("001", 1);
fileMap.put("003", 2);
fileMap.put("005", 3);
List<Comparable[]> rows = Arrays.asList("005", "003", "001").stream().map(i ->
new Comparable[] {i, HoodieInstant.State.COMPLETED, fileMap.get(i)}).collect(Collectors.toList());
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State")
.addTableHeaderField("Total FileIds to be Compacted");
String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
}
/**
* Test case for command 'compaction showarchived'.
*/
@Test
public void testCompactionShowArchived() throws IOException {
generateCompactionInstances();
String instance = "001";
// get compaction plan before compaction
HoodieCompactionPlan plan = TimelineMetadataUtils.deserializeCompactionPlan(
HoodieCLI.getTableMetaClient().reloadActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instance)).get());
generateArchive();
CommandResult cr = getShell().executeCommand("compaction showarchived --instant " + instance);
// generate expected
String expected = new CompactionCommand().printCompaction(plan, "", false, -1, false);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
}
}

View File

@@ -0,0 +1,330 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.cli.integ;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.client.CompactionAdminClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.TestCompactionAdminClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Integration test class for {@link org.apache.hudi.cli.commands.CompactionCommand}.
* <p/>
* A command use SparkLauncher need load jars under lib which generate during mvn package.
* Use integration test instead of unit test.
*/
public class ITTestCompactionCommand extends AbstractShellIntegrationTest {
private String tablePath;
private String tableName;
@BeforeEach
public void init() throws IOException {
tableName = "test_table_" + ITTestCompactionCommand.class.getName();
tablePath = Paths.get(basePath, tableName).toString();
HoodieCLI.conf = jsc.hadoopConfiguration();
// Create table and connect
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
metaClient.setBasePath(tablePath);
metaClient = HoodieTableMetaClient.reload(metaClient);
}
/**
* Test case for command 'compaction schedule'.
*/
@Test
public void testScheduleCompact() throws IOException {
// generate commits
generateCommits();
CommandResult cr = getShell().executeCommand(
String.format("compaction schedule --hoodieConfigs hoodie.compact.inline.max.delta.commits=1 --sparkMaster %s",
"local"));
assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertTrue(
cr.getResult().toString().startsWith("Attempted to schedule compaction for")));
// there is 1 requested compaction
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
assertEquals(1, timeline.filterPendingCompactionTimeline().countInstants());
}
/**
* Test case for command 'compaction run'.
*/
@Test
public void testCompact() throws IOException {
// generate commits
generateCommits();
String instance = prepareScheduleCompaction();
String schemaPath = Paths.get(basePath, "compaction.schema").toString();
writeSchemaToTmpFile(schemaPath);
CommandResult cr2 = getShell().executeCommand(
String.format("compaction run --parallelism %s --schemaFilePath %s --sparkMaster %s",
2, schemaPath, "local"));
assertAll("Command run failed",
() -> assertTrue(cr2.isSuccess()),
() -> assertTrue(
cr2.getResult().toString().startsWith("Compaction successfully completed for")));
// assert compaction complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending compaction must be completed");
}
/**
* Test case for command 'compaction validate'.
*/
@Test
public void testValidateCompaction() throws IOException {
// generate commits
generateCommits();
String instance = prepareScheduleCompaction();
CommandResult cr = getShell().executeCommand(
String.format("compaction validate --instant %s --sparkMaster %s", instance, "local"));
assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertTrue(
// compaction requested should be valid
cr.getResult().toString().contains("COMPACTION PLAN VALID")));
}
/**
* This function mainly tests the workflow of 'compaction unschedule' command.
* The real test of {@link org.apache.hudi.client.CompactionAdminClient#unscheduleCompactionPlan}
* is {@link TestCompactionAdminClient#testUnscheduleCompactionPlan()}.
*/
@Test
public void testUnscheduleCompaction() throws Exception {
// generate commits
generateCommits();
String instance = prepareScheduleCompaction();
CommandResult cr = getShell().executeCommand(
String.format("compaction unschedule --instant %s --sparkMaster %s", instance, "local"));
// Always has no file
assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertEquals("No File renames needed to unschedule pending compaction. Operation successful.",
cr.getResult().toString()));
}
/**
* This function mainly tests the workflow of 'compaction unscheduleFileId' command.
* The real test of {@link org.apache.hudi.client.CompactionAdminClient#unscheduleCompactionFileId}
* is {@link TestCompactionAdminClient#testUnscheduleCompactionFileId}.
*/
@Test
public void testUnscheduleCompactFile() throws IOException {
int numEntriesPerInstant = 10;
CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
CompactionOperation op = CompactionOperation.convertFromAvroRecordInstance(
CompactionUtils.getCompactionPlan(metaClient, "001").getOperations().stream().findFirst().get());
CommandResult cr = getShell().executeCommand(
String.format("compaction unscheduleFileId --fileId %s --partitionPath %s --sparkMaster %s",
op.getFileGroupId().getFileId(), op.getFileGroupId().getPartitionPath(), "local"));
assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertTrue(removeNonWordAndStripSpace(cr.getResult().toString()).contains("true")),
() -> assertFalse(removeNonWordAndStripSpace(cr.getResult().toString()).contains("false")));
}
/**
* This function mainly tests the workflow of 'compaction repair' command.
* The real test of {@link org.apache.hudi.client.CompactionAdminClient#repairCompaction}
* is {@link TestCompactionAdminClient#testRepairCompactionPlan}.
*/
@Test
public void testRepairCompaction() throws Exception {
int numEntriesPerInstant = 10;
String compactionInstant = "001";
CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
metaClient.reloadActiveTimeline();
CompactionAdminClient client = new CompactionAdminClient(new HoodieSparkEngineContext(jsc), metaClient.getBasePath());
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, compactionInstant, 1, Option.empty(), false);
renameFiles.forEach(lfPair -> {
try {
metaClient.getFs().rename(lfPair.getLeft().getPath(), lfPair.getRight().getPath());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
});
client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
CommandResult cr = getShell().executeCommand(
String.format("compaction repair --instant %s --sparkMaster %s", compactionInstant, "local"));
// All Executes is succeeded, result contains true and has no false
// Expected:
// ║ File Id │ Source File Path │ Destination File Path │ Rename Executed? │ Rename Succeeded? │ Error ║
// ║ * │ * │ * │ true │ true │ ║
assertAll("Command run failed",
() -> assertTrue(cr.isSuccess()),
() -> assertTrue(removeNonWordAndStripSpace(cr.getResult().toString()).contains("true")),
() -> assertFalse(removeNonWordAndStripSpace(cr.getResult().toString()).contains("false")));
}
private String prepareScheduleCompaction() {
// generate requested compaction
CommandResult cr = getShell().executeCommand(
String.format("compaction schedule --hoodieConfigs hoodie.compact.inline.max.delta.commits=1 --sparkMaster %s",
"local"));
assertTrue(cr.isSuccess());
// get compaction instance
HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
Option<String> instance =
timeline.filterPendingCompactionTimeline().firstInstant().map(HoodieInstant::getTimestamp);
assertTrue(instance.isPresent(), "Must have pending compaction.");
return instance.get();
}
private void writeSchemaToTmpFile(String schemaPath) throws IOException {
try (BufferedWriter out = new BufferedWriter(new FileWriter(schemaPath))) {
out.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
}
}
private void generateCommits() throws IOException {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
List<HoodieRecord> records = insert(jsc, client, dataGen);
upsert(jsc, client, dataGen, records);
delete(jsc, client, records);
}
private List<HoodieRecord> insert(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload> client,
HoodieTestDataGenerator dataGen) throws IOException {
// inserts
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
return records;
}
private void upsert(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload> client,
HoodieTestDataGenerator dataGen, List<HoodieRecord> records)
throws IOException {
// updates
String newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2);
records.addAll(toBeUpdated);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
operateFunc(SparkRDDWriteClient::upsert, client, writeRecords, newCommitTime);
}
private void delete(JavaSparkContext jsc, SparkRDDWriteClient<HoodieAvroPayload> client,
List<HoodieRecord> records) {
// Delete
String newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
// just delete half of the records
int numToDelete = records.size() / 2;
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
client.delete(deleteRecords, newCommitTime);
}
private JavaRDD<WriteStatus> operateFunc(
HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> writeRecords, String commitTime)
throws IOException {
return writeFn.apply(client, writeRecords, commitTime);
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.cli.testutils; package org.apache.hudi.cli.testutils;
import org.apache.hudi.common.model.HoodieTableType;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@@ -37,4 +38,8 @@ public abstract class AbstractShellIntegrationTest extends AbstractShellBaseInte
public void teardown() throws Exception { public void teardown() throws Exception {
cleanupResources(); cleanupResources();
} }
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
} }

View File

@@ -383,7 +383,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
* @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule * @return list of pairs of log-files (old, new) and for each pair, rename must be done to successfully unschedule
* compaction. * compaction.
*/ */
protected List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionPlan( public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulingCompactionPlan(
HoodieTableMetaClient metaClient, String compactionInstant, int parallelism, HoodieTableMetaClient metaClient, String compactionInstant, int parallelism,
Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException { Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) throws IOException {
HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get() HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get()

View File

@@ -464,7 +464,8 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
assertTrue(result); assertTrue(result);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3); List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
assertEquals(new HashSet<>(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet())); assertEquals(new HashSet<>(archivedInstants),
archivedTimeline.filterCompletedInstants().getInstants().collect(Collectors.toSet()));
assertFalse(wrapperFs.exists(markerPath)); assertFalse(wrapperFs.exists(markerPath));
} }

View File

@@ -18,12 +18,14 @@
package org.apache.hudi.common.table.timeline; package org.apache.hudi.common.table.timeline;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
@@ -43,6 +45,7 @@ import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@@ -66,6 +69,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits"; private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits";
private static final String ACTION_TYPE_KEY = "actionType"; private static final String ACTION_TYPE_KEY = "actionType";
private static final String ACTION_STATE = "actionState";
private HoodieTableMetaClient metaClient; private HoodieTableMetaClient metaClient;
private Map<String, byte[]> readCommits = new HashMap<>(); private Map<String, byte[]> readCommits = new HashMap<>();
@@ -108,6 +112,22 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
loadInstants(startTs, endTs); loadInstants(startTs, endTs);
} }
public void loadCompactionDetailsInMemory(String compactionInstantTime) {
loadCompactionDetailsInMemory(compactionInstantTime, compactionInstantTime);
}
public void loadCompactionDetailsInMemory(String startTs, String endTs) {
// load compactionPlan
loadInstants(new TimeRangeFilter(startTs, endTs), true, record ->
record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
&& HoodieInstant.State.INFLIGHT.toString().equals(record.get(ACTION_STATE).toString())
);
}
public void clearInstantDetailsFromMemory(String instantTime) {
this.readCommits.remove(instantTime);
}
public void clearInstantDetailsFromMemory(String startTs, String endTs) { public void clearInstantDetailsFromMemory(String startTs, String endTs) {
this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant -> this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant ->
this.readCommits.remove(instant.getTimestamp())); this.readCommits.remove(instant.getTimestamp()));
@@ -126,11 +146,16 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString(); final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
final String action = record.get(ACTION_TYPE_KEY).toString(); final String action = record.get(ACTION_TYPE_KEY).toString();
if (loadDetails) { if (loadDetails) {
Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> {
this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8)) if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
); this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData));
} else {
this.readCommits.put(instantTime, actionData.toString().getBytes(StandardCharsets.UTF_8));
}
return null;
});
} }
return new HoodieInstant(false, action, instantTime); return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime);
} }
private String getMetadataKey(String action) { private String getMetadataKey(String action) {
@@ -145,6 +170,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
return "hoodieRollbackMetadata"; return "hoodieRollbackMetadata";
case HoodieTimeline.SAVEPOINT_ACTION: case HoodieTimeline.SAVEPOINT_ACTION:
return "hoodieSavePointMetadata"; return "hoodieSavePointMetadata";
case HoodieTimeline.COMPACTION_ACTION:
return "hoodieCompactionPlan";
default: default:
throw new HoodieIOException("Unknown action in metadata " + action); throw new HoodieIOException("Unknown action in metadata " + action);
} }
@@ -158,12 +185,18 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
return loadInstants(new TimeRangeFilter(startTs, endTs), true); return loadInstants(new TimeRangeFilter(startTs, endTs), true);
} }
private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) {
return loadInstants(filter, loadInstantDetails, record -> true);
}
/** /**
* This is method to read selected instants. Do NOT use this directly use one of the helper methods above * This is method to read selected instants. Do NOT use this directly use one of the helper methods above
* If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details * If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details
* If filter is specified, only the filtered instants are loaded * If filter is specified, only the filtered instants are loaded
* If commitsFilter is specified, only the filtered records are loaded
*/ */
private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) { private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
Function<GenericRecord, Boolean> commitsFilter) {
try { try {
// list all files // list all files
FileStatus[] fsStatuses = metaClient.getFs().globStatus( FileStatus[] fsStatuses = metaClient.getFs().globStatus(
@@ -187,6 +220,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
List<IndexedRecord> records = blk.getRecords(); List<IndexedRecord> records = blk.getRecords();
// filter blocks in desired time window // filter blocks in desired time window
Stream<HoodieInstant> instantsInBlkStream = records.stream() Stream<HoodieInstant> instantsInBlkStream = records.stream()
.filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r, loadInstantDetails)); .map(r -> readCommit((GenericRecord) r, loadInstantDetails));
if (filter != null) { if (filter != null) {
@@ -254,4 +288,13 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
return 0; return 0;
} }
} }
@Override
public HoodieDefaultTimeline getWriteTimeline() {
// filter in-memory instants
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
return new HoodieDefaultTimeline(getInstants().filter(i ->
readCommits.keySet().contains(i.getTimestamp()))
.filter(s -> validActions.contains(s.getAction())), details);
}
} }

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.timeline; package org.apache.hudi.common.table.timeline;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -33,12 +34,14 @@ import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader; import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase; import org.apache.avro.specific.SpecificRecordBase;
@@ -176,4 +179,13 @@ public class TimelineMetadataUtils {
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz); ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz);
return fileReader.next(); return fileReader.next();
} }
public static <T extends SpecificRecordBase> T deserializeAvroRecordMetadata(byte[] bytes, Schema schema)
throws IOException {
return deserializeAvroRecordMetadata(HoodieAvroUtils.bytesToAvro(bytes, schema), schema);
}
public static <T extends SpecificRecordBase> T deserializeAvroRecordMetadata(Object object, Schema schema) {
return (T) SpecificData.get().deepCopy(schema, object);
}
} }