From 63f1b123554c9c350958beebb539a538bcd52cd2 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Fri, 25 Aug 2017 14:12:19 -0700 Subject: [PATCH] adding ability to read archived files written in log format --- .../cli/commands/ArchivedCommitsCommand.java | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java new file mode 100644 index 000000000..e78ce8cd7 --- /dev/null +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.cli.commands; + +import com.uber.hoodie.avro.model.HoodieArchivedMetaEntry; +import com.uber.hoodie.cli.HoodieCLI; +import com.uber.hoodie.cli.HoodiePrintHelper; +import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.util.FSUtils; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliAvailabilityIndicator; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@Component +public class ArchivedCommitsCommand implements CommandMarker { + + @CliAvailabilityIndicator({"show archived commits"}) + public boolean isShowArchivedCommitAvailable() { + return HoodieCLI.tableMetadata != null; + } + + @CliCommand(value = "show archived commits", help = "Read commits from archived files and show details") + public String showCommits( + @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "10") + final Integer limit) throws IOException { + + System.out.println("===============> Showing only " + limit + " archived commits <==============="); + FileStatus [] fsStatuses = FSUtils.getFs().globStatus(new Path(HoodieCLI.tableMetadata.getBasePath() + "/.hoodie/.commits_.archive*")); + List allCommits = new ArrayList<>(); + for(FileStatus fs : fsStatuses) { + //read the archived file + HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(), + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); + + List readRecords = new ArrayList<>(); + //read the avro blocks + while (reader.hasNext()) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); + List records = blk.getRecords(); + readRecords.addAll(records); + } + List readCommits = readRecords.stream().map(r -> (GenericRecord)r).map(r -> readCommit(r)).limit(limit).collect(Collectors.toList()); + allCommits.addAll(readCommits); + } + return HoodiePrintHelper.print( + new String[] {"CommitTime", "CommitType", "CommitDetails"}, allCommits.toArray(new String[allCommits.size()][])); + } + + private String[] readCommit(GenericRecord record) { + List commitDetails = new ArrayList<>(); + try { + switch (record.get("actionType").toString()) { + case HoodieTimeline.CLEAN_ACTION: { + commitDetails.add(record.get("commitTime").toString()); + commitDetails.add(record.get("actionType").toString()); + commitDetails.add(record.get("hoodieCleanMetadata").toString()); + break; + } + case HoodieTimeline.COMMIT_ACTION: { + commitDetails.add(record.get("commitTime").toString()); + commitDetails.add(record.get("actionType").toString()); + commitDetails.add(record.get("hoodieCommitMetadata").toString()); + break; + } + case HoodieTimeline.COMPACTION_ACTION: { + commitDetails.add(record.get("commitTime").toString()); + commitDetails.add(record.get("actionType").toString()); + commitDetails.add(record.get("hoodieCompactionMetadata").toString()); + break; + } + case HoodieTimeline.DELTA_COMMIT_ACTION: { + commitDetails.add(record.get("commitTime").toString()); + commitDetails.add(record.get("actionType").toString()); + commitDetails.add(record.get("hoodieCommitMetadata").toString()); + break; + } + case HoodieTimeline.ROLLBACK_ACTION: { + commitDetails.add(record.get("commitTime").toString()); + commitDetails.add(record.get("actionType").toString()); + commitDetails.add(record.get("hoodieRollbackMetadata").toString()); + break; + } + case HoodieTimeline.SAVEPOINT_ACTION: { + commitDetails.add(record.get("commitTime").toString()); + commitDetails.add(record.get("actionType").toString()); + commitDetails.add(record.get("hoodieSavePointMetadata").toString()); + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + return commitDetails.toArray(new String[commitDetails.size()]); + } +} \ No newline at end of file