diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 019153293..9ced403bd 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -182,6 +182,11 @@ public class HoodieCopyOnWriteTable extends Hoodi */ private HashMap bucketInfoMap; + /** + * List of all small files to be corrected + */ + List smallFiles = new ArrayList(); + UpsertPartitioner(WorkloadProfile profile) { updateLocationToBucket = new HashMap<>(); partitionPathToInsertBuckets = new HashMap<>(); @@ -296,7 +301,9 @@ public class HoodieCopyOnWriteTable extends Hoodi /** * Returns a list of small files in the given partition path */ - private List getSmallFiles(String partitionPath) { + protected List getSmallFiles(String partitionPath) { + + // smallFiles only for partitionPath List smallFileLocations = new ArrayList<>(); HoodieTimeline commitTimeline = getCompletedCommitTimeline(); @@ -315,6 +322,8 @@ public class HoodieCopyOnWriteTable extends Hoodi FSUtils.getFileId(filename)); sf.sizeBytes = file.getFileSize(); smallFileLocations.add(sf); + // Update the global small files list + smallFiles.add(sf); } } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index e97108086..88d7b8d8a 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -20,9 +20,11 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieRollbackStat; +import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; @@ -36,18 +38,20 @@ import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieCompactionException; import com.uber.hoodie.exception.HoodieRollbackException; +import com.uber.hoodie.exception.HoodieUpsertException; import com.uber.hoodie.io.HoodieAppendHandle; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.Partitioner; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -57,7 +61,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; - /** * Implementation of a more real-time read-optimized Hoodie Table where *

@@ -78,21 +81,89 @@ public class HoodieMergeOnReadTable extends private static Logger logger = LogManager.getLogger(HoodieMergeOnReadTable.class); - public HoodieMergeOnReadTable(HoodieWriteConfig config, - HoodieTableMetaClient metaClient) { + // UpsertPartitioner for MergeOnRead table type + private MergeOnReadUpsertPartitioner mergeOnReadUpsertPartitioner; + + public HoodieMergeOnReadTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) { super(config, metaClient); } + /** + * UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet + * files to larger ones without the need for an index in the logFile. + */ + class MergeOnReadUpsertPartitioner extends HoodieCopyOnWriteTable.UpsertPartitioner { + + MergeOnReadUpsertPartitioner(WorkloadProfile profile) { + super(profile); + } + + @Override + protected List getSmallFiles(String partitionPath) { + + // smallFiles only for partitionPath + List smallFileLocations = new ArrayList<>(); + + // Init here since this class (and member variables) might not have been initialized + HoodieTimeline commitTimeline = getCompletedCommitTimeline(); + + if (!commitTimeline.empty()) { + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + // find smallest file in partition and append to it + Optional smallFileSlice = getRTFileSystemView() + .getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()) + .filter(fileSlice -> fileSlice.getLogFiles().count() < 1 && + fileSlice.getDataFile().get().getFileSize() < config.getParquetSmallFileLimit()) + .sorted((FileSlice left, FileSlice right) -> + left.getDataFile().get().getFileSize() < right.getDataFile().get().getFileSize() ? -1 : 1) + .findFirst(); + + if(smallFileSlice.isPresent()) { + String filename = smallFileSlice.get().getDataFile().get().getFileName(); + SmallFile sf = new SmallFile(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), + FSUtils.getFileId(filename)); + sf.sizeBytes = smallFileSlice.get().getDataFile().get().getFileSize(); + smallFileLocations.add(sf); + // Update the global small files list + smallFiles.add(sf); + } + } + + return smallFileLocations; + } + + public List getSmallFileIds() { + return (List) smallFiles.stream().map(smallFile -> ((SmallFile) smallFile).location.getFileId()) + .collect(Collectors.toList()); + } + } + + @Override + public Partitioner getUpsertPartitioner(WorkloadProfile profile) { + if (profile == null) { + throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); + } + mergeOnReadUpsertPartitioner = new MergeOnReadUpsertPartitioner(profile); + return mergeOnReadUpsertPartitioner; + } + @Override public Iterator> handleUpdate(String commitTime, String fileId, Iterator> recordItr) throws IOException { logger.info("Merging updates for commit " + commitTime + " for file " + fileId); - HoodieAppendHandle appendHandle = - new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); - appendHandle.doAppend(); - appendHandle.close(); - return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())) - .iterator(); + + if(mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { + logger.info("Small file corrections for updates for commit " + commitTime + " for file " + fileId); + return super.handleUpdate(commitTime, fileId, recordItr); + } else { + HoodieAppendHandle appendHandle = + new HoodieAppendHandle<>(config, commitTime, this, fileId, recordItr); + appendHandle.doAppend(); + appendHandle.close(); + return Collections.singletonList(Collections.singletonList(appendHandle.getWriteStatus())) + .iterator(); + } } @Override diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 5f0d51d2f..38ee2101a 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -19,11 +19,6 @@ package com.uber.hoodie.table; -import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.HoodieClientTestUtils; @@ -49,14 +44,6 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; import com.uber.hoodie.io.compact.HoodieCompactor; import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -73,6 +60,20 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class TestMergeOnReadTable { private transient JavaSparkContext jsc = null; @@ -257,7 +258,7 @@ public class TestMergeOnReadTable { } @Test - public void testSimpleInsertAndDelete() throws Exception { + public void testSimpleInsertUpdateAndDelete() throws Exception { HoodieWriteConfig cfg = getConfig(true); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); @@ -301,12 +302,12 @@ public class TestMergeOnReadTable { dataFilesToRead.findAny().isPresent()); /** - * Write 2 (only inserts, written to .log file) + * Write 2 (only updates, written to .log file) */ newCommitTime = "002"; client.startCommitWithTime(newCommitTime); - records = dataGen.generateInserts(newCommitTime, 20); + records = dataGen.generateUpdates(newCommitTime, records); writeRecords = jsc.parallelize(records, 1); statuses = client.upsert(writeRecords, newCommitTime).collect(); assertNoWriteErrors(statuses); @@ -548,6 +549,88 @@ public class TestMergeOnReadTable { }).findAny().isPresent()); } + @Test + public void testUpsertPartitioner() throws Exception { + HoodieWriteConfig cfg = getConfig(true); + HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + + /** + * Write 1 (only inserts, written as parquet file) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + List records = dataGen.generateInserts(newCommitTime, 20); + JavaRDD writeRecords = jsc.parallelize(records, 1); + + List statuses = client.upsert(writeRecords, newCommitTime).collect(); + assertNoWriteErrors(statuses); + + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg); + + Optional deltaCommit = + metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp()); + + Optional commit = + metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + FileStatus[] allFiles = HoodieTestUtils + .listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient, + hoodieTable.getCommitsTimeline().filterCompletedInstants(), allFiles); + Stream dataFilesToRead = roView.getLatestDataFiles(); + Map parquetFileIdToSize = dataFilesToRead.collect(Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize)); + + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), + allFiles); + dataFilesToRead = roView.getLatestDataFiles(); + assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", + dataFilesToRead.findAny().isPresent()); + + /** + * Write 2 (only updates + inserts, written to .log file + correction of existing parquet file size) + */ + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + + List newRecords = dataGen.generateUpdates(newCommitTime, records); + newRecords.addAll(dataGen.generateInserts(newCommitTime, 20)); + + statuses = client.upsert(jsc.parallelize(newRecords), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); + deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant(); + assertTrue(deltaCommit.isPresent()); + assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp()); + + commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + assertFalse(commit.isPresent()); + + allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getActiveTimeline().reload() + .getCommitsTimeline().filterCompletedInstants(), allFiles); + dataFilesToRead = roView.getLatestDataFiles(); + Map parquetFileIdToNewSize = dataFilesToRead.collect(Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize)); + + assertTrue(parquetFileIdToNewSize.entrySet().stream() + .filter(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()).count() > 0); + + List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()) + .collect(Collectors.toList()); + List recordsRead = HoodieMergeOnReadTestUtils + .getRecordsUsingInputFormat(dataFiles, basePath); + //Wrote 20 records in 2 batches + assertEquals("Must contain 40 records", 40, recordsRead.size()); + } + + private HoodieWriteConfig getConfig(Boolean autoCommit) { return getConfigBuilder(autoCommit).build(); } @@ -558,9 +641,9 @@ public class TestMergeOnReadTable { .withAutoCommit(autoCommit) .withAssumeDatePartitioning(true) .withCompactionConfig( - HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) + HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) .withInlineCompaction(false).build()) - .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build()) .forTable("test-trip-table").withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()); }