Useful Hudi CLI commands to debug/analyze production workloads
This commit is contained in:
committed by
vinoth chandar
parent
07324e7a20
commit
25cd05b24e
@@ -50,6 +50,8 @@ public class ArchivedCommitsCommand implements CommandMarker {
|
|||||||
|
|
||||||
@CliCommand(value = "show archived commits", help = "Read commits from archived files and show details")
|
@CliCommand(value = "show archived commits", help = "Read commits from archived files and show details")
|
||||||
public String showCommits(
|
public String showCommits(
|
||||||
|
@CliOption(key = {"skipMetadata"}, help = "Skip displaying commit metadata", unspecifiedDefaultValue = "true")
|
||||||
|
boolean skipMetadata,
|
||||||
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "10") final Integer limit,
|
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "10") final Integer limit,
|
||||||
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
|
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
|
||||||
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
||||||
@@ -74,51 +76,65 @@ public class ArchivedCommitsCommand implements CommandMarker {
|
|||||||
List<IndexedRecord> records = blk.getRecords();
|
List<IndexedRecord> records = blk.getRecords();
|
||||||
readRecords.addAll(records);
|
readRecords.addAll(records);
|
||||||
}
|
}
|
||||||
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> readCommit(r))
|
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r ->
|
||||||
|
readCommit(r, skipMetadata))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
allCommits.addAll(readCommits);
|
allCommits.addAll(readCommits);
|
||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
TableHeader header = new TableHeader().addTableHeaderField("CommitTime")
|
TableHeader header = new TableHeader().addTableHeaderField("CommitTime")
|
||||||
.addTableHeaderField("CommitType")
|
.addTableHeaderField("CommitType");
|
||||||
.addTableHeaderField("CommitDetails");
|
|
||||||
|
if (!skipMetadata) {
|
||||||
|
header = header.addTableHeaderField("CommitDetails");
|
||||||
|
}
|
||||||
|
|
||||||
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, allCommits);
|
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, allCommits);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Comparable[] readCommit(GenericRecord record) {
|
private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) {
|
||||||
List<Object> commitDetails = new ArrayList<>();
|
List<Object> commitDetails = new ArrayList<>();
|
||||||
try {
|
try {
|
||||||
switch (record.get("actionType").toString()) {
|
switch (record.get("actionType").toString()) {
|
||||||
case HoodieTimeline.CLEAN_ACTION: {
|
case HoodieTimeline.CLEAN_ACTION: {
|
||||||
commitDetails.add(record.get("commitTime"));
|
commitDetails.add(record.get("commitTime"));
|
||||||
commitDetails.add(record.get("actionType").toString());
|
commitDetails.add(record.get("actionType").toString());
|
||||||
|
if (!skipMetadata) {
|
||||||
commitDetails.add(record.get("hoodieCleanMetadata").toString());
|
commitDetails.add(record.get("hoodieCleanMetadata").toString());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case HoodieTimeline.COMMIT_ACTION: {
|
case HoodieTimeline.COMMIT_ACTION: {
|
||||||
commitDetails.add(record.get("commitTime"));
|
commitDetails.add(record.get("commitTime"));
|
||||||
commitDetails.add(record.get("actionType").toString());
|
commitDetails.add(record.get("actionType").toString());
|
||||||
|
if (!skipMetadata) {
|
||||||
commitDetails.add(record.get("hoodieCommitMetadata").toString());
|
commitDetails.add(record.get("hoodieCommitMetadata").toString());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case HoodieTimeline.DELTA_COMMIT_ACTION: {
|
case HoodieTimeline.DELTA_COMMIT_ACTION: {
|
||||||
commitDetails.add(record.get("commitTime"));
|
commitDetails.add(record.get("commitTime"));
|
||||||
commitDetails.add(record.get("actionType").toString());
|
commitDetails.add(record.get("actionType").toString());
|
||||||
|
if (!skipMetadata) {
|
||||||
commitDetails.add(record.get("hoodieCommitMetadata").toString());
|
commitDetails.add(record.get("hoodieCommitMetadata").toString());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case HoodieTimeline.ROLLBACK_ACTION: {
|
case HoodieTimeline.ROLLBACK_ACTION: {
|
||||||
commitDetails.add(record.get("commitTime"));
|
commitDetails.add(record.get("commitTime"));
|
||||||
commitDetails.add(record.get("actionType").toString());
|
commitDetails.add(record.get("actionType").toString());
|
||||||
|
if (!skipMetadata) {
|
||||||
commitDetails.add(record.get("hoodieRollbackMetadata").toString());
|
commitDetails.add(record.get("hoodieRollbackMetadata").toString());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case HoodieTimeline.SAVEPOINT_ACTION: {
|
case HoodieTimeline.SAVEPOINT_ACTION: {
|
||||||
commitDetails.add(record.get("commitTime"));
|
commitDetails.add(record.get("commitTime"));
|
||||||
commitDetails.add(record.get("actionType").toString());
|
commitDetails.add(record.get("actionType").toString());
|
||||||
|
if (!skipMetadata) {
|
||||||
commitDetails.add(record.get("hoodieSavePointMetadata").toString());
|
commitDetails.add(record.get("hoodieSavePointMetadata").toString());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
|||||||
@@ -0,0 +1,273 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.cli.commands;
|
||||||
|
|
||||||
|
import com.uber.hoodie.cli.HoodieCLI;
|
||||||
|
import com.uber.hoodie.cli.HoodiePrintHelper;
|
||||||
|
import com.uber.hoodie.cli.TableHeader;
|
||||||
|
import com.uber.hoodie.common.model.FileSlice;
|
||||||
|
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||||
|
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||||
|
import com.uber.hoodie.common.table.timeline.HoodieDefaultTimeline;
|
||||||
|
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||||
|
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
|
||||||
|
import com.uber.hoodie.common.util.NumericUtils;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.function.BiPredicate;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.springframework.shell.core.CommandMarker;
|
||||||
|
import org.springframework.shell.core.annotation.CliCommand;
|
||||||
|
import org.springframework.shell.core.annotation.CliOption;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class FileSystemViewCommand implements CommandMarker {
|
||||||
|
|
||||||
|
@CliCommand(value = "show fsview all", help = "Show entire file-system view")
|
||||||
|
public String showAllFileSlices(
|
||||||
|
@CliOption(key = {"pathRegex"},
|
||||||
|
help = "regex to select files, eg: 2016/08/02", unspecifiedDefaultValue = "*/*/*") String globRegex,
|
||||||
|
@CliOption(key = {"readOptimizedOnly"}, help = "Only display read-optimized view",
|
||||||
|
unspecifiedDefaultValue = "false") boolean readOptimizedOnly,
|
||||||
|
@CliOption(key = {"maxInstant"}, help = "File-Slices upto this instant are displayed",
|
||||||
|
unspecifiedDefaultValue = "") String maxInstant,
|
||||||
|
@CliOption(key = {
|
||||||
|
"includeMax"}, help = "Include Max Instant", unspecifiedDefaultValue = "false") boolean includeMaxInstant,
|
||||||
|
@CliOption(key = {
|
||||||
|
"includeInflight"}, help = "Include Inflight Instants", unspecifiedDefaultValue = "false")
|
||||||
|
boolean includeInflight,
|
||||||
|
@CliOption(key = {"excludeCompaction"}, help = "Exclude compaction Instants", unspecifiedDefaultValue = "false")
|
||||||
|
boolean excludeCompaction,
|
||||||
|
@CliOption(key = {"limit"}, help = "Limit rows to be displayed", unspecifiedDefaultValue = "-1") Integer limit,
|
||||||
|
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
|
||||||
|
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
||||||
|
@CliOption(key = {
|
||||||
|
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
HoodieTableFileSystemView fsView = buildFileSystemView(globRegex, maxInstant, readOptimizedOnly, includeMaxInstant,
|
||||||
|
includeInflight, excludeCompaction);
|
||||||
|
List<Comparable[]> rows = new ArrayList<>();
|
||||||
|
fsView.getAllFileGroups().forEach(fg -> fg.getAllFileSlices().forEach(fs -> {
|
||||||
|
int idx = 0;
|
||||||
|
// For ReadOptimized Views, do not display any delta-file related columns
|
||||||
|
Comparable[] row = new Comparable[readOptimizedOnly ? 5 : 8];
|
||||||
|
row[idx++] = fg.getPartitionPath();
|
||||||
|
row[idx++] = fg.getId();
|
||||||
|
row[idx++] = fs.getBaseInstantTime();
|
||||||
|
row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getPath() : "";
|
||||||
|
row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1;
|
||||||
|
if (!readOptimizedOnly) {
|
||||||
|
row[idx++] = fs.getLogFiles().count();
|
||||||
|
row[idx++] = fs.getLogFiles().filter(lf -> lf.getFileSize().isPresent())
|
||||||
|
.mapToLong(lf -> lf.getFileSize().get()).sum();
|
||||||
|
row[idx++] = fs.getLogFiles().filter(lf -> lf.getFileSize().isPresent())
|
||||||
|
.collect(Collectors.toList()).toString();
|
||||||
|
}
|
||||||
|
rows.add(row);
|
||||||
|
}));
|
||||||
|
Function<Object, String> converterFunction =
|
||||||
|
entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
|
||||||
|
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
|
||||||
|
fieldNameToConverterMap.put("Total Delta File Size", converterFunction);
|
||||||
|
fieldNameToConverterMap.put("Data-File Size", converterFunction);
|
||||||
|
|
||||||
|
TableHeader header = new TableHeader()
|
||||||
|
.addTableHeaderField("Partition")
|
||||||
|
.addTableHeaderField("FileId")
|
||||||
|
.addTableHeaderField("Base-Instant")
|
||||||
|
.addTableHeaderField("Data-File")
|
||||||
|
.addTableHeaderField("Data-File Size");
|
||||||
|
if (!readOptimizedOnly) {
|
||||||
|
header = header.addTableHeaderField("Num Delta Files")
|
||||||
|
.addTableHeaderField("Total Delta File Size")
|
||||||
|
.addTableHeaderField("Delta Files");
|
||||||
|
}
|
||||||
|
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
@CliCommand(value = "show fsview latest", help = "Show latest file-system view")
|
||||||
|
public String showLatestFileSlices(
|
||||||
|
@CliOption(key = {"partitionPath"},
|
||||||
|
help = "A valid paritition path", mandatory = true) String partition,
|
||||||
|
@CliOption(key = {"readOptimizedOnly"}, help = "Only display read-optimized view",
|
||||||
|
unspecifiedDefaultValue = "false") boolean readOptimizedOnly,
|
||||||
|
@CliOption(key = {"maxInstant"}, help = "File-Slices upto this instant are displayed",
|
||||||
|
unspecifiedDefaultValue = "") String maxInstant,
|
||||||
|
@CliOption(key = {"merge"}, help = "Merge File Slices due to pending compaction",
|
||||||
|
unspecifiedDefaultValue = "true") final boolean merge,
|
||||||
|
@CliOption(key = {"includeMax"}, help = "Include Max Instant", unspecifiedDefaultValue = "false")
|
||||||
|
boolean includeMaxInstant,
|
||||||
|
@CliOption(key = {"includeInflight"}, help = "Include Inflight Instants", unspecifiedDefaultValue = "false")
|
||||||
|
boolean includeInflight,
|
||||||
|
@CliOption(key = {"excludeCompaction"}, help = "Exclude compaction Instants", unspecifiedDefaultValue = "false")
|
||||||
|
boolean excludeCompaction,
|
||||||
|
@CliOption(key = {"limit"}, help = "Limit rows to be displayed", unspecifiedDefaultValue = "-1") Integer limit,
|
||||||
|
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
|
||||||
|
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
||||||
|
@CliOption(key = {
|
||||||
|
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
HoodieTableFileSystemView fsView = buildFileSystemView(partition, maxInstant, readOptimizedOnly, includeMaxInstant,
|
||||||
|
includeInflight, excludeCompaction);
|
||||||
|
List<Comparable[]> rows = new ArrayList<>();
|
||||||
|
|
||||||
|
final Stream<FileSlice> fileSliceStream;
|
||||||
|
if (!merge) {
|
||||||
|
fileSliceStream = fsView.getLatestFileSlices(partition);
|
||||||
|
} else {
|
||||||
|
if (maxInstant.isEmpty()) {
|
||||||
|
maxInstant = HoodieCLI.tableMetadata.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant()
|
||||||
|
.get().getTimestamp();
|
||||||
|
}
|
||||||
|
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition, maxInstant);
|
||||||
|
}
|
||||||
|
|
||||||
|
fileSliceStream.forEach(fs -> {
|
||||||
|
int idx = 0;
|
||||||
|
Comparable[] row = new Comparable[readOptimizedOnly ? 5 : 13];
|
||||||
|
row[idx++] = partition;
|
||||||
|
row[idx++] = fs.getFileId();
|
||||||
|
row[idx++] = fs.getBaseInstantTime();
|
||||||
|
row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getPath() : "";
|
||||||
|
|
||||||
|
long dataFileSize = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1;
|
||||||
|
row[idx++] = dataFileSize;
|
||||||
|
|
||||||
|
if (!readOptimizedOnly) {
|
||||||
|
row[idx++] = fs.getLogFiles().count();
|
||||||
|
row[idx++] = fs.getLogFiles().filter(lf -> lf.getFileSize().isPresent())
|
||||||
|
.mapToLong(lf -> lf.getFileSize().get()).sum();
|
||||||
|
long logFilesScheduledForCompactionTotalSize = fs.getLogFiles().filter(lf -> lf.getFileSize().isPresent())
|
||||||
|
.filter(lf -> lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
|
||||||
|
.mapToLong(lf -> lf.getFileSize().get()).sum();
|
||||||
|
row[idx++] = logFilesScheduledForCompactionTotalSize;
|
||||||
|
|
||||||
|
long logFilesUnscheduledTotalSize = fs.getLogFiles().filter(lf -> lf.getFileSize().isPresent())
|
||||||
|
.filter(lf -> !lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
|
||||||
|
.mapToLong(lf -> lf.getFileSize().get()).sum();
|
||||||
|
row[idx++] = logFilesUnscheduledTotalSize;
|
||||||
|
|
||||||
|
double logSelectedForCompactionToBaseRatio =
|
||||||
|
dataFileSize > 0 ? logFilesScheduledForCompactionTotalSize / (dataFileSize * 1.0) : -1;
|
||||||
|
row[idx++] = logSelectedForCompactionToBaseRatio;
|
||||||
|
double logUnscheduledToBaseRatio =
|
||||||
|
dataFileSize > 0 ? logFilesUnscheduledTotalSize / (dataFileSize * 1.0) : -1;
|
||||||
|
row[idx++] = logUnscheduledToBaseRatio;
|
||||||
|
|
||||||
|
row[idx++] = fs.getLogFiles().filter(lf -> lf.getFileSize().isPresent())
|
||||||
|
.filter(lf -> lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
|
||||||
|
.collect(Collectors.toList()).toString();
|
||||||
|
row[idx++] = fs.getLogFiles().filter(lf -> lf.getFileSize().isPresent())
|
||||||
|
.filter(lf -> !lf.getBaseCommitTime().equals(fs.getBaseInstantTime()))
|
||||||
|
.collect(Collectors.toList()).toString();
|
||||||
|
}
|
||||||
|
rows.add(row);
|
||||||
|
});
|
||||||
|
|
||||||
|
Function<Object, String> converterFunction =
|
||||||
|
entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
|
||||||
|
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
|
||||||
|
fieldNameToConverterMap.put("Data-File Size", converterFunction);
|
||||||
|
if (!readOptimizedOnly) {
|
||||||
|
fieldNameToConverterMap.put("Total Delta Size", converterFunction);
|
||||||
|
fieldNameToConverterMap.put("Delta Size - compaction scheduled", converterFunction);
|
||||||
|
fieldNameToConverterMap.put("Delta Size - compaction unscheduled", converterFunction);
|
||||||
|
}
|
||||||
|
|
||||||
|
TableHeader header = new TableHeader()
|
||||||
|
.addTableHeaderField("Partition")
|
||||||
|
.addTableHeaderField("FileId")
|
||||||
|
.addTableHeaderField("Base-Instant")
|
||||||
|
.addTableHeaderField("Data-File")
|
||||||
|
.addTableHeaderField("Data-File Size");
|
||||||
|
|
||||||
|
if (!readOptimizedOnly) {
|
||||||
|
header = header.addTableHeaderField("Num Delta Files")
|
||||||
|
.addTableHeaderField("Total Delta Size")
|
||||||
|
.addTableHeaderField("Delta Size - compaction scheduled")
|
||||||
|
.addTableHeaderField("Delta Size - compaction unscheduled")
|
||||||
|
.addTableHeaderField("Delta To Base Ratio - compaction scheduled")
|
||||||
|
.addTableHeaderField("Delta To Base Ratio - compaction unscheduled")
|
||||||
|
.addTableHeaderField("Delta Files - compaction scheduled")
|
||||||
|
.addTableHeaderField("Delta Files - compaction unscheduled");
|
||||||
|
}
|
||||||
|
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build File System View
|
||||||
|
* @param globRegex Path Regex
|
||||||
|
* @param maxInstant Max Instants to be used for displaying file-instants
|
||||||
|
* @param readOptimizedOnly Include only read optimized view
|
||||||
|
* @param includeMaxInstant Include Max instant
|
||||||
|
* @param includeInflight Include inflight instants
|
||||||
|
* @param excludeCompaction Exclude Compaction instants
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private HoodieTableFileSystemView buildFileSystemView(String globRegex, String maxInstant, boolean readOptimizedOnly,
|
||||||
|
boolean includeMaxInstant, boolean includeInflight, boolean excludeCompaction) throws IOException {
|
||||||
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(HoodieCLI.tableMetadata.getHadoopConf(),
|
||||||
|
HoodieCLI.tableMetadata.getBasePath(), true);
|
||||||
|
FileSystem fs = HoodieCLI.fs;
|
||||||
|
String globPath = String.format("%s/%s/*", HoodieCLI.tableMetadata.getBasePath(), globRegex);
|
||||||
|
FileStatus[] statuses = fs.globStatus(new Path(globPath));
|
||||||
|
Stream<HoodieInstant> instantsStream = null;
|
||||||
|
|
||||||
|
HoodieTimeline timeline = null;
|
||||||
|
if (readOptimizedOnly) {
|
||||||
|
timeline = metaClient.getActiveTimeline().getCommitTimeline();
|
||||||
|
} else if (excludeCompaction) {
|
||||||
|
timeline = metaClient.getActiveTimeline().getCommitsTimeline();
|
||||||
|
} else {
|
||||||
|
timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!includeInflight) {
|
||||||
|
timeline = timeline.filterCompletedInstants();
|
||||||
|
}
|
||||||
|
|
||||||
|
instantsStream = timeline.getInstants();
|
||||||
|
|
||||||
|
if (!maxInstant.isEmpty()) {
|
||||||
|
final BiPredicate<String, String> predicate;
|
||||||
|
if (includeMaxInstant) {
|
||||||
|
predicate = HoodieTimeline.GREATER_OR_EQUAL;
|
||||||
|
} else {
|
||||||
|
predicate = HoodieTimeline.GREATER;
|
||||||
|
}
|
||||||
|
instantsStream = instantsStream.filter(is -> predicate.test(maxInstant, is.getTimestamp()));
|
||||||
|
}
|
||||||
|
|
||||||
|
HoodieTimeline filteredTimeline = new HoodieDefaultTimeline(instantsStream,
|
||||||
|
(Function<HoodieInstant, Optional<byte[]>> & Serializable) metaClient.getActiveTimeline()::getInstantDetails);
|
||||||
|
return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,130 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.cli.commands;
|
||||||
|
|
||||||
|
import static com.uber.hoodie.common.table.HoodieTimeline.ROLLBACK_ACTION;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.uber.hoodie.avro.model.HoodieRollbackMetadata;
|
||||||
|
import com.uber.hoodie.cli.HoodieCLI;
|
||||||
|
import com.uber.hoodie.cli.HoodiePrintHelper;
|
||||||
|
import com.uber.hoodie.cli.TableHeader;
|
||||||
|
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||||
|
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||||
|
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||||
|
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||||
|
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
|
||||||
|
import com.uber.hoodie.common.util.AvroUtils;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.springframework.shell.core.CommandMarker;
|
||||||
|
import org.springframework.shell.core.annotation.CliCommand;
|
||||||
|
import org.springframework.shell.core.annotation.CliOption;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class RollbacksCommand implements CommandMarker {
|
||||||
|
|
||||||
|
@CliCommand(value = "show rollbacks", help = "List all rollback instants")
|
||||||
|
public String showRollbacks(
|
||||||
|
@CliOption(key = {"limit"}, help = "Limit #rows to be displayed", unspecifiedDefaultValue = "10") Integer limit,
|
||||||
|
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
|
||||||
|
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
||||||
|
@CliOption(key = {
|
||||||
|
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
|
||||||
|
throws IOException {
|
||||||
|
HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.tableMetadata);
|
||||||
|
HoodieTimeline rollback = activeTimeline.getRollbackTimeline().filterCompletedInstants();
|
||||||
|
|
||||||
|
final List<Comparable[]> rows = new ArrayList<>();
|
||||||
|
rollback.getInstants().forEach(instant -> {
|
||||||
|
try {
|
||||||
|
HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata(
|
||||||
|
activeTimeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
|
||||||
|
metadata.getCommitsRollback().forEach(c -> {
|
||||||
|
Comparable[] row = new Comparable[5];
|
||||||
|
row[0] = metadata.getStartRollbackTime();
|
||||||
|
row[1] = c;
|
||||||
|
row[2] = metadata.getTotalFilesDeleted();
|
||||||
|
row[3] = metadata.getTimeTakenInMillis();
|
||||||
|
row[4] = metadata.getPartitionMetadata() != null ? metadata.getPartitionMetadata().size() : 0;
|
||||||
|
rows.add(row);
|
||||||
|
});
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
TableHeader header = new TableHeader()
|
||||||
|
.addTableHeaderField("Instant")
|
||||||
|
.addTableHeaderField("Rolledback Instant")
|
||||||
|
.addTableHeaderField("Total Files Deleted")
|
||||||
|
.addTableHeaderField("Time taken in millis")
|
||||||
|
.addTableHeaderField("Total Partitions");
|
||||||
|
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
@CliCommand(value = "show rollback", help = "Show details of a rollback instant")
|
||||||
|
public String showRollback(
|
||||||
|
@CliOption(key = {"instant"}, help = "Rollback instant", mandatory = true) String rollbackInstant,
|
||||||
|
@CliOption(key = {"limit"}, help = "Limit #rows to be displayed", unspecifiedDefaultValue = "10") Integer limit,
|
||||||
|
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
|
||||||
|
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
|
||||||
|
@CliOption(key = {
|
||||||
|
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
|
||||||
|
throws IOException {
|
||||||
|
HoodieActiveTimeline activeTimeline = new RollbackTimeline(HoodieCLI.tableMetadata);
|
||||||
|
final List<Comparable[]> rows = new ArrayList<>();
|
||||||
|
HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata(
|
||||||
|
activeTimeline.getInstantDetails(new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant))
|
||||||
|
.get(), HoodieRollbackMetadata.class);
|
||||||
|
metadata.getPartitionMetadata().entrySet().forEach(e -> {
|
||||||
|
Stream.concat(e.getValue().getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)),
|
||||||
|
e.getValue().getFailedDeleteFiles().stream().map(f -> Pair.of(f, false)))
|
||||||
|
.forEach(fileWithDeleteStatus -> {
|
||||||
|
Comparable[] row = new Comparable[5];
|
||||||
|
row[0] = metadata.getStartRollbackTime();
|
||||||
|
row[1] = metadata.getCommitsRollback().toString();
|
||||||
|
row[2] = e.getKey();
|
||||||
|
row[3] = fileWithDeleteStatus.getLeft();
|
||||||
|
row[4] = fileWithDeleteStatus.getRight();
|
||||||
|
rows.add(row);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
TableHeader header = new TableHeader()
|
||||||
|
.addTableHeaderField("Instant")
|
||||||
|
.addTableHeaderField("Rolledback Instants")
|
||||||
|
.addTableHeaderField("Partition")
|
||||||
|
.addTableHeaderField("Deleted File")
|
||||||
|
.addTableHeaderField("Succeeded");
|
||||||
|
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An Active timeline containing only rollbacks
|
||||||
|
*/
|
||||||
|
class RollbackTimeline extends HoodieActiveTimeline {
|
||||||
|
|
||||||
|
public RollbackTimeline(HoodieTableMetaClient metaClient) {
|
||||||
|
super(metaClient, ImmutableSet.<String>builder().add(ROLLBACK_EXTENSION).build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -440,4 +440,8 @@ public class HoodieTableFileSystemView implements TableFileSystemView,
|
|||||||
public Map<String, Pair<String, CompactionOperation>> getFileIdToPendingCompaction() {
|
public Map<String, Pair<String, CompactionOperation>> getFileIdToPendingCompaction() {
|
||||||
return fileIdToPendingCompaction;
|
return fileIdToPendingCompaction;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Stream<HoodieFileGroup> getAllFileGroups() {
|
||||||
|
return fileGroupMap.values().stream();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user