diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index b55fc5ae6..aa2e3f479 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -436,14 +436,10 @@ public class HoodieWriteClient implements Seriali // Save was a success // Do a inline compaction if enabled if (config.isInlineCompaction()) { - Optional compactionMetadata = table.compact(jsc); - if (compactionMetadata.isPresent()) { - logger.info("Compacted successfully on commit " + commitTime); - metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); - } else { - logger.info("Compaction did not run for commit " + commitTime); - metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false"); - } + metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); + forceCompact(); + } else { + metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false"); } // We cannot have unbounded commit files. Archive commits if we have to archive @@ -822,6 +818,37 @@ public class HoodieWriteClient implements Seriali new HoodieInstant(true, commitActionType, commitTime)); } + /** + * Performs a compaction operation on a dataset. + * WARNING: Compaction operation cannot be executed asynchronously. Please always use this serially + * before or after an insert/upsert action. + * @param compactionCommitTime + * @throws IOException + */ + private void compact(String compactionCommitTime) throws IOException { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable + .getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config); + Optional compactionMetadata = table.compact(jsc, compactionCommitTime); + if (compactionMetadata.isPresent()) { + logger.info("Compacted successfully on commit " + compactionCommitTime); + } else { + logger.info("Compaction did not run for commit " + compactionCommitTime); + } + } + + /** + * Performs a compaction operation on a dataset. + * WARNING: Compaction operation cannot be executed asynchronously. Please always use this serially + * before or after an insert/upsert action. + * @throws IOException + */ + public String forceCompact() throws IOException { + String compactionCommitTime = HoodieActiveTimeline.createNewCommitTime(); + compact(compactionCommitTime); + return compactionCommitTime; + } + public static SparkConf registerClasses(SparkConf conf) { conf.registerKryoClasses( new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java index e192a2416..d5bcd9ee6 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactor.java @@ -24,6 +24,8 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.table.HoodieTable; import java.io.Serializable; import java.util.Date; +import java.util.Optional; + import org.apache.spark.api.java.JavaSparkContext; /** @@ -35,7 +37,7 @@ public interface HoodieCompactor extends Serializable { * Compact the delta files with the data files */ HoodieCompactionMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, - HoodieTable hoodieTable) throws Exception; + HoodieTable hoodieTable, String compactionCommitTime) throws Exception; // Helper methods diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 7db2da2f7..e794492eb 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -64,7 +64,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { @Override public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, - HoodieTable hoodieTable) throws IOException { + HoodieTable hoodieTable, String compactionCommitTime) throws IOException { Preconditions.checkArgument( hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, "HoodieRealtimeTableCompactor can only compact table of type " @@ -74,8 +74,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { // TODO - rollback any compactions in flight HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); - String compactionCommit = startCompactionCommit(hoodieTable); - log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit); + log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime); List partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning()); @@ -101,7 +100,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { log.info("After filtering, Compacting " + operations + " files"); List updateStatusMap = jsc.parallelize(operations, operations.size()) - .map(s -> executeCompaction(metaClient, config, s, compactionCommit)) + .map(s -> executeCompaction(metaClient, config, s, compactionCommitTime)) .flatMap(new FlatMapFunction, CompactionWriteStat>() { @Override public Iterator call( @@ -120,10 +119,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { //noinspection ConstantConditions if (isCompactionSucceeded(metadata)) { - log.info("Compaction succeeded " + compactionCommit); - commitCompaction(compactionCommit, metaClient, metadata); + log.info("Compaction succeeded " + compactionCommitTime); + commitCompaction(compactionCommitTime, metaClient, metadata); } else { - log.info("Compaction failed " + compactionCommit); + log.info("Compaction failed " + compactionCommitTime); } return metadata; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 3bcce373b..543b4c26d 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -486,7 +486,7 @@ public class HoodieCopyOnWriteTable extends Hoodi } @Override - public Optional compact(JavaSparkContext jsc) { + public Optional compact(JavaSparkContext jsc, String commitCompactionTime) { logger.info("Nothing to compact in COW storage format"); return Optional.empty(); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 1a2cfa1c0..3be0479c7 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -39,6 +39,13 @@ import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; @@ -49,12 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; /** @@ -92,7 +93,7 @@ public class HoodieMergeOnReadTable extends } @Override - public Optional compact(JavaSparkContext jsc) { + public Optional compact(JavaSparkContext jsc, String compactionCommitTime) { logger.info("Checking if compaction needs to be run on " + config.getBasePath()); Optional lastCompaction = getActiveTimeline().getCompactionTimeline() .filterCompletedInstants().lastInstant(); @@ -113,7 +114,7 @@ public class HoodieMergeOnReadTable extends logger.info("Compacting merge on read table " + config.getBasePath()); HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); try { - return Optional.of(compactor.compact(jsc, config, this)); + return Optional.of(compactor.compact(jsc, config, this, compactionCommitTime)); } catch (IOException e) { throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 88f7f9b4b..b434c7509 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -279,7 +279,8 @@ public abstract class HoodieTable implements Seri * Run Compaction on the table. Compaction arranges the data so that it is optimized for data * access */ - public abstract Optional compact(JavaSparkContext jsc); + public abstract Optional compact(JavaSparkContext jsc, + String commitCompactionTime); /** * Clean partition paths according to cleaning policy and returns the number of files cleaned. diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 40ebc1829..d13949c71 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -16,9 +16,6 @@ package com.uber.hoodie.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieClientTestUtils; @@ -41,10 +38,6 @@ import com.uber.hoodie.index.bloom.HoodieBloomIndex; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.table.HoodieTable; -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -53,6 +46,15 @@ import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestHoodieCompactor { private transient JavaSparkContext jsc = null; @@ -107,7 +109,7 @@ public class TestHoodieCompactor { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); - compactor.compact(jsc, getConfig(), table); + compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); } @Test @@ -123,7 +125,7 @@ public class TestHoodieCompactor { writeClient.insert(recordsRDD, newCommitTime).collect(); HoodieCompactionMetadata result = - compactor.compact(jsc, getConfig(), table); + compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); String basePath = table.getMetaClient().getBasePath(); assertTrue("If there is nothing to compact, result will be empty", result.getFileIdAndFullPaths(basePath).isEmpty()); @@ -177,7 +179,7 @@ public class TestHoodieCompactor { table = HoodieTable.getHoodieTable(metaClient, config); HoodieCompactionMetadata result = - compactor.compact(jsc, getConfig(), table); + compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime()); // Verify that recently written compacted data file has no log file metaClient = new HoodieTableMetaClient(fs, basePath); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 9602a02ff..337d06610 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -19,11 +19,6 @@ package com.uber.hoodie.table; -import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieClientTestUtils; @@ -39,6 +34,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; +import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.util.FSUtils; @@ -49,14 +45,6 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -73,6 +61,20 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class TestMergeOnReadTable { private transient JavaSparkContext jsc = null; @@ -218,7 +220,7 @@ public class TestMergeOnReadTable { HoodieCompactor compactor = new HoodieRealtimeTableCompactor(); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true)); - compactor.compact(jsc, getConfig(true), table); + compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime()); allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), @@ -517,7 +519,7 @@ public class TestMergeOnReadTable { metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true)); - compactor.compact(jsc, getConfig(true), table); + compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());