1
0

Separating out compaction() API

This commit is contained in:
Nishith Agarwal
2017-11-13 10:36:33 -08:00
committed by vinoth chandar
parent e45679f5e2
commit 9b610f82c7
8 changed files with 85 additions and 51 deletions

View File

@@ -436,14 +436,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// Save was a success // Save was a success
// Do a inline compaction if enabled // Do a inline compaction if enabled
if (config.isInlineCompaction()) { if (config.isInlineCompaction()) {
Optional<HoodieCompactionMetadata> compactionMetadata = table.compact(jsc); metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true");
if (compactionMetadata.isPresent()) { forceCompact();
logger.info("Compacted successfully on commit " + commitTime); } else {
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
} else {
logger.info("Compaction did not run for commit " + commitTime);
metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false");
}
} }
// We cannot have unbounded commit files. Archive commits if we have to archive // We cannot have unbounded commit files. Archive commits if we have to archive
@@ -822,6 +818,37 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
new HoodieInstant(true, commitActionType, commitTime)); new HoodieInstant(true, commitActionType, commitTime));
} }
/**
* Performs a compaction operation on a dataset.
* WARNING: Compaction operation cannot be executed asynchronously. Please always use this serially
* before or after an insert/upsert action.
* @param compactionCommitTime
* @throws IOException
*/
private void compact(String compactionCommitTime) throws IOException {
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
Optional<HoodieCompactionMetadata> compactionMetadata = table.compact(jsc, compactionCommitTime);
if (compactionMetadata.isPresent()) {
logger.info("Compacted successfully on commit " + compactionCommitTime);
} else {
logger.info("Compaction did not run for commit " + compactionCommitTime);
}
}
/**
* Performs a compaction operation on a dataset.
* WARNING: Compaction operation cannot be executed asynchronously. Please always use this serially
* before or after an insert/upsert action.
* @throws IOException
*/
public String forceCompact() throws IOException {
String compactionCommitTime = HoodieActiveTimeline.createNewCommitTime();
compact(compactionCommitTime);
return compactionCommitTime;
}
public static SparkConf registerClasses(SparkConf conf) { public static SparkConf registerClasses(SparkConf conf) {
conf.registerKryoClasses( conf.registerKryoClasses(
new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class}); new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});

View File

@@ -24,6 +24,8 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import java.util.Optional;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
/** /**
@@ -35,7 +37,7 @@ public interface HoodieCompactor extends Serializable {
* Compact the delta files with the data files * Compact the delta files with the data files
*/ */
HoodieCompactionMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config, HoodieCompactionMetadata compact(JavaSparkContext jsc, final HoodieWriteConfig config,
HoodieTable hoodieTable) throws Exception; HoodieTable hoodieTable, String compactionCommitTime) throws Exception;
// Helper methods // Helper methods

View File

@@ -64,7 +64,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
@Override @Override
public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config, public HoodieCompactionMetadata compact(JavaSparkContext jsc, HoodieWriteConfig config,
HoodieTable hoodieTable) throws IOException { HoodieTable hoodieTable, String compactionCommitTime) throws IOException {
Preconditions.checkArgument( Preconditions.checkArgument(
hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ, hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
"HoodieRealtimeTableCompactor can only compact table of type " "HoodieRealtimeTableCompactor can only compact table of type "
@@ -74,8 +74,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
// TODO - rollback any compactions in flight // TODO - rollback any compactions in flight
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
String compactionCommit = startCompactionCommit(hoodieTable); log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommitTime);
log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit);
List<String> partitionPaths = List<String> partitionPaths =
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(),
config.shouldAssumeDatePartitioning()); config.shouldAssumeDatePartitioning());
@@ -101,7 +100,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
log.info("After filtering, Compacting " + operations + " files"); log.info("After filtering, Compacting " + operations + " files");
List<CompactionWriteStat> updateStatusMap = List<CompactionWriteStat> updateStatusMap =
jsc.parallelize(operations, operations.size()) jsc.parallelize(operations, operations.size())
.map(s -> executeCompaction(metaClient, config, s, compactionCommit)) .map(s -> executeCompaction(metaClient, config, s, compactionCommitTime))
.flatMap(new FlatMapFunction<List<CompactionWriteStat>, CompactionWriteStat>() { .flatMap(new FlatMapFunction<List<CompactionWriteStat>, CompactionWriteStat>() {
@Override @Override
public Iterator<CompactionWriteStat> call( public Iterator<CompactionWriteStat> call(
@@ -120,10 +119,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
//noinspection ConstantConditions //noinspection ConstantConditions
if (isCompactionSucceeded(metadata)) { if (isCompactionSucceeded(metadata)) {
log.info("Compaction succeeded " + compactionCommit); log.info("Compaction succeeded " + compactionCommitTime);
commitCompaction(compactionCommit, metaClient, metadata); commitCompaction(compactionCommitTime, metaClient, metadata);
} else { } else {
log.info("Compaction failed " + compactionCommit); log.info("Compaction failed " + compactionCommitTime);
} }
return metadata; return metadata;
} }

View File

@@ -486,7 +486,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
} }
@Override @Override
public Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc) { public Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc, String commitCompactionTime) {
logger.info("Nothing to compact in COW storage format"); logger.info("Nothing to compact in COW storage format");
return Optional.empty(); return Optional.empty();
} }

View File

@@ -39,6 +39,13 @@ import com.uber.hoodie.exception.HoodieCompactionException;
import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.HoodieAppendHandle;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.util.Arrays; import java.util.Arrays;
@@ -49,12 +56,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/** /**
@@ -92,7 +93,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
} }
@Override @Override
public Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc) { public Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc, String compactionCommitTime) {
logger.info("Checking if compaction needs to be run on " + config.getBasePath()); logger.info("Checking if compaction needs to be run on " + config.getBasePath());
Optional<HoodieInstant> lastCompaction = getActiveTimeline().getCompactionTimeline() Optional<HoodieInstant> lastCompaction = getActiveTimeline().getCompactionTimeline()
.filterCompletedInstants().lastInstant(); .filterCompletedInstants().lastInstant();
@@ -113,7 +114,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
logger.info("Compacting merge on read table " + config.getBasePath()); logger.info("Compacting merge on read table " + config.getBasePath());
HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor(); HoodieRealtimeTableCompactor compactor = new HoodieRealtimeTableCompactor();
try { try {
return Optional.of(compactor.compact(jsc, config, this)); return Optional.of(compactor.compact(jsc, config, this, compactionCommitTime));
} catch (IOException e) { } catch (IOException e) {
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
} }

View File

@@ -279,7 +279,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* Run Compaction on the table. Compaction arranges the data so that it is optimized for data * Run Compaction on the table. Compaction arranges the data so that it is optimized for data
* access * access
*/ */
public abstract Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc); public abstract Optional<HoodieCompactionMetadata> compact(JavaSparkContext jsc,
String commitCompactionTime);
/** /**
* Clean partition paths according to cleaning policy and returns the number of files cleaned. * Clean partition paths according to cleaning policy and returns the number of files cleaned.

View File

@@ -16,9 +16,6 @@
package com.uber.hoodie.io; package com.uber.hoodie.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieClientTestUtils;
@@ -41,10 +38,6 @@ import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieCompactor;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import com.uber.hoodie.table.HoodieTable; import com.uber.hoodie.table.HoodieTable;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@@ -53,6 +46,15 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestHoodieCompactor { public class TestHoodieCompactor {
private transient JavaSparkContext jsc = null; private transient JavaSparkContext jsc = null;
@@ -107,7 +109,7 @@ public class TestHoodieCompactor {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
compactor.compact(jsc, getConfig(), table); compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
} }
@Test @Test
@@ -123,7 +125,7 @@ public class TestHoodieCompactor {
writeClient.insert(recordsRDD, newCommitTime).collect(); writeClient.insert(recordsRDD, newCommitTime).collect();
HoodieCompactionMetadata result = HoodieCompactionMetadata result =
compactor.compact(jsc, getConfig(), table); compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
String basePath = table.getMetaClient().getBasePath(); String basePath = table.getMetaClient().getBasePath();
assertTrue("If there is nothing to compact, result will be empty", assertTrue("If there is nothing to compact, result will be empty",
result.getFileIdAndFullPaths(basePath).isEmpty()); result.getFileIdAndFullPaths(basePath).isEmpty());
@@ -177,7 +179,7 @@ public class TestHoodieCompactor {
table = HoodieTable.getHoodieTable(metaClient, config); table = HoodieTable.getHoodieTable(metaClient, config);
HoodieCompactionMetadata result = HoodieCompactionMetadata result =
compactor.compact(jsc, getConfig(), table); compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
// Verify that recently written compacted data file has no log file // Verify that recently written compacted data file has no log file
metaClient = new HoodieTableMetaClient(fs, basePath); metaClient = new HoodieTableMetaClient(fs, basePath);

View File

@@ -19,11 +19,6 @@
package com.uber.hoodie.table; package com.uber.hoodie.table;
import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.WriteStatus; import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.HoodieClientTestUtils; import com.uber.hoodie.common.HoodieClientTestUtils;
@@ -39,6 +34,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView; import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
@@ -49,14 +45,6 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieCompactor;
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@@ -73,6 +61,20 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestMergeOnReadTable { public class TestMergeOnReadTable {
private transient JavaSparkContext jsc = null; private transient JavaSparkContext jsc = null;
@@ -218,7 +220,7 @@ public class TestMergeOnReadTable {
HoodieCompactor compactor = new HoodieRealtimeTableCompactor(); HoodieCompactor compactor = new HoodieRealtimeTableCompactor();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true)); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true));
compactor.compact(jsc, getConfig(true), table); compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime());
allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(fs, cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(),
@@ -517,7 +519,7 @@ public class TestMergeOnReadTable {
metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true)); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(true));
compactor.compact(jsc, getConfig(true), table); compactor.compact(jsc, getConfig(true), table, HoodieActiveTimeline.createNewCommitTime());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath()); metaClient = new HoodieTableMetaClient(fs, cfg.getBasePath());