Small File Size correction handling for MOR table type
This commit is contained in:
committed by
vinoth chandar
parent
2116815261
commit
30049383f5
@@ -19,11 +19,6 @@
|
||||
package com.uber.hoodie.table;
|
||||
|
||||
|
||||
import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.uber.hoodie.HoodieWriteClient;
|
||||
import com.uber.hoodie.WriteStatus;
|
||||
import com.uber.hoodie.common.HoodieClientTestUtils;
|
||||
@@ -49,14 +44,6 @@ import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
import com.uber.hoodie.io.compact.HoodieCompactor;
|
||||
import com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@@ -73,6 +60,20 @@ import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestMergeOnReadTable {
|
||||
|
||||
private transient JavaSparkContext jsc = null;
|
||||
@@ -257,7 +258,7 @@ public class TestMergeOnReadTable {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleInsertAndDelete() throws Exception {
|
||||
public void testSimpleInsertUpdateAndDelete() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||
|
||||
@@ -301,12 +302,12 @@ public class TestMergeOnReadTable {
|
||||
dataFilesToRead.findAny().isPresent());
|
||||
|
||||
/**
|
||||
* Write 2 (only inserts, written to .log file)
|
||||
* Write 2 (only updates, written to .log file)
|
||||
*/
|
||||
newCommitTime = "002";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
records = dataGen.generateInserts(newCommitTime, 20);
|
||||
records = dataGen.generateUpdates(newCommitTime, records);
|
||||
writeRecords = jsc.parallelize(records, 1);
|
||||
statuses = client.upsert(writeRecords, newCommitTime).collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
@@ -548,6 +549,88 @@ public class TestMergeOnReadTable {
|
||||
}).findAny().isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertPartitioner() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfig(true);
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts, written as parquet file)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg);
|
||||
|
||||
Optional<HoodieInstant> deltaCommit =
|
||||
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals("Delta commit should be 001", "001", deltaCommit.get().getTimestamp());
|
||||
|
||||
Optional<HoodieInstant> commit =
|
||||
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
FileStatus[] allFiles = HoodieTestUtils
|
||||
.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
TableFileSystemView.ReadOptimizedView roView = new HoodieTableFileSystemView(metaClient,
|
||||
hoodieTable.getCommitsTimeline().filterCompletedInstants(), allFiles);
|
||||
Stream<HoodieDataFile> dataFilesToRead = roView.getLatestDataFiles();
|
||||
Map<String, Long> parquetFileIdToSize = dataFilesToRead.collect(Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize));
|
||||
|
||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(),
|
||||
allFiles);
|
||||
dataFilesToRead = roView.getLatestDataFiles();
|
||||
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
|
||||
dataFilesToRead.findAny().isPresent());
|
||||
|
||||
/**
|
||||
* Write 2 (only updates + inserts, written to .log file + correction of existing parquet file size)
|
||||
*/
|
||||
newCommitTime = "002";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> newRecords = dataGen.generateUpdates(newCommitTime, records);
|
||||
newRecords.addAll(dataGen.generateInserts(newCommitTime, 20));
|
||||
|
||||
statuses = client.upsert(jsc.parallelize(newRecords), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
||||
assertTrue(deltaCommit.isPresent());
|
||||
assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp());
|
||||
|
||||
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
|
||||
assertFalse(commit.isPresent());
|
||||
|
||||
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getActiveTimeline().reload()
|
||||
.getCommitsTimeline().filterCompletedInstants(), allFiles);
|
||||
dataFilesToRead = roView.getLatestDataFiles();
|
||||
Map<String, Long> parquetFileIdToNewSize = dataFilesToRead.collect(Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize));
|
||||
|
||||
assertTrue(parquetFileIdToNewSize.entrySet().stream()
|
||||
.filter(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue()).count() > 0);
|
||||
|
||||
List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath())
|
||||
.collect(Collectors.toList());
|
||||
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils
|
||||
.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||
//Wrote 20 records in 2 batches
|
||||
assertEquals("Must contain 40 records", 40, recordsRead.size());
|
||||
}
|
||||
|
||||
|
||||
private HoodieWriteConfig getConfig(Boolean autoCommit) {
|
||||
return getConfigBuilder(autoCommit).build();
|
||||
}
|
||||
@@ -558,9 +641,9 @@ public class TestMergeOnReadTable {
|
||||
.withAutoCommit(autoCommit)
|
||||
.withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
|
||||
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
|
||||
.withInlineCompaction(false).build())
|
||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
|
||||
.forTable("test-trip-table").withIndexConfig(
|
||||
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user