From 0eaa21111a1e1bcd9f0597c62b33d31866ac43c6 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Wed, 28 Feb 2018 15:58:19 -0800 Subject: [PATCH] Re-factoring Compaction as first level API in WriteClient similar to upsert/insert --- .../com/uber/hoodie/HoodieWriteClient.java | 118 ++++++++++- .../hoodie/config/HoodieCompactionConfig.java | 2 +- .../hoodie/io/compact/HoodieCompactor.java | 4 +- .../compact/HoodieRealtimeTableCompactor.java | 194 ++++++++---------- .../hoodie/table/HoodieCopyOnWriteTable.java | 13 +- .../hoodie/table/HoodieMergeOnReadTable.java | 22 +- .../com/uber/hoodie/table/HoodieTable.java | 17 +- .../uber/hoodie/io/TestHoodieCompactor.java | 34 +-- .../hoodie/table/TestMergeOnReadTable.java | 127 +++++++++--- 9 files changed, 328 insertions(+), 203 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 81224fab7..42778e7dd 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -40,6 +40,7 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCommitException; +import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.exception.HoodieInsertException; import com.uber.hoodie.exception.HoodieRollbackException; @@ -353,10 +354,11 @@ public class HoodieWriteClient implements Seriali return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime); } - private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD) { + private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD, + String actionType) { if (config.shouldAutoCommit()) { logger.info("Auto commit enabled: Committing " + commitTime); - boolean commitResult = commit(commitTime, resultRDD); + boolean commitResult = commit(commitTime, resultRDD, Optional.empty(), actionType); if (!commitResult) { throw new HoodieCommitException("Failed to commit " + commitTime); } @@ -454,7 +456,7 @@ public class HoodieWriteClient implements Seriali JavaRDD statuses = index.updateLocation(writeStatusRDD, table); // Trigger the insert and collect statuses statuses = statuses.persist(config.getWriteStatusStorageLevel()); - commitOnAutoCommit(commitTime, statuses); + commitOnAutoCommit(commitTime, statuses, table.getCommitActionType()); return statuses; } @@ -482,6 +484,14 @@ public class HoodieWriteClient implements Seriali public boolean commit(String commitTime, JavaRDD writeStatuses, Optional> extraMetadata) { + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), + config); + return commit(commitTime, writeStatuses, extraMetadata, table.getCommitActionType()); + } + + private boolean commit(String commitTime, JavaRDD writeStatuses, + Optional> extraMetadata, String actionType) { logger.info("Commiting " + commitTime); // Create a Hoodie table which encapsulated the commits and files visible @@ -519,7 +529,6 @@ public class HoodieWriteClient implements Seriali } try { - String actionType = table.getCommitActionType(); activeTimeline.saveAsComplete( new HoodieInstant(true, actionType, commitTime), Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); @@ -917,17 +926,80 @@ public class HoodieWriteClient implements Seriali new HoodieInstant(true, commitActionType, commitTime)); } + /** + * Provides a new commit time for a compaction (commit) operation + */ + public String startCompaction() { + String commitTime = HoodieActiveTimeline.createNewCommitTime(); + logger.info("Generate a new commit time " + commitTime); + startCompactionWithTime(commitTime); + return commitTime; + } + + /** Since MOR tableType default to {@link HoodieTimeline#DELTA_COMMIT_ACTION}, + * we need to explicitly set to {@link HoodieTimeline#COMMIT_ACTION} for compaction + */ + public void startCompactionWithTime(String commitTime) { + HoodieTable table = HoodieTable.getHoodieTable( + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), + config); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + String commitActionType = HoodieTimeline.COMMIT_ACTION; + activeTimeline.createInflight( + new HoodieInstant(true, commitActionType, commitTime)); + } + /** * Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed * asynchronously. Please always use this serially before or after an insert/upsert action. */ - private void compact(String compactionCommitTime) throws IOException { + public JavaRDD compact(String commitTime) throws IOException { // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config); - Optional compactionMetadata = table.compact(jsc, compactionCommitTime); - if (compactionMetadata.isPresent()) { + JavaRDD statuses = table.compact(jsc, commitTime); + // Trigger the insert and collect statuses + statuses = statuses.persist(config.getWriteStatusStorageLevel()); + String actionType = HoodieActiveTimeline.COMMIT_ACTION; + commitOnAutoCommit(commitTime, statuses, actionType); + return statuses; + } + + /** + * Commit a compaction operation + * @param commitTime + * @param writeStatuses + * @param extraMetadata + */ + public void commitCompaction(String commitTime, JavaRDD writeStatuses, + Optional> extraMetadata) { + String commitCompactionActionType = HoodieActiveTimeline.COMMIT_ACTION; + commit(commitTime, writeStatuses, extraMetadata, commitCompactionActionType); + } + + /** + * Commit a compaction operation + * @param commitTime + * @param writeStatuses + */ + public void commitCompaction(String commitTime, JavaRDD writeStatuses) { + String commitCompactionActionType = HoodieActiveTimeline.COMMIT_ACTION; + commit(commitTime, writeStatuses, Optional.empty(), commitCompactionActionType); + } + + /** + * Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed + * asynchronously. Please always use this serially before or after an insert/upsert action. + */ + private void forceCompact(String compactionCommitTime) throws IOException { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTableMetaClient metaClient = + new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); + JavaRDD compactedStatuses = table.compact(jsc, compactionCommitTime); + if (!compactedStatuses.isEmpty()) { + commitForceCompaction(compactedStatuses, metaClient, compactionCommitTime); logger.info("Compacted successfully on commit " + compactionCommitTime); } else { logger.info("Compaction did not run for commit " + compactionCommitTime); @@ -938,12 +1010,38 @@ public class HoodieWriteClient implements Seriali * Performs a compaction operation on a dataset. WARNING: Compaction operation cannot be executed * asynchronously. Please always use this serially before or after an insert/upsert action. */ - public String forceCompact() throws IOException { - String compactionCommitTime = HoodieActiveTimeline.createNewCommitTime(); - compact(compactionCommitTime); + private String forceCompact() throws IOException { + String compactionCommitTime = startCompaction(); + forceCompact(compactionCommitTime); return compactionCommitTime; } + private void commitForceCompaction(JavaRDD writeStatuses, + HoodieTableMetaClient metaClient, + String compactionCommitTime) { + List updateStatusMap = writeStatuses.map(writeStatus -> writeStatus.getStat()) + .collect(); + + HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); + for (HoodieWriteStat stat : updateStatusMap) { + metadata.addWriteStat(stat.getPartitionPath(), stat); + } + + logger.info("Compaction finished with result " + metadata); + + logger.info("Committing Compaction " + compactionCommitTime); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + try { + activeTimeline.saveAsComplete( + new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, compactionCommitTime), + Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException e) { + throw new HoodieCompactionException( + "Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e); + } + } + public static SparkConf registerClasses(SparkConf conf) { conf.registerKryoClasses( new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 6579ccf9f..56257fdc3 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -42,7 +42,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { // Turn on inline compaction - after fw delta commits a inline compaction will be run public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline"; - private static final String DEFAULT_INLINE_COMPACT = "true"; + private static final String DEFAULT_INLINE_COMPACT = "false"; // Run a compaction every N delta commits public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max.delta.commits"; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index fe8227f2d..9011a82b2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -16,12 +16,14 @@ package com.uber.hoodie.io.compact; +import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.Serializable; @@ -35,7 +37,7 @@ public interface HoodieCompactor extends Serializable { /** * Compact the delta files with the data files */ - HoodieCommitMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, + JavaRDD compact(JavaSparkContext jsc, final HoodieWriteConfig config, HoodieTable hoodieTable, String compactionCommitTime) throws Exception; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 709c0e9fe..62a606efa 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -16,44 +16,36 @@ package com.uber.hoodie.io.compact; +import static java.util.stream.Collectors.toList; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; -import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; -import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; -import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; -import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; -import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieTable; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import static java.util.stream.Collectors.toList; - /** * HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. Computes all * possible compactions, passes it through a CompactionFilter and executes all the compactions and @@ -66,8 +58,78 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class); @Override - public HoodieCommitMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, - HoodieTable hoodieTable, String compactionCommitTime) throws IOException { + public JavaRDD compact(JavaSparkContext jsc, HoodieWriteConfig config, + HoodieTable hoodieTable, String compactionCommitTime) throws IOException { + + List operations = getCompactionWorkload(jsc, hoodieTable, config, + compactionCommitTime); + if (operations == null) { + return jsc.emptyRDD(); + } + return executeCompaction(jsc, operations, hoodieTable, config, compactionCommitTime); + } + + private JavaRDD executeCompaction(JavaSparkContext jsc, + List operations, + HoodieTable hoodieTable, + HoodieWriteConfig config, String compactionCommitTime) throws IOException { + + log.info("After filtering, Compacting " + operations + " files"); + return jsc.parallelize(operations, operations.size()) + .map(s -> compact(hoodieTable, config, s, compactionCommitTime)) + .flatMap(writeStatusesItr -> writeStatusesItr.iterator()); + } + + private List compact(HoodieTable hoodieTable, + HoodieWriteConfig config, CompactionOperation operation, String commitTime) + throws IOException { + FileSystem fs = hoodieTable.getMetaClient().getFs(); + Schema readerSchema = + HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + + log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation + .getDeltaFilePaths() + " for commit " + commitTime); + // TODO - FIX THIS + // Reads the entire avro file. Always only specific blocks should be read from the avro file (failure recover). + // Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader + // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + String maxInstantTime = metaClient.getActiveTimeline() + .getTimelineOfActions( + Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, + HoodieTimeline.DELTA_COMMIT_ACTION)) + .filterCompletedInstants().lastInstant().get().getTimestamp(); + + HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, + metaClient.getBasePath(), operation.getDeltaFilePaths(), readerSchema, maxInstantTime, + config.getMaxMemorySizePerCompactionInBytes(), config.getCompactionLazyBlockReadEnabled(), + config.getCompactionReverseLogReadEnabled()); + if (!scanner.iterator().hasNext()) { + return Lists.newArrayList(); + } + + // Compacting is very similar to applying updates to existing file + HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metaClient); + Iterator> result = table + .handleUpdate(commitTime, operation.getFileId(), scanner.getRecords()); + Iterable> resultIterable = () -> result; + return StreamSupport.stream(resultIterable.spliterator(), false) + .flatMap(Collection::stream) + .map(s -> { + s.getStat().setTotalRecordsToBeUpdate(scanner.getTotalRecordsToUpdate()); + s.getStat().setTotalLogFiles(scanner.getTotalLogFiles()); + s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); + s.getStat().setPartitionPath(operation.getPartitionPath()); + return s; + }) + .collect(toList()); + } + + private List getCompactionWorkload(JavaSparkContext jsc, + HoodieTable hoodieTable, + HoodieWriteConfig config, String compactionCommitTime) + throws IOException { + Preconditions.checkArgument( hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "HoodieRealtimeTableCompactor can only compact table of type " @@ -89,8 +151,9 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { .flatMap((FlatMapFunction) partitionPath -> fileSystemView .getLatestFileSlices(partitionPath) .map(s -> new CompactionOperation(s.getDataFile().get(), - partitionPath, s.getLogFiles().sorted(HoodieLogFile.getLogVersionComparator().reversed()) - .collect(Collectors.toList()), config)) + partitionPath, + s.getLogFiles().sorted(HoodieLogFile.getLogVersionComparator().reversed()) + .collect(Collectors.toList()), config)) .filter(c -> !c.getDeltaFilePaths().isEmpty()) .collect(toList()).iterator()).collect(); log.info("Total of " + operations.size() + " compactions are retrieved"); @@ -101,96 +164,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { log.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); return null; } - - log.info("After filtering, Compacting " + operations + " files"); - List updateStatusMap = - jsc.parallelize(operations, operations.size()) - .map(s -> executeCompaction(hoodieTable, config, s, compactionCommitTime)) - .flatMap(writeStatList -> writeStatList.iterator()) - .collect(); - - HoodieCommitMetadata metadata = new HoodieCommitMetadata(true); - for (HoodieWriteStat stat : updateStatusMap) { - metadata.addWriteStat(stat.getPartitionPath(), stat); - } - - log.info("Compaction finished with result " + metadata); - - //noinspection ConstantConditions - if (isCompactionSucceeded(metadata)) { - log.info("Compaction succeeded " + compactionCommitTime); - commitCompaction(compactionCommitTime, metaClient, metadata); - } else { - log.info("Compaction failed " + compactionCommitTime); - } - return metadata; - } - - private boolean isCompactionSucceeded(HoodieCommitMetadata result) { - //TODO figure out a success factor for a compaction - return true; - } - - - private List executeCompaction(HoodieTable hoodieTable, - HoodieWriteConfig config, CompactionOperation operation, String commitTime) - throws IOException { - FileSystem fs = hoodieTable.getMetaClient().getFs(); - Schema readerSchema = - HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); - - log.info("Compacting base " + operation.getDataFilePath() + " with delta files " + operation - .getDeltaFilePaths() + " for commit " + commitTime); - // TODO - FIX THIS - // Reads the entire avro file. Always only specific blocks should be read from the avro file (failure recover). - // Load all the delta commits since the last compaction commit and get all the blocks to be loaded and load it using CompositeAvroLogReader - // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon. - HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - String maxInstantTime = metaClient.getActiveTimeline() - .getTimelineOfActions( - Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, - HoodieTimeline.DELTA_COMMIT_ACTION)) - .filterCompletedInstants().lastInstant().get().getTimestamp(); - - HoodieCompactedLogRecordScanner scanner = new HoodieCompactedLogRecordScanner(fs, - metaClient.getBasePath(), - operation.getDeltaFilePaths(), readerSchema, maxInstantTime, config.getMaxMemorySizePerCompactionInBytes(), - config.getCompactionLazyBlockReadEnabled(), config.getCompactionReverseLogReadEnabled()); - if (!scanner.iterator().hasNext()) { - return Lists.newArrayList(); - } - - // Compacting is very similar to applying updates to existing file - HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metaClient); - Iterator> result = table - .handleUpdate(commitTime, operation.getFileId(), scanner.getRecords()); - Iterable> resultIterable = () -> result; - return StreamSupport.stream(resultIterable.spliterator(), false) - .flatMap(Collection::stream) - .map(WriteStatus::getStat) - .map(s -> { - s.setTotalRecordsToBeUpdate(scanner.getTotalRecordsToUpdate()); - s.setTotalLogFiles(scanner.getTotalLogFiles()); - s.setTotalLogRecords(scanner.getTotalLogRecords()); - s.setPartitionPath(operation.getPartitionPath()); - return s;}) - .collect(toList()); - } - - public boolean commitCompaction(String commitTime, HoodieTableMetaClient metaClient, - HoodieCommitMetadata metadata) { - log.info("Committing Compaction " + commitTime); - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - - try { - activeTimeline.saveAsComplete( - new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime), - Optional.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } catch (IOException e) { - throw new HoodieCompactionException( - "Failed to commit " + metaClient.getBasePath() + " at time " + commitTime, e); - } - return true; + return operations; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 5b4f176b6..ea80630fa 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -35,6 +35,7 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; +import com.uber.hoodie.exception.HoodieNotSupportedException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.func.LazyInsertIterable; import com.uber.hoodie.io.HoodieCleanHelper; @@ -64,6 +65,7 @@ import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; @@ -414,6 +416,11 @@ public class HoodieCopyOnWriteTable extends Hoodi return true; } + @Override + public JavaRDD compact(JavaSparkContext jsc, String commitTime) { + throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); + } + public Iterator> handleUpdate(String commitTime, String fileLoc, Iterator> recordItr) @@ -513,12 +520,6 @@ public class HoodieCopyOnWriteTable extends Hoodi return handleUpsertPartition(commitTime, partition, recordItr, partitioner); } - @Override - public Optional compact(JavaSparkContext jsc, String commitCompactionTime) { - logger.info("Nothing to compact in COW storage format"); - return Optional.empty(); - } - /** * Performs cleaning of partition paths according to cleaning policy and returns the number of * files cleaned. Handles skews in partitions to clean by making files to clean as the unit of diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 9cca2ea90..078747774 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -41,14 +41,6 @@ import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.Partitioner; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -60,6 +52,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; /** * Implementation of a more real-time read-optimized Hoodie Table where @@ -167,7 +167,7 @@ public class HoodieMergeOnReadTable extends } @Override - public Optional compact(JavaSparkContext jsc, String compactionCommitTime) { + public JavaRDD compact(JavaSparkContext jsc, String compactionCommitTime) { logger.info("Checking if compaction needs to be run on " + config.getBasePath()); Optional lastCompaction = getActiveTimeline().getCommitTimeline() .filterCompletedInstants().lastInstant(); @@ -182,13 +182,13 @@ public class HoodieMergeOnReadTable extends logger.info("Not running compaction as only " + deltaCommitsSinceLastCompaction + " delta commits was found since last compaction " + deltaCommitsSinceTs + ". Waiting for " + config.getInlineCompactDeltaCommitMax()); - return Optional.empty(); + return jsc.emptyRDD(); } logger.info("Compacting merge on read table " + config.getBasePath()); HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { - return Optional.of(compactor.compact(jsc, config, this, compactionCommitTime)); + return compactor.compact(jsc, config, this, compactionCommitTime); } catch (IOException e) { throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index c3b9d1f70..728fc560a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -20,7 +20,6 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.avro.model.HoodieSavepointMetadata; import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; -import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; @@ -43,9 +42,8 @@ import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; @@ -57,8 +55,6 @@ public abstract class HoodieTable implements Seri protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; - private static Logger logger = LogManager.getLogger(HoodieTable.class); - protected HoodieTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { this.config = config; this.metaClient = metaClient; @@ -240,7 +236,6 @@ public abstract class HoodieTable implements Seri public abstract Iterator> handleInsertPartition(String commitTime, Integer partition, Iterator> recordIterator, Partitioner partitioner); - public static HoodieTable getHoodieTable( HoodieTableMetaClient metaClient, HoodieWriteConfig config) { switch (metaClient.getTableType()) { @@ -254,11 +249,10 @@ public abstract class HoodieTable implements Seri } /** - * Run Compaction on the table. Compaction arranges the data so that it is optimized for data - * access + * Run Compaction on the table. + * Compaction arranges the data so that it is optimized for data access */ - public abstract Optional compact(JavaSparkContext jsc, - String commitCompactionTime); + public abstract JavaRDD compact(JavaSparkContext jsc, String commitTime); /** * Clean partition paths according to cleaning policy and returns the number of files cleaned. @@ -279,5 +273,6 @@ public abstract class HoodieTable implements Seri * @param writeStatuses List of WriteStatus * @return number of files finalized */ - public abstract Optional finalizeWrite(JavaSparkContext jsc, List> writeStatuses); + public abstract Optional finalizeWrite(JavaSparkContext jsc, + List> writeStatuses); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index e05e73364..c1dd84034 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -24,12 +24,10 @@ import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.FileSlice; -import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; -import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieCompactionConfig; @@ -125,15 +123,14 @@ public class TestHoodieCompactor { JavaRDD recordsRDD = jsc.parallelize(records, 1); writeClient.insert(recordsRDD, newCommitTime).collect(); - HoodieCommitMetadata result = + JavaRDD result = compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); - String basePath = table.getMetaClient().getBasePath(); assertTrue("If there is nothing to compact, result will be empty", - result == null || result.getFileIdAndFullPaths(basePath).isEmpty()); + result.isEmpty()); } @Test - public void testLogFileCountsAfterCompaction() throws Exception { + public void testWriteStatusContentsAfterCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfig(); HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); @@ -179,28 +176,15 @@ public class TestHoodieCompactor { metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); table = HoodieTable.getHoodieTable(metaClient, config); - HoodieCommitMetadata result = + JavaRDD result = compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); - // Verify that recently written compacted data file has no log file - metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); - table = HoodieTable.getHoodieTable(metaClient, config); - HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); - - assertTrue("Compaction commit should be > than last insert", - HoodieTimeline.compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, - HoodieTimeline.GREATER)); - + // Verify that all partition paths are present in the WriteStatus result for (String partitionPath : dataGen.getPartitionPaths()) { - List groupedLogFiles = table.getRTFileSystemView() - .getLatestFileSlices(partitionPath) - .collect(Collectors.toList()); - for (FileSlice slice : groupedLogFiles) { - assertTrue( - "After compaction there should be no log files visiable on a Realtime view", - slice.getLogFiles().collect(Collectors.toList()).isEmpty()); - } - assertTrue(result.getPartitionToWriteStats().containsKey(partitionPath)); + List writeStatuses = result.collect(); + assertTrue(writeStatuses.stream() + .filter(writeStatus -> writeStatus.getStat().getPartitionPath() + .contentEquals(partitionPath)).count() > 0); } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 38ee2101a..3de5654d8 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -19,6 +19,11 @@ package com.uber.hoodie.table; +import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieClientTestUtils; @@ -26,6 +31,7 @@ import com.uber.hoodie.common.HoodieMergeOnReadTestUtils; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus; import com.uber.hoodie.common.minicluster.HdfsTestService; +import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodieRecord; @@ -42,8 +48,15 @@ import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.io.compact.HoodieCompactor; -import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; +import com.uber.hoodie.index.bloom.HoodieBloomIndex; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -57,29 +70,15 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - public class TestMergeOnReadTable { private transient JavaSparkContext jsc = null; private transient SQLContext sqlContext; private static String basePath = null; - private HoodieCompactor compactor; //NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class) //The implementation and gurantees of many API's differ, for example check rename(src,dst) @@ -125,7 +124,6 @@ public class TestMergeOnReadTable { HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ); sqlContext = new SQLContext(jsc); // SQLContext stuff - compactor = new HoodieRealtimeTableCompactor(); } @After @@ -207,10 +205,8 @@ public class TestMergeOnReadTable { commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); assertFalse(commit.isPresent()); - HoodieCompactor compactor = new HoodieRealtimeTableCompactor(); - HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true)); - - compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime()); + String compactionCommitTime = client.startCompaction(); + client.compact(compactionCommitTime); allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), @@ -219,7 +215,7 @@ public class TestMergeOnReadTable { assertTrue(dataFilesToRead.findAny().isPresent()); // verify that there is a commit - table = HoodieTable.getHoodieTable( + HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true), getConfig(false)); HoodieTimeline timeline = table.getCommitTimeline().filterCompletedInstants(); @@ -508,12 +504,10 @@ public class TestMergeOnReadTable { statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(statuses); - HoodieCompactor compactor = new HoodieRealtimeTableCompactor(); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); - HoodieTable table = HoodieTable - .getHoodieTable(metaClient, getConfig(true)); - compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime()); + String compactionCommit = client.startCompaction(); + client.compact(compactionCommit); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); @@ -630,6 +624,83 @@ public class TestMergeOnReadTable { assertEquals("Must contain 40 records", 40, recordsRead.size()); } + @Test + @Ignore + public void testLogFileCountsAfterCompaction() throws Exception { + // insert 100 records + HoodieWriteConfig config = getConfig(true); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc.parallelize(records, 1); + List statuses = writeClient.insert(recordsRDD, newCommitTime).collect(); + + // Update all the 100 records + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), + basePath); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); + + newCommitTime = "101"; + writeClient.startCommitWithTime(newCommitTime); + + List updatedRecords = dataGen.generateUpdates(newCommitTime, records); + JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); + HoodieIndex index = new HoodieBloomIndex<>(config, jsc); + updatedRecords = index.tagLocation(updatedRecordsRDD, table).collect(); + + // Write them to corresponding avro logfiles + HoodieTestUtils + .writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(), + HoodieTestDataGenerator.avroSchema, updatedRecords); + + // Verify that all data file has one log file + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + table = HoodieTable.getHoodieTable(metaClient, config); + for (String partitionPath : dataGen.getPartitionPaths()) { + List groupedLogFiles = + table.getRTFileSystemView().getLatestFileSlices(partitionPath) + .collect(Collectors.toList()); + for (FileSlice fileSlice : groupedLogFiles) { + assertEquals("There should be 1 log file written for every data file", 1, + fileSlice.getLogFiles().count()); + } + } + + // Do a compaction + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + table = HoodieTable.getHoodieTable(metaClient, config); + + String commitTime = writeClient.startCompaction(); + JavaRDD result = + writeClient.compact(commitTime); + + // Verify that recently written compacted data file has no log file + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + table = HoodieTable.getHoodieTable(metaClient, config); + HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + + assertTrue("Compaction commit should be > than last insert", + HoodieTimeline.compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, + HoodieTimeline.GREATER)); + + for (String partitionPath : dataGen.getPartitionPaths()) { + List groupedLogFiles = table.getRTFileSystemView() + .getLatestFileSlices(partitionPath) + .collect(Collectors.toList()); + for (FileSlice slice : groupedLogFiles) { + assertTrue( + "After compaction there should be no log files visiable on a Realtime view", + slice.getLogFiles().collect(Collectors.toList()).isEmpty()); + } + List writeStatuses = result.collect(); + assertTrue(writeStatuses.stream() + .filter(writeStatus -> writeStatus.getStat().getPartitionPath() + .contentEquals(partitionPath)).count() > 0); + } + } private HoodieWriteConfig getConfig(Boolean autoCommit) { return getConfigBuilder(autoCommit).build(); @@ -642,7 +713,7 @@ public class TestMergeOnReadTable { .withAssumeDatePartitioning(true) .withCompactionConfig( HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) - .withInlineCompaction(false).build()) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build()) .forTable("test-trip-table").withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());