diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java index e78ce8cd7..147734990 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/ArchivedCommitsCommand.java @@ -58,7 +58,7 @@ public class ArchivedCommitsCommand implements CommandMarker { for(FileStatus fs : fsStatuses) { //read the archived file HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(), - new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(), false); List readRecords = new ArrayList<>(); //read the avro blocks diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 67bebcd6c..f402825f7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -19,7 +19,6 @@ package com.uber.hoodie; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.uber.hoodie.avro.model.HoodieCleanMetadata; import com.uber.hoodie.avro.model.HoodieRollbackMetadata; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; @@ -54,20 +53,16 @@ import com.uber.hoodie.metrics.HoodieMetrics; import com.uber.hoodie.table.UserDefinedBulkInsertPartitioner; import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.WorkloadProfile; -import org.apache.hadoop.fs.FileStatus; +import com.uber.hoodie.table.WorkloadStat; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; -import org.apache.spark.util.LongAccumulator; import scala.Option; import scala.Tuple2; @@ -297,6 +292,39 @@ public class HoodieWriteClient implements Seriali return records; } + /** + * + * Save the workload profile in an intermediate file (here re-using commit files) + * This is useful when performing rollback for MOR datasets. Only updates are recorded + * in the workload profile metadata since updates to log blocks are unknown across batches + * Inserts (which are new parquet files) are rolled back based on commit time. + * // TODO : Create a new WorkloadProfile metadata file instead of using HoodieCommitMetadata + * @param profile + * @param commitTime + * @throws HoodieCommitException + */ + private void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, HoodieTable table, String commitTime) throws HoodieCommitException { + try { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + profile.getPartitionPaths().stream().forEach(path -> { + WorkloadStat partitionStat = profile.getWorkloadStat(path.toString()); + partitionStat.getUpdateLocationToCount().entrySet().stream().forEach(entry -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(entry.getKey()); + writeStat.setNumUpdateWrites(entry.getValue()); + metadata.addWriteStat(path.toString(), writeStat); + }); + }); + + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + Optional instant = activeTimeline.filterInflights().lastInstant(); + activeTimeline.saveToInflight(instant.get(), + Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch(IOException io) { + throw new HoodieCommitException("Failed to commit " + commitTime + " unable to save inflight metadata ", io); + } + } + private JavaRDD upsertRecordsInternal(JavaRDD> preppedRecords, String commitTime, HoodieTable hoodieTable, @@ -309,6 +337,7 @@ public class HoodieWriteClient implements Seriali if (hoodieTable.isWorkloadProfileNeeded()) { profile = new WorkloadProfile(preppedRecords); logger.info("Workload profile :" + profile); + saveWorkloadProfileMetadataToInflight(profile, hoodieTable, commitTime); } // partition using the insert partitioner @@ -608,7 +637,6 @@ public class HoodieWriteClient implements Seriali return true; } - private void rollback(List commits) { if(commits.isEmpty()) { logger.info("List of commits to rollback is empty"); @@ -660,60 +688,21 @@ public class HoodieWriteClient implements Seriali ", please rollback greater commits first"); } - // Atomically unpublish all the commits - commits.stream().filter(s -> !inflights.contains(s)) - .map(s -> new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s)) - .forEach(activeTimeline::revertToInflight); - logger.info("Unpublished " + commits); + List stats = table.rollback(jsc, commits); // cleanup index entries commits.stream().forEach(s -> { if (!index.rollbackCommit(s)) { throw new HoodieRollbackException( - "Clean out index changes failed, for time :" + s); + "Rollback index changes failed, for time :" + s); } }); logger.info("Index rolled back for commits " + commits); - // delete all the data files for all these commits - logger.info("Clean out all parquet files generated for commits: " + commits); - final LongAccumulator numFilesDeletedCounter = jsc.sc().longAccumulator(); - List stats = jsc.parallelize( - FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) - .map((Function) partitionPath -> { - // Scan all partitions files with this commit time - logger.info("Cleaning path " + partitionPath); - FileSystem fs1 = FSUtils.getFs(); - FileStatus[] toBeDeleted = - fs1.listStatus(new Path(config.getBasePath(), partitionPath), path -> { - if(!path.toString().contains(".parquet")) { - return false; - } - String fileCommitTime = FSUtils.getCommitTime(path.getName()); - return commits.contains(fileCommitTime); - }); - Map results = Maps.newHashMap(); - for (FileStatus file : toBeDeleted) { - boolean success = fs1.delete(file.getPath(), false); - results.put(file, success); - logger.info("Delete file " + file.getPath() + "\t" + success); - if (success) { - numFilesDeletedCounter.add(1); - } - } - return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) - .withDeletedFileResults(results).build(); - }).collect(); - - // Remove the rolled back inflight commits - commits.stream().map(s -> new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, s)) - .forEach(activeTimeline::deleteInflight); - logger.info("Deleted inflight commits " + commits); - Optional durationInMs = Optional.empty(); if (context != null) { durationInMs = Optional.of(metrics.getDurationInMs(context.stop())); - Long numFilesDeleted = numFilesDeletedCounter.value(); + Long numFilesDeleted = stats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); } HoodieRollbackMetadata rollbackMetadata = diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 3fdcd77f0..7e4d106ec 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -16,6 +16,7 @@ package com.uber.hoodie.io; +import com.beust.jcommander.internal.Maps; import com.clearspring.analytics.util.Lists; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieDeltaWriteStat; @@ -27,6 +28,7 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.ReflectionUtils; @@ -45,6 +47,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -157,6 +160,8 @@ public class HoodieAppendHandle extends HoodieIOH List recordList = new ArrayList<>(); List keysToDelete = new ArrayList<>(); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, commitTime); records.stream().forEach(record -> { Optional indexedRecord = getIndexedRecord(record); if(indexedRecord.isPresent()) { @@ -166,9 +171,11 @@ public class HoodieAppendHandle extends HoodieIOH } }); try { - writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, schema)); + if(recordList.size() > 0) { + writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, schema, metadata)); + } if(keysToDelete.size() > 0) { - writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new))); + writer = writer.appendBlock(new HoodieDeleteBlock(keysToDelete.stream().toArray(String[]::new), metadata)); } } catch (Exception e) { throw new HoodieAppendException( diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index 5a10f91af..18279a498 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; + /** * Archiver to bound the growth of .commit files */ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 003f8e699..b1b874120 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -18,6 +18,7 @@ package com.uber.hoodie.io.compact; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.CompactionWriteStat; import com.uber.hoodie.common.model.HoodieAvroPayload; @@ -147,7 +148,14 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, operation.getDeltaFilePaths(), readerSchema); + String maxInstantTime = metaClient.getActiveTimeline() + .getTimelineOfActions( + Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, + HoodieTimeline.COMPACTION_ACTION, + HoodieTimeline.DELTA_COMMIT_ACTION)) + .filterCompletedInstants().lastInstant().get().getTimestamp(); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, operation.getDeltaFilePaths(), readerSchema, maxInstantTime); if (!scanner.iterator().hasNext()) { return Lists.newArrayList(); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index e669e7c21..9c2a80e9b 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -16,9 +16,11 @@ package com.uber.hoodie.table; +import com.google.common.collect.Maps; import com.google.common.hash.Hashing; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieCleanStat; +import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieDataFile; @@ -28,6 +30,7 @@ import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; @@ -51,6 +54,8 @@ import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -59,11 +64,24 @@ import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Option; import scala.Tuple2; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + /** * Implementation of a very heavily read-optimized Hoodie Table where * @@ -73,7 +91,7 @@ import scala.Tuple2; * UPDATES - Produce a new version of the file, just replacing the updated records with new values * */ -public class HoodieCopyOnWriteTable extends HoodieTable { +public class HoodieCopyOnWriteTable extends HoodieTable { public HoodieCopyOnWriteTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { super(config, metaClient); } @@ -501,6 +519,65 @@ public class HoodieCopyOnWriteTable extends Hoodi } } + /** + * + * Common method used for cleaning out parquet files under a partition path during rollback of a set of commits + * @param partitionPath + * @param commits + * @return + * @throws IOException + */ + protected Map deleteCleanedFiles(String partitionPath, List commits) throws IOException { + logger.info("Cleaning path " + partitionPath); + FileSystem fs = FSUtils.getFs(); + FileStatus[] toBeDeleted = + fs.listStatus(new Path(config.getBasePath(), partitionPath), path -> { + if(!path.toString().contains(".parquet")) { + return false; + } + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commits.contains(fileCommitTime); + }); + Map results = Maps.newHashMap(); + for (FileStatus file : toBeDeleted) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + logger.info("Delete file " + file.getPath() + "\t" + success); + } + return results; + } + + @Override + public List rollback(JavaSparkContext jsc, List commits) throws IOException { + String actionType = this.getCompactedCommitActionType(); + HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); + List inflights = this.getInflightCommitTimeline().getInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + + // Atomically unpublish all the commits + commits.stream().filter(s -> !inflights.contains(s)) + .map(s -> new HoodieInstant(false, actionType, s)) + .forEach(activeTimeline::revertToInflight); + logger.info("Unpublished " + commits); + + // delete all the data files for all these commits + logger.info("Clean out all parquet files generated for commits: " + commits); + List stats = jsc.parallelize( + FSUtils.getAllPartitionPaths(FSUtils.getFs(), this.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning())) + .map((Function) partitionPath -> { + // Scan all partitions files with this commit time + Map results = deleteCleanedFiles(partitionPath, commits); + return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) + .withDeletedFileResults(results).build(); + }).collect(); + + // Remove the rolled back inflight commits + commits.stream().map(s -> new HoodieInstant(true, actionType, s)) + .forEach(activeTimeline::deleteInflight); + logger.info("Deleted inflight commits " + commits); + return stats; + } + private static class PartitionCleanStat implements Serializable { private final String partitionPath; private final List deletePathPatterns = new ArrayList<>(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 04f5e9300..fe84238e3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -16,25 +16,47 @@ package com.uber.hoodie.table; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; +import com.uber.hoodie.common.HoodieRollbackStat; +import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieCompactionMetadata; +import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.log.HoodieLogFormat; +import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; +import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; -import java.util.Optional; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; -import org.apache.spark.api.java.JavaSparkContext; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + /** * Implementation of a more real-time read-optimized Hoodie Table where @@ -45,6 +67,8 @@ import org.apache.spark.api.java.JavaSparkContext; * UPDATES - Appends the changes to a rolling log file maintained per file Id. * Compaction merges the log file into the base file. * + * WARNING - MOR table type does not support nested rollbacks, every rollback + * must be followed by an attempted commit action */ public class HoodieMergeOnReadTable extends HoodieCopyOnWriteTable { private static Logger logger = LogManager.getLogger(HoodieMergeOnReadTable.class); @@ -94,4 +118,117 @@ public class HoodieMergeOnReadTable extends Hoodi } } + @Override + public List rollback(JavaSparkContext jsc, List commits) throws IOException { + + //At the moment, MOR table type does not support nested rollbacks + if(commits.size() > 1) { + throw new UnsupportedOperationException("Nested Rollbacks are not supported"); + } + Map commitsAndCompactions = + this.getActiveTimeline() + .getTimelineOfActions(Sets.newHashSet(HoodieActiveTimeline.COMMIT_ACTION, HoodieActiveTimeline.COMPACTION_ACTION, HoodieActiveTimeline.DELTA_COMMIT_ACTION)) + .getInstants() + .filter(i -> commits.contains(i.getTimestamp())) + .collect(Collectors.toMap(i -> i.getTimestamp(), i -> i)); + + // Atomically un-publish all non-inflight commits + commitsAndCompactions.entrySet().stream().map(entry -> entry.getValue()) + .filter(i -> !i.isInflight()).forEach(this.getActiveTimeline()::revertToInflight); + + logger.info("Unpublished " + commits); + + Long startTime = System.currentTimeMillis(); + + List allRollbackStats = commits.stream().map(commit -> { + HoodieInstant instant = commitsAndCompactions.get(commit); + List stats = null; + switch (instant.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: + try { + logger.info("Starting to rollback Commit/Compaction " + instant); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(this.getCommitTimeline().getInstantDetails(new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); + + stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream().collect(Collectors.toList())) + .map((Function) partitionPath -> { + Map results = super.deleteCleanedFiles(partitionPath, Arrays.asList(commit)); + return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) + .withDeletedFileResults(results).build(); + }).collect(); + logger.info("Finished rollback of Commit/Compaction " + instant); + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to rollback for commit " + commit, io); + } + case HoodieTimeline.DELTA_COMMIT_ACTION: + try { + logger.info("Starting to rollback delta commit " + instant); + + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(this.getCommitTimeline().getInstantDetails(new HoodieInstant(true, instant.getAction(), instant.getTimestamp())).get()); + + stats = jsc.parallelize(commitMetadata.getPartitionToWriteStats().keySet().stream().collect(Collectors.toList())) + .map((Function) partitionPath -> { + // read commit file and (either append delete blocks or delete file) + Map filesToDeletedStatus = new HashMap<>(); + Map filesToNumBlocksRollback = new HashMap<>(); + + // we do not know fileIds for inserts (first inserts are parquet files), delete all parquet files for the corresponding failed commit, if present (same as COW) + filesToDeletedStatus = super.deleteCleanedFiles(partitionPath, Arrays.asList(commit)); + + // append rollback blocks for updates + commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT).forEach(wStat -> { + HoodieLogFormat.Writer writer = null; + try { + writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(new Path(this.getMetaClient().getBasePath(), partitionPath)) + .withFileId(wStat.getFileId()).overBaseCommit(wStat.getPrevCommit()) + .withFs(FSUtils.getFs()).withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + Long numRollbackBlocks = 0L; + // generate metadata + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, commit); + // if update belongs to an existing log file + writer.appendBlock(new HoodieCommandBlock(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata)); + numRollbackBlocks++; + if(wStat.getNumDeletes() > 0) { + writer.appendBlock(new HoodieCommandBlock(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata)); + numRollbackBlocks++; + } + filesToNumBlocksRollback.put(FSUtils.getFs().getFileStatus(writer.getLogFile().getPath()), numRollbackBlocks); + } catch (IOException | InterruptedException io) { + throw new HoodieRollbackException("Failed to rollback for commit " + commit, io); + } finally { + try { + writer.close(); + } catch (IOException io) { + throw new UncheckedIOException(io); + } + } + }); + return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) + .withDeletedFileResults(filesToDeletedStatus) + .withRollbackBlockAppendResults(filesToNumBlocksRollback).build(); + }).collect(); + logger.info("Fnished rollback of delta commit " + instant); + break; + } catch (IOException io) { + throw new UncheckedIOException("Failed to rollback for commit " + commit, io); + } + } + return stats; + }).flatMap(x -> x.stream()).collect(Collectors.toList()); + + commitsAndCompactions.entrySet().stream() + .map(entry -> new HoodieInstant(true, entry.getValue().getAction(), entry.getValue().getTimestamp())) + .forEach(this.getActiveTimeline()::deleteInflight); + + logger.debug("Time(in ms) taken to finish rollback " + (System.currentTimeMillis() - startTime)); + + return allRollbackStats; + } + } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 9e8c2ab2c..8ed494f79 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -20,6 +20,7 @@ import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; +import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieCompactionMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; @@ -33,6 +34,7 @@ import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCommitException; import com.uber.hoodie.exception.HoodieException; +import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieSavepointException; import java.io.IOException; import java.io.Serializable; @@ -327,4 +329,17 @@ public abstract class HoodieTable implements Seri * of files cleaned. */ public abstract List clean(JavaSparkContext jsc); + + /** + * Rollback the (inflight/committed) record changes with the given commit time. + * Four steps: + * (1) Atomically unpublish this commit + * (2) clean indexing data + * (3) clean new generated parquet files / log blocks + * (4) Finally, delete ..commit or ..inflight file + * @param commits + * @return + * @throws HoodieRollbackException + */ + public abstract List rollback(JavaSparkContext jsc, List commits) throws IOException; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 2995ddf18..be4022dc1 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -181,6 +181,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); @@ -193,6 +195,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, 100); JavaRDD updateRecords = jsc.parallelize(records, 1); result = client.upsert(updateRecords, newCommitTime); @@ -214,6 +218,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 1 (only inserts) */ String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); @@ -242,6 +248,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 2 (updates) */ newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, 100); LinkedHashMap recordsMap = new LinkedHashMap<>(); for (HoodieRecord rec : records) { @@ -299,6 +307,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write actual 200 insert records and ignore 100 delete records */ String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List fewRecordsForInsert = dataGen.generateInserts(newCommitTime, 200); List fewRecordsForDelete = dataGen.generateDeletes(newCommitTime, 100); @@ -327,6 +337,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 2 (deletes+writes) */ newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + fewRecordsForDelete = records.subList(0,50); List fewRecordsForUpdate = records.subList(50,100); records = dataGen.generateDeletesFromExistingRecords(fewRecordsForDelete); @@ -378,6 +390,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 1 (only inserts) */ String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 200); List statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(statuses); @@ -386,6 +400,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 2 (updates) */ newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, records); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors @@ -403,6 +419,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 3 (updates) */ newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, records); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors @@ -412,6 +430,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 4 (updates) */ newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, records); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors @@ -433,6 +453,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { table.getCompletedSavepointTimeline().getInstants().findFirst().get().getTimestamp()); // rollback and reupsert 004 client.rollback(newCommitTime); + + client.startCommitWithTime(newCommitTime); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); @@ -462,6 +484,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 1 (only inserts) */ String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); @@ -472,6 +496,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 2 (updates) */ newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, records); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors @@ -483,6 +509,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 3 (updates) */ newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, records); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors @@ -502,6 +530,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { * Write 4 (updates) */ newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, records); statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); // Verify there are no errors @@ -917,7 +947,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { // Inserts => will write file1 String commitTime1 = "001"; - List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~5000kb + client.startCommitWithTime(commitTime1); + List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); @@ -933,6 +964,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { // Update + Inserts such that they just expand file1 String commitTime2 = "002"; + client.startCommitWithTime(commitTime2); List inserts2 = dataGen.generateInserts(commitTime2, 40); Set keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); List insertsAndUpdates2 = new ArrayList<>(); @@ -958,6 +990,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { // update + inserts such that file1 is updated and expanded, a new file2 is created. String commitTime3 = "003"; + client.startCommitWithTime(commitTime3); List insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 200); Set keys3 = HoodieClientTestUtils.getRecordKeys(insertsAndUpdates3); List updates3 = dataGen.generateUpdates(commitTime3, inserts2); @@ -1017,7 +1050,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { // Inserts => will write file1 String commitTime1 = "001"; - List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~5000kb + client.startCommitWithTime(commitTime1); + List inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses= client.insert(insertRecordsRDD1, commitTime1).collect(); @@ -1033,6 +1067,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { // Second, set of Inserts should just expand file1 String commitTime2 = "002"; + client.startCommitWithTime(commitTime2); List inserts2 = dataGen.generateInserts(commitTime2, 40); Set keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); JavaRDD insertRecordsRDD2 = jsc.parallelize(inserts2, 1); @@ -1055,6 +1090,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { // Lots of inserts such that file1 is updated and expanded, a new file2 is created. String commitTime3 = "003"; + client.startCommitWithTime(commitTime3); List insert3 = dataGen.generateInserts(commitTime3, 200); JavaRDD insertRecordsRDD3 = jsc.parallelize(insert3, 1); statuses = client.insert(insertRecordsRDD3, commitTime3).collect(); @@ -1369,6 +1405,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable { HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg); String commitTime = "000"; + client.startCommitWithTime(commitTime); + List records = dataGen.generateInserts(commitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 2934f9795..690f1d89d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -112,7 +112,7 @@ public class TestHoodieCommitArchiveLog { //read the file HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(), - new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")), HoodieArchivedMetaEntry.getClassSchema()); + new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")), HoodieArchivedMetaEntry.getClassSchema(), false); int archivedRecordsCount = 0; List readRecords = new ArrayList<>(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 1ff8c300f..784e35c37 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -138,6 +138,8 @@ public class TestHoodieCompactor { HoodieWriteConfig config = getConfig(); HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 100); JavaRDD recordsRDD = jsc.parallelize(records, 1); List statuses = writeClient.insert(recordsRDD, newCommitTime).collect(); @@ -147,6 +149,8 @@ public class TestHoodieCompactor { HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); newCommitTime = "101"; + writeClient.startCommitWithTime(newCommitTime); + List updatedRecords = dataGen.generateUpdates(newCommitTime, records); JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); HoodieIndex index = new HoodieBloomIndex<>(config, jsc); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 13e93205e..69cd2ce4b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -44,7 +44,6 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; - import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -148,13 +147,15 @@ public class TestMergeOnReadTable { @Test public void testSimpleInsertAndUpdate() throws Exception { - HoodieWriteConfig cfg = getConfig(); + HoodieWriteConfig cfg = getConfig(true); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); /** * Write 1 (only inserts) */ String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); @@ -166,12 +167,12 @@ public class TestMergeOnReadTable { HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); Optional deltaCommit = - metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); assertTrue(deltaCommit.isPresent()); assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp()); Optional commit = - metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); @@ -189,6 +190,8 @@ public class TestMergeOnReadTable { * Write 2 (updates) */ newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, 100); Map recordsMap = new HashMap<>(); for (HoodieRecord rec : records) { @@ -211,9 +214,9 @@ public class TestMergeOnReadTable { HoodieCompactor compactor = new HoodieRealtimeTableCompactor(); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true)); - compactor.compact(jsc, getConfig(), table); + compactor.compact(jsc, getConfig(true), table); allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); @@ -221,7 +224,7 @@ public class TestMergeOnReadTable { assertTrue(dataFilesToRead.findAny().isPresent()); // verify that there is a commit - table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, cfg.getBasePath(), true), getConfig()); + table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(fs, cfg.getBasePath(), true), getConfig(false)); HoodieTimeline timeline = table.getCompletedCompactionCommitTimeline(); assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); @@ -235,7 +238,7 @@ public class TestMergeOnReadTable { // Check if record level metadata is aggregated properly at the end of write. @Test public void testMetadataAggregateFromWriteStatus() throws Exception { - HoodieWriteConfig cfg = getConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build(); + HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); String newCommitTime = "001"; @@ -243,6 +246,8 @@ public class TestMergeOnReadTable { List records = dataGen.generateInserts(newCommitTime, 200); JavaRDD writeRecords = jsc.parallelize(records, 1); + client.startCommit(); + List statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); Map allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus .mergeMetadataForWriteStatuses(statuses); @@ -253,13 +258,15 @@ public class TestMergeOnReadTable { @Test public void testSimpleInsertAndDelete() throws Exception { - HoodieWriteConfig cfg = getConfig(); + HoodieWriteConfig cfg = getConfig(true); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); /** * Write 1 (only inserts, written as parquet file) */ String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); List records = dataGen.generateInserts(newCommitTime, 20); JavaRDD writeRecords = jsc.parallelize(records, 1); @@ -294,6 +301,8 @@ public class TestMergeOnReadTable { * Write 2 (only inserts, written to .log file) */ newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateInserts(newCommitTime, 20); writeRecords = jsc.parallelize(records, 1); statuses = client.upsert(writeRecords, newCommitTime).collect(); @@ -303,6 +312,8 @@ public class TestMergeOnReadTable { * Write 2 (only deletes, written to .log file) */ newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + List fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records); statuses = client.upsert(jsc.parallelize(fewRecordsForDelete, 1), newCommitTime).collect(); @@ -328,18 +339,207 @@ public class TestMergeOnReadTable { assertEquals("Must contain 20 records", 20, recordsRead.size()); } - private HoodieWriteConfig getConfig() { - return getConfigBuilder().build(); + @Test + public void testCOWToMORConvertedDatasetRollback() throws Exception { + + //Set TableType to COW + HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE); + + HoodieWriteConfig cfg = getConfig(true); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + //verify there are no errors + assertNoWriteErrors(statuses); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + + Optional commit = + metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertTrue(commit.isPresent()); + assertEquals("commit should be 001", "001", commit.get().getTimestamp()); + + /** + * Write 2 (updates) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, records); + + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + //Set TableType to MOR + HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); + + //rollback a COW commit when TableType is MOR + client.rollback(newCommitTime); + + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + FileStatus [] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + + final String absentCommit = newCommitTime; + assertFalse(roView.getLatestDataFiles().filter(file -> { + if(absentCommit.equals(file.getCommitTime())) + return true; + else + return false; + }).findAny().isPresent()); } - private HoodieWriteConfig.Builder getConfigBuilder() { + @Test + public void testRollbackWithDeltaAndCompactionCommit() throws Exception { + + HoodieWriteConfig cfg = getConfig(true); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + + // Test delta commit rollback (with all log files) + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 200); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + + Optional deltaCommit = + metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp()); + + Optional commit = + metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, + hoodieTable.getCompletedCompactionCommitTimeline(), allFiles); + Stream dataFilesToRead = roView.getLatestDataFiles(); + assertTrue(!dataFilesToRead.findAny().isPresent()); + + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + dataFilesToRead = roView.getLatestDataFiles(); + assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", + dataFilesToRead.findAny().isPresent()); + + /** + * Write 2 (updates) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, 200); + + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp()); + + commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles); + + assertEquals(recordsRead.size(), 200); + + // Test delta commit rollback + client.rollback(newCommitTime); + + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles); + + assertEquals(recordsRead.size(), 200); + + + //Test compaction commit rollback + /** + * Write 2 (updates) + */ + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + + records = dataGen.generateUpdates(newCommitTime, 400); + + statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + assertNoWriteErrors(statuses); + + HoodieCompactor compactor = new HoodieRealtimeTableCompactor(); + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true)); + + compactor.compact(jsc, getConfig(true), table); + + allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompactionCommitTimeline(), allFiles); + + final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsAndCompactionsTimeline().lastInstant().get().getTimestamp(); + + assertTrue(roView.getLatestDataFiles().filter(file -> { + if(compactedCommitTime.equals(file.getCommitTime())) + return true; + else + return false; + }).findAny().isPresent()); + + client.rollback(compactedCommitTime); + + allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); + hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompactionCommitTimeline(), allFiles); + + assertFalse(roView.getLatestDataFiles().filter(file -> { + if(compactedCommitTime.equals(file.getCommitTime())) + return true; + else + return false; + }).findAny().isPresent()); + } + + private HoodieWriteConfig getConfig(Boolean autoCommit) { + return getConfigBuilder(autoCommit).build(); + } + + private HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) { return HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withAutoCommit(autoCommit) .withCompactionConfig( HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) .withInlineCompaction(false).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) - .forTable("test-trip-table").withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); } @@ -350,4 +550,4 @@ public class TestMergeOnReadTable { assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors()); } } -} +} \ No newline at end of file diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieRollbackStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieRollbackStat.java index c809538fd..e9d271dbc 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieRollbackStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/HoodieRollbackStat.java @@ -16,10 +16,9 @@ package com.uber.hoodie.common; -import com.uber.hoodie.common.model.HoodieCleaningPolicy; -import com.uber.hoodie.common.table.timeline.HoodieInstant; import org.apache.hadoop.fs.FileStatus; +import java.io.File; import java.io.Serializable; import java.util.List; import java.util.Map; @@ -34,12 +33,19 @@ public class HoodieRollbackStat implements Serializable { private final List successDeleteFiles; // Files that could not be deleted private final List failedDeleteFiles; + // Count of HoodieLogFile to commandBlocks written for a particular rollback + private final Map commandBlocksCount; public HoodieRollbackStat(String partitionPath, List successDeleteFiles, - List failedDeleteFiles) { + List failedDeleteFiles, Map commandBlocksCount) { this.partitionPath = partitionPath; this.successDeleteFiles = successDeleteFiles; this.failedDeleteFiles = failedDeleteFiles; + this.commandBlocksCount = commandBlocksCount; + } + + public Map getCommandBlocksCount() { + return commandBlocksCount; } public String getPartitionPath() { @@ -61,6 +67,7 @@ public class HoodieRollbackStat implements Serializable { public static class Builder { private List successDeleteFiles; private List failedDeleteFiles; + private Map commandBlocksCount; private String partitionPath; public Builder withDeletedFileResults(Map deletedFiles) { @@ -72,13 +79,18 @@ public class HoodieRollbackStat implements Serializable { return this; } + public Builder withRollbackBlockAppendResults(Map commandBlocksCount) { + this.commandBlocksCount = commandBlocksCount; + return this; + } + public Builder withPartitionPath(String partitionPath) { this.partitionPath = partitionPath; return this; } public HoodieRollbackStat build() { - return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles); + return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount); } } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index 5e73ddd69..1d26ec64c 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -21,20 +21,12 @@ import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; -import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.exception.HoodieIOException; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -43,6 +35,21 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.CORRUPT_BLOCK; +import static com.uber.hoodie.common.table.log.block.HoodieLogBlock.LogMetadataType.INSTANT_TIME; + /** * Scans through all the blocks in a list of HoodieLogFile and builds up a compacted/merged * list of records which will be used as a lookup table when merging the base columnar file @@ -63,69 +70,99 @@ public class HoodieCompactedLogRecordScanner implements Iterable logFilePaths, - Schema readerSchema) { + Schema readerSchema, String latestInstantTime) { this.readerSchema = readerSchema; + this.latestInstantTime = latestInstantTime; + // Store only the last log blocks (needed to implement rollback) + Deque lastBlocks = new ArrayDeque<>(); + // Store merged records for all versions for this log file Map> records = Maps.newHashMap(); // iterate over the paths - logFilePaths.stream().map(s -> new HoodieLogFile(new Path(s))).forEach(s -> { - log.info("Scanning log file " + s.getPath()); + Iterator logFilePathsItr = logFilePaths.iterator(); + while(logFilePathsItr.hasNext()) { + HoodieLogFile logFile = new HoodieLogFile(new Path(logFilePathsItr.next())); + log.info("Scanning log file " + logFile.getPath()); totalLogFiles.incrementAndGet(); try { // Use the HoodieLogFormatReader to iterate through the blocks in the log file - HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, s, readerSchema); - // Store the records loaded from the last data block (needed to implement rollback) - Map> recordsFromLastBlock = Maps.newHashMap(); - reader.forEachRemaining(r -> { + HoodieLogFormatReader reader = new HoodieLogFormatReader(fs, logFile, readerSchema, true); + while(reader.hasNext()) { + HoodieLogBlock r = reader.next(); + String blockInstantTime = r.getLogMetadata().get(INSTANT_TIME); + if(!HoodieTimeline.compareTimestamps(blockInstantTime, this.latestInstantTime, + HoodieTimeline.LESSER_OR_EQUAL)) { + //hit a block with instant time greater than should be processed, stop processing further + break; + } switch (r.getBlockType()) { case AVRO_DATA_BLOCK: - log.info("Reading a data block from file " + s.getPath()); + log.info("Reading a data block from file " + logFile.getPath()); // If this is a avro data block, then merge the last block records into the main result - merge(records, recordsFromLastBlock); - // Load the merged records into recordsFromLastBlock - HoodieAvroDataBlock dataBlock = (HoodieAvroDataBlock) r; - loadRecordsFromBlock(dataBlock, recordsFromLastBlock); + merge(records, lastBlocks); + // store the last block + lastBlocks.push(r); break; case DELETE_BLOCK: - log.info("Reading a delete block from file " + s.getPath()); - // This is a delete block, so lets merge any records from previous data block - merge(records, recordsFromLastBlock); - // Delete the keys listed as to be deleted - HoodieDeleteBlock deleteBlock = (HoodieDeleteBlock) r; - Arrays.stream(deleteBlock.getKeysToDelete()).forEach(records::remove); + log.info("Reading a delete block from file " + logFile.getPath()); + String lastBlockInstantTime = lastBlocks.peek().getLogMetadata().get(INSTANT_TIME); + if(!lastBlockInstantTime.equals(blockInstantTime)) { + // Block with the keys listed as to be deleted, data and delete blocks written in different batches + // so it is safe to merge + // This is a delete block, so lets merge any records from previous data block + merge(records, lastBlocks); + } + // store deletes so can be rolled back + lastBlocks.push(r); break; case COMMAND_BLOCK: - log.info("Reading a command block from file " + s.getPath()); + log.info("Reading a command block from file " + logFile.getPath()); // This is a command block - take appropriate action based on the command HoodieCommandBlock commandBlock = (HoodieCommandBlock) r; - if (commandBlock.getType() == HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK) { - log.info("Rolling back the last data block read in " + s.getPath()); - // rollback the last read data block - recordsFromLastBlock.clear(); + String targetInstantForCommandBlock = r.getLogMetadata().get(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME); + switch (commandBlock.getType()) { // there can be different types of command blocks + case ROLLBACK_PREVIOUS_BLOCK: + // Rollback the last read log block + // Get commit time from last record block, compare with targetCommitTime, rollback only if equal, + // this is required in scenarios of invalid/extra rollback blocks written due to failures during + // the rollback operation itself + HoodieLogBlock lastBlock = lastBlocks.peek(); + if (lastBlock != null && lastBlock.getBlockType() != CORRUPT_BLOCK && + targetInstantForCommandBlock.contentEquals(lastBlock.getLogMetadata().get(INSTANT_TIME))) { + log.info("Rolling back the last log block read in " + logFile.getPath()); + lastBlocks.pop(); + } else if(lastBlock != null && lastBlock.getBlockType() == CORRUPT_BLOCK) { + // handle corrupt blocks separately since they may not have metadata + log.info("Rolling back the last corrupted log block read in " + logFile.getPath()); + lastBlocks.pop(); + } + else { + log.warn("Invalid or extra rollback command block in " + logFile.getPath()); + } + break; } break; case CORRUPT_BLOCK: - log.info("Found a corrupt block in " + s.getPath()); + log.info("Found a corrupt block in " + logFile.getPath()); // If there is a corrupt block - we will assume that this was the next data block - // so merge the last block records (TODO - handle when the corrupted block was a tombstone written partially?) - merge(records, recordsFromLastBlock); - recordsFromLastBlock.clear(); + lastBlocks.push(r); break; } - }); - - // merge the last read block when all the blocks are done reading - if (!recordsFromLastBlock.isEmpty()) { - log.info("Merging the final data block in " + s.getPath()); - merge(records, recordsFromLastBlock); } } catch (IOException e) { - throw new HoodieIOException("IOException when reading log file " + s); + throw new HoodieIOException("IOException when reading log file " + logFile); } - }); + // merge the last read block when all the blocks are done reading + if(!lastBlocks.isEmpty()) { + log.info("Merging the final data blocks in " + logFile.getPath()); + merge(records, lastBlocks); + } + } this.logRecords = Collections.unmodifiableCollection(records.values()); this.totalRecordsToUpdate = records.size(); } @@ -135,17 +172,14 @@ public class HoodieCompactedLogRecordScanner implements Iterable> recordsFromLastBlock) { - recordsFromLastBlock.clear(); + private Map> loadRecordsFromBlock(HoodieAvroDataBlock dataBlock) { + Map> recordsFromLastBlock = Maps.newHashMap(); List recs = dataBlock.getRecords(); totalLogRecords.addAndGet(recs.size()); recs.forEach(rec -> { String key = ((GenericRecord) rec).get(HoodieRecord.RECORD_KEY_METADATA_FIELD) - .toString(); + .toString(); String partitionPath = ((GenericRecord) rec).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD) .toString(); @@ -155,15 +189,39 @@ public class HoodieCompactedLogRecordScanner implements Iterable(new HoodieKey(key, hoodieRecord.getPartitionPath()), - combinedValue)); + .put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), + combinedValue)); } else { // Put the record as is recordsFromLastBlock.put(key, hoodieRecord); } }); + return recordsFromLastBlock; + } + + /** + * Merge the last seen log blocks with the accumulated records + * + * @param records + * @param lastBlocks + */ + private void merge(Map> records, + Deque lastBlocks) { + while (!lastBlocks.isEmpty()) { + HoodieLogBlock lastBlock = lastBlocks.pop(); + switch (lastBlock.getBlockType()) { + case AVRO_DATA_BLOCK: + merge(records, loadRecordsFromBlock((HoodieAvroDataBlock) lastBlock)); + break; + case DELETE_BLOCK: + Arrays.stream(((HoodieDeleteBlock) lastBlock).getKeysToDelete()).forEach(records::remove); + break; + case CORRUPT_BLOCK: + break; + } + } } /** @@ -178,9 +236,9 @@ public class HoodieCompactedLogRecordScanner implements Iterable(new HoodieKey(key, hoodieRecord.getPartitionPath()), - combinedValue)); + combinedValue)); } else { // Put the record as is records.put(key, hoodieRecord); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java index e08878467..bae8aa126 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java @@ -187,8 +187,8 @@ public interface HoodieLogFormat { return new WriterBuilder(); } - static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) + static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean readMetadata) throws IOException { - return new HoodieLogFormatReader(fs, logFile, readerSchema); + return new HoodieLogFormatReader(fs, logFile, readerSchema, readMetadata); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java index ca4644634..d6a513a66 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatReader.java @@ -51,15 +51,17 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private static final byte[] magicBuffer = new byte[4]; private final Schema readerSchema; private HoodieLogBlock nextBlock = null; + private boolean readMetadata = true; - HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize) throws IOException { + HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readMetadata) throws IOException { this.inputStream = fs.open(logFile.getPath(), bufferSize); this.logFile = logFile; this.readerSchema = readerSchema; + this.readMetadata = readMetadata; } - HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { - this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE); + HoodieLogFormatReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean readMetadata) throws IOException { + this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, readMetadata); } @Override @@ -93,11 +95,11 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { switch (blockType) { // based on type read the block case AVRO_DATA_BLOCK: - return HoodieAvroDataBlock.fromBytes(content, readerSchema); + return HoodieAvroDataBlock.fromBytes(content, readerSchema, readMetadata); case DELETE_BLOCK: - return HoodieDeleteBlock.fromBytes(content); + return HoodieDeleteBlock.fromBytes(content, readMetadata); case COMMAND_BLOCK: - return HoodieCommandBlock.fromBytes(content); + return HoodieCommandBlock.fromBytes(content, readMetadata); default: throw new HoodieNotSupportedException("Unsupported Block " + blockType); } @@ -113,7 +115,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { int corruptedBlockSize = (int) (nextBlockOffset - currentPos); byte[] content = new byte[corruptedBlockSize]; inputStream.readFully(content, 0, corruptedBlockSize); - return HoodieCorruptBlock.fromBytes(content); + return HoodieCorruptBlock.fromBytes(content, corruptedBlockSize, true); } private boolean isBlockCorrupt(int blocksize) throws IOException { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java index 796ded8c0..9019689b7 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieAvroDataBlock.java @@ -18,13 +18,6 @@ package com.uber.hoodie.common.table.log.block; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.exception.HoodieIOException; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; @@ -34,6 +27,15 @@ import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + /** * DataBlock contains a list of records serialized using Avro. * The Datablock contains @@ -43,16 +45,21 @@ import org.apache.avro.io.EncoderFactory; * 4. Size of a record * 5. Actual avro serialized content of the record */ -public class HoodieAvroDataBlock implements HoodieLogBlock { +public class HoodieAvroDataBlock extends HoodieLogBlock { private List records; private Schema schema; - public HoodieAvroDataBlock(List records, Schema schema) { + public HoodieAvroDataBlock(List records, Schema schema, Map metadata) { + super(metadata); this.records = records; this.schema = schema; } + public HoodieAvroDataBlock(List records, Schema schema) { + this(records, schema, null); + } + public List getRecords() { return records; } @@ -63,19 +70,25 @@ public class HoodieAvroDataBlock implements HoodieLogBlock { @Override public byte[] getBytes() throws IOException { + GenericDatumWriter writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream output = new DataOutputStream(baos); - // 1. Compress and Write schema out + // 1. Write out metadata + if(super.getLogMetadata() != null) { + output.write(HoodieLogBlock.getLogMetadataBytes(super.getLogMetadata())); + } + + // 2. Compress and Write schema out byte[] schemaContent = HoodieAvroUtils.compress(schema.toString()); output.writeInt(schemaContent.length); output.write(schemaContent); - // 2. Write total number of records + // 3. Write total number of records output.writeInt(records.size()); - // 3. Write the records + // 4. Write the records records.forEach(s -> { ByteArrayOutputStream temp = new ByteArrayOutputStream(); Encoder encoder = EncoderFactory.get().binaryEncoder(temp, null); @@ -104,9 +117,15 @@ public class HoodieAvroDataBlock implements HoodieLogBlock { return HoodieLogBlockType.AVRO_DATA_BLOCK; } - public static HoodieLogBlock fromBytes(byte[] content, Schema readerSchema) throws IOException { - // 1. Read the schema written out + public static HoodieLogBlock fromBytes(byte[] content, Schema readerSchema, boolean readMetadata) throws IOException { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content)); + Map metadata = null; + // 1. Read the metadata written out, if applicable + if(readMetadata) { + metadata = HoodieLogBlock.getLogMetadata(dis); + } + // 1. Read the schema written out int schemaLength = dis.readInt(); byte[] compressedSchema = new byte[schemaLength]; dis.readFully(compressedSchema, 0, schemaLength); @@ -133,6 +152,6 @@ public class HoodieAvroDataBlock implements HoodieLogBlock { } dis.close(); - return new HoodieAvroDataBlock(records, readerSchema); + return new HoodieAvroDataBlock(records, readerSchema, metadata); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCommandBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCommandBlock.java index be407ab6b..247e2d0ee 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCommandBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCommandBlock.java @@ -16,25 +16,42 @@ package com.uber.hoodie.common.table.log.block; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; +import java.util.Map; /** * Command block issues a specific command to the scanner */ -public class HoodieCommandBlock implements HoodieLogBlock { +public class HoodieCommandBlock extends HoodieLogBlock { private final HoodieCommandBlockTypeEnum type; public enum HoodieCommandBlockTypeEnum {ROLLBACK_PREVIOUS_BLOCK} - public HoodieCommandBlock(HoodieCommandBlockTypeEnum type) { + public HoodieCommandBlock(HoodieCommandBlockTypeEnum type, Map metadata) { + super(metadata); this.type = type; } + public HoodieCommandBlock(HoodieCommandBlockTypeEnum type) { + this(type, null); + } + @Override public byte[] getBytes() throws IOException { - return ByteBuffer.allocate(4).putInt(type.ordinal()).array(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(baos); + if(super.getLogMetadata() != null) { + output.write(HoodieLogBlock.getLogMetadataBytes(super.getLogMetadata())); + } + output.writeInt(type.ordinal()); + output.close(); + return baos.toByteArray(); } public HoodieCommandBlockTypeEnum getType() { @@ -46,8 +63,13 @@ public class HoodieCommandBlock implements HoodieLogBlock { return HoodieLogBlockType.COMMAND_BLOCK; } - public static HoodieLogBlock fromBytes(byte[] content) { - int ordinal = ByteBuffer.wrap(content).getInt(); - return new HoodieCommandBlock(HoodieCommandBlockTypeEnum.values()[ordinal]); + public static HoodieLogBlock fromBytes(byte[] content, boolean readMetadata) throws IOException { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content)); + Map metadata = null; + if(readMetadata) { + metadata = HoodieLogBlock.getLogMetadata(dis); + } + int ordinal = dis.readInt(); + return new HoodieCommandBlock(HoodieCommandBlockTypeEnum.values()[ordinal], metadata); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java index c0b6dc231..1e79a6241 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieCorruptBlock.java @@ -16,23 +16,39 @@ package com.uber.hoodie.common.table.log.block; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.util.Map; /** * Corrupt block is emitted whenever the scanner finds the length of the block written at the * beginning does not match (did not find a EOF or a sync marker after the length) */ -public class HoodieCorruptBlock implements HoodieLogBlock { +public class HoodieCorruptBlock extends HoodieLogBlock { private final byte[] corruptedBytes; - private HoodieCorruptBlock(byte[] corruptedBytes) { + private HoodieCorruptBlock(byte[] corruptedBytes, Map metadata) { + super(metadata); this.corruptedBytes = corruptedBytes; } + private HoodieCorruptBlock(byte[] corruptedBytes) { + this(corruptedBytes, null); + } + @Override public byte[] getBytes() throws IOException { - return corruptedBytes; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(baos); + if(super.getLogMetadata() != null) { + output.write(HoodieLogBlock.getLogMetadataBytes(super.getLogMetadata())); + } + output.write(corruptedBytes); + return baos.toByteArray(); } @Override @@ -40,7 +56,25 @@ public class HoodieCorruptBlock implements HoodieLogBlock { return HoodieLogBlockType.CORRUPT_BLOCK; } - public static HoodieLogBlock fromBytes(byte[] content) { - return new HoodieCorruptBlock(content); + public byte[] getCorruptedBytes() { + return corruptedBytes; + } + + public static HoodieLogBlock fromBytes(byte[] content, int blockSize, boolean readMetadata) throws IOException { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content)); + Map metadata = null; + int bytesRemaining = blockSize; + if(readMetadata) { + try { //attempt to read metadata + metadata = HoodieLogBlock.getLogMetadata(dis); + bytesRemaining = blockSize - HoodieLogBlock.getLogMetadataBytes(metadata).length; + } catch(IOException e) { + // unable to read metadata, possibly corrupted + metadata = null; + } + } + byte [] corruptedBytes = new byte[bytesRemaining]; + dis.readFully(corruptedBytes); + return new HoodieCorruptBlock(corruptedBytes, metadata); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java index b1f972022..c1256a4f6 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java @@ -16,24 +16,43 @@ package com.uber.hoodie.common.table.log.block; +import org.apache.commons.lang3.StringUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.Charset; -import org.apache.commons.lang3.StringUtils; +import java.util.Map; /** * Delete block contains a list of keys to be deleted from scanning the blocks so far */ -public class HoodieDeleteBlock implements HoodieLogBlock { +public class HoodieDeleteBlock extends HoodieLogBlock { private final String[] keysToDelete; - public HoodieDeleteBlock(String[] keysToDelete) { + public HoodieDeleteBlock(String[] keysToDelete, Map metadata) { + super(metadata); this.keysToDelete = keysToDelete; } + public HoodieDeleteBlock(String[] keysToDelete) { + this(keysToDelete, null); + } + @Override public byte[] getBytes() throws IOException { - return StringUtils.join(keysToDelete, ',').getBytes(Charset.forName("utf-8")); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(baos); + if(super.getLogMetadata() != null) { + output.write(HoodieLogBlock.getLogMetadataBytes(super.getLogMetadata())); + } + byte [] bytesToWrite = StringUtils.join(keysToDelete, ',').getBytes(Charset.forName("utf-8")); + output.writeInt(bytesToWrite.length); + output.write(bytesToWrite); + return baos.toByteArray(); } public String[] getKeysToDelete() { @@ -45,7 +64,15 @@ public class HoodieDeleteBlock implements HoodieLogBlock { return HoodieLogBlockType.DELETE_BLOCK; } - public static HoodieLogBlock fromBytes(byte[] content) { - return new HoodieDeleteBlock(new String(content).split(",")); + public static HoodieLogBlock fromBytes(byte[] content, boolean readMetadata) throws IOException { + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(content)); + Map metadata = null; + if(readMetadata) { + metadata = HoodieLogBlock.getLogMetadata(dis); + } + int dataLength = dis.readInt(); + byte [] data = new byte[dataLength]; + dis.readFully(data); + return new HoodieDeleteBlock(new String(data).split(","), metadata); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java index 63d2f8c35..cf4f90859 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieLogBlock.java @@ -16,23 +16,105 @@ package com.uber.hoodie.common.table.log.block; +import com.google.common.collect.Maps; +import com.uber.hoodie.exception.HoodieException; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; +import java.util.Map; /** - * Abstract interface defining a block in HoodieLogFile + * Abstract class defining a block in HoodieLogFile */ -public interface HoodieLogBlock { - byte[] getBytes() throws IOException; - HoodieLogBlockType getBlockType(); +public abstract class HoodieLogBlock { + + public byte[] getBytes() throws IOException { + throw new HoodieException("No implementation was provided"); + } + public HoodieLogBlockType getBlockType() { + throw new HoodieException("No implementation was provided"); + } + + //log metadata for each log block + private Map logMetadata; /** * Type of the log block * WARNING: This enum is serialized as the ordinal. Only add new enums at the end. */ - enum HoodieLogBlockType { + public enum HoodieLogBlockType { COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK } + + /** + * Metadata abstraction for a HoodieLogBlock + * WARNING : This enum is serialized as the ordinal. Only add new enums at the end. + */ + public enum LogMetadataType { + INSTANT_TIME, + TARGET_INSTANT_TIME; + } + + public HoodieLogBlock(Map logMetadata) { + this.logMetadata = logMetadata; + } + + public Map getLogMetadata() { + return logMetadata; + } + + /** + * Convert log metadata to bytes + * 1. Write size of metadata + * 2. Write enum ordinal + * 3. Write actual bytes + * @param metadata + * @return + * @throws IOException + */ + public static byte [] getLogMetadataBytes(Map metadata) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream output = new DataOutputStream(baos); + output.writeInt(metadata.size()); + for(Map.Entry entry : metadata.entrySet()) { + output.writeInt(entry.getKey().ordinal()); + byte [] bytes = entry.getValue().getBytes(); + output.writeInt(bytes.length); + output.write(bytes); + } + return baos.toByteArray(); + } + + /** + * Convert bytes to LogMetadata, follow the same order as + * {@link HoodieLogBlock#getLogMetadataBytes} + * @param dis + * @return + * @throws IOException + */ + public static Map getLogMetadata(DataInputStream dis) throws IOException { + + Map metadata = Maps.newHashMap(); + // 1. Read the metadata written out + int metadataCount = dis.readInt(); + try { + while (metadataCount > 0) { + int metadataEntryIndex = dis.readInt(); + int metadataEntrySize = dis.readInt(); + byte[] metadataEntry = new byte[metadataEntrySize]; + dis.readFully(metadataEntry, 0, metadataEntrySize); + metadata.put(LogMetadataType.values()[metadataEntryIndex], new String(metadataEntry)); + metadataCount--; + } + return metadata; + } catch(EOFException eof) { + throw new IOException("Could not read metadata fields ", eof); + } + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 2a5b46300..3d739f156 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -256,7 +256,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return readDataFromPath(detailPath); } - protected void moveInflightToComplete(HoodieInstant inflight, HoodieInstant completed, Optional data) { Path commitFilePath = new Path(metaPath, completed.getFileName()); @@ -290,6 +289,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } } + public void saveToInflight(HoodieInstant instant, Optional content) { + createFileInMetaPath(instant.getFileName(), content); + } + protected void createFileInMetaPath(String filename, Optional content) { Path fullPath = new Path(metaPath, filename); try { diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index f3ed87476..c41d129fa 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -16,12 +16,12 @@ package com.uber.hoodie.common.model; -import com.google.common.collect.Lists; - import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.uber.hoodie.avro.model.HoodieCleanMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.table.HoodieTableConfig; @@ -30,6 +30,7 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; @@ -249,6 +250,8 @@ public class HoodieTestUtils { .overBaseCommit(location.getCommitTime()) .withFs(fs).build(); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, location.getCommitTime()); logWriter.appendBlock(new HoodieAvroDataBlock(s.getValue().stream().map(r -> { try { GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get(); @@ -260,7 +263,7 @@ public class HoodieTestUtils { } catch (IOException e) { return null; } - }).collect(Collectors.toList()), schema)); + }).collect(Collectors.toList()), schema, metadata)); logWriter.close(); } catch (Exception e) { fail(e.toString()); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 4e92d9aef..57530064c 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.table.log; +import com.google.common.collect.Maps; import com.uber.hoodie.common.minicluster.MiniClusterUtil; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; @@ -24,16 +25,13 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; import com.uber.hoodie.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum; +import com.uber.hoodie.common.table.log.block.HoodieCorruptBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.SchemaTestUtil; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; @@ -49,7 +47,12 @@ import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema; @@ -107,8 +110,10 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records = SchemaTestUtil.generateTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); long size = writer.getCurrentSize(); assertTrue("We just wrote a block - size should be > 0", size > 0); @@ -124,8 +129,10 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records = SchemaTestUtil.generateTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); // Write out a block writer = writer.appendBlock(dataBlock); // Get the size of the block @@ -138,7 +145,7 @@ public class HoodieLogFormatTest { .overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build(); records = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); assertEquals("This should be a new log file and hence size should be 0", 0, writer.getCurrentSize()); @@ -152,8 +159,10 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records = SchemaTestUtil.generateTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); long size1 = writer.getCurrentSize(); writer.close(); @@ -163,7 +172,7 @@ public class HoodieLogFormatTest { .overBaseCommit("100").withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); long size2 = writer.getCurrentSize(); assertTrue("We just wrote a new block - size2 should be > size1", size2 > size1); @@ -178,7 +187,7 @@ public class HoodieLogFormatTest { .overBaseCommit("100").withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); long size3 = writer.getCurrentSize(); assertTrue("We just wrote a new block - size3 should be > size2", size3 > size2); @@ -202,8 +211,10 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records = SchemaTestUtil.generateTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); long size1 = writer.getCurrentSize(); // do not close this writer - this simulates a data note appending to a log dying without closing the file @@ -214,7 +225,7 @@ public class HoodieLogFormatTest { .overBaseCommit("100").withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); long size2 = writer.getCurrentSize(); assertTrue("We just wrote a new block - size2 should be > size1", size2 > size1); @@ -232,12 +243,14 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records = SchemaTestUtil.generateTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); writer.close(); - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), true); assertTrue("We wrote a block, we should be able to read it", reader.hasNext()); HoodieLogBlock nextBlock = reader.next(); assertEquals("The next block should be a data block", HoodieLogBlockType.AVRO_DATA_BLOCK, @@ -257,8 +270,10 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records1 = SchemaTestUtil.generateTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -267,7 +282,7 @@ public class HoodieLogFormatTest { .overBaseCommit("100").withFs(fs).build(); List records2 = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = new HoodieAvroDataBlock(records2, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -277,11 +292,11 @@ public class HoodieLogFormatTest { .overBaseCommit("100").withFs(fs).build(); List records3 = SchemaTestUtil.generateTestRecords(0, 100); dataBlock = new HoodieAvroDataBlock(records3, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); writer.close(); - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), true); assertTrue("First block should be available", reader.hasNext()); HoodieLogBlock nextBlock = reader.next(); HoodieAvroDataBlock dataBlockRead = (HoodieAvroDataBlock) nextBlock; @@ -312,8 +327,10 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); List records = SchemaTestUtil.generateTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, - getSimpleSchema()); + getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -324,20 +341,24 @@ public class HoodieLogFormatTest { outputStream.write(HoodieLogFormat.MAGIC); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); // Write out a length that does not confirm with the content - outputStream.writeInt(100); + outputStream.writeInt(1000); + // Write out some metadata + // TODO : test for failure to write metadata - NA ? + outputStream.write(HoodieLogBlock.getLogMetadataBytes(metadata)); outputStream.write("something-random".getBytes()); outputStream.flush(); outputStream.close(); // First round of reads - we should be able to read the first block and then EOF - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), true); assertTrue("First block should be available", reader.hasNext()); reader.next(); assertTrue("We should have corrupted block next", reader.hasNext()); HoodieLogBlock block = reader.next(); assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType()); - assertEquals("", "something-random", new String(block.getBytes())); + HoodieCorruptBlock corruptBlock = (HoodieCorruptBlock) block; + assertEquals("", "something-random", new String(corruptBlock.getCorruptedBytes())); assertFalse("There should be no more block left", reader.hasNext()); // Simulate another failure back to back @@ -346,7 +367,10 @@ public class HoodieLogFormatTest { outputStream.write(HoodieLogFormat.MAGIC); outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); // Write out a length that does not confirm with the content - outputStream.writeInt(100); + outputStream.writeInt(1000); + // Write out some metadata + // TODO : test for failure to write metadata - NA ? + outputStream.write(HoodieLogBlock.getLogMetadataBytes(metadata)); outputStream.write("something-else-random".getBytes()); outputStream.flush(); outputStream.close(); @@ -356,12 +380,12 @@ public class HoodieLogFormatTest { .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); records = SchemaTestUtil.generateTestRecords(0, 100); - dataBlock = new HoodieAvroDataBlock(records, getSimpleSchema()); + dataBlock = new HoodieAvroDataBlock(records, getSimpleSchema(), metadata); writer = writer.appendBlock(dataBlock); writer.close(); // Second round of reads - we should be able to read the first and last block - reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema()); + reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), true); assertTrue("First block should be available", reader.hasNext()); reader.next(); assertTrue("We should get the 1st corrupted block next", reader.hasNext()); @@ -370,7 +394,8 @@ public class HoodieLogFormatTest { block = reader.next(); assertEquals("The read block should be a corrupt block", HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType()); - assertEquals("", "something-else-random", new String(block.getBytes())); + corruptBlock = (HoodieCorruptBlock) block; + assertEquals("", "something-else-random", new String(corruptBlock.getCorruptedBytes())); assertTrue("We should get the last block next", reader.hasNext()); reader.next(); assertFalse("We should have no more blocks left", reader.hasNext()); @@ -388,12 +413,15 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); writer = writer.appendBlock(dataBlock); // Write 2 List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - dataBlock = new HoodieAvroDataBlock(records2, schema); + dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -402,8 +430,7 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, - schema); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); assertEquals("", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -428,22 +455,27 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); + + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); writer = writer.appendBlock(dataBlock); // Write 2 List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - dataBlock = new HoodieAvroDataBlock(records2, schema); + dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); writer = writer.appendBlock(dataBlock); // Rollback the last write HoodieCommandBlock commandBlock = new HoodieCommandBlock( - HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK); + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); writer = writer.appendBlock(commandBlock); // Write 3 List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - dataBlock = new HoodieAvroDataBlock(records3, schema); + dataBlock = new HoodieAvroDataBlock(records3, schema, metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -452,9 +484,8 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, - schema); - assertEquals("We still would read 300 records, but only 200 of them are valid", 300, + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + assertEquals("We only read 200 records, since 200 of them are valid", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); @@ -479,7 +510,11 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -491,13 +526,16 @@ public class HoodieLogFormatTest { outputStream.writeInt(HoodieLogBlockType.AVRO_DATA_BLOCK.ordinal()); // Write out a length that does not confirm with the content outputStream.writeInt(100); + // Write out some metadata + // TODO : test for failure to write metadata - NA ? + outputStream.write(HoodieLogBlock.getLogMetadataBytes(metadata)); outputStream.write("something-random".getBytes()); outputStream.flush(); outputStream.close(); // Rollback the last write HoodieCommandBlock commandBlock = new HoodieCommandBlock( - HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK); + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") .overBaseCommit("100").withFs(fs).build(); @@ -505,7 +543,7 @@ public class HoodieLogFormatTest { // Write 3 List records3 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - dataBlock = new HoodieAvroDataBlock(records3, schema); + dataBlock = new HoodieAvroDataBlock(records3, schema, metadata); writer = writer.appendBlock(dataBlock); writer.close(); @@ -514,8 +552,7 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, - schema); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); assertEquals("We would read 200 records", 200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); @@ -531,7 +568,7 @@ public class HoodieLogFormatTest { } @Test - public void testAvroLogRecordReaderWithDelete() + public void testAvroLogRecordReaderWithDeleteAndRollback() throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -541,12 +578,16 @@ public class HoodieLogFormatTest { // Write 1 List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, schema); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); writer = writer.appendBlock(dataBlock); // Write 2 List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); - dataBlock = new HoodieAvroDataBlock(records2, schema); + dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); writer = writer.appendBlock(dataBlock); records1.addAll(records2); @@ -557,7 +598,8 @@ public class HoodieLogFormatTest { // Delete 50 keys List deletedKeys = originalKeys.subList(0, 50); - HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50])); + + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), metadata); writer = writer.appendBlock(deleteBlock); List allLogFiles = FSUtils @@ -565,17 +607,170 @@ public class HoodieLogFormatTest { .map(s -> s.getPath().toString()) .collect(Collectors.toList()); - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, - schema); + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); assertEquals("We still would read 200 records", 200, scanner.getTotalLogRecords()); - List readKeys = new ArrayList<>(200); + final List readKeys = new ArrayList<>(200); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); - assertEquals("Stream collect should return all 200 records", 150, readKeys.size()); + assertEquals("Stream collect should return all 150 records", 150, readKeys.size()); originalKeys.removeAll(deletedKeys); Collections.sort(originalKeys); Collections.sort(readKeys); - assertEquals("CompositeAvroLogReader should return 200 records from 2 versions", originalKeys, + assertEquals("CompositeAvroLogReader should return 150 records from 2 versions", originalKeys, readKeys); + + // Rollback the last block + HoodieCommandBlock commandBlock = new HoodieCommandBlock( + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); + writer = writer.appendBlock(commandBlock); + + readKeys.clear(); + scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); + assertEquals("Stream collect should return all 200 records after rollback of delete", 200, readKeys.size()); + } + + @Test + public void testAvroLogRecordReaderWithFailedRollbacks() + throws IOException, URISyntaxException, InterruptedException { + + // Write a Data block and Delete block with same InstantTime (written in same batch) + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); + writer = writer.appendBlock(dataBlock); + + // Write 2 + List records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + dataBlock = new HoodieAvroDataBlock(records2, schema, metadata); + writer = writer.appendBlock(dataBlock); + + List originalKeys = records1.stream() + .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect( + Collectors.toList()); + + // Delete 50 keys + List deletedKeys = originalKeys.subList(0, 50); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), metadata); + writer = writer.appendBlock(deleteBlock); + + // Attemp 1 : Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write + HoodieCommandBlock commandBlock = new HoodieCommandBlock( + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); + try { + writer = writer.appendBlock(commandBlock); + // Say job failed, retry writing 2 rollback in the next rollback(..) attempt + throw new Exception("simulating failure"); + } catch(Exception e) { + // it's okay + } + // Attempt 2 : Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write + writer = writer.appendBlock(commandBlock); + writer = writer.appendBlock(commandBlock); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + assertEquals("We would read 100 records", 100, + scanner.getTotalLogRecords()); + + final List readKeys = new ArrayList<>(100); + scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); + assertEquals("Stream collect should return all 150 records", 100, readKeys.size()); + } + + @Test + public void testAvroLogRecordReaderWithInsertDeleteAndRollback() + throws IOException, URISyntaxException, InterruptedException { + + // Write a Data block and Delete block with same InstantTime (written in same batch) + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "100"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); + writer = writer.appendBlock(dataBlock); + + List originalKeys = records1.stream() + .map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect( + Collectors.toList()); + + // Delete 50 keys + List deletedKeys = originalKeys.subList(0, 50); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deletedKeys.toArray(new String[50]), metadata); + writer = writer.appendBlock(deleteBlock); + + // Write 2 rollback blocks (1 data block + 1 delete bloc) for a failed write + HoodieCommandBlock commandBlock = new HoodieCommandBlock( + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); + writer = writer.appendBlock(commandBlock); + writer = writer.appendBlock(commandBlock); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + assertEquals("We would read 0 records", 0, + scanner.getTotalLogRecords()); + } + + @Test + public void testAvroLogRecordReaderWithInvalidRollback() throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1") + .overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, "100"); + metadata.put(HoodieLogBlock.LogMetadataType.TARGET_INSTANT_TIME, "101"); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, + schema, metadata); + writer = writer.appendBlock(dataBlock); + + // Write invalid rollback for a failed write (possible for in-flight commits) + HoodieCommandBlock commandBlock = new HoodieCommandBlock( + HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK, metadata); + writer = writer.appendBlock(commandBlock); + + List allLogFiles = FSUtils + .getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") + .map(s -> s.getPath().toString()) + .collect(Collectors.toList()); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, allLogFiles, schema, "100"); + assertEquals("We still would read 100 records", 100, + scanner.getTotalLogRecords()); + final List readKeys = new ArrayList<>(100); + scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); + assertEquals("Stream collect should return all 150 records", 100, readKeys.size()); } } diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index b68a299db..591fb56aa 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -18,16 +18,12 @@ package com.uber.hoodie.hadoop.realtime; -import com.google.common.collect.Lists; import com.uber.hoodie.common.model.HoodieAvroPayload; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.exception.HoodieIOException; -import java.util.Set; -import java.util.TreeMap; -import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericFixed; @@ -49,6 +45,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; +import parquet.avro.AvroSchemaConverter; +import parquet.hadoop.ParquetFileReader; +import parquet.schema.MessageType; import java.io.IOException; import java.util.ArrayList; @@ -56,10 +55,9 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - -import parquet.avro.AvroSchemaConverter; -import parquet.hadoop.ParquetFileReader; -import parquet.schema.MessageType; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; /** * Record Reader implementation to merge fresh avro data with base parquet data, to support real time @@ -127,7 +125,7 @@ public class HoodieRealtimeRecordReader implements RecordReader the commit we are trying to read (if using readCommit() API) diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index 990808193..a93189e3a 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -19,11 +19,13 @@ package com.uber.hoodie.hadoop.realtime; +import com.google.common.collect.Maps; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.common.util.SchemaTestUtil; @@ -55,6 +57,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.junit.Assert.assertTrue; @@ -81,7 +84,9 @@ public class HoodieRealtimeRecordReaderTest { records.add(SchemaTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); } Schema writeSchema = records.get(0).getSchema(); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, newCommit); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, writeSchema, metadata); writer = writer.appendBlock(dataBlock); long size = writer.getCurrentSize(); return writer; diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java index d9d39977b..5f0dc9337 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HoodieHiveClient.java @@ -395,7 +395,7 @@ public class HoodieHiveClient { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private MessageType readSchemaFromLogFile(Optional lastCompactionCommitOpt, Path path) throws IOException { - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true); HoodieAvroDataBlock lastBlock = null; while (reader.hasNext()) { HoodieLogBlock block = reader.next(); diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java index 3c23c7ef0..c69b96840 100644 --- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java +++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java @@ -16,10 +16,8 @@ package com.uber.hoodie.hive; -import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_TASK_PARTITIONID; -import static org.junit.Assert.fail; - import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.uber.hoodie.avro.HoodieAvroWriteSupport; import com.uber.hoodie.common.BloomFilter; @@ -38,19 +36,10 @@ import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; +import com.uber.hoodie.common.table.log.block.HoodieLogBlock; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.SchemaTestUtil; import com.uber.hoodie.hive.util.HiveTestService; -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.commons.io.FileUtils; @@ -70,6 +59,19 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.junit.runners.model.InitializationError; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; + +import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_TASK_PARTITIONID; +import static org.junit.Assert.fail; + @SuppressWarnings("SameParameterValue") public class TestUtil { @@ -306,7 +308,9 @@ public class TestUtil { List records = (isLogSchemaSimple ? SchemaTestUtil .generateTestRecords(0, 100) : SchemaTestUtil.generateEvolvedTestRecords(100, 100)); - HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, schema); + Map metadata = Maps.newHashMap(); + metadata.put(HoodieLogBlock.LogMetadataType.INSTANT_TIME, dataFile.getCommitTime()); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, schema, metadata); logWriter.appendBlock(dataBlock); logWriter.close(); return logWriter.getLogFile();