1
0

Async Compaction Main API changes

This commit is contained in:
Balaji Varadarajan
2018-05-23 23:09:25 -07:00
committed by vinoth chandar
parent 9b78523d62
commit 2f8ce93030
18 changed files with 878 additions and 267 deletions

View File

@@ -0,0 +1,508 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie;
import static com.uber.hoodie.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.CompactionUtils;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.spark.api.java.JavaRDD;
import org.junit.Test;
/**
* Test Cases for Async Compaction and Ingestion interaction
*/
public class TestAsyncCompaction extends TestHoodieClientBase {
private HoodieWriteConfig getConfig(Boolean autoCommit) {
return getConfigBuilder(autoCommit).build();
}
private HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true).withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024).withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
}
@Test
public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
// Rollback inflight ingestion when there is pending compaction
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
String inflightInstantTime = "006";
String nextInflightInstantTime = "007";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>());
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time",
inflightInstant.getTimestamp().equals(inflightInstantTime));
//This should rollback
client.startCommitWithTime(nextInflightInstantTime);
//Validate
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time",
inflightInstant.getTimestamp().equals(nextInflightInstantTime));
assertTrue("Expect only one inflight instant",
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().getInstants().count() == 1);
//Expect pending Compaction to be present
pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
}
@Test
public void testInflightCompaction() throws Exception {
// There is inflight compaction. Subsequent compaction run must work correctly
HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
String thirdInstantTime = "006";
String fourthInstantTime = "007";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>());
// Schedule and mark compaction instant as inflight
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
scheduleCompaction(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, client, cfg);
// Complete ingestions
runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime),
records, cfg, false, Arrays.asList(compactionInstantTime));
// execute inflight compaction
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
}
@Test
public void testScheduleIngestionBeforePendingCompaction() throws Exception {
// Case: Failure case. Latest pending compaction instant time must be earlier than this instant time
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
String firstInstantTime = "001";
String secondInstantTime = "004";
String failedInstantTime = "005";
String compactionInstantTime = "006";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>());
// Schedule compaction but do not run them
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
scheduleCompaction(compactionInstantTime, client, cfg);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
boolean gotException = false;
try {
runNextDeltaCommits(client, Arrays.asList(failedInstantTime),
records, cfg, false, Arrays.asList(compactionInstantTime));
} catch (IllegalArgumentException iex) {
// Latest pending compaction instant time must be earlier than this instant time. Should fail here
gotException = true;
}
assertTrue("Latest pending compaction instant time must be earlier than this instant time", gotException);
}
@Test
public void testScheduleCompactionAfterPendingIngestion() throws Exception {
// Case: Failure case. Earliest ingestion inflight instant time must be later than compaction time
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
String firstInstantTime = "001";
String secondInstantTime = "004";
String inflightInstantTime = "005";
String compactionInstantTime = "006";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
createNextDeltaCommit(inflightInstantTime, records, client, metaClient, cfg, true);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterInflightsExcludingCompaction().firstInstant().get();
assertTrue("inflight instant has expected instant time",
inflightInstant.getTimestamp().equals(inflightInstantTime));
boolean gotException = false;
try {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
} catch (IllegalArgumentException iex) {
// Earliest ingestion inflight instant time must be later than compaction time. Should fail here
gotException = true;
}
assertTrue("Earliest ingestion inflight instant time must be later than compaction time", gotException);
}
@Test
public void testScheduleCompactionWithOlderOrSameTimestamp() throws Exception {
// Case: Failure case. Earliest ingestion inflight instant time must be later than compaction time
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "002";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
boolean gotException = false;
try {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
} catch (IllegalArgumentException iex) {
gotException = true;
}
assertTrue("Compaction Instant to be scheduled cannot have older timestamp", gotException);
// Schedule with timestamp same as that of committed instant
gotException = false;
String dupCompactionInstantTime = secondInstantTime;
try {
// Schedule compaction but do not run them
scheduleCompaction(dupCompactionInstantTime, client, cfg);
} catch (IllegalArgumentException iex) {
gotException = true;
}
assertTrue("Compaction Instant to be scheduled cannot have same timestamp as committed instant",
gotException);
compactionInstantTime = "006";
scheduleCompaction(compactionInstantTime, client, cfg);
gotException = false;
try {
// Schedule compaction with the same times as a pending compaction
scheduleCompaction(dupCompactionInstantTime, client, cfg);
} catch (IllegalArgumentException iex) {
gotException = true;
}
assertTrue("Compaction Instant to be scheduled cannot have same timestamp as a pending compaction",
gotException);
}
@Test
public void testCompactionAfterTwoDeltaCommits() throws Exception {
// No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false);
}
@Test
public void testInterleavedCompaction() throws Exception {
//Case: Two delta commits before and after compaction schedule
HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
String firstInstantTime = "001";
String secondInstantTime = "004";
String compactionInstantTime = "005";
String thirdInstantTime = "006";
String fourthInstantTime = "007";
int numRecs = 2000;
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
scheduleCompaction(compactionInstantTime, client, cfg);
runNextDeltaCommits(client, Arrays.asList(thirdInstantTime, fourthInstantTime),
records, cfg, false, Arrays.asList(compactionInstantTime));
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
}
/**
* HELPER METHODS FOR TESTING
**/
private void validateDeltaCommit(String latestDeltaCommit,
final Map<String, Pair<String, HoodieCompactionOperation>> fileIdToCompactionOperation,
HoodieWriteConfig cfg) throws IOException {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg);
fileSliceList.forEach(fileSlice -> {
Pair<String, HoodieCompactionOperation> opPair = fileIdToCompactionOperation.get(fileSlice.getFileId());
if (opPair != null) {
System.out.println("FileSlice :" + fileSlice);
assertTrue("Expect baseInstant to match compaction Instant",
fileSlice.getBaseInstantTime().equals(opPair.getKey()));
assertTrue("Expect atleast one log file to be present where the latest delta commit was written",
fileSlice.getLogFiles().count() > 0);
assertFalse("Expect no data-file to be present", fileSlice.getDataFile().isPresent());
} else {
assertTrue("Expect baseInstant to be less than or equal to latestDeltaCommit",
fileSlice.getBaseInstantTime().compareTo(latestDeltaCommit) <= 0);
}
});
}
private List<HoodieRecord> runNextDeltaCommits(HoodieWriteClient client, List<String> deltaInstants,
List<HoodieRecord> records, HoodieWriteConfig cfg, boolean insertFirst,
List<String> expPendingCompactionInstants) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
List<Pair<HoodieInstant, HoodieCompactionPlan>> pendingCompactions =
CompactionUtils.getAllPendingCompactionPlans(metaClient);
List<String> gotPendingCompactionInstants =
pendingCompactions.stream().map(pc -> pc.getKey().getTimestamp()).sorted().collect(Collectors.toList());
assertEquals(expPendingCompactionInstants, gotPendingCompactionInstants);
Map<String, Pair<String, HoodieCompactionOperation>> fileIdToCompactionOperation =
CompactionUtils.getAllPendingCompactionOperations(metaClient);
if (insertFirst) {
// Use first instant for inserting records
String firstInstant = deltaInstants.get(0);
deltaInstants = deltaInstants.subList(1, deltaInstants.size());
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.startCommitWithTime(firstInstant);
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, firstInstant);
List<WriteStatus> statusList = statuses.collect();
if (!cfg.shouldAutoCommit()) {
client.commit(firstInstant, statuses);
}
TestHoodieClientBase.assertNoWriteErrors(statusList);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
List<HoodieDataFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",
dataFilesToRead.stream().findAny().isPresent());
validateDeltaCommit(firstInstant, fileIdToCompactionOperation, cfg);
}
int numRecords = records.size();
for (String instantTime : deltaInstants) {
records = dataGen.generateUpdates(instantTime, numRecords);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
createNextDeltaCommit(instantTime, records, client, metaClient, cfg, false);
validateDeltaCommit(instantTime, fileIdToCompactionOperation, cfg);
}
return records;
}
private void moveCompactionFromRequestedToInflight(String compactionInstantTime, HoodieWriteClient client,
HoodieWriteConfig cfg) throws IOException {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant compactionInstant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
HoodieCompactionPlan workload = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().getInstantAuxiliaryDetails(compactionInstant).get());
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
HoodieInstant instant = metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
.filter(in -> in.getTimestamp().equals(compactionInstantTime)).findAny().get();
assertTrue("Instant must be marked inflight", instant.isInflight());
}
private void scheduleCompaction(String compactionInstantTime, HoodieWriteClient client, HoodieWriteConfig cfg)
throws IOException {
client.scheduleCompactionAtInstant(compactionInstantTime, Optional.empty());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
HoodieInstant instant = metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
assertEquals("Last compaction instant must be the one set",
instant.getTimestamp(), compactionInstantTime);
}
private void scheduleAndExecuteCompaction(String compactionInstantTime,
HoodieWriteClient client, HoodieTable table, HoodieWriteConfig cfg, int expectedNumRecs,
boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
scheduleCompaction(compactionInstantTime, client, cfg);
executeCompaction(compactionInstantTime, client, table, cfg, expectedNumRecs, hasDeltaCommitAfterPendingCompaction);
}
private void executeCompaction(String compactionInstantTime,
HoodieWriteClient client, HoodieTable table, HoodieWriteConfig cfg, int expectedNumRecs,
boolean hasDeltaCommitAfterPendingCompaction) throws IOException {
client.compact(compactionInstantTime);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table, cfg);
assertTrue("Ensure latest file-slices are not empty", fileSliceList.stream().findAny().isPresent());
assertFalse("Verify all file-slices have base-instant same as compaction instant",
fileSliceList.stream().filter(fs -> !fs.getBaseInstantTime().equals(compactionInstantTime))
.findAny().isPresent());
assertFalse("Verify all file-slices have data-files",
fileSliceList.stream().filter(fs -> !fs.getDataFile().isPresent()).findAny().isPresent());
if (hasDeltaCommitAfterPendingCompaction) {
assertFalse("Verify all file-slices have atleast one log-file",
fileSliceList.stream().filter(fs -> fs.getLogFiles().count() == 0).findAny().isPresent());
} else {
assertFalse("Verify all file-slices have no log-files",
fileSliceList.stream().filter(fs -> fs.getLogFiles().count() > 0).findAny().isPresent());
}
// verify that there is a commit
table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true), cfg, jsc);
HoodieTimeline timeline = table.getMetaClient().getCommitTimeline().filterCompletedInstants();
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertEquals("Expect compaction instant time to be the latest commit time",
latestCompactionCommitTime, compactionInstantTime);
assertEquals("Must contain expected records", expectedNumRecs,
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count());
}
private List<WriteStatus> createNextDeltaCommit(String instantTime, List<HoodieRecord> records,
HoodieWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, boolean skipCommit) {
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
client.startCommitWithTime(instantTime);
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
List<WriteStatus> statusList = statuses.collect();
TestHoodieClientBase.assertNoWriteErrors(statusList);
if (!cfg.shouldAutoCommit() && !skipCommit) {
client.commit(instantTime, statuses);
}
Optional<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().reload().getDeltaCommitTimeline()
.filterCompletedInstants().lastInstant();
if (skipCommit && !cfg.shouldAutoCommit()) {
assertTrue("Delta commit should not be latest instant",
deltaCommit.get().getTimestamp().compareTo(instantTime) < 0);
} else {
assertTrue(deltaCommit.isPresent());
assertEquals("Delta commit should be latest instant", instantTime, deltaCommit.get().getTimestamp());
}
return statusList;
}
private List<HoodieDataFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
HoodieTableFileSystemView
view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitTimeline(), allFiles);
List<HoodieDataFile> dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList());
return dataFilesToRead;
}
private List<FileSlice> getCurrentLatestFileSlices(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
List<FileSlice> fileSliceList =
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).stream().flatMap(partition ->
view.getLatestFileSlices(partition)).collect(Collectors.toList());
return fileSliceList;
}
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
}

View File

@@ -26,6 +26,7 @@ import com.uber.hoodie.common.HoodieClientTestUtils;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
@@ -48,6 +49,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -71,6 +73,7 @@ public class TestHoodieClientBase implements Serializable {
public void init() throws IOException {
// Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieClient"));
jsc.setLogLevel("ERROR");
//SQLContext stuff
sqlContext = new SQLContext(jsc);
@@ -80,7 +83,14 @@ public class TestHoodieClientBase implements Serializable {
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
if (fs instanceof LocalFileSystem) {
LocalFileSystem lfs = (LocalFileSystem) fs;
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
// So, for the tests, we enforce checksum verification to circumvent the problem
lfs.setVerifyChecksum(true);
}
HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, getTableType());
dataGen = new HoodieTestDataGenerator();
}
@@ -112,7 +122,7 @@ public class TestHoodieClientBase implements Serializable {
*
* @param statuses List of Write Status
*/
void assertNoWriteErrors(List<WriteStatus> statuses) {
static void assertNoWriteErrors(List<WriteStatus> statuses) {
// Verify there are no errors
for (WriteStatus status : statuses) {
assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
@@ -433,4 +443,8 @@ public class TestHoodieClientBase implements Serializable {
R apply(T1 v1, T2 v2, T3 v3) throws IOException;
}
protected HoodieTableType getTableType() {
return HoodieTableType.COPY_ON_WRITE;
}
}

View File

@@ -35,6 +35,7 @@ import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieMemoryConfig;
import com.uber.hoodie.config.HoodieStorageConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.io.compact.HoodieCompactor;
@@ -90,7 +91,9 @@ public class TestHoodieCompactor {
}
private HoodieWriteConfig getConfig() {
return getConfigBuilder().build();
return getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.build();
}
private HoodieWriteConfig.Builder getConfigBuilder() {
@@ -103,12 +106,14 @@ public class TestHoodieCompactor {
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = HoodieNotSupportedException.class)
public void testCompactionOnCopyOnWriteFail() throws Exception {
HoodieTestUtils.initTableType(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
compactor.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
}
@Test
@@ -123,8 +128,9 @@ public class TestHoodieCompactor {
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
writeClient.insert(recordsRDD, newCommitTime).collect();
JavaRDD<WriteStatus> result = compactor
.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
JavaRDD<WriteStatus> result =
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
assertTrue("If there is nothing to compact, result will be empty", result.isEmpty());
}
@@ -171,8 +177,9 @@ public class TestHoodieCompactor {
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<WriteStatus> result = compactor
.compact(jsc, getConfig(), table, HoodieActiveTimeline.createNewCommitTime());
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
JavaRDD<WriteStatus> result =
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
// Verify that all partition paths are present in the WriteStatus result
for (String partitionPath : dataGen.getPartitionPaths()) {

View File

@@ -41,6 +41,7 @@ import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
@@ -202,7 +203,7 @@ public class TestMergeOnReadTable {
commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
String compactionCommitTime = client.startCompaction();
String compactionCommitTime = client.scheduleCompaction(Optional.empty()).get().toString();
client.compact(compactionCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
@@ -522,14 +523,15 @@ public class TestMergeOnReadTable {
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
String compactionCommit = client.startCompaction();
JavaRDD<WriteStatus> writeStatus = client.compact(compactionCommit);
client.commitCompaction(compactionCommit, writeStatus);
String compactionInstantTime = client.scheduleCompaction(Optional.empty()).get().toString();
JavaRDD<WriteStatus> ws = client.compact(compactionInstantTime);
client.commitCompaction(compactionInstantTime, ws, Optional.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
List<HoodieDataFile> dataFiles2 = roView.getLatestDataFiles().collect(Collectors.toList());
final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get()
.getTimestamp();
@@ -679,12 +681,13 @@ public class TestMergeOnReadTable {
}
}
// Do a compaction
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Mark 2nd delta-instant as completed
metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Optional.empty());
String commitTime = writeClient.startCompaction();
JavaRDD<WriteStatus> result = writeClient.compact(commitTime);
// Do a compaction
String compactionInstantTime = writeClient.scheduleCompaction(Optional.empty()).get().toString();
JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
// Verify that recently written compacted data file has no log file
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
@@ -741,9 +744,9 @@ public class TestMergeOnReadTable {
Assert.assertTrue(totalUpsertTime > 0);
// Do a compaction
String commitTime = writeClient.startCompaction();
statuses = writeClient.compact(commitTime);
writeClient.commitCompaction(commitTime, statuses);
String compactionInstantTime = writeClient.scheduleCompaction(Optional.empty()).get().toString();
statuses = writeClient.compact(compactionInstantTime);
writeClient.commitCompaction(compactionInstantTime, statuses, Optional.empty());
// total time taken for scanning log files should be greater than 0
long timeTakenForScanner = statuses.map(writeStatus -> writeStatus.getStat().getRuntimeStats().getTotalScanTime())
.reduce((a, b) -> a + b).longValue();
@@ -782,11 +785,11 @@ public class TestMergeOnReadTable {
Assert.assertTrue(numLogFiles > 0);
// Do a compaction
String commitTime = writeClient.startCompaction();
String commitTime = writeClient.scheduleCompaction(Optional.empty()).get().toString();
statuses = writeClient.compact(commitTime);
Assert.assertTrue(statuses.map(status -> status.getStat().getPath().contains("parquet")).count() == numLogFiles);
Assert.assertEquals(statuses.count(), numLogFiles);
writeClient.commitCompaction(commitTime, statuses);
writeClient.commitCompaction(commitTime, statuses, Optional.empty());
}
@Test
@@ -824,6 +827,8 @@ public class TestMergeOnReadTable {
writeClient.commit(newCommitTime, statuses);
// rollback a successful commit
// Sleep for small interval to force a new rollback start time.
Thread.sleep(5);
writeClient.rollback(newCommitTime);
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -875,12 +880,12 @@ public class TestMergeOnReadTable {
Assert.assertTrue(numLogFiles > 0);
// Do a compaction
newCommitTime = writeClient.startCompaction();
newCommitTime = writeClient.scheduleCompaction(Optional.empty()).get().toString();
statuses = writeClient.compact(newCommitTime);
// Ensure all log files have been compacted into parquet files
Assert.assertTrue(statuses.map(status -> status.getStat().getPath().contains("parquet")).count() == numLogFiles);
Assert.assertEquals(statuses.count(), numLogFiles);
writeClient.commitCompaction(newCommitTime, statuses);
writeClient.commitCompaction(newCommitTime, statuses, Optional.empty());
// Trigger a rollback of compaction
writeClient.rollback(newCommitTime);
for (String partitionPath : dataGen.getPartitionPaths()) {