1
0

[HUDI-571] Add 'commits show archived' command to CLI

This commit is contained in:
Satish Kotha
2020-01-22 13:50:34 -08:00
committed by n3nash
parent c1516df8ac
commit 462fd02556
8 changed files with 385 additions and 201 deletions

View File

@@ -28,9 +28,12 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
@@ -38,7 +41,10 @@ import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,6 +57,49 @@ import java.util.stream.Collectors;
@Component
public class CommitsCommand implements CommandMarker {
private String printCommits(HoodieDefaultTimeline timeline,
final Integer limit, final String sortByField,
final boolean descending,
final boolean headerOnly) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();
final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstants().collect(Collectors.toList());
// timeline can be read from multiple files. So sort is needed instead of reversing the collection
Collections.sort(commits, HoodieInstant.COMPARATOR.reversed());
for (int i = 0; i < commits.size(); i++) {
final HoodieInstant commit = commits.get(i);
final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
rows.add(new Comparable[]{commit.getTimestamp(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(),
commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(),
commitMetadata.fetchTotalWriteErrors()});
}
final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
});
final TableHeader header = new TableHeader()
.addTableHeaderField("CommitTime")
.addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added")
.addTableHeaderField("Total Files Updated")
.addTableHeaderField("Total Partitions Written")
.addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Update Records Written")
.addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(
@CliOption(key = {"limit"}, help = "Limit commits",
@@ -62,26 +111,39 @@ public class CommitsCommand implements CommandMarker {
throws IOException {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
for (HoodieInstant commit : commits) {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(), commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(), commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(), commitMetadata.fetchTotalWriteErrors()});
return printCommits(activeTimeline, limit, sortByField, descending, headerOnly);
}
@CliCommand(value = "commits show archived", help = "Show the archived commits")
public String showArchivedCommits(
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for commits, default: now - 10 days")
String startTs,
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for commits, default: now - 1 day")
String endTs,
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1")
final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "")
final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false")
final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false")
final boolean headerOnly)
throws IOException {
if (StringUtils.isNullOrEmpty(startTs)) {
startTs = getTimeDaysAgo(10);
}
if (StringUtils.isNullOrEmpty(endTs)) {
endTs = getTimeDaysAgo(1);
}
HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline();
try {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
return printCommits(archivedTimeline.findInstantsInRange(startTs, endTs),
limit, sortByField, descending, headerOnly);
} finally {
// clear the instant details from memory after printing to reduce usage
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
}
Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))));
TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
.addTableHeaderField("Total Partitions Written").addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Update Records Written").addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}
@CliCommand(value = "commits refresh", help = "Refresh the commits")
@@ -241,4 +303,9 @@ public class CommitsCommand implements CommandMarker {
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
}
private String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
}
}

View File

@@ -19,24 +19,18 @@
package org.apache.hudi.io;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import com.google.common.collect.Sets;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
@@ -44,7 +38,8 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -197,35 +192,18 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105")));
// read the file
Reader reader =
HoodieLogFormat.newReader(dfs, new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1_1-0-1")),
HoodieArchivedMetaEntry.getClassSchema());
int archivedRecordsCount = 0;
List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks and validate the number of records written in each avro block
int numBlocks = 0;
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> records = blk.getRecords();
readRecords.addAll(records);
archivedRecordsCount += records.size();
numBlocks++;
}
System.out.println("Read Records :" + readRecords.stream().map(r -> (GenericRecord) r)
.map(r -> r.get("actionType") + "_" + r.get("actionState") + "_" + r.get("commitTime")).collect(Collectors.toList()));
assertEquals("Total archived records and total read records are the same count", 24, archivedRecordsCount);
assertTrue("Average Archived records per block is greater than 1", archivedRecordsCount / numBlocks > 1);
// make sure the archived commits are the same as the (originalcommits - commitsleft)
Set<String> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> {
return r.get("commitTime").toString();
}).collect(Collectors.toSet());
HoodieArchivedTimeline archivedTimeline = new HoodieArchivedTimeline(metaClient);
assertEquals("Total archived records and total read records are the same count",
24, archivedTimeline.countInstants());
//make sure the archived commits are the same as the (originalcommits - commitsleft)
Set<String> readCommits =
archivedTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
assertEquals("Read commits map should match the originalCommits - commitsLoadedFromArchival",
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), readCommits);
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), readCommits);
// verify in-flight instants after archive
verifyInflightInstants(metaClient, 2);
reader.close();
}
@Test
@@ -397,6 +375,37 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "107")));
}
@Test
public void checkArchiveCommitTimeline() throws IOException, InterruptedException {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
HoodieTestDataGenerator.createCommitFile(basePath, "2", dfs.getConf());
HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2");
HoodieTestDataGenerator.createCommitFile(basePath, "3", dfs.getConf());
HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3");
//add 2 more instants to pass filter criteria set in compaction config above
HoodieTestDataGenerator.createCommitFile(basePath, "4", dfs.getConf());
HoodieInstant instant4 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "4");
HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf());
HoodieInstant instant5 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5");
boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
assertEquals(new HashSet(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet()));
}
private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) {
HoodieTimeline timeline = metaClient.getActiveTimeline().reload()
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights();

View File

@@ -135,6 +135,7 @@ public class HoodieWriteStat implements Serializable {
/**
* Total number of rollback blocks seen in a compaction operation.
*/
@Nullable
private long totalRollbackBlocks;
/**
@@ -290,7 +291,7 @@ public class HoodieWriteStat implements Serializable {
return totalRollbackBlocks;
}
public void setTotalRollbackBlocks(Long totalRollbackBlocks) {
public void setTotalRollbackBlocks(long totalRollbackBlocks) {
this.totalRollbackBlocks = totalRollbackBlocks;
}

View File

@@ -234,6 +234,14 @@ public interface HoodieTimeline extends Serializable {
return predicateToApply.test(commit1, commit2);
}
/**
* Return true if specified timestamp is in range (startTs, endTs].
*/
static boolean isInRange(String timestamp, String startTs, String endTs) {
return HoodieTimeline.compareTimestamps(timestamp, startTs, GREATER)
&& HoodieTimeline.compareTimestamps(timestamp, endTs, LESSER_OR_EQUAL);
}
static HoodieInstant getCompletedInstant(final HoodieInstant instant) {
return new HoodieInstant(State.COMPLETED, instant.getAction(), instant.getTimestamp());
}

View File

@@ -27,7 +27,6 @@ import org.apache.hudi.exception.HoodieIOException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -45,7 +44,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
@@ -134,93 +132,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
in.defaultReadObject();
}
/**
* Get all instants (commits, delta commits) that produce new data, in the active timeline.
*/
public HoodieTimeline getCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION));
}
/**
* Get all instants (commits, delta commits, in-flight/request compaction) that produce new data, in the active
* timeline * With Async compaction a requested/inflight compaction-instant is a valid baseInstant for a file-slice as
* there could be delta-commits with that baseInstant.
*/
@Override
public HoodieTimeline getCommitsAndCompactionTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION));
}
/**
* Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions, in the active
* timeline.
*/
public HoodieTimeline getAllCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION,
SAVEPOINT_ACTION, ROLLBACK_ACTION));
}
/**
* Get only pure commits (inflight and completed) in the active timeline.
*/
public HoodieTimeline getCommitTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
}
/**
* Get only the delta commits (inflight and completed) in the active timeline.
*/
public HoodieTimeline getDeltaCommitTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get a timeline of a specific set of actions. useful to create a merged timeline of multiple actions.
*
* @param actions actions allowed in the timeline
*/
public HoodieTimeline getTimelineOfActions(Set<String> actions) {
return new HoodieDefaultTimeline(getInstants().filter(s -> actions.contains(s.getAction())),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the cleaner action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getCleanerTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the rollback action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getRollbackTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the save point action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getSavePointTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the restore action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getRestoreTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
protected Stream<HoodieInstant> filterInstantsByAction(String action) {
return getInstants().filter(s -> s.getAction().equals(action));
}
public void createNewInstant(HoodieInstant instant) {
LOG.info("Creating a new instant " + instant);
// Create the in-flight file

View File

@@ -18,24 +18,36 @@
package org.apache.hudi.common.table.timeline;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
@@ -49,34 +61,27 @@ import java.util.stream.Collectors;
* This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized.
*/
public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
private static final Pattern ARCHIVE_FILE_PATTERN =
Pattern.compile("^\\.commits_\\.archive\\.([0-9]*)$");
private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE = "commits";
private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits";
private static final String ACTION_TYPE_KEY = "actionType";
private HoodieTableMetaClient metaClient;
private Map<String, byte[]> readCommits = new HashMap<>();
private static final Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class);
/**
* Loads instants between (startTs, endTs].
* Note that there is no lazy loading, so this may not work if really long time range (endTs-startTs) is specified.
* TBD: Should we enforce maximum time range?
*/
public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
// Read back the commits to make sure
Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
try (SequenceFile.Reader reader =
new SequenceFile.Reader(metaClient.getHadoopConf(), SequenceFile.Reader.file(archiveLogPath))) {
Text key = new Text();
Text val = new Text();
while (reader.next(key, val)) {
// TODO - limit the number of commits loaded in memory. this could get very large.
// This is okay because only tooling will load the archived commit timeline today
readCommits.put(key.toString(), Arrays.copyOf(val.getBytes(), val.getLength()));
}
this.setInstants(readCommits.keySet().stream().map(s -> new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s))
.collect(Collectors.toList()));
} catch (IOException e) {
throw new HoodieIOException("Could not load archived commit timeline from path " + archiveLogPath, e);
}
this.metaClient = metaClient;
setInstants(this.loadInstants(false));
// multiple casts will make this lambda serializable -
// http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
this.metaClient = metaClient;
}
/**
@@ -96,7 +101,16 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
}
public static Path getArchiveLogPath(String archiveFolder) {
return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE);
return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX);
}
public void loadInstantDetailsInMemory(String startTs, String endTs) {
loadInstants(startTs, endTs);
}
public void clearInstantDetailsFromMemory(String startTs, String endTs) {
this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant ->
this.readCommits.remove(instant.getTimestamp()));
}
@Override
@@ -108,4 +122,136 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
return new HoodieArchivedTimeline(metaClient);
}
private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
final String commitTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
final String action = record.get(ACTION_TYPE_KEY).toString();
if (loadDetails) {
Option.ofNullable(record.get(getMetadataKey(action))).map(actionData ->
this.readCommits.put(commitTime, actionData.toString().getBytes(StandardCharsets.UTF_8))
);
}
return new HoodieInstant(false, action, commitTime);
}
private String getMetadataKey(String action) {
switch (action) {
case HoodieTimeline.CLEAN_ACTION:
return "hoodieCleanMetadata";
case HoodieTimeline.COMMIT_ACTION:
return "hoodieCommitMetadata";
case HoodieTimeline.DELTA_COMMIT_ACTION:
return "hoodieCommitMetadata";
case HoodieTimeline.ROLLBACK_ACTION:
return "hoodieRollbackMetadata";
case HoodieTimeline.SAVEPOINT_ACTION:
return "hoodieSavePointMetadata";
default:
throw new HoodieIOException("Unknown action in metadata " + action);
}
}
private List<HoodieInstant> loadInstants(boolean loadInstantDetails) {
return loadInstants(null, loadInstantDetails);
}
private List<HoodieInstant> loadInstants(String startTs, String endTs) {
return loadInstants(new TimeRangeFilter(startTs, endTs), true);
}
/**
* This is method to read selected instants. Do NOT use this directly use one of the helper methods above
* If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details
* If filter is specified, only the filtered instants are loaded
*/
private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) {
try {
// list all files
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
// sort files by version suffix in reverse (implies reverse chronological order)
Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
List<HoodieInstant> instantsInRange = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
//read the archived file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema());
try {
int instantsInPreviousFile = instantsInRange.size();
//read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
List<IndexedRecord> records = blk.getRecords();
// filter blocks in desired time window
Stream<HoodieInstant> instantsInBlkStream = records.stream()
.map(r -> readCommit((GenericRecord) r, loadInstantDetails));
if (filter != null) {
instantsInBlkStream = instantsInBlkStream.filter(filter::isInRange);
}
instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList()));
}
if (filter != null) {
int instantsInCurrentFile = instantsInRange.size() - instantsInPreviousFile;
if (instantsInPreviousFile > 0 && instantsInCurrentFile == 0) {
// Note that this is an optimization to skip reading unnecessary archived files
// This signals we crossed lower bound of desired time window.
break;
}
}
} finally {
reader.close();
}
}
return instantsInRange;
} catch (IOException e) {
throw new HoodieIOException(
"Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);
}
}
private static class TimeRangeFilter {
private final String startTs;
private final String endTs;
public TimeRangeFilter(String startTs, String endTs) {
this.startTs = startTs;
this.endTs = endTs;
}
public boolean isInRange(HoodieInstant instant) {
return HoodieTimeline.isInRange(instant.getTimestamp(), this.startTs, this.endTs);
}
}
/**
* Sort files by reverse order of version suffix in file name.
*/
public static class ArchiveFileVersionComparator implements Comparator<FileStatus>, Serializable {
@Override
public int compare(FileStatus f1, FileStatus f2) {
return Integer.compare(getArchivedFileSuffix(f2), getArchivedFileSuffix(f1));
}
private int getArchivedFileSuffix(FileStatus f) {
try {
Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(f.getPath().getName());
if (fileMatcher.matches()) {
return Integer.parseInt(fileMatcher.group(1));
}
} catch (NumberFormatException e) {
// log and ignore any format warnings
LOG.warn("error getting suffix for archived file: " + f.getPath());
}
// return default value in case of any errors
return 0;
}
}
}

View File

@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.Serializable;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
@@ -126,8 +127,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
@Override
public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) {
return new HoodieDefaultTimeline(
instants.stream().filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), startTs, GREATER)
&& HoodieTimeline.compareTimestamps(s.getTimestamp(), endTs, LESSER_OR_EQUAL)),
instants.stream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)),
details);
}
@@ -143,6 +143,83 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
return new HoodieDefaultTimeline(instants.stream().filter(filter), details);
}
/**
* Get all instants (commits, delta commits) that produce new data, in the active timeline.
*/
public HoodieTimeline getCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION));
}
/**
* Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions, in the active
* timeline.
*/
public HoodieTimeline getAllCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION,
SAVEPOINT_ACTION, ROLLBACK_ACTION));
}
/**
* Get only pure commits (inflight and completed) in the active timeline.
*/
public HoodieTimeline getCommitTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
}
/**
* Get only the delta commits (inflight and completed) in the active timeline.
*/
public HoodieTimeline getDeltaCommitTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get a timeline of a specific set of actions. useful to create a merged timeline of multiple actions.
*
* @param actions actions allowed in the timeline
*/
public HoodieTimeline getTimelineOfActions(Set<String> actions) {
return new HoodieDefaultTimeline(getInstants().filter(s -> actions.contains(s.getAction())),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the cleaner action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getCleanerTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the rollback action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getRollbackTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the save point action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getSavePointTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the restore action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getRestoreTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
protected Stream<HoodieInstant> filterInstantsByAction(String action) {
return getInstants().filter(s -> s.getAction().equals(action));
}
@Override
public boolean empty() {
return !instants.stream().findFirst().isPresent();

View File

@@ -21,20 +21,13 @@ package org.apache.hudi.common.table;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -100,32 +93,4 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness {
assertArrayEquals("Commit value should be \"test-detail\"", "test-detail".getBytes(),
activeCommitTimeline.getInstantDetails(completedInstant).get());
}
@Test
public void checkArchiveCommitTimeline() throws IOException {
Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
SequenceFile.Writer writer =
SequenceFile.createWriter(metaClient.getHadoopConf(), SequenceFile.Writer.file(archiveLogPath),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
writer.append(new Text("1"), new Text("data1"));
writer.append(new Text("2"), new Text("data2"));
writer.append(new Text("3"), new Text("data3"));
IOUtils.closeStream(writer);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2");
HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3");
assertEquals(Arrays.asList(instant1, instant2, instant3),
archivedTimeline.getInstants().collect(Collectors.toList()));
assertArrayEquals(new Text("data1").getBytes(), archivedTimeline.getInstantDetails(instant1).get());
assertArrayEquals(new Text("data2").getBytes(), archivedTimeline.getInstantDetails(instant2).get());
assertArrayEquals(new Text("data3").getBytes(), archivedTimeline.getInstantDetails(instant3).get());
}
}