[HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091)
This commit is contained in:
@@ -18,6 +18,10 @@
|
||||
|
||||
package org.apache.hudi.common.table.timeline;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
@@ -28,14 +32,10 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
@@ -51,7 +51,6 @@ import java.util.function.Function;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
|
||||
@@ -147,7 +146,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
||||
final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
|
||||
final String action = record.get(ACTION_TYPE_KEY).toString();
|
||||
if (loadDetails) {
|
||||
Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> {
|
||||
getMetadataKey(action).map(key -> {
|
||||
Object actionData = record.get(key);
|
||||
if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
|
||||
this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData));
|
||||
} else {
|
||||
@@ -159,22 +159,25 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
||||
return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime);
|
||||
}
|
||||
|
||||
private String getMetadataKey(String action) {
|
||||
@Nonnull
|
||||
private Option<String> getMetadataKey(String action) {
|
||||
switch (action) {
|
||||
case HoodieTimeline.CLEAN_ACTION:
|
||||
return "hoodieCleanMetadata";
|
||||
return Option.of("hoodieCleanMetadata");
|
||||
case HoodieTimeline.COMMIT_ACTION:
|
||||
return "hoodieCommitMetadata";
|
||||
case HoodieTimeline.DELTA_COMMIT_ACTION:
|
||||
return "hoodieCommitMetadata";
|
||||
return Option.of("hoodieCommitMetadata");
|
||||
case HoodieTimeline.ROLLBACK_ACTION:
|
||||
return "hoodieRollbackMetadata";
|
||||
return Option.of("hoodieRollbackMetadata");
|
||||
case HoodieTimeline.SAVEPOINT_ACTION:
|
||||
return "hoodieSavePointMetadata";
|
||||
return Option.of("hoodieSavePointMetadata");
|
||||
case HoodieTimeline.COMPACTION_ACTION:
|
||||
return "hoodieCompactionPlan";
|
||||
return Option.of("hoodieCompactionPlan");
|
||||
case HoodieTimeline.REPLACE_COMMIT_ACTION:
|
||||
return Option.of("hoodieReplaceCommitMetadata");
|
||||
default:
|
||||
throw new HoodieIOException("Unknown action in metadata " + action);
|
||||
LOG.error(String.format("Unknown action in metadata (%s)", action));
|
||||
return Option.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,35 +202,33 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
|
||||
private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
|
||||
Function<GenericRecord, Boolean> commitsFilter) {
|
||||
try {
|
||||
// list all files
|
||||
// List all files
|
||||
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
|
||||
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
|
||||
|
||||
// sort files by version suffix in reverse (implies reverse chronological order)
|
||||
// Sort files by version suffix in reverse (implies reverse chronological order)
|
||||
Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
|
||||
|
||||
List<HoodieInstant> instantsInRange = new ArrayList<>();
|
||||
for (FileStatus fs : fsStatuses) {
|
||||
//read the archived file
|
||||
// Read the archived file
|
||||
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
|
||||
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
|
||||
int instantsInPreviousFile = instantsInRange.size();
|
||||
//read the avro blocks
|
||||
// Read the avro blocks
|
||||
while (reader.hasNext()) {
|
||||
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
|
||||
// TODO If we can store additional metadata in datablock, we can skip parsing records
|
||||
// (such as startTime, endTime of records in the block)
|
||||
List<IndexedRecord> records = blk.getRecords();
|
||||
// filter blocks in desired time window
|
||||
Stream<HoodieInstant> instantsInBlkStream = records.stream()
|
||||
.filter(r -> commitsFilter.apply((GenericRecord) r))
|
||||
.map(r -> readCommit((GenericRecord) r, loadInstantDetails));
|
||||
|
||||
if (filter != null) {
|
||||
instantsInBlkStream = instantsInBlkStream.filter(filter::isInRange);
|
||||
}
|
||||
|
||||
instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList()));
|
||||
// Filter blocks in desired time window
|
||||
instantsInRange.addAll(
|
||||
records.stream()
|
||||
.filter(r -> commitsFilter.apply((GenericRecord) r))
|
||||
.map(r -> readCommit((GenericRecord) r, loadInstantDetails))
|
||||
.filter(c -> filter == null || filter.isInRange(c))
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
||||
if (filter != null) {
|
||||
|
||||
Reference in New Issue
Block a user