1
0

Add Support for ordering and limiting results in CLI show commands

This commit is contained in:
Balaji Varadarajan
2018-04-24 10:56:05 -07:00
committed by vinoth chandar
parent b9b9b24993
commit c66004d79a
9 changed files with 561 additions and 106 deletions

View File

@@ -79,9 +79,9 @@ To view some basic information about the last 10 commits,
```
hoodie:trips->commits show
hoodie:trips->commits show --sortBy "Total Bytes Written" --desc true --limit 10
________________________________________________________________________________________________________________________________________________________________________
| CommitTime | Total Written (B)| Total Files Added| Total Files Updated| Total Partitions Written| Total Records Written| Total Update Records Written| Total Errors|
| CommitTime | Total Bytes Written| Total Files Added| Total Files Updated| Total Partitions Written| Total Records Written| Total Update Records Written| Total Errors|
|=======================================================================================================================================================================|
....
....
@@ -105,7 +105,7 @@ To understand how the writes spread across specific partiions,
```
hoodie:trips->commit showpartitions --commit 20161005165855
hoodie:trips->commit showpartitions --commit 20161005165855 --sortBy "Total Bytes Written" --desc true --limit 10
__________________________________________________________________________________________________________________________________________
| Partition Path| Total Files Added| Total Files Updated| Total Records Inserted| Total Records Updated| Total Bytes Written| Total Errors|
|=========================================================================================================================================|
@@ -117,7 +117,7 @@ If you need file level granularity , we can do the following
```
hoodie:trips->commit showfiles --commit 20161005165855
hoodie:trips->commit showfiles --commit 20161005165855 --sortBy "Partition Path"
________________________________________________________________________________________________________________________________________________________
| Partition Path| File ID | Previous Commit| Total Records Updated| Total Records Written| Total Bytes Written| Total Errors|
|=======================================================================================================================================================|
@@ -131,7 +131,7 @@ Since Hoodie directly manages file sizes for HDFS dataset, it might be good to g
```
hoodie:trips->stats filesizes --partitionPath 2016/09/01
hoodie:trips->stats filesizes --partitionPath 2016/09/01 --sortBy "95th" --desc true --limit 10
________________________________________________________________________________________________
| CommitTime | Min | 10th | 50th | avg | 95th | Max | NumFiles| StdDev |
|===============================================================================================|

View File

@@ -20,11 +20,89 @@ import dnl.utils.text.table.TextTable;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
/**
* Helper class to render table for hoodie-cli
*/
public class HoodiePrintHelper {
/**
* Print header and raw rows
* @param header Header
* @param rows Raw Rows
* @return output
*/
public static String print(String[] header, String[][] rows) {
TextTable textTable = new TextTable(header, rows);
return printTextTable(textTable);
}
/**
* Serialize Table to printable string
* @param rowHeader Row Header
* @param fieldNameToConverterMap Field Specific Converters
* @param sortByField Sorting field
* @param isDescending Order
* @param limit Limit
* @param headerOnly Headers only
* @param rows List of rows
* @return Serialized form for printing
*/
public static String print(TableHeader rowHeader,
Map<String, Function<Object, String>> fieldNameToConverterMap,
String sortByField, boolean isDescending, Integer limit, boolean headerOnly,
List<Comparable[]> rows) {
if (headerOnly) {
return HoodiePrintHelper.print(rowHeader);
}
Table table = new Table(rowHeader, fieldNameToConverterMap,
Optional.ofNullable(sortByField.isEmpty() ? null : sortByField),
Optional.ofNullable(isDescending),
Optional.ofNullable(limit <= 0 ? null : limit)).addAllRows(rows).flip();
return HoodiePrintHelper.print(table);
}
/**
* Render rows in Table
* @param buffer Table
* @return output
*/
private static String print(Table buffer) {
String[] header = new String[buffer.getFieldNames().size()];
buffer.getFieldNames().toArray(header);
String[][] rows = buffer.getRenderRows().stream()
.map(l -> l.stream().toArray(String[]::new))
.toArray(String[][]::new);
TextTable textTable = new TextTable(header, rows);
return printTextTable(textTable);
}
/**
* Render only header of the table
* @param header Table Header
* @return output
*/
private static String print(TableHeader header) {
String[] head = new String[header.getFieldNames().size()];
header.getFieldNames().toArray(head);
TextTable textTable = new TextTable(head, new String[][]{});
return printTextTable(textTable);
}
/**
* Print Text table
* @param textTable Text table to be printed
* @return
*/
private static String printTextTable(TextTable textTable) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
textTable.printTable(ps, 4);

View File

@@ -0,0 +1,175 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.cli;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Table to be rendered. This class takes care of ordering
* rows and limiting before renderer renders it.
*/
public class Table implements Iterable<List<String>> {
// Header for this table
private final TableHeader rowHeader;
// User-specified conversions before rendering
private final Map<String, Function<Object, String>> fieldNameToConverterMap;
// Optional attribute to track sorting field
private final Optional<String> orderingFieldNameOptional;
// Whether sorting has to be in descending order (by default : optional)
private final Optional<Boolean> isDescendingOptional;
// Limit the number of entries rendered
private final Optional<Integer> limitOptional;
// Raw list of rows
private final List<List<Comparable>> rawRows;
// Flag to determine if all the rows have been added
private boolean finishedAdding = false;
// Rows ready for Rendering
private List<List<String>> renderRows;
public Table(TableHeader rowHeader,
Map<String, Function<Object, String>> fieldNameToConverterMap,
Optional<String> orderingFieldNameOptional,
Optional<Boolean> isDescendingOptional,
Optional<Integer> limitOptional) {
this.rowHeader = rowHeader;
this.fieldNameToConverterMap = fieldNameToConverterMap;
this.orderingFieldNameOptional = orderingFieldNameOptional;
this.isDescendingOptional = isDescendingOptional;
this.limitOptional = limitOptional;
this.rawRows = new ArrayList<>();
}
/**
* Main API to add row to the table
* @param row Row
*/
public Table add(List<Comparable> row) {
if (finishedAdding) {
throw new IllegalStateException("Container already marked done for adding. No more entries can be added.");
}
if (rowHeader.getFieldNames().size() != row.size()) {
throw new IllegalArgumentException("Incorrect number of fields in row. Expected: "
+ rowHeader.getFieldNames().size() + ", Got: " + row.size() + ", Row: " + row);
}
this.rawRows.add(new ArrayList<>(row));
return this;
}
/**
* Add all rows
* @param rows Rows to be aded
* @return
*/
public Table addAll(List<List<Comparable>> rows) {
rows.forEach(r -> add(r));
return this;
}
/**
* Add all rows
* @param rows Rows to be added
* @return
*/
public Table addAllRows(List<Comparable[]> rows) {
rows.forEach(r -> add(Arrays.asList(r)));
return this;
}
/**
* API to let the table know writing is over and reading is going to start
*/
public Table flip() {
this.finishedAdding = true;
sortAndLimit();
return this;
}
/**
* Sorting of rows by a specified field
* @return
*/
private List<List<Comparable>> orderRows() {
return orderingFieldNameOptional.map(orderingColumnName -> {
return rawRows.stream().sorted(new Comparator<List<Comparable>>() {
@Override
public int compare(List<Comparable> row1, List<Comparable> row2) {
Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName));
Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName));
int cmpRawResult = fieldForRow1.compareTo(fieldForRow2);
return isDescendingOptional.map(isDescending -> {
return isDescending ? -1 * cmpRawResult : cmpRawResult;
}).orElse(cmpRawResult);
}
}).collect(Collectors.toList());
}).orElse(rawRows);
}
/**
* Prepares for rendering. Rows are sorted and limited
*/
private void sortAndLimit() {
this.renderRows = new ArrayList<>();
final int limit = this.limitOptional.orElse(rawRows.size());
final List<List<Comparable>> orderedRows = orderRows();
renderRows = orderedRows.stream().limit(limit).map(row -> {
return IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
String fieldName = rowHeader.get(idx);
if (fieldNameToConverterMap.containsKey(fieldName)) {
return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
}
return row.get(idx).toString();
}).collect(Collectors.toList());
}).collect(Collectors.toList());
}
@Override
public Iterator<List<String>> iterator() {
if (!finishedAdding) {
throw new IllegalStateException("Container must be flipped before reading the data");
}
return renderRows.iterator();
}
@Override
public void forEach(Consumer<? super List<String>> action) {
if (!finishedAdding) {
throw new IllegalStateException("Container must be flipped before reading the data");
}
renderRows.forEach(action);
}
public List<String> getFieldNames() {
return rowHeader.getFieldNames();
}
public List<List<String>> getRenderRows() {
return renderRows;
}
}

View File

@@ -0,0 +1,69 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.cli;
import java.util.ArrayList;
import java.util.List;
/**
* Header for the table to be rendered
*/
public class TableHeader {
// List of fields (columns)
private final List<String> fieldNames = new ArrayList<>();
/**
* Add a field (column) to table
*
* @param fieldName field Name
*/
public TableHeader addTableHeaderField(String fieldName) {
fieldNames.add(fieldName);
return this;
}
/**
* Get all field names
*/
public List<String> getFieldNames() {
return fieldNames;
}
/**
* Index of the field in the table
*
* @param fieldName Field Name
*/
public int indexOf(String fieldName) {
return fieldNames.indexOf(fieldName);
}
/**
* Lookup field by offset
*/
public String get(int index) {
return fieldNames.get(index);
}
/**
* Get number of fields in the table
*/
public int getNumFields() {
return fieldNames.size();
}
}

View File

@@ -19,6 +19,7 @@ package com.uber.hoodie.cli.commands;
import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.TableHeader;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
@@ -26,6 +27,7 @@ import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.util.FSUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
@@ -47,16 +49,19 @@ public class ArchivedCommitsCommand implements CommandMarker {
}
@CliCommand(value = "show archived commits", help = "Read commits from archived files and show details")
public String showCommits(@CliOption(key = {
"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10") final Integer limit)
public String showCommits(
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "10") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
System.out.println("===============> Showing only " + limit + " archived commits <===============");
String basePath = HoodieCLI.tableMetadata.getBasePath();
FileStatus[] fsStatuses = FSUtils.getFs(basePath, HoodieCLI.conf)
.globStatus(new Path(basePath + "/.hoodie/.commits_.archive*"));
List<String[]> allCommits = new ArrayList<>();
int commits = 0;
List<Comparable[]> allCommits = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
//read the archived file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
@@ -68,62 +73,59 @@ public class ArchivedCommitsCommand implements CommandMarker {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> records = blk.getRecords();
readRecords.addAll(records);
if (commits == limit) {
break;
}
commits++;
}
List<String[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> readCommit(r))
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> readCommit(r))
.collect(Collectors.toList());
allCommits.addAll(readCommits);
if (commits == limit) {
break;
}
}
return HoodiePrintHelper.print(new String[] {"CommitTime", "CommitType", "CommitDetails"},
allCommits.toArray(new String[allCommits.size()][]));
TableHeader header = new TableHeader().addTableHeaderField("CommitTime")
.addTableHeaderField("CommitType")
.addTableHeaderField("CommitDetails");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, allCommits);
}
private String[] readCommit(GenericRecord record) {
List<String> commitDetails = new ArrayList<>();
private Comparable[] readCommit(GenericRecord record) {
List<Object> commitDetails = new ArrayList<>();
try {
switch (record.get("actionType").toString()) {
case HoodieTimeline.CLEAN_ACTION: {
commitDetails.add(record.get("commitTime").toString());
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
commitDetails.add(record.get("hoodieCleanMetadata").toString());
break;
}
case HoodieTimeline.COMMIT_ACTION: {
commitDetails.add(record.get("commitTime").toString());
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
commitDetails.add(record.get("hoodieCommitMetadata").toString());
break;
}
case HoodieTimeline.DELTA_COMMIT_ACTION: {
commitDetails.add(record.get("commitTime").toString());
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
commitDetails.add(record.get("hoodieCommitMetadata").toString());
break;
}
case HoodieTimeline.ROLLBACK_ACTION: {
commitDetails.add(record.get("commitTime").toString());
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
commitDetails.add(record.get("hoodieRollbackMetadata").toString());
break;
}
case HoodieTimeline.SAVEPOINT_ACTION: {
commitDetails.add(record.get("commitTime").toString());
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
commitDetails.add(record.get("hoodieSavePointMetadata").toString());
break;
}
default:
return commitDetails.toArray(new String[commitDetails.size()]);
return commitDetails.toArray(new Comparable[commitDetails.size()]);
}
} catch (Exception e) {
e.printStackTrace();
}
return commitDetails.toArray(new String[commitDetails.size()]);
return commitDetails.toArray(new Comparable[commitDetails.size()]);
}
}

View File

@@ -20,6 +20,7 @@ import com.uber.hoodie.avro.model.HoodieCleanMetadata;
import com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.TableHeader;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
@@ -28,6 +29,7 @@ import com.uber.hoodie.common.util.AvroUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -56,22 +58,33 @@ public class CleansCommand implements CommandMarker {
}
@CliCommand(value = "cleans show", help = "Show the cleans")
public String showCleans() throws IOException {
public String showCleans(
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
List<HoodieInstant> cleans = timeline.getInstants().collect(Collectors.toList());
String[][] rows = new String[cleans.size()][];
List<Comparable[]> rows = new ArrayList<>();
Collections.reverse(cleans);
for (int i = 0; i < cleans.size(); i++) {
HoodieInstant clean = cleans.get(i);
HoodieCleanMetadata cleanMetadata = AvroUtils
.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
rows[i] = new String[] {clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
String.valueOf(cleanMetadata.getTotalFilesDeleted()), String.valueOf(cleanMetadata.getTimeTakenInMillis())};
rows.add(new Comparable[]{clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
}
return HoodiePrintHelper
.print(new String[] {"CleanTime", "EarliestCommandRetained", "Total Files Deleted", "Total Time Taken"},
rows);
TableHeader header = new TableHeader()
.addTableHeaderField("CleanTime")
.addTableHeaderField("EarliestCommandRetained")
.addTableHeaderField("Total Files Deleted")
.addTableHeaderField("Total Time Taken");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "cleans refresh", help = "Refresh the commits")
@@ -82,8 +95,15 @@ public class CleansCommand implements CommandMarker {
}
@CliCommand(value = "clean showpartitions", help = "Show partition level details of a clean")
public String showCleanPartitions(@CliOption(key = {"clean"}, help = "clean to show") final String commitTime)
public String showCleanPartitions(
@CliOption(key = {"clean"}, help = "clean to show") final String commitTime,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
HoodieInstant cleanInstant = new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, commitTime);
@@ -91,19 +111,25 @@ public class CleansCommand implements CommandMarker {
if (!timeline.containsInstant(cleanInstant)) {
return "Clean " + commitTime + " not found in metadata " + timeline;
}
HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata(
timeline.getInstantDetails(cleanInstant).get());
List<String[]> rows = new ArrayList<>();
List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, HoodieCleanPartitionMetadata> entry : cleanMetadata.getPartitionMetadata().entrySet()) {
String path = entry.getKey();
HoodieCleanPartitionMetadata stats = entry.getValue();
String policy = stats.getPolicy();
String totalSuccessDeletedFiles = String.valueOf(stats.getSuccessDeleteFiles().size());
String totalFailedDeletedFiles = String.valueOf(stats.getFailedDeleteFiles().size());
rows.add(new String[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles});
Integer totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size();
Integer totalFailedDeletedFiles = stats.getFailedDeleteFiles().size();
rows.add(new Comparable[]{path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles});
}
return HoodiePrintHelper.print(
new String[] {"Partition Path", "Cleaning policy", "Total Files Successfully Deleted",
"Total Failed Deletions"}, rows.toArray(new String[rows.size()][]));
TableHeader header = new TableHeader()
.addTableHeaderField("Partition Path")
.addTableHeaderField("Cleaning policy")
.addTableHeaderField("Total Files Successfully Deleted")
.addTableHeaderField("Total Failed Deletions");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
}

View File

@@ -18,6 +18,7 @@ package com.uber.hoodie.cli.commands;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.TableHeader;
import com.uber.hoodie.cli.utils.InputStreamConsumer;
import com.uber.hoodie.cli.utils.SparkUtil;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
@@ -30,8 +31,10 @@ import com.uber.hoodie.common.util.NumericUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
@@ -65,28 +68,46 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(@CliOption(key = {
"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10") final Integer limit)
"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getInstants().collect(Collectors.toList());
String[][] rows = new String[commits.size()][];
List<Comparable[]> rows = new ArrayList<>();
Collections.reverse(commits);
for (int i = 0; i < commits.size(); i++) {
HoodieInstant commit = commits.get(i);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get());
rows[i] = new String[] {commit.getTimestamp(),
NumericUtils.humanReadableByteCount(commitMetadata.fetchTotalBytesWritten()),
String.valueOf(commitMetadata.fetchTotalFilesInsert()),
String.valueOf(commitMetadata.fetchTotalFilesUpdated()),
String.valueOf(commitMetadata.fetchTotalPartitionsWritten()),
String.valueOf(commitMetadata.fetchTotalRecordsWritten()),
String.valueOf(commitMetadata.fetchTotalUpdateRecordsWritten()),
String.valueOf(commitMetadata.fetchTotalWriteErrors())};
rows.add(new Comparable[]{commit.getTimestamp(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(),
commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(),
commitMetadata.fetchTotalWriteErrors()});
}
return HoodiePrintHelper.print(
new String[] {"CommitTime", "Total Written (B)", "Total Files Added", "Total Files Updated",
"Total Partitions Written", "Total Records Written", "Total Update Records Written", "Total Errors"}, rows);
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
});
TableHeader header = new TableHeader()
.addTableHeaderField("CommitTime")
.addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added")
.addTableHeaderField("Total Files Updated")
.addTableHeaderField("Total Partitions Written")
.addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Update Records Written")
.addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "commits refresh", help = "Refresh the commits")
@@ -123,8 +144,15 @@ public class CommitsCommand implements CommandMarker {
}
@CliCommand(value = "commit showpartitions", help = "Show partition level details of a commit")
public String showCommitPartitions(@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime)
public String showCommitPartitions(
@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
@@ -133,7 +161,7 @@ public class CommitsCommand implements CommandMarker {
return "Commit " + commitTime + " not found in Commits " + timeline;
}
HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get());
List<String[]> rows = new ArrayList<String[]>();
List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry : meta.getPartitionToWriteStats().entrySet()) {
String path = entry.getKey();
List<HoodieWriteStat> stats = entry.getValue();
@@ -154,19 +182,38 @@ public class CommitsCommand implements CommandMarker {
totalBytesWritten += stat.getTotalWriteBytes();
totalWriteErrors += stat.getTotalWriteErrors();
}
rows.add(new String[] {path, String.valueOf(totalFilesAdded), String.valueOf(totalFilesUpdated),
String.valueOf(totalRecordsInserted), String.valueOf(totalRecordsUpdated),
NumericUtils.humanReadableByteCount(totalBytesWritten), String.valueOf(totalWriteErrors)});
rows.add(new Comparable[]{path, totalFilesAdded, totalFilesUpdated,
totalRecordsInserted, totalRecordsUpdated,
totalBytesWritten, totalWriteErrors});
}
return HoodiePrintHelper.print(
new String[] {"Partition Path", "Total Files Added", "Total Files Updated", "Total Records Inserted",
"Total Records Updated", "Total Bytes Written", "Total Errors"}, rows.toArray(new String[rows.size()][]));
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> {
return NumericUtils.humanReadableByteCount((Long.valueOf(entry.toString())));
});
TableHeader header = new TableHeader()
.addTableHeaderField("Partition Path")
.addTableHeaderField("Total Files Added")
.addTableHeaderField("Total Files Updated")
.addTableHeaderField("Total Records Inserted")
.addTableHeaderField("Total Records Updated")
.addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "commit showfiles", help = "Show file level details of a commit")
public String showCommitFiles(@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime)
public String showCommitFiles(
@CliOption(key = {"commit"}, help = "Commit to show") final String commitTime,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {
"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime);
@@ -175,19 +222,27 @@ public class CommitsCommand implements CommandMarker {
return "Commit " + commitTime + " not found in Commits " + timeline;
}
HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get());
List<String[]> rows = new ArrayList<String[]>();
List<Comparable[]> rows = new ArrayList<>();
for (Map.Entry<String, List<HoodieWriteStat>> entry : meta.getPartitionToWriteStats().entrySet()) {
String path = entry.getKey();
List<HoodieWriteStat> stats = entry.getValue();
for (HoodieWriteStat stat : stats) {
rows.add(new String[] {path, stat.getFileId(), stat.getPrevCommit(), String.valueOf(stat.getNumUpdateWrites()),
String.valueOf(stat.getNumWrites()), String.valueOf(stat.getTotalWriteBytes()),
String.valueOf(stat.getTotalWriteErrors())});
rows.add(new Comparable[]{path, stat.getFileId(), stat.getPrevCommit(), stat.getNumUpdateWrites(),
stat.getNumWrites(), stat.getTotalWriteBytes(),
stat.getTotalWriteErrors()});
}
}
return HoodiePrintHelper.print(
new String[] {"Partition Path", "File ID", "Previous Commit", "Total Records Updated", "Total Records Written",
"Total Bytes Written", "Total Errors"}, rows.toArray(new String[rows.size()][]));
TableHeader header = new TableHeader()
.addTableHeaderField("Partition Path")
.addTableHeaderField("File ID")
.addTableHeaderField("Previous Commit")
.addTableHeaderField("Total Records Updated")
.addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
@CliAvailabilityIndicator({"commits compare"})

View File

@@ -20,6 +20,7 @@ import com.beust.jcommander.internal.Maps;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.TableHeader;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
@@ -36,6 +37,7 @@ import com.uber.hoodie.hive.util.SchemaUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -65,8 +67,12 @@ public class HoodieLogFileCommand implements CommandMarker {
@CliCommand(value = "show logfile metadata", help = "Read commit metadata from log files")
public String showLogFileCommits(
@CliOption(key = "logFilePathPattern", mandatory = true, help = "Fully qualified path for the log file") final
String logFilePathPattern)
throws IOException {
String logFilePathPattern,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false")
final boolean headerOnly) throws IOException {
FileSystem fs = HoodieCLI.tableMetadata.getFs();
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
@@ -120,7 +126,7 @@ public class HoodieLogFileCommand implements CommandMarker {
}
}
}
String[][] rows = new String[totalEntries + 1][];
List<Comparable[]> rows = new ArrayList<>();
int i = 0;
ObjectMapper objectMapper = new ObjectMapper();
for (Map.Entry<String, List<Tuple3<HoodieLogBlockType,
@@ -129,19 +135,25 @@ public class HoodieLogFileCommand implements CommandMarker {
String instantTime = entry.getKey().toString();
for (Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>,
Map<HeaderMetadataType, String>>, Integer> tuple3 : entry.getValue()) {
String[] output = new String[5];
Comparable[] output = new Comparable[5];
output[0] = instantTime;
output[1] = String.valueOf(tuple3._3());
output[1] = tuple3._3();
output[2] = tuple3._1().toString();
output[3] = objectMapper.writeValueAsString(tuple3._2()._1());
output[4] = objectMapper.writeValueAsString(tuple3._2()._2());
rows[i] = output;
rows.add(output);
i++;
}
}
return HoodiePrintHelper
.print(new String[] {"InstantTime", "RecordCount", "BlockType", "HeaderMetadata", "FooterMetadata"},
rows);
TableHeader header = new TableHeader()
.addTableHeaderField("InstantTime")
.addTableHeaderField("RecordCount")
.addTableHeaderField("BlockType")
.addTableHeaderField("HeaderMetadata")
.addTableHeaderField("FooterMetadata");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "show logfile records", help = "Read records from log files")

View File

@@ -21,6 +21,7 @@ import com.codahale.metrics.Snapshot;
import com.codahale.metrics.UniformReservoir;
import com.uber.hoodie.cli.HoodieCLI;
import com.uber.hoodie.cli.HoodiePrintHelper;
import com.uber.hoodie.cli.TableHeader;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
@@ -29,7 +30,11 @@ import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.NumericUtils;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -52,14 +57,20 @@ public class StatsCommand implements CommandMarker {
@CliCommand(value = "stats wa", help = "Write Amplification. Ratio of how many records were upserted to how many "
+ "records were actually written")
public String writeAmplificationStats() throws IOException {
public String writeAmplificationStats(
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false")
final boolean headerOnly) throws IOException {
long totalRecordsUpserted = 0;
long totalRecordsWritten = 0;
HoodieActiveTimeline activeTimeline = HoodieCLI.tableMetadata.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
String[][] rows = new String[new Long(timeline.countInstants()).intValue() + 1][];
List<Comparable[]> rows = new ArrayList<>();
int i = 0;
DecimalFormat df = new DecimalFormat("#.00");
for (HoodieInstant commitTime : timeline.getInstants().collect(Collectors.toList())) {
@@ -68,8 +79,8 @@ public class StatsCommand implements CommandMarker {
if (commit.fetchTotalUpdateRecordsWritten() > 0) {
waf = df.format((float) commit.fetchTotalRecordsWritten() / commit.fetchTotalUpdateRecordsWritten());
}
rows[i++] = new String[] {commitTime.getTimestamp(), String.valueOf(commit.fetchTotalUpdateRecordsWritten()),
String.valueOf(commit.fetchTotalRecordsWritten()), waf};
rows.add(new Comparable[]{commitTime.getTimestamp(), commit.fetchTotalUpdateRecordsWritten(),
commit.fetchTotalRecordsWritten(), waf});
totalRecordsUpserted += commit.fetchTotalUpdateRecordsWritten();
totalRecordsWritten += commit.fetchTotalRecordsWritten();
}
@@ -77,26 +88,33 @@ public class StatsCommand implements CommandMarker {
if (totalRecordsUpserted > 0) {
waf = df.format((float) totalRecordsWritten / totalRecordsUpserted);
}
rows[i] = new String[] {"Total", String.valueOf(totalRecordsUpserted), String.valueOf(totalRecordsWritten), waf};
return HoodiePrintHelper
.print(new String[] {"CommitTime", "Total Upserted", "Total Written", "Write Amplifiation Factor"},
rows);
rows.add(new Comparable[]{"Total", totalRecordsUpserted, totalRecordsWritten, waf});
TableHeader header = new TableHeader()
.addTableHeaderField("CommitTime")
.addTableHeaderField("Total Upserted")
.addTableHeaderField("Total Written")
.addTableHeaderField("Write Amplifiation Factor");
return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows);
}
private String[] printFileSizeHistogram(String commitTime, Snapshot s) {
return new String[] {commitTime, NumericUtils.humanReadableByteCount(s.getMin()),
NumericUtils.humanReadableByteCount(s.getValue(0.1)), NumericUtils.humanReadableByteCount(s.getMedian()),
NumericUtils.humanReadableByteCount(s.getMean()), NumericUtils.humanReadableByteCount(s.get95thPercentile()),
NumericUtils.humanReadableByteCount(s.getMax()), String.valueOf(s.size()),
NumericUtils.humanReadableByteCount(s.getStdDev())};
private Comparable[] printFileSizeHistogram(String commitTime, Snapshot s) {
return new Comparable[]{commitTime, s.getMin(),
s.getValue(0.1), s.getMedian(),
s.getMean(), s.get95thPercentile(),
s.getMax(), s.size(),
s.getStdDev()};
}
@CliCommand(value = "stats filesizes", help = "File Sizes. Display summary stats on sizes of files")
public String fileSizeStats(@CliOption(key = {
"partitionPath"}, help = "regex to select files, eg: 2016/08/02", unspecifiedDefaultValue = "*/*/*") final
String globRegex) throws IOException {
public String fileSizeStats(
@CliOption(key = {"partitionPath"},
help = "regex to select files, eg: 2016/08/02", unspecifiedDefaultValue = "*/*/*") final String globRegex,
@CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false")
final boolean headerOnly) throws IOException {
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", HoodieCLI.tableMetadata.getBasePath(), globRegex);
@@ -115,17 +133,37 @@ public class StatsCommand implements CommandMarker {
globalHistogram.update(sz);
}
String[][] rows = new String[commitHistoMap.size() + 1][];
List<Comparable[]> rows = new ArrayList<>();
int ind = 0;
for (String commitTime : commitHistoMap.keySet()) {
Snapshot s = commitHistoMap.get(commitTime).getSnapshot();
rows[ind++] = printFileSizeHistogram(commitTime, s);
rows.add(printFileSizeHistogram(commitTime, s));
}
Snapshot s = globalHistogram.getSnapshot();
rows[ind++] = printFileSizeHistogram("ALL", s);
rows.add(printFileSizeHistogram("ALL", s));
return HoodiePrintHelper
.print(new String[] {"CommitTime", "Min", "10th", "50th", "avg", "95th", "Max", "NumFiles", "StdDev"},
rows);
Function<Object, String> converterFunction = entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
};
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Min", converterFunction);
fieldNameToConverterMap.put("10th", converterFunction);
fieldNameToConverterMap.put("50th", converterFunction);
fieldNameToConverterMap.put("avg", converterFunction);
fieldNameToConverterMap.put("95th", converterFunction);
fieldNameToConverterMap.put("Max", converterFunction);
fieldNameToConverterMap.put("StdDev", converterFunction);
TableHeader header = new TableHeader()
.addTableHeaderField("CommitTime")
.addTableHeaderField("Min")
.addTableHeaderField("10th")
.addTableHeaderField("50th")
.addTableHeaderField("avg")
.addTableHeaderField("95th")
.addTableHeaderField("Max")
.addTableHeaderField("NumFiles")
.addTableHeaderField("StdDev");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}
}