1
0

Fixing Rollback for compaction/commit operation, added check for null commit

- Fallback to old way of rollback by listing all partitions
	- Added null check to ensure only partitions which are to be rolledback are considered
	- Added location (committime) to workload stat
	- Added checks in CompactedScanner to guard against task retries
	- Introduce new logic for rollback (bounded by instant_time and target_instant time)
        - Reversed logfiles order
This commit is contained in:
Nishith Agarwal
2017-12-14 21:34:54 -08:00
committed by vinoth chandar
parent be0b1f3e57
commit 2116815261
10 changed files with 348 additions and 162 deletions

View File

@@ -318,7 +318,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
partitionStat.getUpdateLocationToCount().entrySet().stream().forEach(entry -> { partitionStat.getUpdateLocationToCount().entrySet().stream().forEach(entry -> {
HoodieWriteStat writeStat = new HoodieWriteStat(); HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(entry.getKey()); writeStat.setFileId(entry.getKey());
writeStat.setNumUpdateWrites(entry.getValue()); writeStat.setPrevCommit(entry.getValue().getKey());
writeStat.setNumUpdateWrites(entry.getValue().getValue());
metadata.addWriteStat(path.toString(), writeStat); metadata.addWriteStat(path.toString(), writeStat);
}); });
}); });

View File

@@ -187,7 +187,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
} }
} catch (Exception e) { } catch (Exception e) {
throw new HoodieAppendException( throw new HoodieAppendException(
"Failed while appeding records to " + currentLogFile.getPath(), e); "Failed while appending records to " + currentLogFile.getPath(), e);
} }
} }

View File

@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
@@ -86,7 +87,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView .flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath) .getLatestFileSlices(partitionPath)
.map(s -> new CompactionOperation(s.getDataFile().get(), .map(s -> new CompactionOperation(s.getDataFile().get(),
partitionPath, s.getLogFiles().collect(Collectors.toList()), config)) partitionPath, s.getLogFiles().sorted(HoodieLogFile.getLogVersionComparator().reversed())
.collect(Collectors.toList()), config))
.filter(c -> !c.getDeltaFilePaths().isEmpty()) .filter(c -> !c.getDeltaFilePaths().isEmpty())
.collect(toList()).iterator()).collect(); .collect(toList()).iterator()).collect();
log.info("Total of " + operations.size() + " compactions are retrieved"); log.info("Total of " + operations.size() + " compactions are retrieved");
@@ -144,7 +146,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
String maxInstantTime = metaClient.getActiveTimeline() String maxInstantTime = metaClient.getActiveTimeline()
.getTimelineOfActions( .getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION)) HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp(); .filterCompletedInstants().lastInstant().get().getTimestamp();

View File

@@ -54,6 +54,7 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -199,7 +200,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
private void assignUpdates(WorkloadProfile profile) { private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition // each update location gets a partition
WorkloadStat gStat = profile.getGlobalStat(); WorkloadStat gStat = profile.getGlobalStat();
for (Map.Entry<String, Long> updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) { for (Map.Entry<String, Pair<String, Long>> updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) {
addUpdateBucket(updateLocEntry.getKey()); addUpdateBucket(updateLocEntry.getKey());
} }
} }

View File

@@ -32,11 +32,20 @@ import com.uber.hoodie.common.table.log.block.HoodieCommandBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.HoodieAppendHandle;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.Arrays; import java.util.Arrays;
@@ -47,32 +56,30 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/** /**
* Implementation of a more real-time read-optimized Hoodie Table where * Implementation of a more real-time read-optimized Hoodie Table where
* * <p>
* INSERTS - Same as HoodieCopyOnWriteTable - Produce new files, block aligned to desired size (or) * INSERTS - Same as HoodieCopyOnWriteTable - Produce new files, block aligned to desired size (or)
* Merge with the smallest existing file, to expand it * Merge with the smallest existing file, to expand it
* * </p>
* <p>
* UPDATES - Appends the changes to a rolling log file maintained per file Id. Compaction merges the * UPDATES - Appends the changes to a rolling log file maintained per file Id. Compaction merges the
* log file into the base file. * log file into the base file.
* * </p>
* <p>
* WARNING - MOR table type does not support nested rollbacks, every rollback must be followed by an * WARNING - MOR table type does not support nested rollbacks, every rollback must be followed by an
* attempted commit action * attempted commit action
* </p>
*/ */
public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
HoodieCopyOnWriteTable<T> { HoodieCopyOnWriteTable<T> {
private static Logger logger = LogManager.getLogger(HoodieMergeOnReadTable.class); private static Logger logger = LogManager.getLogger(HoodieMergeOnReadTable.class);
public HoodieMergeOnReadTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { public HoodieMergeOnReadTable(HoodieWriteConfig config,
HoodieTableMetaClient metaClient) {
super(config, metaClient); super(config, metaClient);
} }
@@ -126,7 +133,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
} }
Map<String, HoodieInstant> commitsAndCompactions = Map<String, HoodieInstant> commitsAndCompactions =
this.getActiveTimeline() this.getActiveTimeline()
.getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION)) .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION))
.getInstants() .getInstants()
.filter(i -> commits.contains(i.getTimestamp())) .filter(i -> commits.contains(i.getTimestamp()))
.collect(Collectors.toMap(i -> i.getTimestamp(), i -> i)); .collect(Collectors.toMap(i -> i.getTimestamp(), i -> i));
@@ -139,41 +147,28 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
Long startTime = System.currentTimeMillis(); Long startTime = System.currentTimeMillis();
List<HoodieRollbackStat> allRollbackStats = commits.stream().map(commit -> { List<HoodieRollbackStat> allRollbackStats = jsc.parallelize
(FSUtils.getAllPartitionPaths(this.metaClient.getFs(),
this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
.map((Function<String, List<HoodieRollbackStat>>) partitionPath -> {
return commits.stream().map(commit -> {
HoodieInstant instant = commitsAndCompactions.get(commit); HoodieInstant instant = commitsAndCompactions.get(commit);
List<HoodieRollbackStat> stats = null; HoodieRollbackStat hoodieRollbackStats = null;
switch (instant.getAction()) { switch (instant.getAction()) {
case HoodieTimeline.COMMIT_ACTION: case HoodieTimeline.COMMIT_ACTION:
try { try {
logger.info("Starting to rollback Commit/Compaction " + instant); Map<FileStatus, Boolean> results = super.deleteCleanedFiles(partitionPath, Arrays.asList(commit));
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.fromBytes(this.getCommitsTimeline().getInstantDetails(
new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get());
stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream()
.collect(Collectors.toList()))
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
Map<FileStatus, Boolean> results = super
.deleteCleanedFiles(partitionPath, Arrays.asList(commit));
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(results).build(); .withDeletedFileResults(results).build();
}).collect();
logger.info("Finished rollback of Commit/Compaction " + instant);
break; break;
} catch (IOException io) { } catch (IOException io) {
throw new UncheckedIOException("Failed to rollback for commit " + commit, io); throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
} }
case HoodieTimeline.DELTA_COMMIT_ACTION: case HoodieTimeline.DELTA_COMMIT_ACTION:
try { try {
logger.info("Starting to rollback delta commit " + instant);
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(this.getCommitsTimeline().getInstantDetails( .fromBytes(this.getCommitTimeline().getInstantDetails(new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get());
new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get());
stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream()
.collect(Collectors.toList()))
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
// read commit file and (either append delete blocks or delete file) // read commit file and (either append delete blocks or delete file)
Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>(); Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>(); Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();
@@ -183,16 +178,19 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
.deleteCleanedFiles(partitionPath, Arrays.asList(commit)); .deleteCleanedFiles(partitionPath, Arrays.asList(commit));
// append rollback blocks for updates // append rollback blocks for updates
if(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() commitMetadata.getPartitionToWriteStats().get(partitionPath).stream()
.filter(wStat -> wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) .filter(wStat -> {
return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
&& wStat.getPrevCommit() != null;
})
.forEach(wStat -> { .forEach(wStat -> {
HoodieLogFormat.Writer writer = null; HoodieLogFormat.Writer writer = null;
try { try {
writer = HoodieLogFormat.newWriterBuilder() writer = HoodieLogFormat.newWriterBuilder()
.onParentPath( .onParentPath(new Path(this.getMetaClient().getBasePath(), partitionPath))
new Path(this.getMetaClient().getBasePath(), partitionPath))
.withFileId(wStat.getFileId()).overBaseCommit(wStat.getPrevCommit()) .withFileId(wStat.getFileId()).overBaseCommit(wStat.getPrevCommit())
.withFs(getMetaClient().getFs()) .withFs(this.metaClient.getFs())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Long numRollbackBlocks = 0L; Long numRollbackBlocks = 0L;
// generate metadata // generate metadata
@@ -201,19 +199,12 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, commit); metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, commit);
// if update belongs to an existing log file // if update belongs to an existing log file
writer.appendBlock(new HoodieCommandBlock( writer = writer.appendBlock(new HoodieCommandBlock(
HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK,
metadata)); metadata));
numRollbackBlocks++; numRollbackBlocks++;
if (wStat.getNumDeletes() > 0) {
writer.appendBlock(new HoodieCommandBlock(
HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK,
metadata));
numRollbackBlocks++;
}
filesToNumBlocksRollback filesToNumBlocksRollback
.put(getMetaClient().getFs() .put(this.getMetaClient().getFs().getFileStatus(writer.getLogFile().getPath()),
.getFileStatus(writer.getLogFile().getPath()),
numRollbackBlocks); numRollbackBlocks);
} catch (IOException | InterruptedException io) { } catch (IOException | InterruptedException io) {
throw new HoodieRollbackException( throw new HoodieRollbackException(
@@ -226,26 +217,25 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
} }
} }
}); });
return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath)
.withDeletedFileResults(filesToDeletedStatus) .withDeletedFileResults(filesToDeletedStatus)
.withRollbackBlockAppendResults(filesToNumBlocksRollback).build(); .withRollbackBlockAppendResults(filesToNumBlocksRollback).build();
}).collect(); }
logger.info("Fnished rollback of delta commit " + instant);
break; break;
} catch (IOException io) { } catch (IOException io) {
throw new UncheckedIOException("Failed to rollback for commit " + commit, io); throw new UncheckedIOException("Failed to rollback for commit " + commit, io);
} }
} }
return stats; return hoodieRollbackStats;
}).flatMap(x -> x.stream()).collect(Collectors.toList()); }).collect(Collectors.toList());
}).flatMap(x -> x.iterator()).filter(x -> x != null).collect();
commitsAndCompactions.entrySet().stream() commitsAndCompactions.entrySet().stream()
.map(entry -> new HoodieInstant(true, entry.getValue().getAction(), .map(entry -> new HoodieInstant(true, entry.getValue().getAction(),
entry.getValue().getTimestamp())) entry.getValue().getTimestamp()))
.forEach(this.getActiveTimeline()::deleteInflight); .forEach(this.getActiveTimeline()::deleteInflight);
logger logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime));
return allRollbackStats; return allRollbackStats;
} }

View File

@@ -17,6 +17,8 @@
package com.uber.hoodie.table; package com.uber.hoodie.table;
import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordLocation;
import org.apache.commons.lang3.tuple.Pair;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
@@ -29,7 +31,7 @@ public class WorkloadStat implements Serializable {
private long numUpdates = 0L; private long numUpdates = 0L;
private HashMap<String, Long> updateLocationToCount; private HashMap<String, Pair<String, Long>> updateLocationToCount;
public WorkloadStat() { public WorkloadStat() {
updateLocationToCount = new HashMap<>(); updateLocationToCount = new HashMap<>();
@@ -40,7 +42,7 @@ public class WorkloadStat implements Serializable {
} }
long addUpdates(HoodieRecordLocation location, long numUpdates) { long addUpdates(HoodieRecordLocation location, long numUpdates) {
updateLocationToCount.put(location.getFileId(), numUpdates); updateLocationToCount.put(location.getFileId(), Pair.of(location.getCommitTime(), numUpdates));
return this.numUpdates += numUpdates; return this.numUpdates += numUpdates;
} }
@@ -52,7 +54,7 @@ public class WorkloadStat implements Serializable {
return numInserts; return numInserts;
} }
public HashMap<String, Long> getUpdateLocationToCount() { public HashMap<String, Pair<String, Long>> getUpdateLocationToCount() {
return updateLocationToCount; return updateLocationToCount;
} }

View File

@@ -463,7 +463,7 @@ public class TestMergeOnReadTable {
newCommitTime = "002"; newCommitTime = "002";
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 200); records = dataGen.generateUpdates(newCommitTime, records);
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors // Verify there are no errors
@@ -556,6 +556,7 @@ public class TestMergeOnReadTable {
return HoodieWriteConfig.newBuilder().withPath(basePath) return HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withAutoCommit(autoCommit) .withAutoCommit(autoCommit)
.withAssumeDatePartitioning(true)
.withCompactionConfig( .withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
.withInlineCompaction(false).build()) .withInlineCompaction(false).build())

View File

@@ -16,9 +16,6 @@
package com.uber.hoodie.common.table.log; package com.uber.hoodie.common.table.log;
import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;
import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.LogMetadataType.INSTANT_TIME;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieLogFile;
@@ -32,6 +29,14 @@ import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
@@ -43,13 +48,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK;
import org.apache.avro.generic.IndexedRecord; import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.LogMetadataType.INSTANT_TIME;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/** /**
* Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of * Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of
@@ -76,8 +77,8 @@ public class HoodieCompactedLogRecordScanner implements
private HoodieTableMetaClient hoodieTableMetaClient; private HoodieTableMetaClient hoodieTableMetaClient;
// Merge strategy to use when combining records from log // Merge strategy to use when combining records from log
private String payloadClassFQN; private String payloadClassFQN;
// Store only the last log blocks (needed to implement rollback) // Store the last instant log blocks (needed to implement rollback)
Deque<HoodieLogBlock> lastBlocks = new ArrayDeque<>(); Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime) { Schema readerSchema, String latestInstantTime) {
@@ -100,8 +101,8 @@ public class HoodieCompactedLogRecordScanner implements
HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, logFile, readerSchema, true); HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, logFile, readerSchema, true);
while (reader.hasNext()) { while (reader.hasNext()) {
HoodieLogBlock r = reader.next(); HoodieLogBlock r = reader.next();
String blockInstantTime = r.getLogMetadata().get(INSTANT_TIME); if (r.getBlockType() != CORRUPT_BLOCK &&
if (!HoodieTimeline.compareTimestamps(blockInstantTime, this.latestInstantTime, !HoodieTimeline.compareTimestamps(r.getLogMetadata().get(INSTANT_TIME), this.latestInstantTime,
HoodieTimeline.LESSER_OR_EQUAL)) { HoodieTimeline.LESSER_OR_EQUAL)) {
//hit a block with instant time greater than should be processed, stop processing further //hit a block with instant time greater than should be processed, stop processing further
break; break;
@@ -109,22 +110,33 @@ public class HoodieCompactedLogRecordScanner implements
switch (r.getBlockType()) { switch (r.getBlockType()) {
case AVRO_DATA_BLOCK: case AVRO_DATA_BLOCK:
log.info("Reading a data block from file " + logFile.getPath()); log.info("Reading a data block from file " + logFile.getPath());
// Consider the following scenario
// (Time 0, C1, Task T1) -> Running
// (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct DataBlock (B1) with commitTime C1
// (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2)
// (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2)
// Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same.
// Say, commit C1 eventually failed and a rollback is triggered.
// Rollback will write only 1 rollback block (R1) since it assumes one block is written per ingestion batch for a file,
// but in reality we need to rollback (B1 & B2)
// The following code ensures the same rollback block (R1) is used to rollback both B1 & B2
if(isNewInstantBlock(r)) {
// If this is a avro data block, then merge the last block records into the main result // If this is a avro data block, then merge the last block records into the main result
merge(records, lastBlocks); merge(records, currentInstantLogBlocks);
// store the last block }
lastBlocks.push(r); // store the current block
currentInstantLogBlocks.push(r);
break; break;
case DELETE_BLOCK: case DELETE_BLOCK:
log.info("Reading a delete block from file " + logFile.getPath()); log.info("Reading a delete block from file " + logFile.getPath());
String lastBlockInstantTime = lastBlocks.peek().getLogMetadata().get(INSTANT_TIME); if (isNewInstantBlock(r)) {
if (!lastBlockInstantTime.equals(blockInstantTime)) {
// Block with the keys listed as to be deleted, data and delete blocks written in different batches // Block with the keys listed as to be deleted, data and delete blocks written in different batches
// so it is safe to merge // so it is safe to merge
// This is a delete block, so lets merge any records from previous data block // This is a delete block, so lets merge any records from previous data block
merge(records, lastBlocks); merge(records, currentInstantLogBlocks);
} }
// store deletes so can be rolled back // store deletes so can be rolled back
lastBlocks.push(r); currentInstantLogBlocks.push(r);
break; break;
case COMMAND_BLOCK: case COMMAND_BLOCK:
log.info("Reading a command block from file " + logFile.getPath()); log.info("Reading a command block from file " + logFile.getPath());
@@ -137,28 +149,46 @@ public class HoodieCompactedLogRecordScanner implements
// Rollback the last read log block // Rollback the last read log block
// Get commit time from last record block, compare with targetCommitTime, rollback only if equal, // Get commit time from last record block, compare with targetCommitTime, rollback only if equal,
// this is required in scenarios of invalid/extra rollback blocks written due to failures during // this is required in scenarios of invalid/extra rollback blocks written due to failures during
// the rollback operation itself // the rollback operation itself and ensures the same rollback block (R1) is used to rollback
HoodieLogBlock lastBlock = lastBlocks.peek(); // both B1 & B2 with same instant_time
if (lastBlock != null && lastBlock.getBlockType() != CORRUPT_BLOCK && int numBlocksRolledBack = 0;
while(!currentInstantLogBlocks.isEmpty()) {
HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
// handle corrupt blocks separately since they may not have metadata
if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
log.info(
"Rolling back the last corrupted log block read in " + logFile.getPath());
currentInstantLogBlocks.pop();
numBlocksRolledBack++;
}
// rollback last data block or delete block
else if (lastBlock.getBlockType() != CORRUPT_BLOCK &&
targetInstantForCommandBlock targetInstantForCommandBlock
.contentEquals(lastBlock.getLogMetadata().get(INSTANT_TIME))) { .contentEquals(lastBlock.getLogMetadata().get(INSTANT_TIME))) {
log.info("Rolling back the last log block read in " + logFile.getPath()); log.info("Rolling back the last log block read in " + logFile.getPath());
lastBlocks.pop(); currentInstantLogBlocks.pop();
} else if (lastBlock != null && lastBlock.getBlockType() == CORRUPT_BLOCK) { numBlocksRolledBack++;
// handle corrupt blocks separately since they may not have metadata
log.info(
"Rolling back the last corrupted log block read in " + logFile.getPath());
lastBlocks.pop();
} else {
log.warn("Invalid or extra rollback command block in " + logFile.getPath());
} }
// invalid or extra rollback block
else if(!targetInstantForCommandBlock
.contentEquals(currentInstantLogBlocks.peek().getLogMetadata().get(INSTANT_TIME))) {
log.warn("Invalid or extra rollback command block in " + logFile.getPath());
break; break;
} }
// this should not happen ideally
else {
log.warn("Unable to apply rollback command block in " + logFile.getPath());
}
}
log.info("Number of applied rollback blocks " + numBlocksRolledBack);
break;
}
break; break;
case CORRUPT_BLOCK: case CORRUPT_BLOCK:
log.info("Found a corrupt block in " + logFile.getPath()); log.info("Found a corrupt block in " + logFile.getPath());
// If there is a corrupt block - we will assume that this was the next data block // If there is a corrupt block - we will assume that this was the next data block
lastBlocks.push(r); currentInstantLogBlocks.push(r);
break; break;
} }
} }
@@ -167,15 +197,26 @@ public class HoodieCompactedLogRecordScanner implements
throw new HoodieIOException("IOException when reading log file " + logFile); throw new HoodieIOException("IOException when reading log file " + logFile);
} }
// merge the last read block when all the blocks are done reading // merge the last read block when all the blocks are done reading
if (!lastBlocks.isEmpty()) { if (!currentInstantLogBlocks.isEmpty()) {
log.info("Merging the final data blocks in " + logFile.getPath()); log.info("Merging the final data blocks in " + logFile.getPath());
merge(records, lastBlocks); merge(records, currentInstantLogBlocks);
} }
} }
this.logRecords = Collections.unmodifiableCollection(records.values()); this.logRecords = Collections.unmodifiableCollection(records.values());
this.totalRecordsToUpdate = records.size(); this.totalRecordsToUpdate = records.size();
} }
/**
* Checks if the current logblock belongs to a later instant
* @param logBlock
* @return
*/
private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK
&& !logBlock.getLogMetadata().get(INSTANT_TIME)
.contentEquals(currentInstantLogBlocks.peek().getLogMetadata().get(INSTANT_TIME));
}
/** /**
* Iterate over the GenericRecord in the block, read the hoodie key and partition path and merge * Iterate over the GenericRecord in the block, read the hoodie key and partition path and merge
* with the application specific payload if the same key was found before Sufficient to just merge * with the application specific payload if the same key was found before Sufficient to just merge
@@ -218,15 +259,18 @@ public class HoodieCompactedLogRecordScanner implements
private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records, private void merge(Map<String, HoodieRecord<? extends HoodieRecordPayload>> records,
Deque<HoodieLogBlock> lastBlocks) { Deque<HoodieLogBlock> lastBlocks) {
while (!lastBlocks.isEmpty()) { while (!lastBlocks.isEmpty()) {
HoodieLogBlock lastBlock = lastBlocks.pop(); // poll the element at the bottom of the stack since that's the order it was inserted
HoodieLogBlock lastBlock = lastBlocks.pollLast();
switch (lastBlock.getBlockType()) { switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK: case AVRO_DATA_BLOCK:
merge(records, loadRecordsFromBlock((HoodieAvroDataBlock) lastBlock)); merge(records, loadRecordsFromBlock((HoodieAvroDataBlock) lastBlock));
break; break;
case DELETE_BLOCK: case DELETE_BLOCK:
// TODO : If delete is the only block written and/or records are present in parquet file
Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(records::remove); Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(records::remove);
break; break;
case CORRUPT_BLOCK: case CORRUPT_BLOCK:
log.warn("Found a corrupt block which was not rolled back");
break; break;
} }
} }

View File

@@ -520,23 +520,25 @@ public class HoodieLogFormatTest {
.collect(Collectors.toList()); .collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap(); Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
schema, metadata); schema, metadata);
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
// Write 2 // Write 2
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101");
List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); dataBlock = new HoodieAvroDataBlock(records2, schema, metadata);
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
// Rollback the last write // Rollback the last write
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101");
HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlock commandBlock = new HoodieCommandBlock(
HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata);
writer = writer.appendBlock(commandBlock); writer = writer.appendBlock(commandBlock);
// Write 3 // Write 3
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "102");
List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream().map(record -> List<IndexedRecord> copyOfRecords3 = records3.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
@@ -552,8 +554,8 @@ public class HoodieLogFormatTest {
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
allLogFiles, allLogFiles,
schema, "100"); schema, "102");
assertEquals("We only read 200 records, but only 200 of them are valid", 200, assertEquals("We read 200 records from 2 write batches", 200,
scanner.getTotalLogRecords()); scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -583,12 +585,13 @@ public class HoodieLogFormatTest {
.collect(Collectors.toList()); .collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap(); Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
schema, metadata); schema, metadata);
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
writer.close(); writer.close();
// Write 2
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101");
// Append some arbit byte[] to thee end of the log (mimics a partially written commit) // Append some arbit byte[] to thee end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
@@ -605,6 +608,8 @@ public class HoodieLogFormatTest {
outputStream.close(); outputStream.close();
// Rollback the last write // Rollback the last write
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "102");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101");
HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlock commandBlock = new HoodieCommandBlock(
HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata);
writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -613,6 +618,7 @@ public class HoodieLogFormatTest {
writer = writer.appendBlock(commandBlock); writer = writer.appendBlock(commandBlock);
// Write 3 // Write 3
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "103");
List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List<IndexedRecord> records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords3 = records3.stream().map(record -> List<IndexedRecord> copyOfRecords3 = records3.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
@@ -629,7 +635,7 @@ public class HoodieLogFormatTest {
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
allLogFiles, allLogFiles,
schema, "100"); schema, "103");
assertEquals("We would read 200 records", 200, assertEquals("We would read 200 records", 200,
scanner.getTotalLogRecords()); scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
@@ -660,12 +666,12 @@ public class HoodieLogFormatTest {
.collect(Collectors.toList()); .collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap(); Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
schema, metadata); schema, metadata);
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
// Write 2 // Write 2
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101");
List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords2 = records2.stream().map(record -> List<IndexedRecord> copyOfRecords2 = records2.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
@@ -682,6 +688,7 @@ public class HoodieLogFormatTest {
// Delete 50 keys // Delete 50 keys
List<String> deletedKeys = originalKeys.subList(0, 50); List<String> deletedKeys = originalKeys.subList(0, 50);
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "102");
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]),
metadata); metadata);
writer = writer.appendBlock(deleteBlock); writer = writer.appendBlock(deleteBlock);
@@ -693,7 +700,7 @@ public class HoodieLogFormatTest {
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
allLogFiles, allLogFiles,
schema, "100"); schema, "102");
assertEquals("We still would read 200 records", 200, assertEquals("We still would read 200 records", 200,
scanner.getTotalLogRecords()); scanner.getTotalLogRecords());
final List<String> readKeys = new ArrayList<>(200); final List<String> readKeys = new ArrayList<>(200);
@@ -706,12 +713,14 @@ public class HoodieLogFormatTest {
readKeys); readKeys);
// Rollback the last block // Rollback the last block
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "103");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "102");
HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlock commandBlock = new HoodieCommandBlock(
HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata);
writer = writer.appendBlock(commandBlock); writer = writer.appendBlock(commandBlock);
readKeys.clear(); readKeys.clear();
scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "100"); scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "101");
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 200 records after rollback of delete", 200, assertEquals("Stream collect should return all 200 records after rollback of delete", 200,
readKeys.size()); readKeys.size());
@@ -756,7 +765,7 @@ public class HoodieLogFormatTest {
metadata); metadata);
writer = writer.appendBlock(deleteBlock); writer = writer.appendBlock(deleteBlock);
// Attemp 1 : Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write // Attempt 1 : Write rollback block for a failed write
HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlock commandBlock = new HoodieCommandBlock(
HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata);
try { try {
@@ -766,8 +775,7 @@ public class HoodieLogFormatTest {
} catch (Exception e) { } catch (Exception e) {
// it's okay // it's okay
} }
// Attempt 2 : Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write // Attempt 2 : Write another rollback blocks for a failed write
writer = writer.appendBlock(commandBlock);
writer = writer.appendBlock(commandBlock); writer = writer.appendBlock(commandBlock);
List<String> allLogFiles = FSUtils List<String> allLogFiles = FSUtils
@@ -778,12 +786,13 @@ public class HoodieLogFormatTest {
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
allLogFiles, allLogFiles,
schema, "100"); schema, "100");
assertEquals("We would read 100 records", 100, // all data must be rolled back before merge
assertEquals("We would read 0 records", 0,
scanner.getTotalLogRecords()); scanner.getTotalLogRecords());
final List<String> readKeys = new ArrayList<>(100); final List<String> readKeys = new ArrayList<>();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 150 records", 100, readKeys.size()); assertEquals("Stream collect should return all 0 records", 0, readKeys.size());
} }
@Test @Test
@@ -850,12 +859,12 @@ public class HoodieLogFormatTest {
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap(); Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
schema, metadata); schema, metadata);
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
// Write invalid rollback for a failed write (possible for in-flight commits) // Write invalid rollback for a failed write (possible for in-flight commits)
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101");
HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlock commandBlock = new HoodieCommandBlock(
HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata);
writer = writer.appendBlock(commandBlock); writer = writer.appendBlock(commandBlock);
@@ -873,4 +882,140 @@ public class HoodieLogFormatTest {
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals("Stream collect should return all 150 records", 100, readKeys.size()); assertEquals("Stream collect should return all 150 records", 100, readKeys.size());
} }
@Test
public void testAvroLogRecordReaderWithInsertsDeleteAndRollback()
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream().map(record ->
HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema))
.collect(Collectors.toList());
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
schema, metadata);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
List<String> originalKeys = copyOfRecords1.stream()
.map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(
Collectors.toList());
// Delete 50 keys
List<String> deletedKeys = originalKeys.subList(0, 50);
HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]),
metadata);
writer = writer.appendBlock(deleteBlock);
// Write 1 rollback block for a failed write
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101");
HoodieCommandBlock commandBlock = new HoodieCommandBlock(
HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata);
writer = writer.appendBlock(commandBlock);
List<String> allLogFiles = FSUtils
.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
allLogFiles, schema, "101");
assertEquals("We would read 0 records", 0,
scanner.getTotalLogRecords());
}
@Test
public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback()
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
// Write 1
List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
Map<HoodieLogBlock.LogMetadataType, String> metadata = Maps.newHashMap();
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100");
metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100");
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
schema, metadata);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
writer = writer.appendBlock(dataBlock);
writer.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
// Write out a length that does not confirm with the content
outputStream.writeInt(100);
outputStream.flush();
outputStream.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
// Write out a length that does not confirm with the content
outputStream.writeInt(100);
outputStream.flush();
outputStream.close();
writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
writer = writer.appendBlock(dataBlock);
writer.close();
// Append some arbit byte[] to thee end of the log (mimics a partially written commit)
fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
outputStream = fs.append(writer.getLogFile().getPath());
// create a block with
outputStream.write(HoodieLogFormat.MAGIC);
outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal());
// Write out a length that does not confirm with the content
outputStream.writeInt(100);
outputStream.flush();
outputStream.close();
writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
.overBaseCommit("100").withFs(fs).build();
// Write 1 rollback block for a failed write
metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101");
HoodieCommandBlock commandBlock = new HoodieCommandBlock(
HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata);
writer = writer.appendBlock(commandBlock);
List<String> allLogFiles = FSUtils
.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString())
.collect(Collectors.toList());
HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath,
allLogFiles, schema, "101");
assertEquals("We would read 0 records", 0,
scanner.getTotalLogRecords());
}
} }

View File

@@ -124,7 +124,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf
// Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table // Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
String maxCommitTime = metaClient.getActiveTimeline() String maxCommitTime = metaClient.getActiveTimeline()
.getTimelineOfActions( .getTimelineOfActions(
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION)) HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp(); .filterCompletedInstants().lastInstant().get().getTimestamp();
rtSplits.add( rtSplits.add(