Issue-329 : Refactoring TestHoodieClientOnCopyOnWriteStorage and adding test-cases
This commit is contained in:
committed by
vinoth chandar
parent
a4049329a5
commit
6c226ca21a
700
hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
Normal file
700
hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
Normal file
@@ -0,0 +1,700 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie;
|
||||
|
||||
import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
|
||||
import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
|
||||
import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.uber.hoodie.common.HoodieCleanStat;
|
||||
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
|
||||
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieFileGroup;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieTableType;
|
||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieCompactionConfig;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.scheduler.SparkListener;
|
||||
import org.apache.spark.scheduler.SparkListenerTaskEnd;
|
||||
import org.apache.spark.util.AccumulatorV2;
|
||||
import org.junit.Test;
|
||||
import scala.Option;
|
||||
import scala.collection.Iterator;
|
||||
|
||||
/**
|
||||
* Test Cleaning related logic
|
||||
*/
|
||||
public class TestCleaner extends TestHoodieClientBase {
|
||||
|
||||
private static final int BIG_BATCH_INSERT_SIZE = 500;
|
||||
private static Logger logger = LogManager.getLogger(TestHoodieClientBase.class);
|
||||
|
||||
/**
|
||||
* Helper method to do first batch of insert for clean by versions/commits tests
|
||||
*
|
||||
* @param cfg Hoodie Write Config
|
||||
* @param client Hoodie Client
|
||||
* @param recordGenFunction Function to generate records for insertion
|
||||
* @param insertFn Insertion API for testing
|
||||
* @throws Exception in case of error
|
||||
*/
|
||||
private void insertFirstBigBatchForClientCleanerTest(
|
||||
HoodieWriteConfig cfg,
|
||||
HoodieWriteClient client,
|
||||
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
|
||||
|
||||
/**
|
||||
* do a big insert
|
||||
* (this is basically same as insert part of upsert, just adding it here so we can
|
||||
* catch breakages in insert(), if the implementation diverges.)
|
||||
*/
|
||||
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
|
||||
String newCommitTime = client.startCommit();
|
||||
|
||||
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 5);
|
||||
|
||||
List<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
// verify that there is a commit
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
|
||||
// Should have 100 records in table (check using Index), all in locations marked at commit
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
|
||||
|
||||
assertFalse(table.getCompletedCommitTimeline().empty());
|
||||
String commitTime = table.getCompletedCommitTimeline().getInstants().findFirst().get().getTimestamp();
|
||||
assertFalse(table.getCompletedCleanTimeline().empty());
|
||||
assertEquals("The clean instant should be the same as the commit instant", commitTime,
|
||||
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp());
|
||||
|
||||
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table).collect();
|
||||
checkTaggedRecords(taggedRecords, newCommitTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-By-Versions using insert/upsert API
|
||||
*/
|
||||
@Test
|
||||
public void testInsertAndCleanByVersions() throws Exception {
|
||||
testInsertAndCleanByVersions(HoodieWriteClient::insert, HoodieWriteClient::upsert, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-By-Versions using prepped versions of insert/upsert API
|
||||
*/
|
||||
@Test
|
||||
public void testInsertPreppedAndCleanByVersions() throws Exception {
|
||||
testInsertAndCleanByVersions(HoodieWriteClient::insertPreppedRecords,
|
||||
HoodieWriteClient::upsertPreppedRecords, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-By-Versions using bulk-insert/upsert API
|
||||
*/
|
||||
@Test
|
||||
public void testBulkInsertAndCleanByVersions() throws Exception {
|
||||
testInsertAndCleanByVersions(HoodieWriteClient::bulkInsert, HoodieWriteClient::upsert, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-By-Versions using prepped versions of bulk-insert/upsert API
|
||||
*/
|
||||
@Test
|
||||
public void testBulkInsertPreppedAndCleanByVersions() throws Exception {
|
||||
testInsertAndCleanByVersions(
|
||||
(client, recordRDD, commitTime) -> client.bulkInsertPreppedRecords(recordRDD, commitTime, Option.empty()),
|
||||
HoodieWriteClient::upsertPreppedRecords, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective
|
||||
*
|
||||
* @param insertFn Insert API to be tested
|
||||
* @param upsertFn Upsert API to be tested
|
||||
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
|
||||
* record generation to also tag the regards (de-dupe is implicit as we use uniq record-gen APIs)
|
||||
* @throws Exception in case of errors
|
||||
*/
|
||||
private void testInsertAndCleanByVersions(
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> upsertFn,
|
||||
boolean isPreppedAPI
|
||||
) throws Exception {
|
||||
int maxVersions = 2; // keep upto 2 versions for each file
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
|
||||
.retainFileVersions(maxVersions).build())
|
||||
.withParallelism(1, 1).withBulkInsertParallelism(1)
|
||||
.build();
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
|
||||
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
|
||||
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
|
||||
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
|
||||
|
||||
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
|
||||
|
||||
// Keep doing some writes and clean inline. Make sure we have expected number of files
|
||||
// remaining.
|
||||
HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).stream().forEach(newCommitTime -> {
|
||||
try {
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100);
|
||||
|
||||
List<WriteStatus> statuses =
|
||||
upsertFn.apply(client, jsc.parallelize(records, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig());
|
||||
HoodieTimeline timeline = table.getCommitsTimeline();
|
||||
|
||||
TableFileSystemView fsView = table.getFileSystemView();
|
||||
// Need to ensure the following
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
// compute all the versions of all files, from time 0
|
||||
HashMap<String, TreeSet<String>> fileIdToVersions = new HashMap<>();
|
||||
for (HoodieInstant entry : timeline.getInstants().collect(Collectors.toList())) {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(timeline.getInstantDetails(entry).get());
|
||||
|
||||
for (HoodieWriteStat wstat : commitMetadata.getWriteStats(partitionPath)) {
|
||||
if (!fileIdToVersions.containsKey(wstat.getFileId())) {
|
||||
fileIdToVersions.put(wstat.getFileId(), new TreeSet<>());
|
||||
}
|
||||
fileIdToVersions.get(wstat.getFileId()).add(FSUtils.getCommitTime(new Path(wstat.getPath()).getName()));
|
||||
}
|
||||
}
|
||||
|
||||
List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
|
||||
|
||||
for (HoodieFileGroup fileGroup : fileGroups) {
|
||||
// No file has no more than max versions
|
||||
String fileId = fileGroup.getId();
|
||||
List<HoodieDataFile> dataFiles = fileGroup.getAllDataFiles().collect(Collectors.toList());
|
||||
|
||||
assertTrue("fileId " + fileId + " has more than " + maxVersions + " versions",
|
||||
dataFiles.size() <= maxVersions);
|
||||
|
||||
// Each file, has the latest N versions (i.e cleaning gets rid of older versions)
|
||||
List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId));
|
||||
for (int i = 0; i < dataFiles.size(); i++) {
|
||||
assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions,
|
||||
Iterables.get(dataFiles, i).getCommitTime(), commitedVersions.get(commitedVersions.size() - 1 - i));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-By-Versions using insert/upsert API
|
||||
*/
|
||||
@Test
|
||||
public void testInsertAndCleanByCommits() throws Exception {
|
||||
testInsertAndCleanByCommits(HoodieWriteClient::insert, HoodieWriteClient::upsert, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-By-Versions using prepped version of insert/upsert API
|
||||
*/
|
||||
@Test
|
||||
public void testInsertPreppedAndCleanByCommits() throws Exception {
|
||||
testInsertAndCleanByCommits(HoodieWriteClient::insertPreppedRecords,
|
||||
HoodieWriteClient::upsertPreppedRecords, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-By-Versions using prepped versions of bulk-insert/upsert API
|
||||
*/
|
||||
@Test
|
||||
public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
|
||||
testInsertAndCleanByCommits(
|
||||
(client, recordRDD, commitTime) -> client.bulkInsertPreppedRecords(recordRDD, commitTime, Option.empty()),
|
||||
HoodieWriteClient::upsertPreppedRecords, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-By-Versions using bulk-insert/upsert API
|
||||
*/
|
||||
@Test
|
||||
public void testBulkInsertAndCleanByCommits() throws Exception {
|
||||
testInsertAndCleanByCommits(HoodieWriteClient::bulkInsert, HoodieWriteClient::upsert, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective
|
||||
*
|
||||
* @param insertFn Insert API to be tested
|
||||
* @param upsertFn Upsert API to be tested
|
||||
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
|
||||
* record generation to also tag the regards (de-dupe is implicit as we use uniq record-gen APIs)
|
||||
* @throws Exception in case of errors
|
||||
*/
|
||||
private void testInsertAndCleanByCommits(
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> upsertFn,
|
||||
boolean isPreppedAPI
|
||||
) throws Exception {
|
||||
int maxCommits = 3; // keep upto 3 commits from the past
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder()
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
|
||||
.withParallelism(1, 1).withBulkInsertParallelism(1).build();
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
|
||||
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
|
||||
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
|
||||
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
|
||||
|
||||
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
|
||||
|
||||
// Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
|
||||
HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).stream().forEach(newCommitTime -> {
|
||||
try {
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100);
|
||||
|
||||
List<WriteStatus> statuses =
|
||||
upsertFn.apply(client, jsc.parallelize(records, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg);
|
||||
HoodieTimeline activeTimeline = table1.getCompletedCommitTimeline();
|
||||
Optional<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
|
||||
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
|
||||
if (earliestRetainedCommit.isPresent()) {
|
||||
acceptableCommits.removeAll(
|
||||
activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp()).getInstants()
|
||||
.collect(Collectors.toSet()));
|
||||
acceptableCommits.add(earliestRetainedCommit.get());
|
||||
}
|
||||
|
||||
TableFileSystemView fsView = table1.getFileSystemView();
|
||||
// Need to ensure the following
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
|
||||
for (HoodieFileGroup fileGroup : fileGroups) {
|
||||
Set<String> commitTimes = new HashSet<>();
|
||||
fileGroup.getAllDataFiles().forEach(value -> {
|
||||
logger.debug("Data File - " + value);
|
||||
commitTimes.add(value.getCommitTime());
|
||||
});
|
||||
assertEquals("Only contain acceptable versions of file should be present",
|
||||
acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes);
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException(ioe);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by versions logic
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestFileVersions() throws IOException {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy(
|
||||
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
|
||||
.build();
|
||||
|
||||
// make 1 commit, with 1 file per partition
|
||||
HoodieTestUtils.createCommitFiles(basePath, "000");
|
||||
|
||||
String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "000");
|
||||
String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "000");
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
|
||||
assertEquals("Must not clean any files", 0,
|
||||
getCleanStat(hoodieCleanStatsOne, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertEquals("Must not clean any files", 0,
|
||||
getCleanStat(hoodieCleanStatsOne, DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_SECOND_PARTITION_PATH, "000", file1P1C0));
|
||||
|
||||
// make next commit, with 1 insert & 1 update per partition
|
||||
HoodieTestUtils.createCommitFiles(basePath, "001");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), config);
|
||||
|
||||
String file2P0C1 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
|
||||
String file2P1C1 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsTwo = table.clean(jsc);
|
||||
assertEquals("Must clean 1 file", 1,
|
||||
getCleanStat(hoodieCleanStatsTwo, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertEquals("Must clean 1 file", 1,
|
||||
getCleanStat(hoodieCleanStatsTwo, DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_SECOND_PARTITION_PATH, "001", file2P1C1));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_SECOND_PARTITION_PATH, "000", file1P1C0));
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createCommitFiles(basePath, "002");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config);
|
||||
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update
|
||||
String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "002");
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsThree = table.clean(jsc);
|
||||
assertEquals("Must clean two files", 2,
|
||||
getCleanStat(hoodieCleanStatsThree, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file3P0C2));
|
||||
|
||||
// No cleaning on partially written file, with no commit.
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update
|
||||
List<HoodieCleanStat> hoodieCleanStatsFour = table.clean(jsc);
|
||||
assertEquals("Must not clean any files", 0,
|
||||
getCleanStat(hoodieCleanStatsFour, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file3P0C2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestFileVersionsMOR() throws IOException {
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy(
|
||||
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
|
||||
.build();
|
||||
|
||||
HoodieTableMetaClient metaClient = HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath,
|
||||
HoodieTableType.MERGE_ON_READ);
|
||||
|
||||
// Make 3 files, one base file and 2 log files associated with base file
|
||||
String file1P0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "000");
|
||||
String file2P0L0 = HoodieTestUtils
|
||||
.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Optional.empty());
|
||||
String file2P0L1 = HoodieTestUtils
|
||||
.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Optional.of(2));
|
||||
// make 1 compaction commit
|
||||
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000");
|
||||
|
||||
// Make 4 files, one base file and 3 log files associated with base file
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0);
|
||||
file2P0L0 = HoodieTestUtils
|
||||
.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0, Optional.empty());
|
||||
file2P0L0 = HoodieTestUtils
|
||||
.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0, Optional.of(2));
|
||||
file2P0L0 = HoodieTestUtils
|
||||
.createNewLogFile(fs, basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0, Optional.of(3));
|
||||
// make 1 compaction commit
|
||||
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001");
|
||||
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config);
|
||||
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
|
||||
assertEquals("Must clean three files, one parquet and 2 log files", 3,
|
||||
getCleanStat(hoodieCleanStats, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0));
|
||||
assertFalse(
|
||||
HoodieTestUtils.doesLogFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file2P0L0, Optional.empty()));
|
||||
assertFalse(
|
||||
HoodieTestUtils.doesLogFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file2P0L0, Optional.of(2)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestCommits() throws IOException {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy(
|
||||
HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()).build();
|
||||
|
||||
// make 1 commit, with 1 file per partition
|
||||
HoodieTestUtils.createCommitFiles(basePath, "000");
|
||||
|
||||
String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "000");
|
||||
String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "000");
|
||||
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
|
||||
assertEquals("Must not clean any files", 0,
|
||||
getCleanStat(hoodieCleanStatsOne, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertEquals("Must not clean any files", 0,
|
||||
getCleanStat(hoodieCleanStatsOne, DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_SECOND_PARTITION_PATH, "000", file1P1C0));
|
||||
|
||||
// make next commit, with 1 insert & 1 update per partition
|
||||
HoodieTestUtils.createCommitFiles(basePath, "001");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config);
|
||||
|
||||
String file2P0C1 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
|
||||
String file2P1C1 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsTwo = table.clean(jsc);
|
||||
assertEquals("Must not clean any files", 0,
|
||||
getCleanStat(hoodieCleanStatsTwo, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertEquals("Must not clean any files", 0,
|
||||
getCleanStat(hoodieCleanStatsTwo, DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_SECOND_PARTITION_PATH, "001", file2P1C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_SECOND_PARTITION_PATH, "000", file1P1C0));
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createCommitFiles(basePath, "002");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config);
|
||||
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update
|
||||
String file3P0C2 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "002");
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsThree = table.clean(jsc);
|
||||
assertEquals("Must not clean any file. We have to keep 1 version before the latest commit time to keep", 0,
|
||||
getCleanStat(hoodieCleanStatsThree, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0));
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createCommitFiles(basePath, "003");
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config);
|
||||
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update
|
||||
String file4P0C3 = HoodieTestUtils.createNewDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "003");
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsFour = table.clean(jsc);
|
||||
assertEquals("Must not clean one old file", 1,
|
||||
getCleanStat(hoodieCleanStatsFour, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "000", file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "002", file3P0C2));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "003", file4P0C3));
|
||||
|
||||
// No cleaning on partially written file, with no commit.
|
||||
HoodieTestUtils.createDataFile(basePath, DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update
|
||||
List<HoodieCleanStat> hoodieCleanStatsFive = table.clean(jsc);
|
||||
assertEquals("Must not clean any files", 0,
|
||||
getCleanStat(hoodieCleanStatsFive, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, DEFAULT_FIRST_PARTITION_PATH, "001", file2P0C1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Cleaning functionality of table.rollback() API.
|
||||
*/
|
||||
@Test
|
||||
public void testCleanTemporaryDataFilesOnRollback() throws IOException {
|
||||
HoodieTestUtils.createCommitFiles(basePath, "000");
|
||||
List<String> tempFiles = createTempFiles("000", 10);
|
||||
assertEquals("Some temp files are created.", 10, tempFiles.size());
|
||||
assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles());
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
|
||||
.withUseTempFolderCopyOnWriteForCreate(false)
|
||||
.withUseTempFolderCopyOnWriteForMerge(false).build();
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config);
|
||||
table.rollback(jsc, Collections.emptyList());
|
||||
assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles());
|
||||
|
||||
config = HoodieWriteConfig.newBuilder().withPath(basePath).withUseTempFolderCopyOnWriteForCreate(true)
|
||||
.withUseTempFolderCopyOnWriteForMerge(false).build();
|
||||
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
|
||||
config);
|
||||
table.rollback(jsc, Collections.emptyList());
|
||||
assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test CLeaner Stat when there are no partition paths.
|
||||
*/
|
||||
@Test
|
||||
public void testCleaningWithZeroPartitonPaths() throws IOException {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy(
|
||||
HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()).build();
|
||||
|
||||
// Make a commit, although there are no partitionPaths.
|
||||
// Example use-case of this is when a client wants to create a table
|
||||
// with just some commit metadata, but no data/partitionPaths.
|
||||
HoodieTestUtils.createCommitFiles(basePath, "000");
|
||||
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
|
||||
assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Clean-by-commits behavior in the presence of skewed partitions
|
||||
*/
|
||||
@Test
|
||||
public void testCleaningSkewedPartitons() throws IOException {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCleanerPolicy(
|
||||
HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()).build();
|
||||
Map<Long, Long> stageOneShuffleReadTaskRecordsCountMap = new HashMap<>();
|
||||
|
||||
// Since clean involves repartition in order to uniformly distribute data,
|
||||
// we can inspect the number of records read by various tasks in stage 1.
|
||||
// There should not be skew in the number of records read in the task.
|
||||
|
||||
// SparkListener below listens to the stage end events and captures number of
|
||||
// records read by various tasks in stage-1.
|
||||
jsc.sc().addSparkListener(new SparkListener() {
|
||||
|
||||
@Override
|
||||
public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
|
||||
|
||||
Iterator<AccumulatorV2<?, ?>> iterator = taskEnd.taskMetrics().accumulators().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
AccumulatorV2 accumulator = iterator.next();
|
||||
if (taskEnd.stageId() == 1 && accumulator.isRegistered() && accumulator.name().isDefined()
|
||||
&& accumulator.name().get().equals("internal.metrics.shuffle.read.recordsRead")) {
|
||||
stageOneShuffleReadTaskRecordsCountMap.put(taskEnd.taskInfo().taskId(), (Long) accumulator.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// make 1 commit, with 100 files in one partition and 10 in other two
|
||||
HoodieTestUtils.createCommitFiles(basePath, "000");
|
||||
List<String> filesP0C0 = createFilesInPartition(DEFAULT_FIRST_PARTITION_PATH, "000", 100);
|
||||
List<String> filesP1C0 = createFilesInPartition(DEFAULT_SECOND_PARTITION_PATH, "000", 10);
|
||||
List<String> filesP2C0 = createFilesInPartition(DEFAULT_THIRD_PARTITION_PATH, "000", 10);
|
||||
|
||||
HoodieTestUtils.createCommitFiles(basePath, "001");
|
||||
updateAllFilesInPartition(filesP0C0, DEFAULT_FIRST_PARTITION_PATH, "001");
|
||||
updateAllFilesInPartition(filesP1C0, DEFAULT_SECOND_PARTITION_PATH, "001");
|
||||
updateAllFilesInPartition(filesP2C0, DEFAULT_THIRD_PARTITION_PATH, "001");
|
||||
|
||||
HoodieTestUtils.createCommitFiles(basePath, "002");
|
||||
updateAllFilesInPartition(filesP0C0, DEFAULT_FIRST_PARTITION_PATH, "002");
|
||||
updateAllFilesInPartition(filesP1C0, DEFAULT_SECOND_PARTITION_PATH, "002");
|
||||
updateAllFilesInPartition(filesP2C0, DEFAULT_THIRD_PARTITION_PATH, "002");
|
||||
|
||||
HoodieTestUtils.createCommitFiles(basePath, "003");
|
||||
updateAllFilesInPartition(filesP0C0, DEFAULT_FIRST_PARTITION_PATH, "003");
|
||||
updateAllFilesInPartition(filesP1C0, DEFAULT_SECOND_PARTITION_PATH, "003");
|
||||
updateAllFilesInPartition(filesP2C0, DEFAULT_THIRD_PARTITION_PATH, "003");
|
||||
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config);
|
||||
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
|
||||
|
||||
assertEquals(100, getCleanStat(hoodieCleanStats, DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertEquals(10, getCleanStat(hoodieCleanStats, DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
assertEquals(10, getCleanStat(hoodieCleanStats, DEFAULT_THIRD_PARTITION_PATH).getSuccessDeleteFiles().size());
|
||||
|
||||
// 3 tasks are expected since the number of partitions is 3
|
||||
assertEquals(3, stageOneShuffleReadTaskRecordsCountMap.keySet().size());
|
||||
// Sum of all records processed = total number of files to clean
|
||||
assertEquals(120,
|
||||
stageOneShuffleReadTaskRecordsCountMap.values().stream().reduce((a, b) -> a + b).get().intValue());
|
||||
assertTrue("The skew in handling files to clean is not removed. "
|
||||
+ "Each task should handle more records than the partitionPath with least files "
|
||||
+ "and less records than the partitionPath with most files.",
|
||||
stageOneShuffleReadTaskRecordsCountMap.values().stream().filter(a -> a > 10 && a < 100).count() == 3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to create temporary data files
|
||||
*
|
||||
* @param commitTime Commit Timestamp
|
||||
* @param numFiles Number for files to be generated
|
||||
* @return generated files
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
private List<String> createTempFiles(String commitTime, int numFiles) throws IOException {
|
||||
List<String> files = new ArrayList<>();
|
||||
for (int i = 0; i < numFiles; i++) {
|
||||
files.add(HoodieTestUtils.createNewDataFile(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, commitTime));
|
||||
}
|
||||
return files;
|
||||
}
|
||||
|
||||
/***
|
||||
* Helper method to return temporary files count
|
||||
* @return Number of temporary files found
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
private int getTotalTempFiles() throws IOException {
|
||||
return fs.listStatus(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)).length;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,320 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
||||
import com.uber.hoodie.common.model.HoodieCleaningPolicy;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieInstant;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieCompactionConfig;
|
||||
import com.uber.hoodie.config.HoodieIndexConfig;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieRollbackException;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test Cases for rollback of snapshots and commits
|
||||
*/
|
||||
public class TestClientRollback extends TestHoodieClientBase {
|
||||
|
||||
/**
|
||||
* Test case for rollback-savepoint interaction
|
||||
*/
|
||||
@Test
|
||||
public void testSavepointAndRollback() throws Exception {
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(
|
||||
HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1)
|
||||
.build()).build();
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
|
||||
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
|
||||
|
||||
/**
|
||||
* Write 1 (only inserts)
|
||||
*/
|
||||
String newCommitTime = "001";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
/**
|
||||
* Write 2 (updates)
|
||||
*/
|
||||
newCommitTime = "002";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
records = dataGen.generateUpdates(newCommitTime, records);
|
||||
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
client.savepoint("hoodie-unit-test", "test");
|
||||
|
||||
/**
|
||||
* Write 3 (updates)
|
||||
*/
|
||||
newCommitTime = "003";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
records = dataGen.generateUpdates(newCommitTime, records);
|
||||
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(),
|
||||
getConfig().shouldAssumeDatePartitioning());
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
|
||||
final TableFileSystemView.ReadOptimizedView view1 = table.getROFileSystemView();
|
||||
|
||||
List<HoodieDataFile> dataFiles = partitionPaths.stream().flatMap(s -> {
|
||||
return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("003"));
|
||||
}).collect(Collectors.toList());
|
||||
assertEquals("The data files for commit 003 should be present", 3, dataFiles.size());
|
||||
|
||||
dataFiles = partitionPaths.stream().flatMap(s -> {
|
||||
return view1.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
|
||||
}).collect(Collectors.toList());
|
||||
assertEquals("The data files for commit 002 should be present", 3, dataFiles.size());
|
||||
|
||||
/**
|
||||
* Write 4 (updates)
|
||||
*/
|
||||
newCommitTime = "004";
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
records = dataGen.generateUpdates(newCommitTime, records);
|
||||
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
table = HoodieTable.getHoodieTable(metaClient, getConfig());
|
||||
final TableFileSystemView.ReadOptimizedView view2 = table.getROFileSystemView();
|
||||
|
||||
dataFiles = partitionPaths.stream().flatMap(s -> {
|
||||
return view2.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("004"));
|
||||
}).collect(Collectors.toList());
|
||||
assertEquals("The data files for commit 004 should be present", 3, dataFiles.size());
|
||||
|
||||
// rolling back to a non existent savepoint must not succeed
|
||||
try {
|
||||
client.rollbackToSavepoint("001");
|
||||
fail("Rolling back to non-existent savepoint should not be allowed");
|
||||
} catch (HoodieRollbackException e) {
|
||||
// this is good
|
||||
}
|
||||
|
||||
// rollback to savepoint 002
|
||||
HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
|
||||
client.rollbackToSavepoint(savepoint.getTimestamp());
|
||||
|
||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
table = HoodieTable.getHoodieTable(metaClient, getConfig());
|
||||
final TableFileSystemView.ReadOptimizedView view3 = table.getROFileSystemView();
|
||||
dataFiles = partitionPaths.stream().flatMap(s -> {
|
||||
return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("002"));
|
||||
}).collect(Collectors.toList());
|
||||
assertEquals("The data files for commit 002 be available", 3, dataFiles.size());
|
||||
|
||||
dataFiles = partitionPaths.stream().flatMap(s -> {
|
||||
return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("003"));
|
||||
}).collect(Collectors.toList());
|
||||
assertEquals("The data files for commit 003 should be rolled back", 0, dataFiles.size());
|
||||
|
||||
dataFiles = partitionPaths.stream().flatMap(s -> {
|
||||
return view3.getAllDataFiles(s).filter(f -> f.getCommitTime().equals("004"));
|
||||
}).collect(Collectors.toList());
|
||||
assertEquals("The data files for commit 004 should be rolled back", 0, dataFiles.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Cases for effects of rollbacking completed/inflight commits
|
||||
*/
|
||||
@Test
|
||||
public void testRollbackCommit() throws Exception {
|
||||
// Let's create some commit files and parquet files
|
||||
String commitTime1 = "20160501010101";
|
||||
String commitTime2 = "20160502020601";
|
||||
String commitTime3 = "20160506030611";
|
||||
new File(basePath + "/.hoodie").mkdirs();
|
||||
HoodieTestDataGenerator
|
||||
.writePartitionMetadata(fs, new String[]{"2016/05/01", "2016/05/02", "2016/05/06"}, basePath);
|
||||
|
||||
// Only first two have commit files
|
||||
HoodieTestUtils.createCommitFiles(basePath, commitTime1, commitTime2);
|
||||
// Third one has a .inflight intermediate commit file
|
||||
HoodieTestUtils.createInflightCommitFiles(basePath, commitTime3);
|
||||
|
||||
// Make commit1
|
||||
String file11 = HoodieTestUtils.createDataFile(basePath, "2016/05/01", commitTime1, "id11");
|
||||
String file12 = HoodieTestUtils.createDataFile(basePath, "2016/05/02", commitTime1, "id12");
|
||||
String file13 = HoodieTestUtils.createDataFile(basePath, "2016/05/06", commitTime1, "id13");
|
||||
|
||||
// Make commit2
|
||||
String file21 = HoodieTestUtils.createDataFile(basePath, "2016/05/01", commitTime2, "id21");
|
||||
String file22 = HoodieTestUtils.createDataFile(basePath, "2016/05/02", commitTime2, "id22");
|
||||
String file23 = HoodieTestUtils.createDataFile(basePath, "2016/05/06", commitTime2, "id23");
|
||||
|
||||
// Make commit3
|
||||
String file31 = HoodieTestUtils.createDataFile(basePath, "2016/05/01", commitTime3, "id31");
|
||||
String file32 = HoodieTestUtils.createDataFile(basePath, "2016/05/02", commitTime3, "id32");
|
||||
String file33 = HoodieTestUtils.createDataFile(basePath, "2016/05/06", commitTime3, "id33");
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(
|
||||
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
||||
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, config, false);
|
||||
|
||||
// Rollback commit 1 (this should fail, since commit2 is still around)
|
||||
try {
|
||||
client.rollback(commitTime1);
|
||||
assertTrue("Should have thrown an exception ", false);
|
||||
} catch (HoodieRollbackException hrbe) {
|
||||
// should get here
|
||||
}
|
||||
|
||||
// Rollback commit3
|
||||
client.rollback(commitTime3);
|
||||
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime3, file31)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime3, file32)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime3, file33));
|
||||
|
||||
// simulate partial failure, where .inflight was not deleted, but data files were.
|
||||
HoodieTestUtils.createInflightCommitFiles(basePath, commitTime3);
|
||||
client.rollback(commitTime3);
|
||||
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3));
|
||||
|
||||
// Rollback commit2
|
||||
client.rollback(commitTime2);
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, commitTime2));
|
||||
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime2, file21)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime2, file22)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime2, file23));
|
||||
|
||||
// simulate partial failure, where only .commit => .inflight renaming succeeded, leaving a
|
||||
// .inflight commit and a bunch of data files around.
|
||||
HoodieTestUtils.createInflightCommitFiles(basePath, commitTime2);
|
||||
file21 = HoodieTestUtils.createDataFile(basePath, "2016/05/01", commitTime2, "id21");
|
||||
file22 = HoodieTestUtils.createDataFile(basePath, "2016/05/02", commitTime2, "id22");
|
||||
file23 = HoodieTestUtils.createDataFile(basePath, "2016/05/06", commitTime2, "id23");
|
||||
|
||||
client.rollback(commitTime2);
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, commitTime2));
|
||||
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime2, file21)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime2, file22)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime2, file23));
|
||||
|
||||
// Let's rollback commit1, Check results
|
||||
client.rollback(commitTime1);
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
|
||||
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime1));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime1, file11)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test auto-rollback of commits which are in flight
|
||||
*/
|
||||
@Test
|
||||
public void testAutoRollbackInflightCommit() throws Exception {
|
||||
// Let's create some commit files and parquet files
|
||||
String commitTime1 = "20160501010101";
|
||||
String commitTime2 = "20160502020601";
|
||||
String commitTime3 = "20160506030611";
|
||||
new File(basePath + "/.hoodie").mkdirs();
|
||||
HoodieTestDataGenerator
|
||||
.writePartitionMetadata(fs, new String[]{"2016/05/01", "2016/05/02", "2016/05/06"}, basePath);
|
||||
|
||||
// One good commit
|
||||
HoodieTestUtils.createCommitFiles(basePath, commitTime1);
|
||||
// Two inflight commits
|
||||
HoodieTestUtils.createInflightCommitFiles(basePath, commitTime2, commitTime3);
|
||||
|
||||
// Make commit1
|
||||
String file11 = HoodieTestUtils.createDataFile(basePath, "2016/05/01", commitTime1, "id11");
|
||||
String file12 = HoodieTestUtils.createDataFile(basePath, "2016/05/02", commitTime1, "id12");
|
||||
String file13 = HoodieTestUtils.createDataFile(basePath, "2016/05/06", commitTime1, "id13");
|
||||
|
||||
// Make commit2
|
||||
String file21 = HoodieTestUtils.createDataFile(basePath, "2016/05/01", commitTime2, "id21");
|
||||
String file22 = HoodieTestUtils.createDataFile(basePath, "2016/05/02", commitTime2, "id22");
|
||||
String file23 = HoodieTestUtils.createDataFile(basePath, "2016/05/06", commitTime2, "id23");
|
||||
|
||||
// Make commit3
|
||||
String file31 = HoodieTestUtils.createDataFile(basePath, "2016/05/01", commitTime3, "id31");
|
||||
String file32 = HoodieTestUtils.createDataFile(basePath, "2016/05/02", commitTime3, "id32");
|
||||
String file33 = HoodieTestUtils.createDataFile(basePath, "2016/05/06", commitTime3, "id33");
|
||||
|
||||
// Turn auto rollback off
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(
|
||||
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
|
||||
|
||||
new HoodieWriteClient(jsc, config, false);
|
||||
|
||||
// Check results, nothing changed
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
|
||||
assertTrue(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
|
||||
assertTrue(HoodieTestUtils.doesInflightExist(basePath, commitTime3));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime3, file31)
|
||||
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime3, file32)
|
||||
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime3, file33));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime2, file21)
|
||||
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime2, file22)
|
||||
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime2, file23));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime1, file11)
|
||||
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12)
|
||||
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13));
|
||||
|
||||
// Turn auto rollback on
|
||||
new HoodieWriteClient(jsc, config, true);
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
|
||||
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
|
||||
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime3, file31)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime3, file32)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime3, file33));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime2, file21)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime2, file22)
|
||||
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime2, file23));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime1, file11)
|
||||
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12)
|
||||
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,436 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie;
|
||||
|
||||
import static com.uber.hoodie.common.HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.uber.hoodie.common.HoodieCleanStat;
|
||||
import com.uber.hoodie.common.HoodieClientTestUtils;
|
||||
import com.uber.hoodie.common.HoodieTestDataGenerator;
|
||||
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieCompactionConfig;
|
||||
import com.uber.hoodie.config.HoodieIndexConfig;
|
||||
import com.uber.hoodie.config.HoodieStorageConfig;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.index.HoodieIndex;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
/**
|
||||
* Base Class providing setup/cleanup and utility methods for testing Hoodie Client facing tests
|
||||
*/
|
||||
public class TestHoodieClientBase implements Serializable {
|
||||
|
||||
protected transient JavaSparkContext jsc = null;
|
||||
protected transient SQLContext sqlContext;
|
||||
protected transient FileSystem fs;
|
||||
protected String basePath = null;
|
||||
protected transient HoodieTestDataGenerator dataGen = null;
|
||||
|
||||
@Before
|
||||
public void init() throws IOException {
|
||||
// Initialize a local spark env
|
||||
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieClient"));
|
||||
|
||||
//SQLContext stuff
|
||||
sqlContext = new SQLContext(jsc);
|
||||
|
||||
// Create a temp folder as the base path
|
||||
TemporaryFolder folder = new TemporaryFolder();
|
||||
folder.create();
|
||||
basePath = folder.getRoot().getAbsolutePath();
|
||||
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
|
||||
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
|
||||
dataGen = new HoodieTestDataGenerator();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Default HoodieWriteConfig for tests
|
||||
*
|
||||
* @return Default Hoodie Write Config for tests
|
||||
*/
|
||||
protected HoodieWriteConfig getConfig() {
|
||||
return getConfigBuilder().build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Config builder with default configs set
|
||||
*
|
||||
* @return Config Builder
|
||||
*/
|
||||
HoodieWriteConfig.Builder getConfigBuilder() {
|
||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
|
||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
|
||||
.forTable("test-trip-table")
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert no failures in writing hoodie files
|
||||
*
|
||||
* @param statuses List of Write Status
|
||||
*/
|
||||
void assertNoWriteErrors(List<WriteStatus> statuses) {
|
||||
// Verify there are no errors
|
||||
for (WriteStatus status : statuses) {
|
||||
assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure presence of partition meta-data at known depth
|
||||
*
|
||||
* @param partitionPaths Partition paths to check
|
||||
* @param fs File System
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
|
||||
for (String partitionPath : partitionPaths) {
|
||||
assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
|
||||
HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
|
||||
pmeta.readFromFS();
|
||||
assertEquals(DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure records have location field set
|
||||
*
|
||||
* @param taggedRecords Tagged Records
|
||||
* @param commitTime Commit Timestamp
|
||||
*/
|
||||
void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
|
||||
for (HoodieRecord rec : taggedRecords) {
|
||||
assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
|
||||
assertEquals("All records should have commit time " + commitTime + ", since updates were made",
|
||||
rec.getCurrentLocation().getCommitTime(), commitTime);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that there is no duplicate key at the partition level
|
||||
*
|
||||
* @param records List of Hoodie records
|
||||
*/
|
||||
void assertNodupesWithinPartition(List<HoodieRecord> records) {
|
||||
Map<String, Set<String>> partitionToKeys = new HashMap<>();
|
||||
for (HoodieRecord r : records) {
|
||||
String key = r.getRecordKey();
|
||||
String partitionPath = r.getPartitionPath();
|
||||
if (!partitionToKeys.containsKey(partitionPath)) {
|
||||
partitionToKeys.put(partitionPath, new HashSet<>());
|
||||
}
|
||||
assertTrue("key " + key + " is duplicate within partition " + partitionPath,
|
||||
!partitionToKeys.get(partitionPath).contains(key));
|
||||
partitionToKeys.get(partitionPath).add(key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records
|
||||
* to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
|
||||
* guaranteed by record-generation function itself.
|
||||
*
|
||||
* @param writeConfig Hoodie Write Config
|
||||
* @param recordGenFunction Records Generation function
|
||||
* @return Wrapped function
|
||||
*/
|
||||
private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
|
||||
final HoodieWriteConfig writeConfig,
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
|
||||
return (commit, numRecords) -> {
|
||||
final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
|
||||
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
|
||||
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
|
||||
HoodieTable.getHoodieTable(metaClient, writeConfig);
|
||||
JavaRDD<HoodieRecord> taggedRecords =
|
||||
index.tagLocation(jsc.parallelize(records, 1), HoodieTable.getHoodieTable(metaClient, writeConfig));
|
||||
return taggedRecords.collect();
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate wrapper for record generation function for testing Prepped APIs
|
||||
*
|
||||
* @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
|
||||
* @param writeConfig Hoodie Write Config
|
||||
* @param wrapped Actual Records Generation function
|
||||
* @return Wrapped Function
|
||||
*/
|
||||
Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(
|
||||
boolean isPreppedAPI,
|
||||
HoodieWriteConfig writeConfig,
|
||||
Function2<List<HoodieRecord>, String, Integer> wrapped) {
|
||||
if (isPreppedAPI) {
|
||||
return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
|
||||
} else {
|
||||
return wrapped;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to insert first batch of records and do regular assertions on the state after successful completion
|
||||
*
|
||||
* @param writeConfig Hoodie Write Config
|
||||
* @param client Hoodie Write Client
|
||||
* @param newCommitTime New Commit Timestamp to be used
|
||||
* @param initCommitTime Begin Timestamp (usually "000")
|
||||
* @param numRecordsInThisCommit Number of records to be added in the new commit
|
||||
* @param writeFn Write Function to be used for insertion
|
||||
* @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
|
||||
* @param assertForCommit Enable Assertion of Writes
|
||||
* @param expRecordsInThisCommit Expected number of records in this commit
|
||||
* @return RDD of write-status
|
||||
* @throws Exception in case of error
|
||||
*/
|
||||
JavaRDD<WriteStatus> insertFirstBatch(
|
||||
HoodieWriteConfig writeConfig,
|
||||
HoodieWriteClient client,
|
||||
String newCommitTime,
|
||||
String initCommitTime,
|
||||
int numRecordsInThisCommit,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
|
||||
boolean isPreppedAPI,
|
||||
boolean assertForCommit,
|
||||
int expRecordsInThisCommit) throws Exception {
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
|
||||
generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
|
||||
|
||||
return writeBatch(client, newCommitTime, initCommitTime, Optional.empty(), initCommitTime,
|
||||
numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit,
|
||||
expRecordsInThisCommit, expRecordsInThisCommit, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to upsert batch of records and do regular assertions on the state after successful completion
|
||||
*
|
||||
* @param writeConfig Hoodie Write Config
|
||||
* @param client Hoodie Write Client
|
||||
* @param newCommitTime New Commit Timestamp to be used
|
||||
* @param prevCommitTime Commit Timestamp used in previous commit
|
||||
* @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
|
||||
* @param initCommitTime Begin Timestamp (usually "000")
|
||||
* @param numRecordsInThisCommit Number of records to be added in the new commit
|
||||
* @param writeFn Write Function to be used for upsert
|
||||
* @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
|
||||
* @param assertForCommit Enable Assertion of Writes
|
||||
* @param expRecordsInThisCommit Expected number of records in this commit
|
||||
* @param expTotalRecords Expected number of records when scanned
|
||||
* @param expTotalCommits Expected number of commits (including this commit)
|
||||
* @return RDD of write-status
|
||||
* @throws Exception in case of error
|
||||
*/
|
||||
JavaRDD<WriteStatus> updateBatch(
|
||||
HoodieWriteConfig writeConfig,
|
||||
HoodieWriteClient client,
|
||||
String newCommitTime,
|
||||
String prevCommitTime,
|
||||
Optional<List<String>> commitTimesBetweenPrevAndNew,
|
||||
String initCommitTime,
|
||||
int numRecordsInThisCommit,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
|
||||
boolean isPreppedAPI,
|
||||
boolean assertForCommit,
|
||||
int expRecordsInThisCommit,
|
||||
int expTotalRecords,
|
||||
int expTotalCommits)
|
||||
throws Exception {
|
||||
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
|
||||
generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
|
||||
|
||||
return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
|
||||
numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit,
|
||||
expRecordsInThisCommit, expTotalRecords, expTotalCommits);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to insert/upsert batch of records and do regular assertions on the state after successful completion
|
||||
*
|
||||
* @param client Hoodie Write Client
|
||||
* @param newCommitTime New Commit Timestamp to be used
|
||||
* @param prevCommitTime Commit Timestamp used in previous commit
|
||||
* @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
|
||||
* @param initCommitTime Begin Timestamp (usually "000")
|
||||
* @param numRecordsInThisCommit Number of records to be added in the new commit
|
||||
* @param recordGenFunction Records Generation Function
|
||||
* @param writeFn Write Function to be used for upsert
|
||||
* @param assertForCommit Enable Assertion of Writes
|
||||
* @param expRecordsInThisCommit Expected number of records in this commit
|
||||
* @param expTotalRecords Expected number of records when scanned
|
||||
* @param expTotalCommits Expected number of commits (including this commit)
|
||||
* @throws Exception in case of error
|
||||
*/
|
||||
JavaRDD<WriteStatus> writeBatch(
|
||||
HoodieWriteClient client,
|
||||
String newCommitTime,
|
||||
String prevCommitTime,
|
||||
Optional<List<String>> commitTimesBetweenPrevAndNew,
|
||||
String initCommitTime,
|
||||
int numRecordsInThisCommit,
|
||||
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
|
||||
boolean assertForCommit,
|
||||
int expRecordsInThisCommit,
|
||||
int expTotalRecords,
|
||||
int expTotalCommits)
|
||||
throws Exception {
|
||||
|
||||
//Write 1 (only inserts)
|
||||
client.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
|
||||
JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
|
||||
List<WriteStatus> statuses = result.collect();
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
// check the partition metadata is written out
|
||||
assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs);
|
||||
|
||||
// verify that there is a commit
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||
|
||||
if (assertForCommit) {
|
||||
assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
|
||||
timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
|
||||
assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
|
||||
timeline.lastInstant().get().getTimestamp());
|
||||
assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
|
||||
HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
|
||||
|
||||
// Check the entire dataset has all records still
|
||||
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
||||
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
|
||||
}
|
||||
assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
|
||||
HoodieClientTestUtils.read(basePath, sqlContext, fs, fullPartitionPaths).count());
|
||||
|
||||
// Check that the incremental consumption from prevCommitTime
|
||||
assertEquals("Incremental consumption from " + prevCommitTime
|
||||
+ " should give all records in latest commit",
|
||||
HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
|
||||
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
|
||||
if (commitTimesBetweenPrevAndNew.isPresent()) {
|
||||
commitTimesBetweenPrevAndNew.get().forEach(ct -> {
|
||||
assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
|
||||
HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
|
||||
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
|
||||
});
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@After
|
||||
public void clean() {
|
||||
if (basePath != null) {
|
||||
new File(basePath).delete();
|
||||
}
|
||||
if (jsc != null) {
|
||||
jsc.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Cleaner state corresponding to a partition path
|
||||
*
|
||||
* @param hoodieCleanStatsTwo List of Clean Stats
|
||||
* @param partitionPath Partition path for filtering
|
||||
* @return Cleaner state corresponding to partition path
|
||||
*/
|
||||
HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
|
||||
return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility to simulate commit touching files in a partition
|
||||
*
|
||||
* @param files List of file-Ids to be touched
|
||||
* @param partitionPath Partition
|
||||
* @param commitTime Commit Timestamp
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
void updateAllFilesInPartition(List<String> files, String partitionPath, String commitTime)
|
||||
throws IOException {
|
||||
for (String fileId : files) {
|
||||
HoodieTestUtils.createDataFile(basePath, partitionPath, commitTime, fileId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper methods to create new data files in a partition
|
||||
*
|
||||
* @param partitionPath Partition
|
||||
* @param commitTime Commit Timestamp
|
||||
* @param numFiles Number of files to be added
|
||||
* @return Created files
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
List<String> createFilesInPartition(String partitionPath, String commitTime, int numFiles)
|
||||
throws IOException {
|
||||
List<String> files = new ArrayList<>();
|
||||
for (int i = 0; i < numFiles; i++) {
|
||||
files.add(HoodieTestUtils.createNewDataFile(basePath, partitionPath, commitTime));
|
||||
}
|
||||
return files;
|
||||
}
|
||||
|
||||
// Functional Interfaces for passing lambda and Hoodie Write API contexts
|
||||
|
||||
@FunctionalInterface
|
||||
interface Function2<R, T1, T2> {
|
||||
|
||||
R apply(T1 v1, T2 v2) throws IOException;
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
interface Function3<R, T1, T2, T3> {
|
||||
|
||||
R apply(T1 v1, T2 v2, T3 v3) throws IOException;
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,195 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.Test;
|
||||
import scala.Option;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
/**
|
||||
* Test-cases for covering HoodieReadClient APIs
|
||||
*/
|
||||
public class TestHoodieReadClient extends TestHoodieClientBase {
|
||||
|
||||
/**
|
||||
* Test ReadFilter API after writing new records using HoodieWriteClient.insert
|
||||
*/
|
||||
@Test
|
||||
public void testReadFilterExistAfterInsert() throws Exception {
|
||||
testReadFilterExist(getConfig(), HoodieWriteClient::insert);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test ReadFilter API after writing new records using HoodieWriteClient.insertPrepped
|
||||
*/
|
||||
@Test
|
||||
public void testReadFilterExistAfterInsertPrepped() throws Exception {
|
||||
testReadFilterExist(getConfig(), HoodieWriteClient::insertPreppedRecords);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test ReadFilter API after writing new records using HoodieWriteClient.bulkInsert
|
||||
*/
|
||||
@Test
|
||||
public void testReadFilterExistAfterBulkInsert() throws Exception {
|
||||
testReadFilterExist(getConfigBuilder().withBulkInsertParallelism(1).build(), HoodieWriteClient::bulkInsert);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test ReadFilter API after writing new records using HoodieWriteClient.bulkInsertPrepped
|
||||
*/
|
||||
@Test
|
||||
public void testReadFilterExistAfterBulkInsertPrepped() throws Exception {
|
||||
testReadFilterExist(getConfigBuilder().withBulkInsertParallelism(1).build(),
|
||||
(writeClient, recordRDD, commitTime) -> {
|
||||
return writeClient.bulkInsertPreppedRecords(recordRDD, commitTime, Option.empty());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to write new records using one of HoodieWriteClient's write API and use ReadClient to test filterExists()
|
||||
* API works correctly
|
||||
*
|
||||
* @param config Hoodie Write Config
|
||||
* @param writeFn Write Function for writing records
|
||||
* @throws Exception in case of error
|
||||
*/
|
||||
private void testReadFilterExist(HoodieWriteConfig config,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
|
||||
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
|
||||
String newCommitTime = writeClient.startCommit();
|
||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
|
||||
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
|
||||
|
||||
HoodieReadClient readClient = new HoodieReadClient(jsc, config.getBasePath());
|
||||
JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD);
|
||||
|
||||
// Should not find any files
|
||||
assertTrue(filteredRDD.collect().size() == 100);
|
||||
|
||||
JavaRDD<HoodieRecord> smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1);
|
||||
// We create three parquet file, each having one record. (3 different partitions)
|
||||
List<WriteStatus> statuses = writeFn.apply(writeClient, smallRecordsRDD, newCommitTime).collect();
|
||||
// Verify there are no errors
|
||||
assertNoWriteErrors(statuses);
|
||||
|
||||
readClient = new HoodieReadClient(jsc, config.getBasePath());
|
||||
filteredRDD = readClient.filterExists(recordsRDD);
|
||||
List<HoodieRecord> result = filteredRDD.collect();
|
||||
// Check results
|
||||
assertTrue(result.size() == 25);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test tagLocation API after insert()
|
||||
*/
|
||||
@Test
|
||||
public void testTagLocationAfterInsert() throws Exception {
|
||||
testTagLocation(getConfig(), HoodieWriteClient::insert,
|
||||
HoodieWriteClient::upsert, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test tagLocation API after insertPrepped()
|
||||
*/
|
||||
@Test
|
||||
public void testTagLocationAfterInsertPrepped() throws Exception {
|
||||
testTagLocation(getConfig(), HoodieWriteClient::insertPreppedRecords,
|
||||
HoodieWriteClient::upsertPreppedRecords, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test tagLocation API after bulk-insert()
|
||||
*/
|
||||
@Test
|
||||
public void testTagLocationAfterBulkInsert() throws Exception {
|
||||
testTagLocation(getConfigBuilder().withBulkInsertParallelism(1).build(), HoodieWriteClient::bulkInsert,
|
||||
HoodieWriteClient::upsert, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test tagLocation API after bulkInsertPrepped()
|
||||
*/
|
||||
@Test
|
||||
public void testTagLocationAfterBulkInsertPrepped() throws Exception {
|
||||
testTagLocation(getConfigBuilder().withBulkInsertParallelism(1).build(),
|
||||
(writeClient, recordRDD, commitTime)
|
||||
-> writeClient.bulkInsertPreppedRecords(recordRDD, commitTime, Option.empty()),
|
||||
HoodieWriteClient::upsertPreppedRecords, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to test tagLocation after using different HoodieWriteClient write APIS
|
||||
*
|
||||
* @param hoodieWriteConfig Write Config
|
||||
* @param insertFn Hoodie Write Client first Insert API
|
||||
* @param updateFn Hoodie Write Client upsert API
|
||||
* @param isPrepped isPrepped flag.
|
||||
* @throws Exception in case of error
|
||||
*/
|
||||
private void testTagLocation(
|
||||
HoodieWriteConfig hoodieWriteConfig,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> updateFn,
|
||||
boolean isPrepped)
|
||||
throws Exception {
|
||||
HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig);
|
||||
//Write 1 (only inserts)
|
||||
String newCommitTime = "001";
|
||||
String initCommitTime = "000";
|
||||
int numRecords = 200;
|
||||
JavaRDD<WriteStatus> result =
|
||||
insertFirstBatch(hoodieWriteConfig, client, newCommitTime, initCommitTime, numRecords, insertFn, isPrepped,
|
||||
true, numRecords);
|
||||
JavaRDD<HoodieRecord> recordRDD =
|
||||
jsc.parallelize(
|
||||
result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
|
||||
.collect(Collectors.toList()));
|
||||
// Should have 100 records in table (check using Index), all in locations marked at commit
|
||||
HoodieReadClient readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
|
||||
List<HoodieRecord> taggedRecords = readClient.tagLocation(recordRDD).collect();
|
||||
checkTaggedRecords(taggedRecords, newCommitTime);
|
||||
|
||||
// Write 2 (updates)
|
||||
String prevCommitTime = newCommitTime;
|
||||
newCommitTime = "004";
|
||||
numRecords = 100;
|
||||
String commitTimeBetweenPrevAndNew = "002";
|
||||
result = updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
|
||||
Optional.of(Arrays.asList(commitTimeBetweenPrevAndNew)),
|
||||
initCommitTime, numRecords, updateFn, isPrepped,
|
||||
true, numRecords, 200, 2);
|
||||
recordRDD =
|
||||
jsc.parallelize(
|
||||
result.collect().stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
|
||||
.collect(Collectors.toList()));
|
||||
// Index should be able to locate all updates in correct locations.
|
||||
readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
|
||||
taggedRecords = readClient.tagLocation(recordRDD).collect();
|
||||
checkTaggedRecords(taggedRecords, newCommitTime);
|
||||
}
|
||||
}
|
||||
@@ -99,7 +99,6 @@ public class HoodieClientTestUtils {
|
||||
}
|
||||
|
||||
public static SparkConf getSparkConfForTest(String appName) {
|
||||
System.out.println("HIII" + "HII2");
|
||||
SparkConf sparkConf = new SparkConf().setAppName(appName)
|
||||
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||
.setMaster("local[1]");
|
||||
@@ -125,6 +124,7 @@ public class HoodieClientTestUtils {
|
||||
try {
|
||||
HashMap<String, String> paths = getLatestFileIDsToFullPath(basePath, commitTimeline,
|
||||
Arrays.asList(commitInstant));
|
||||
System.out.println("Path :" + paths.values());
|
||||
return sqlContext.read().parquet(paths.values().toArray(new String[paths.size()]))
|
||||
.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitTime));
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -28,9 +28,12 @@ import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
@@ -48,7 +51,17 @@ public class HoodieTestDataGenerator {
|
||||
|
||||
// based on examination of sample file, the schema produces the following per record size
|
||||
public static final int SIZE_PER_RECORD = 50 * 1024;
|
||||
public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"};
|
||||
public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
|
||||
public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
|
||||
public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17";
|
||||
|
||||
public static final String[] DEFAULT_PARTITION_PATHS = {
|
||||
DEFAULT_FIRST_PARTITION_PATH,
|
||||
DEFAULT_SECOND_PARTITION_PATH,
|
||||
DEFAULT_THIRD_PARTITION_PATH
|
||||
};
|
||||
public static final int DEFAULT_PARTITION_DEPTH = 3;
|
||||
|
||||
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
|
||||
+ "{\"name\": \"timestamp\",\"type\": \"double\"},"
|
||||
+ "{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||
@@ -62,14 +75,14 @@ public class HoodieTestDataGenerator {
|
||||
public static Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));
|
||||
private static Random rand = new Random(46474747);
|
||||
private List<KeyPartition> existingKeysList = new ArrayList<>();
|
||||
private String[] partitionPaths = DEFAULT_PARTITION_PATHS;
|
||||
private String[] partitionPaths;
|
||||
|
||||
public HoodieTestDataGenerator(String[] partitionPaths) {
|
||||
this.partitionPaths = partitionPaths;
|
||||
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
|
||||
}
|
||||
|
||||
public HoodieTestDataGenerator() {
|
||||
this(new String[] {"2016/03/15", "2015/03/16", "2015/03/17"});
|
||||
this(DEFAULT_PARTITION_PATHS);
|
||||
}
|
||||
|
||||
public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) {
|
||||
@@ -133,7 +146,7 @@ public class HoodieTestDataGenerator {
|
||||
/**
|
||||
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
|
||||
*/
|
||||
public List<HoodieRecord> generateInserts(String commitTime, int n) throws IOException {
|
||||
public List<HoodieRecord> generateInserts(String commitTime, Integer n) throws IOException {
|
||||
List<HoodieRecord> inserts = new ArrayList<>();
|
||||
for (int i = 0; i < n; i++) {
|
||||
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
|
||||
@@ -149,7 +162,7 @@ public class HoodieTestDataGenerator {
|
||||
return inserts;
|
||||
}
|
||||
|
||||
public List<HoodieRecord> generateDeletes(String commitTime, int n) throws IOException {
|
||||
public List<HoodieRecord> generateDeletes(String commitTime, Integer n) throws IOException {
|
||||
List<HoodieRecord> inserts = generateInserts(commitTime, n);
|
||||
return generateDeletesFromExistingRecords(inserts);
|
||||
}
|
||||
@@ -159,36 +172,77 @@ public class HoodieTestDataGenerator {
|
||||
for (HoodieRecord existingRecord : existingRecords) {
|
||||
HoodieRecord record = generateDeleteRecord(existingRecord);
|
||||
deletes.add(record);
|
||||
|
||||
}
|
||||
return deletes;
|
||||
}
|
||||
|
||||
public HoodieRecord generateDeleteRecord(HoodieRecord existingRecord) throws IOException {
|
||||
HoodieKey key = existingRecord.getKey();
|
||||
return generateDeleteRecord(key);
|
||||
}
|
||||
|
||||
public HoodieRecord generateDeleteRecord(HoodieKey key) throws IOException {
|
||||
TestRawTripPayload payload = new TestRawTripPayload(Optional.empty(), key.getRecordKey(), key.getPartitionPath(),
|
||||
null, true);
|
||||
return new HoodieRecord(key, payload);
|
||||
}
|
||||
|
||||
public HoodieRecord generateUpdateRecord(HoodieKey key, String commitTime) throws IOException {
|
||||
return new HoodieRecord(key, generateRandomValue(key, commitTime));
|
||||
}
|
||||
|
||||
public List<HoodieRecord> generateUpdates(String commitTime, List<HoodieRecord> baseRecords) throws IOException {
|
||||
List<HoodieRecord> updates = new ArrayList<>();
|
||||
for (HoodieRecord baseRecord : baseRecords) {
|
||||
HoodieRecord record = new HoodieRecord(baseRecord.getKey(), generateRandomValue(baseRecord.getKey(), commitTime));
|
||||
HoodieRecord record = generateUpdateRecord(baseRecord.getKey(), commitTime);
|
||||
updates.add(record);
|
||||
}
|
||||
return updates;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates new updates, randomly distributed across the keys above.
|
||||
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned list
|
||||
* @param commitTime Commit Timestamp
|
||||
* @param n Number of updates (including dups)
|
||||
* @return list of hoodie record updates
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<HoodieRecord> generateUpdates(String commitTime, int n) throws IOException {
|
||||
public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws IOException {
|
||||
List<HoodieRecord> updates = new ArrayList<>();
|
||||
for (int i = 0; i < n; i++) {
|
||||
KeyPartition kp = existingKeysList.get(rand.nextInt(existingKeysList.size() - 1));
|
||||
HoodieRecord record = generateUpdateRecord(kp.key, commitTime);
|
||||
updates.add(record);
|
||||
}
|
||||
return updates;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
|
||||
* @param commitTime Commit Timestamp
|
||||
* @param n Number of unique records
|
||||
* @return list of hoodie record updates
|
||||
* @throws IOException
|
||||
*/
|
||||
public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer n) throws IOException {
|
||||
List<HoodieRecord> updates = new ArrayList<>();
|
||||
Set<KeyPartition> used = new HashSet<>();
|
||||
|
||||
if (n > existingKeysList.size()) {
|
||||
throw new IllegalArgumentException("Requested unique updates is greater than number of available keys");
|
||||
}
|
||||
|
||||
for (int i = 0; i < n; i++) {
|
||||
int index = rand.nextInt(existingKeysList.size() - 1);
|
||||
KeyPartition kp = existingKeysList.get(index);
|
||||
// Find the available keyPartition starting from randomly chosen one.
|
||||
while (used.contains(kp)) {
|
||||
index = (index + 1) % existingKeysList.size();
|
||||
kp = existingKeysList.get(index);
|
||||
}
|
||||
HoodieRecord record = new HoodieRecord(kp.key, generateRandomValue(kp.key, commitTime));
|
||||
updates.add(record);
|
||||
used.add(kp);
|
||||
}
|
||||
return updates;
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat;
|
||||
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
|
||||
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
|
||||
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
|
||||
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
|
||||
import com.uber.hoodie.common.util.AvroUtils;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
@@ -45,6 +46,7 @@ import java.io.Serializable;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@@ -310,4 +312,15 @@ public class HoodieTestUtils {
|
||||
}
|
||||
return returns.toArray(new FileStatus[returns.size()]);
|
||||
}
|
||||
|
||||
public static List<String> monotonicIncreasingCommitTimestamps(int numTimestamps, int startSecsDelta) {
|
||||
Calendar cal = Calendar.getInstance();
|
||||
cal.add(Calendar.SECOND, startSecsDelta);
|
||||
List<String> commits = new ArrayList<>();
|
||||
for (int i = 0; i < numTimestamps; i++) {
|
||||
commits.add(HoodieActiveTimeline.COMMIT_FORMATTER.format(cal.getTime()));
|
||||
cal.add(Calendar.SECOND, 1);
|
||||
}
|
||||
return commits;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user