diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index df80ba587..e960e1be5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -318,7 +318,8 @@ public class HoodieWriteClient implements Seriali partitionStat.getUpdateLocationToCount().entrySet().stream().forEach(entry -> { HoodieWriteStat writeStat = new HoodieWriteStat(); writeStat.setFileId(entry.getKey()); - writeStat.setNumUpdateWrites(entry.getValue()); + writeStat.setPrevCommit(entry.getValue().getKey()); + writeStat.setNumUpdateWrites(entry.getValue().getValue()); metadata.addWriteStat(path.toString(), writeStat); }); }); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index fb107444d..9752dc444 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -187,7 +187,7 @@ public class HoodieAppendHandle extends HoodieIOH } } catch (Exception e) { throw new HoodieAppendException( - "Failed while appeding records to " + currentLogFile.getPath(), e); + "Failed while appending records to " + currentLogFile.getPath(), e); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 501e452c4..138992469 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -86,7 +87,8 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .flatMap((FlatMapFunction) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) .map(s -> new CompactionOperation(s.getDataFile().get(), - partitionPath, s.getLogFiles().collect(Collectors.toList()), config)) + partitionPath, s.getLogFiles().sorted(HoodieLogFile.getLogVersionComparator().reversed()) + .collect(Collectors.toList()), config)) .filter(c -> !c.getDeltaFilePaths().isEmpty()) .collect(toList()).iterator()).collect(); log.info("Total of " + operations.size() + " compactions are retrieved"); @@ -144,7 +146,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); String maxInstantTime = metaClient.getActiveTimeline() .getTimelineOfActions( - Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, + Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index cb706a389..019153293 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -54,6 +54,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -199,7 +200,7 @@ public class HoodieCopyOnWriteTable extends Hoodi private void assignUpdates(WorkloadProfile profile) { // each update location gets a partition WorkloadStat gStat = profile.getGlobalStat(); - for (Map.Entry updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) { + for (Map.Entry> updateLocEntry : gStat.getUpdateLocationToCount().entrySet()) { addUpdateBucket(updateLocEntry.getKey()); } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 1e507c645..e97108086 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -32,11 +32,20 @@ import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; @@ -47,38 +56,36 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; /** * Implementation of a more real-time read-optimized Hoodie Table where - * + *

* INSERTS - Same as HoodieCopyOnWriteTable - Produce new files, block aligned to desired size (or) * Merge with the smallest existing file, to expand it - * + *

+ *

* UPDATES - Appends the changes to a rolling log file maintained per file Id. Compaction merges the * log file into the base file. - * + *

+ *

* WARNING - MOR table type does not support nested rollbacks, every rollback must be followed by an * attempted commit action + *

*/ public class HoodieMergeOnReadTable extends HoodieCopyOnWriteTable { private static Logger logger = LogManager.getLogger(HoodieMergeOnReadTable.class); - public HoodieMergeOnReadTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { + public HoodieMergeOnReadTable(HoodieWriteConfig config, + HoodieTableMetaClient metaClient) { super(config, metaClient); } @Override public Iterator> handleUpdate(String commitTime, String fileId, - Iterator> recordItr) throws IOException { + Iterator> recordItr) throws IOException { logger.info("Merging updates for commit " + commitTime + " for file " + fileId); HoodieAppendHandle appendHandle = new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); @@ -126,7 +133,8 @@ public class HoodieMergeOnReadTable extends } Map commitsAndCompactions = this.getActiveTimeline() - .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION)) + .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, + HoodieActiveTimeline.DELTA_COMMIT_ACTION)) .getInstants() .filter(i -> commits.contains(i.getTimestamp())) .collect(Collectors.toMap(i -> i.getTimestamp(), i -> i)); @@ -139,41 +147,28 @@ public class HoodieMergeOnReadTable extends Long startTime = System.currentTimeMillis(); - List allRollbackStats = commits.stream().map(commit -> { - HoodieInstant instant = commitsAndCompactions.get(commit); - List stats = null; - switch (instant.getAction()) { - case HoodieTimeline.COMMIT_ACTION: - try { - logger.info("Starting to rollback Commit/Compaction " + instant); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(this.getCommitsTimeline().getInstantDetails( - new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); - - stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream() - .collect(Collectors.toList())) - .map((Function) partitionPath -> { - Map results = super - .deleteCleanedFiles(partitionPath, Arrays.asList(commit)); - return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) + List allRollbackStats = jsc.parallelize + (FSUtils.getAllPartitionPaths(this.metaClient.getFs(), + this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) + .map((Function>) partitionPath -> { + return commits.stream().map(commit -> { + HoodieInstant instant = commitsAndCompactions.get(commit); + HoodieRollbackStat hoodieRollbackStats = null; + switch (instant.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + try { + Map results = super.deleteCleanedFiles(partitionPath, Arrays.asList(commit)); + hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) .withDeletedFileResults(results).build(); - }).collect(); - logger.info("Finished rollback of Commit/Compaction " + instant); - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to rollback for commit " + commit, io); - } - case HoodieTimeline.DELTA_COMMIT_ACTION: - try { - logger.info("Starting to rollback delta commit " + instant); + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to rollback for commit " + commit, io); + } + case HoodieTimeline.DELTA_COMMIT_ACTION: + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(this.getCommitTimeline().getInstantDetails(new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(this.getCommitsTimeline().getInstantDetails( - new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); - - stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream() - .collect(Collectors.toList())) - .map((Function) partitionPath -> { // read commit file and (either append delete blocks or delete file) Map filesToDeletedStatus = new HashMap<>(); Map filesToNumBlocksRollback = new HashMap<>(); @@ -183,69 +178,64 @@ public class HoodieMergeOnReadTable extends .deleteCleanedFiles(partitionPath, Arrays.asList(commit)); // append rollback blocks for updates - commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() - .filter(wStat -> wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT) - .forEach(wStat -> { - HoodieLogFormat.Writer writer = null; - try { - writer = HoodieLogFormat.newWriterBuilder() - .onParentPath( - new Path(this.getMetaClient().getBasePath(), partitionPath)) - .withFileId(wStat.getFileId()).overBaseCommit(wStat.getPrevCommit()) - .withFs(getMetaClient().getFs()) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - Long numRollbackBlocks = 0L; - // generate metadata - Map metadata = Maps.newHashMap(); - metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, - metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); - metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, commit); - // if update belongs to an existing log file - writer.appendBlock(new HoodieCommandBlock( - HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, - metadata)); - numRollbackBlocks++; - if (wStat.getNumDeletes() > 0) { - writer.appendBlock(new HoodieCommandBlock( + if(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + commitMetadata.getPartitionToWriteStats().get(partitionPath).stream() + .filter(wStat -> { + return wStat != null && wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT + && wStat.getPrevCommit() != null; + }) + .forEach(wStat -> { + HoodieLogFormat.Writer writer = null; + try { + writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(new Path(this.getMetaClient().getBasePath(), partitionPath)) + .withFileId(wStat.getFileId()).overBaseCommit(wStat.getPrevCommit()) + .withFs(this.metaClient.getFs()) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + Long numRollbackBlocks = 0L; + // generate metadata + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, + metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, commit); + // if update belongs to an existing log file + writer = writer.appendBlock(new HoodieCommandBlock( HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata)); numRollbackBlocks++; + filesToNumBlocksRollback + .put(this.getMetaClient().getFs().getFileStatus(writer.getLogFile().getPath()), + numRollbackBlocks); + } catch (IOException | InterruptedException io) { + throw new HoodieRollbackException( + "Failed to rollback for commit " + commit, io); + } finally { + try { + writer.close(); + } catch (IOException io) { + throw new UncheckedIOException(io); + } } - filesToNumBlocksRollback - .put(getMetaClient().getFs() - .getFileStatus(writer.getLogFile().getPath()), - numRollbackBlocks); - } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException( - "Failed to rollback for commit " + commit, io); - } finally { - try { - writer.close(); - } catch (IOException io) { - throw new UncheckedIOException(io); - } - } - }); - return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) - .withDeletedFileResults(filesToDeletedStatus) - .withRollbackBlockAppendResults(filesToNumBlocksRollback).build(); - }).collect(); - logger.info("Fnished rollback of delta commit " + instant); - break; - } catch (IOException io) { - throw new UncheckedIOException("Failed to rollback for commit " + commit, io); - } - } - return stats; - }).flatMap(x -> x.stream()).collect(Collectors.toList()); + }); + hoodieRollbackStats = HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) + .withDeletedFileResults(filesToDeletedStatus) + .withRollbackBlockAppendResults(filesToNumBlocksRollback).build(); + } + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to rollback for commit " + commit, io); + } + } + return hoodieRollbackStats; + }).collect(Collectors.toList()); + }).flatMap(x -> x.iterator()).filter(x -> x != null).collect(); commitsAndCompactions.entrySet().stream() .map(entry -> new HoodieInstant(true, entry.getValue().getAction(), entry.getValue().getTimestamp())) .forEach(this.getActiveTimeline()::deleteInflight); - logger - .debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); + logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); return allRollbackStats; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java index 10bf6735a..33764fa59 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java @@ -17,6 +17,8 @@ package com.uber.hoodie.table; import com.uber.hoodie.common.model.HoodieRecordLocation; +import org.apache.commons.lang3.tuple.Pair; + import java.io.Serializable; import java.util.HashMap; @@ -29,7 +31,7 @@ public class WorkloadStat implements Serializable { private long numUpdates = 0L; - private HashMap updateLocationToCount; + private HashMap> updateLocationToCount; public WorkloadStat() { updateLocationToCount = new HashMap<>(); @@ -40,7 +42,7 @@ public class WorkloadStat implements Serializable { } long addUpdates(HoodieRecordLocation location, long numUpdates) { - updateLocationToCount.put(location.getFileId(), numUpdates); + updateLocationToCount.put(location.getFileId(), Pair.of(location.getCommitTime(), numUpdates)); return this.numUpdates += numUpdates; } @@ -52,7 +54,7 @@ public class WorkloadStat implements Serializable { return numInserts; } - public HashMap getUpdateLocationToCount() { + public HashMap> getUpdateLocationToCount() { return updateLocationToCount; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index a9ed76379..5f0d51d2f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -463,7 +463,7 @@ public class TestMergeOnReadTable { newCommitTime = "002"; client.startCommitWithTime(newCommitTime); - records = dataGen.generateUpdates(newCommitTime, 200); + records = dataGen.generateUpdates(newCommitTime, records); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors @@ -556,6 +556,7 @@ public class TestMergeOnReadTable { return HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withAutoCommit(autoCommit) + .withAssumeDatePartitioning(true) .withCompactionConfig( HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) .withInlineCompaction(false).build()) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index 140d5d4fa..905da5c11 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -16,9 +16,6 @@ package com.uber.hoodie.common.table.log; -import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK; -import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.LogMetadataType.INSTANT_TIME; - import com.google.common.collect.Maps; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieLogFile; @@ -32,6 +29,14 @@ import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.exception.HoodieIOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; @@ -43,13 +48,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; + +import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK; +import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.LogMetadataType.INSTANT_TIME; /** * Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged list of @@ -76,8 +77,8 @@ public class HoodieCompactedLogRecordScanner implements private HoodieTableMetaClient hoodieTableMetaClient; // Merge strategy to use when combining records from log private String payloadClassFQN; - // Store only the last log blocks (needed to implement rollback) - Deque lastBlocks = new ArrayDeque<>(); + // Store the last instant log blocks (needed to implement rollback) + Deque currentInstantLogBlocks = new ArrayDeque<>(); public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime) { @@ -100,8 +101,8 @@ public class HoodieCompactedLogRecordScanner implements HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, logFile, readerSchema, true); while (reader.hasNext()) { HoodieLogBlock r = reader.next(); - String blockInstantTime = r.getLogMetadata().get(INSTANT_TIME); - if (!HoodieTimeline.compareTimestamps(blockInstantTime, this.latestInstantTime, + if (r.getBlockType() != CORRUPT_BLOCK && + !HoodieTimeline.compareTimestamps(r.getLogMetadata().get(INSTANT_TIME), this.latestInstantTime, HoodieTimeline.LESSER_OR_EQUAL)) { //hit a block with instant time greater than should be processed, stop processing further break; @@ -109,22 +110,33 @@ public class HoodieCompactedLogRecordScanner implements switch (r.getBlockType()) { case AVRO_DATA_BLOCK: log.info("Reading a data block from file " + logFile.getPath()); - // If this is a avro data block, then merge the last block records into the main result - merge(records, lastBlocks); - // store the last block - lastBlocks.push(r); + // Consider the following scenario + // (Time 0, C1, Task T1) -> Running + // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block or a correct DataBlock (B1) with commitTime C1 + // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the attempt number is 2) + // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock B2) + // Now a logFile L1 can have 2 correct Datablocks (B1 and B2) which are the same. + // Say, commit C1 eventually failed and a rollback is triggered. + // Rollback will write only 1 rollback block (R1) since it assumes one block is written per ingestion batch for a file, + // but in reality we need to rollback (B1 & B2) + // The following code ensures the same rollback block (R1) is used to rollback both B1 & B2 + if(isNewInstantBlock(r)) { + // If this is a avro data block, then merge the last block records into the main result + merge(records, currentInstantLogBlocks); + } + // store the current block + currentInstantLogBlocks.push(r); break; case DELETE_BLOCK: log.info("Reading a delete block from file " + logFile.getPath()); - String lastBlockInstantTime = lastBlocks.peek().getLogMetadata().get(INSTANT_TIME); - if (!lastBlockInstantTime.equals(blockInstantTime)) { + if (isNewInstantBlock(r)) { // Block with the keys listed as to be deleted, data and delete blocks written in different batches // so it is safe to merge // This is a delete block, so lets merge any records from previous data block - merge(records, lastBlocks); + merge(records, currentInstantLogBlocks); } // store deletes so can be rolled back - lastBlocks.push(r); + currentInstantLogBlocks.push(r); break; case COMMAND_BLOCK: log.info("Reading a command block from file " + logFile.getPath()); @@ -137,28 +149,46 @@ public class HoodieCompactedLogRecordScanner implements // Rollback the last read log block // Get commit time from last record block, compare with targetCommitTime, rollback only if equal, // this is required in scenarios of invalid/extra rollback blocks written due to failures during - // the rollback operation itself - HoodieLogBlock lastBlock = lastBlocks.peek(); - if (lastBlock != null && lastBlock.getBlockType() != CORRUPT_BLOCK && - targetInstantForCommandBlock - .contentEquals(lastBlock.getLogMetadata().get(INSTANT_TIME))) { - log.info("Rolling back the last log block read in " + logFile.getPath()); - lastBlocks.pop(); - } else if (lastBlock != null && lastBlock.getBlockType() == CORRUPT_BLOCK) { + // the rollback operation itself and ensures the same rollback block (R1) is used to rollback + // both B1 & B2 with same instant_time + int numBlocksRolledBack = 0; + while(!currentInstantLogBlocks.isEmpty()) { + HoodieLogBlock lastBlock = currentInstantLogBlocks.peek(); // handle corrupt blocks separately since they may not have metadata - log.info( - "Rolling back the last corrupted log block read in " + logFile.getPath()); - lastBlocks.pop(); - } else { - log.warn("Invalid or extra rollback command block in " + logFile.getPath()); + if (lastBlock.getBlockType() == CORRUPT_BLOCK) { + log.info( + "Rolling back the last corrupted log block read in " + logFile.getPath()); + currentInstantLogBlocks.pop(); + numBlocksRolledBack++; + } + // rollback last data block or delete block + else if (lastBlock.getBlockType() != CORRUPT_BLOCK && + targetInstantForCommandBlock + .contentEquals(lastBlock.getLogMetadata().get(INSTANT_TIME))) { + log.info("Rolling back the last log block read in " + logFile.getPath()); + currentInstantLogBlocks.pop(); + numBlocksRolledBack++; + } + // invalid or extra rollback block + else if(!targetInstantForCommandBlock + .contentEquals(currentInstantLogBlocks.peek().getLogMetadata().get(INSTANT_TIME))) { + log.warn("Invalid or extra rollback command block in " + logFile.getPath()); + break; + } + // this should not happen ideally + else { + log.warn("Unable to apply rollback command block in " + logFile.getPath()); + } } + log.info("Number of applied rollback blocks " + numBlocksRolledBack); break; + } break; case CORRUPT_BLOCK: log.info("Found a corrupt block in " + logFile.getPath()); // If there is a corrupt block - we will assume that this was the next data block - lastBlocks.push(r); + currentInstantLogBlocks.push(r); break; } } @@ -167,15 +197,26 @@ public class HoodieCompactedLogRecordScanner implements throw new HoodieIOException("IOException when reading log file " + logFile); } // merge the last read block when all the blocks are done reading - if (!lastBlocks.isEmpty()) { + if (!currentInstantLogBlocks.isEmpty()) { log.info("Merging the final data blocks in " + logFile.getPath()); - merge(records, lastBlocks); + merge(records, currentInstantLogBlocks); } } this.logRecords = Collections.unmodifiableCollection(records.values()); this.totalRecordsToUpdate = records.size(); } + /** + * Checks if the current logblock belongs to a later instant + * @param logBlock + * @return + */ + private boolean isNewInstantBlock(HoodieLogBlock logBlock) { + return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK + && !logBlock.getLogMetadata().get(INSTANT_TIME) + .contentEquals(currentInstantLogBlocks.peek().getLogMetadata().get(INSTANT_TIME)); + } + /** * Iterate over the GenericRecord in the block, read the hoodie key and partition path and merge * with the application specific payload if the same key was found before Sufficient to just merge @@ -218,15 +259,18 @@ public class HoodieCompactedLogRecordScanner implements private void merge(Map> records, Deque lastBlocks) { while (!lastBlocks.isEmpty()) { - HoodieLogBlock lastBlock = lastBlocks.pop(); + // poll the element at the bottom of the stack since that's the order it was inserted + HoodieLogBlock lastBlock = lastBlocks.pollLast(); switch (lastBlock.getBlockType()) { case AVRO_DATA_BLOCK: merge(records, loadRecordsFromBlock((HoodieAvroDataBlock) lastBlock)); break; case DELETE_BLOCK: + // TODO : If delete is the only block written and/or records are present in parquet file Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(records::remove); break; case CORRUPT_BLOCK: + log.warn("Found a corrupt block which was not rolled back"); break; } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 00fd9ca5e..a8be14ee4 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -520,23 +520,25 @@ public class HoodieLogFormatTest { .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); - metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema, metadata); writer = writer.appendBlock(dataBlock); // Write 2 + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101"); List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); writer = writer.appendBlock(dataBlock); // Rollback the last write + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101"); HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); writer = writer.appendBlock(commandBlock); // Write 3 + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "102"); List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) @@ -552,8 +554,8 @@ public class HoodieLogFormatTest { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, - schema, "100"); - assertEquals("We only read 200 records, but only 200 of them are valid", 200, + schema, "102"); + assertEquals("We read 200 records from 2 write batches", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -583,12 +585,13 @@ public class HoodieLogFormatTest { .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); - metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema, metadata); writer = writer.appendBlock(dataBlock); writer.close(); + // Write 2 + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101"); // Append some arbit byte[] to thee end of the log (mimics a partially written commit) fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); @@ -605,6 +608,8 @@ public class HoodieLogFormatTest { outputStream.close(); // Rollback the last write + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "102"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101"); HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) @@ -613,6 +618,7 @@ public class HoodieLogFormatTest { writer = writer.appendBlock(commandBlock); // Write 3 + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "103"); List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List copyOfRecords3 = records3.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) @@ -629,7 +635,7 @@ public class HoodieLogFormatTest { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, - schema, "100"); + schema, "103"); assertEquals("We would read 200 records", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); @@ -660,12 +666,12 @@ public class HoodieLogFormatTest { .collect(Collectors.toList()); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); - metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema, metadata); writer = writer.appendBlock(dataBlock); // Write 2 + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101"); List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); List copyOfRecords2 = records2.stream().map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) @@ -682,6 +688,7 @@ public class HoodieLogFormatTest { // Delete 50 keys List deletedKeys = originalKeys.subList(0, 50); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "102"); HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), metadata); writer = writer.appendBlock(deleteBlock); @@ -693,7 +700,7 @@ public class HoodieLogFormatTest { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, - schema, "100"); + schema, "102"); assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords()); final List readKeys = new ArrayList<>(200); @@ -706,12 +713,14 @@ public class HoodieLogFormatTest { readKeys); // Rollback the last block + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "103"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "102"); HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); writer = writer.appendBlock(commandBlock); readKeys.clear(); - scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "100"); + scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "101"); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size()); @@ -756,7 +765,7 @@ public class HoodieLogFormatTest { metadata); writer = writer.appendBlock(deleteBlock); - // Attemp 1 : Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write + // Attempt 1 : Write rollback block for a failed write HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); try { @@ -766,8 +775,7 @@ public class HoodieLogFormatTest { } catch (Exception e) { // it's okay } - // Attempt 2 : Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write - writer = writer.appendBlock(commandBlock); + // Attempt 2 : Write another rollback blocks for a failed write writer = writer.appendBlock(commandBlock); List allLogFiles = FSUtils @@ -778,12 +786,13 @@ public class HoodieLogFormatTest { HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, allLogFiles, schema, "100"); - assertEquals("We would read 100 records", 100, + // all data must be rolled back before merge + assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords()); - final List readKeys = new ArrayList<>(100); + final List readKeys = new ArrayList<>(); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); - assertEquals("Stream collect should return all 150 records", 100, readKeys.size()); + assertEquals("Stream collect should return all 0 records", 0, readKeys.size()); } @Test @@ -850,12 +859,12 @@ public class HoodieLogFormatTest { List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); Map metadata = Maps.newHashMap(); metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); - metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema, metadata); writer = writer.appendBlock(dataBlock); // Write invalid rollback for a failed write (possible for in-flight commits) + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101"); HoodieCommandBlock commandBlock = new HoodieCommandBlock( HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); writer = writer.appendBlock(commandBlock); @@ -873,4 +882,140 @@ public class HoodieLogFormatTest { scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals("Stream collect should return all 150 records", 100, readKeys.size()); } + + @Test + public void testAvroLogRecordReaderWithInsertsDeleteAndRollback() + throws IOException, URISyntaxException, InterruptedException { + + // Write a 3 Data blocs with same InstantTime (written in same batch) + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List copyOfRecords1 = records1.stream().map(record -> + HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)) + .collect(Collectors.toList()); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); + writer = writer.appendBlock(dataBlock); + writer = writer.appendBlock(dataBlock); + writer = writer.appendBlock(dataBlock); + + List originalKeys = copyOfRecords1.stream() + .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect( + Collectors.toList()); + + // Delete 50 keys + List deletedKeys = originalKeys.subList(0, 50); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), + metadata); + writer = writer.appendBlock(deleteBlock); + + // Write 1 rollback block for a failed write + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101"); + HoodieCommandBlock commandBlock = new HoodieCommandBlock( + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); + writer = writer.appendBlock(commandBlock); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, + allLogFiles, schema, "101"); + assertEquals("We would read 0 records", 0, + scanner.getTotalLogRecords()); + } + + @Test + public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback() + throws IOException, URISyntaxException, InterruptedException { + + // Write a 3 Data blocs with same InstantTime (written in same batch) + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); + writer = writer.appendBlock(dataBlock); + writer = writer.appendBlock(dataBlock); + writer = writer.appendBlock(dataBlock); + + writer.close(); + // Append some arbit byte[] to thee end of the log (mimics a partially written commit) + fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); + FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); + // create a block with + outputStream.write(HoodieLogFormat.MAGIC); + outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); + // Write out a length that does not confirm with the content + outputStream.writeInt(100); + outputStream.flush(); + outputStream.close(); + + // Append some arbit byte[] to thee end of the log (mimics a partially written commit) + fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); + outputStream = fs.append(writer.getLogFile().getPath()); + // create a block with + outputStream.write(HoodieLogFormat.MAGIC); + outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); + // Write out a length that does not confirm with the content + outputStream.writeInt(100); + outputStream.flush(); + outputStream.close(); + + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + writer = writer.appendBlock(dataBlock); + writer.close(); + + // Append some arbit byte[] to thee end of the log (mimics a partially written commit) + fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); + outputStream = fs.append(writer.getLogFile().getPath()); + // create a block with + outputStream.write(HoodieLogFormat.MAGIC); + outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); + // Write out a length that does not confirm with the content + outputStream.writeInt(100); + outputStream.flush(); + outputStream.close(); + + writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + // Write 1 rollback block for a failed write + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "101"); + HoodieCommandBlock commandBlock = new HoodieCommandBlock( + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); + writer = writer.appendBlock(commandBlock); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, basePath, + allLogFiles, schema, "101"); + assertEquals("We would read 0 records", 0, + scanner.getTotalLogRecords()); + } } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java index 6add30565..939f58297 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeInputFormat.java @@ -124,7 +124,7 @@ public class HoodieRealtimeInputFormat extends HoodieInputFormat implements Conf // Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table String maxCommitTime = metaClient.getActiveTimeline() .getTimelineOfActions( - Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, + Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); rtSplits.add(