[HUDI-571] Add show archived compaction(s) to CLI
This commit is contained in:
@@ -21,6 +21,7 @@ package org.apache.hudi.cli.commands;
|
||||
import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||
import org.apache.hudi.cli.TableHeader;
|
||||
import org.apache.hudi.cli.utils.CommitUtil;
|
||||
import org.apache.hudi.cli.utils.InputStreamConsumer;
|
||||
import org.apache.hudi.cli.utils.SparkUtil;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
@@ -41,10 +42,8 @@ import org.springframework.shell.core.annotation.CliOption;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -186,10 +185,10 @@ public class CommitsCommand implements CommandMarker {
|
||||
final boolean headerOnly)
|
||||
throws IOException {
|
||||
if (StringUtils.isNullOrEmpty(startTs)) {
|
||||
startTs = getTimeDaysAgo(10);
|
||||
startTs = CommitUtil.getTimeDaysAgo(10);
|
||||
}
|
||||
if (StringUtils.isNullOrEmpty(endTs)) {
|
||||
endTs = getTimeDaysAgo(1);
|
||||
endTs = CommitUtil.getTimeDaysAgo(1);
|
||||
}
|
||||
HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline();
|
||||
try {
|
||||
@@ -362,10 +361,4 @@ public class CommitsCommand implements CommandMarker {
|
||||
return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
|
||||
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
|
||||
}
|
||||
|
||||
private String getTimeDaysAgo(int numberOfDays) {
|
||||
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
|
||||
return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -26,16 +26,20 @@ import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.HoodiePrintHelper;
|
||||
import org.apache.hudi.cli.TableHeader;
|
||||
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
|
||||
import org.apache.hudi.cli.utils.CommitUtil;
|
||||
import org.apache.hudi.cli.utils.InputStreamConsumer;
|
||||
import org.apache.hudi.cli.utils.SparkUtil;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.AvroUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.func.OperationResult;
|
||||
@@ -61,8 +65,10 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* CLI command to display compaction related options.
|
||||
@@ -95,51 +101,9 @@ public class CompactionCommand implements CommandMarker {
|
||||
throws IOException {
|
||||
HoodieTableMetaClient client = checkAndGetMetaClient();
|
||||
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
|
||||
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline();
|
||||
HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
|
||||
Set<String> committed = commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
|
||||
|
||||
List<HoodieInstant> instants = timeline.getReverseOrderedInstants().collect(Collectors.toList());
|
||||
List<Comparable[]> rows = new ArrayList<>();
|
||||
for (HoodieInstant instant : instants) {
|
||||
HoodieCompactionPlan compactionPlan = null;
|
||||
if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
|
||||
try {
|
||||
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
|
||||
compactionPlan = AvroUtils.deserializeCompactionPlan(
|
||||
activeTimeline.readCompactionPlanAsBytes(
|
||||
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
|
||||
} catch (HoodieIOException ioe) {
|
||||
// SKIP
|
||||
}
|
||||
} else {
|
||||
compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
|
||||
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
|
||||
}
|
||||
|
||||
if (null != compactionPlan) {
|
||||
State state = instant.getState();
|
||||
if (committed.contains(instant.getTimestamp())) {
|
||||
state = State.COMPLETED;
|
||||
}
|
||||
if (includeExtraMetadata) {
|
||||
rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
|
||||
compactionPlan.getOperations() == null ? 0 : compactionPlan.getOperations().size(),
|
||||
compactionPlan.getExtraMetadata().toString()});
|
||||
} else {
|
||||
rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
|
||||
compactionPlan.getOperations() == null ? 0 : compactionPlan.getOperations().size()});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
|
||||
TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State")
|
||||
.addTableHeaderField("Total FileIds to be Compacted");
|
||||
if (includeExtraMetadata) {
|
||||
header = header.addTableHeaderField("Extra Metadata");
|
||||
}
|
||||
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
|
||||
return printAllCompactions(activeTimeline,
|
||||
compactionPlanReader(this::readCompactionPlanForActiveTimeline, activeTimeline),
|
||||
includeExtraMetadata, sortByField, descending, limit, headerOnly);
|
||||
}
|
||||
|
||||
@CliCommand(value = "compaction show", help = "Shows compaction details for a specific compaction instant")
|
||||
@@ -159,19 +123,68 @@ public class CompactionCommand implements CommandMarker {
|
||||
activeTimeline.readCompactionPlanAsBytes(
|
||||
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
|
||||
|
||||
List<Comparable[]> rows = new ArrayList<>();
|
||||
if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
|
||||
for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
|
||||
rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(),
|
||||
op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()});
|
||||
}
|
||||
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
|
||||
}
|
||||
|
||||
@CliCommand(value = "compactions show archived", help = "Shows compaction details for specified time window")
|
||||
public String compactionShowArchived(
|
||||
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
|
||||
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
|
||||
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for compactions, default: now - 10 days")
|
||||
String startTs,
|
||||
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for compactions, default: now - 1 day")
|
||||
String endTs,
|
||||
@CliOption(key = {"limit"}, help = "Limit compactions",
|
||||
unspecifiedDefaultValue = "-1") final Integer limit,
|
||||
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
|
||||
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
||||
@CliOption(key = {"headeronly"}, help = "Print Header Only",
|
||||
unspecifiedDefaultValue = "false") final boolean headerOnly)
|
||||
throws Exception {
|
||||
if (StringUtils.isNullOrEmpty(startTs)) {
|
||||
startTs = CommitUtil.getTimeDaysAgo(10);
|
||||
}
|
||||
if (StringUtils.isNullOrEmpty(endTs)) {
|
||||
endTs = CommitUtil.getTimeDaysAgo(1);
|
||||
}
|
||||
|
||||
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
|
||||
TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id")
|
||||
.addTableHeaderField("Base Instant").addTableHeaderField("Data File Path")
|
||||
.addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics");
|
||||
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
|
||||
HoodieTableMetaClient client = checkAndGetMetaClient();
|
||||
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
|
||||
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
|
||||
try {
|
||||
return printAllCompactions(archivedTimeline,
|
||||
compactionPlanReader(this::readCompactionPlanForArchivedTimeline, archivedTimeline),
|
||||
includeExtraMetadata, sortByField, descending, limit, headerOnly);
|
||||
} finally {
|
||||
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
|
||||
}
|
||||
}
|
||||
|
||||
@CliCommand(value = "compaction show archived", help = "Shows compaction details for a specific compaction instant")
|
||||
public String compactionShowArchived(
|
||||
@CliOption(key = "instant", mandatory = true,
|
||||
help = "instant time") final String compactionInstantTime,
|
||||
@CliOption(key = {"limit"}, help = "Limit commits",
|
||||
unspecifiedDefaultValue = "-1") final Integer limit,
|
||||
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
|
||||
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
||||
@CliOption(key = {"headeronly"}, help = "Print Header Only",
|
||||
unspecifiedDefaultValue = "false") final boolean headerOnly)
|
||||
throws Exception {
|
||||
HoodieTableMetaClient client = checkAndGetMetaClient();
|
||||
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
|
||||
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
|
||||
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
|
||||
String startTs = CommitUtil.addHours(compactionInstantTime, -1);
|
||||
String endTs = CommitUtil.addHours(compactionInstantTime, 1);
|
||||
try {
|
||||
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
|
||||
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
|
||||
archivedTimeline.getInstantDetails(instant).get());
|
||||
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
|
||||
} finally {
|
||||
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
|
||||
}
|
||||
}
|
||||
|
||||
@CliCommand(value = "compaction schedule", help = "Schedule Compaction")
|
||||
@@ -249,6 +262,126 @@ public class CompactionCommand implements CommandMarker {
|
||||
return "Compaction successfully completed for " + compactionInstantTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prints all compaction details.
|
||||
*/
|
||||
private String printAllCompactions(HoodieDefaultTimeline timeline,
|
||||
Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader,
|
||||
boolean includeExtraMetadata,
|
||||
String sortByField,
|
||||
boolean descending,
|
||||
int limit,
|
||||
boolean headerOnly) {
|
||||
|
||||
Stream<HoodieInstant> instantsStream = timeline.getCommitsAndCompactionTimeline().getReverseOrderedInstants();
|
||||
List<Pair<HoodieInstant, HoodieCompactionPlan>> compactionPlans = instantsStream
|
||||
.map(instant -> Pair.of(instant, compactionPlanReader.apply(instant)))
|
||||
.filter(pair -> pair.getRight() != null)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Set<HoodieInstant> committedInstants = timeline.getCommitTimeline().filterCompletedInstants()
|
||||
.getInstants().collect(Collectors.toSet());
|
||||
|
||||
List<Comparable[]> rows = new ArrayList<>();
|
||||
for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan : compactionPlans) {
|
||||
HoodieCompactionPlan plan = compactionPlan.getRight();
|
||||
HoodieInstant instant = compactionPlan.getLeft();
|
||||
final HoodieInstant.State state;
|
||||
if (committedInstants.contains(instant)) {
|
||||
state = HoodieInstant.State.COMPLETED;
|
||||
} else {
|
||||
state = instant.getState();
|
||||
}
|
||||
|
||||
if (includeExtraMetadata) {
|
||||
rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
|
||||
plan.getOperations() == null ? 0 : plan.getOperations().size(),
|
||||
plan.getExtraMetadata().toString()});
|
||||
} else {
|
||||
rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
|
||||
plan.getOperations() == null ? 0 : plan.getOperations().size()});
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
|
||||
TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State")
|
||||
.addTableHeaderField("Total FileIds to be Compacted");
|
||||
if (includeExtraMetadata) {
|
||||
header = header.addTableHeaderField("Extra Metadata");
|
||||
}
|
||||
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compaction reading is different for different timelines. Create partial function to override special logic.
|
||||
* We can make these read methods part of HoodieDefaultTimeline and override where necessary. But the
|
||||
* BiFunction below has 'hacky' exception blocks, so restricting it to CLI.
|
||||
*/
|
||||
private <T extends HoodieDefaultTimeline, U extends HoodieInstant, V extends HoodieCompactionPlan>
|
||||
Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader(
|
||||
BiFunction<T, HoodieInstant, HoodieCompactionPlan> f, T timeline) {
|
||||
|
||||
return (y) -> f.apply(timeline, y);
|
||||
}
|
||||
|
||||
private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
|
||||
HoodieInstant instant) {
|
||||
if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
|
||||
return null;
|
||||
} else {
|
||||
try {
|
||||
return AvroUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* TBD Can we make this part of HoodieActiveTimeline or a utility class.
|
||||
*/
|
||||
private HoodieCompactionPlan readCompactionPlanForActiveTimeline(HoodieActiveTimeline activeTimeline,
|
||||
HoodieInstant instant) {
|
||||
try {
|
||||
if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
|
||||
try {
|
||||
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
|
||||
return AvroUtils.deserializeCompactionPlan(
|
||||
activeTimeline.readCompactionPlanAsBytes(
|
||||
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
|
||||
} catch (HoodieIOException ioe) {
|
||||
// SKIP
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
|
||||
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private String printCompaction(HoodieCompactionPlan compactionPlan,
|
||||
String sortByField,
|
||||
boolean descending,
|
||||
int limit,
|
||||
boolean headerOnly) {
|
||||
List<Comparable[]> rows = new ArrayList<>();
|
||||
if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
|
||||
for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
|
||||
rows.add(new Comparable[]{op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(),
|
||||
op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()});
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
|
||||
TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id")
|
||||
.addTableHeaderField("Base Instant").addTableHeaderField("Data File Path")
|
||||
.addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics");
|
||||
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
|
||||
}
|
||||
|
||||
private static String getTmpSerializerFile() {
|
||||
return TMP_DIR + UUID.randomUUID().toString() + ".ser";
|
||||
}
|
||||
|
||||
@@ -21,9 +21,15 @@ package org.apache.hudi.cli.utils;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -42,4 +48,21 @@ public class CommitUtil {
|
||||
}
|
||||
return totalNew;
|
||||
}
|
||||
|
||||
public static String getTimeDaysAgo(int numberOfDays) {
|
||||
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
|
||||
return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add hours to specified time. If hours <0, this acts as remove hours.
|
||||
* example, say compactionCommitTime: "20200202020000"
|
||||
* a) hours: +1, returns 20200202030000
|
||||
* b) hours: -1, returns 20200202010000
|
||||
*/
|
||||
public static String addHours(String compactionCommitTime, int hours) throws ParseException {
|
||||
Instant instant = HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).toInstant();
|
||||
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
|
||||
return HoodieActiveTimeline.COMMIT_FORMATTER.format(Date.from(commitDateTime.plusHours(hours).toInstant()));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user