1
0

Split insert() into insert() & bulkInsert() (#69)

- Behaviour change for existing insert() users
 - Made the current insert() implementation, as something to use for bulkInsert()
 - Normal inserts now share a lot of code with upsert, which provides benefits like small file handling
 - Refactored/Cleaned up code in HoodieWriteClient for reuse
 - Added a unit test, switching few tests to call bulkInsert() and few to call insert()
This commit is contained in:
vinoth chandar
2017-01-27 10:51:00 -08:00
committed by prazanna
parent 54409b07ea
commit 8e72ed69b4
6 changed files with 309 additions and 159 deletions

View File

@@ -31,7 +31,7 @@ import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieInsertException;
import com.uber.hoodie.exception.HoodieRollbackException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.InsertMapFunction;
import com.uber.hoodie.func.BulkInsertMapFunction;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.HoodieCleaner;
import com.uber.hoodie.io.HoodieCommitArchiveLog;
@@ -72,12 +72,11 @@ import scala.Option;
import scala.Tuple2;
/**
* Hoodie Write Client helps you build datasets on HDFS [insert()] and then
* perform efficient mutations on a HDFS dataset [upsert()]
*
* Note that, at any given time, there can only be one Spark job performing
* these operatons on a Hoodie dataset.
* Hoodie Write Client helps you build datasets on HDFS [insert()] and then perform efficient
* mutations on a HDFS dataset [upsert()]
*
* Note that, at any given time, there can only be one Spark job performing these operatons on a
* Hoodie dataset.
*/
public class HoodieWriteClient<T extends HoodieRecordPayload> implements Serializable {
@@ -142,72 +141,18 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
final HoodieTableMetadata metadata =
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
writeContext = metrics.getCommitCtx();
final HoodieTable table =
HoodieTable.getHoodieTable(metadata.getTableType(), commitTime, config, metadata);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeUpsert(), records,
config.getUpsertShuffleParallelism());
combineOnCondition(config.shouldCombineBeforeUpsert(), records,
config.getUpsertShuffleParallelism());
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, metadata);
// Cache the tagged records, so we don't end up computing both
taggedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
WorkloadProfile profile = null;
if (table.isWorkloadProfileNeeded()) {
profile = new WorkloadProfile(taggedRecords);
logger.info("Workload profile :" + profile);
}
// obtain the upsert partitioner, and the run the tagger records through that & get a partitioned RDD.
final Partitioner upsertPartitioner = table.getUpsertPartitioner(profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = taggedRecords.mapToPair(
new PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>() {
@Override
public Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> call(
HoodieRecord<T> record) throws Exception {
return new Tuple2<>(new Tuple2<>(record.getKey(),
Option.apply(record.getCurrentLocation())), record);
}
}).partitionBy(upsertPartitioner).map(
new Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(
Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> tuple)
throws Exception {
return tuple._2();
}
});
// Perform the actual writing.
JavaRDD<WriteStatus> upsertStatusRDD = partitionedRecords.mapPartitionsWithIndex(
new Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>>() {
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
return table.handleUpsertPartition(partition, recordItr, upsertPartitioner);
}
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses;
}
});
// Update the index back.
JavaRDD<WriteStatus> resultRDD = index.updateLocation(upsertStatusRDD, metadata);
resultRDD = resultRDD.persist(config.getWriteStatusStorageLevel());
commitOnAutoCommit(commitTime, resultRDD);
return resultRDD;
return upsertRecordsInternal(taggedRecords, commitTime, metadata, true);
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
@@ -216,8 +161,38 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}
/**
* Inserts the given HoodieRecords, into the table. This API is intended to be used for normal
* writes.
*
* This implementation skips the index check & is able to leverage benefits such as
* small file handling/blocking alignment, as with upsert(), by profiling the workload
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
final HoodieTableMetadata metadata =
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
writeContext = metrics.getCommitCtx();
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records,
config.getInsertShuffleParallelism());
return upsertRecordsInternal(dedupedRecords, commitTime, metadata, false);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e);
}
}
private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD) {
if(config.shouldAutoCommit()) {
if (config.shouldAutoCommit()) {
logger.info("Auto commit enabled: Committing " + commitTime);
boolean commitResult = commit(commitTime, resultRDD);
if (!commitResult) {
@@ -229,64 +204,146 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition,
JavaRDD<HoodieRecord<T>> records, int parallelism) {
if(condition) {
JavaRDD<HoodieRecord<T>> records, int parallelism) {
if (condition) {
return deduplicateRecords(records, parallelism);
}
return records;
}
private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
return dedupedRecords.mapToPair(
new PairFunction<HoodieRecord<T>, Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>() {
@Override
public Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> call(
HoodieRecord<T> record) throws Exception {
return new Tuple2<>(new Tuple2<>(record.getKey(),
Option.apply(record.getCurrentLocation())), record);
}
}).partitionBy(partitioner).map(
new Function<Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>, HoodieRecord<T>>() {
@Override
public HoodieRecord<T> call(
Tuple2<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> tuple)
throws Exception {
return tuple._2();
}
});
}
private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, WorkloadProfile profile) {
if (isUpsert) {
return table.getUpsertPartitioner(profile);
} else {
return table.getInsertPartitioner(profile);
}
}
private JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTableMetadata metadata,
String commitTime) {
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, metadata);
// Trigger the insert and collect statuses
statuses = statuses.persist(config.getWriteStatusStorageLevel());
commitOnAutoCommit(commitTime, statuses);
return statuses;
}
private JavaRDD<WriteStatus> upsertRecordsInternal(JavaRDD<HoodieRecord<T>> preppedRecords,
String commitTime,
HoodieTableMetadata metadata,
final boolean isUpsert) {
final HoodieTable table =
HoodieTable.getHoodieTable(metadata.getTableType(), commitTime, config, metadata);
// Cache the tagged records, so we don't end up computing both
preppedRecords.persist(StorageLevel.MEMORY_AND_DISK_SER());
WorkloadProfile profile = null;
if (table.isWorkloadProfileNeeded()) {
profile = new WorkloadProfile(preppedRecords);
logger.info("Workload profile :" + profile);
}
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(table, isUpsert, profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(preppedRecords, partitioner);
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex(
new Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>>() {
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
if (isUpsert) {
return table.handleUpsertPartition(partition, recordItr, partitioner);
} else {
return table.handleInsertPartition(partition, recordItr, partitioner);
}
}
}, true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses;
}
});
return updateIndexAndCommitIfNeeded(writeStatusRDD, metadata, commitTime);
}
/**
* Loads the given HoodieRecords, as inserts into the table.
* (This implementation uses sortBy and attempts to control the numbers of files with less memory)
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
* loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
* Hoodie).
*
* @param records HoodieRecords to insert
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) &
* attempts to control the numbers of files with less memory compared to the {@link
* HoodieWriteClient#insert(JavaRDD, String)}
*
* @param records HoodieRecords to insert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
final HoodieTableMetadata metadata =
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());
writeContext = metrics.getCommitCtx();
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records,
config.getInsertShuffleParallelism());
combineOnCondition(config.shouldCombineBeforeInsert(), records,
config.getInsertShuffleParallelism());
// Now, sort the records and line them up nicely for loading.
JavaRDD<HoodieRecord<T>> sortedRecords =
dedupedRecords.sortBy(new Function<HoodieRecord<T>, String>() {
@Override
public String call(HoodieRecord<T> record) {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}
}, true, config.getInsertShuffleParallelism());
dedupedRecords.sortBy(new Function<HoodieRecord<T>, String>() {
@Override
public String call(HoodieRecord<T> record) {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}
}, true, config.getInsertShuffleParallelism());
JavaRDD<WriteStatus> writeStatusRDD = sortedRecords
.mapPartitionsWithIndex(new InsertMapFunction<T>(commitTime, config, metadata),
true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses;
}
});
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, metadata);
// Trigger the insert and collect statuses
statuses = statuses.persist(config.getWriteStatusStorageLevel());
commitOnAutoCommit(commitTime, statuses);
return statuses;
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, metadata),
true).flatMap(new FlatMapFunction<List<WriteStatus>, WriteStatus>() {
@Override
public Iterable<WriteStatus> call(List<WriteStatus> writeStatuses)
throws Exception {
return writeStatuses;
}
});
return updateIndexAndCommitIfNeeded(writeStatusRDD, metadata, commitTime);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to insert for commit time " + commitTime, e);
throw new HoodieInsertException("Failed to bulk insert for commit time " + commitTime, e);
}
}
@@ -296,7 +353,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses) {
logger.info("Comitting " + commitTime);
Path commitFile =
new Path(config.getBasePath() + "/.hoodie/" + FSUtils.makeCommitFileName(commitTime));
new Path(config.getBasePath() + "/.hoodie/" + FSUtils.makeCommitFileName(commitTime));
try {
if (fs.exists(commitFile)) {
@@ -304,13 +361,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
List<Tuple2<String, HoodieWriteStat>> stats =
writeStatuses.mapToPair(new PairFunction<WriteStatus, String, HoodieWriteStat>() {
@Override
public Tuple2<String, HoodieWriteStat> call(WriteStatus writeStatus)
throws Exception {
return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat());
}
}).collect();
writeStatuses.mapToPair(new PairFunction<WriteStatus, String, HoodieWriteStat>() {
@Override
public Tuple2<String, HoodieWriteStat> call(WriteStatus writeStatus)
throws Exception {
return new Tuple2<>(writeStatus.getPartitionPath(), writeStatus.getStat());
}
}).collect();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
for (Tuple2<String, HoodieWriteStat> stat : stats) {
@@ -319,10 +376,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// open a new file and write the commit metadata in
Path inflightCommitFile = new Path(config.getBasePath() + "/.hoodie/" + FSUtils
.makeInflightCommitFileName(commitTime));
.makeInflightCommitFileName(commitTime));
FSDataOutputStream fsout = fs.create(inflightCommitFile, true);
fsout.writeBytes(new String(metadata.toJsonString().getBytes(StandardCharsets.UTF_8),
StandardCharsets.UTF_8));
StandardCharsets.UTF_8));
fsout.close();
boolean success = fs.rename(inflightCommitFile, commitFile);
@@ -331,10 +388,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
archiveLog.archiveIfRequired();
// Call clean to cleanup if there is anything to cleanup after the commit,
clean();
if(writeContext != null) {
if (writeContext != null) {
long durationInMs = metrics.getDurationInMs(writeContext.stop());
metrics.updateCommitMetrics(FORMATTER.parse(commitTime).getTime(), durationInMs,
metadata);
metadata);
writeContext = null;
}
}
@@ -342,21 +399,18 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
return success;
} catch (IOException e) {
throw new HoodieCommitException(
"Failed to commit " + config.getBasePath() + " at time " + commitTime, e);
"Failed to commit " + config.getBasePath() + " at time " + commitTime, e);
} catch (ParseException e) {
throw new HoodieCommitException(
"Commit time is not of valid format.Failed to commit " + config.getBasePath()
+ " at time " + commitTime, e);
"Commit time is not of valid format.Failed to commit " + config.getBasePath()
+ " at time " + commitTime, e);
}
}
/**
* Rollback the (inflight/committed) record changes with the given commit time.
* Three steps:
* (0) Obtain the commit or rollback file
* (1) clean indexing data,
* (2) clean new generated parquet files.
* (3) Finally delete .commit or .inflight file,
* Rollback the (inflight/committed) record changes with the given commit time. Three steps: (0)
* Obtain the commit or rollback file (1) clean indexing data, (2) clean new generated parquet
* files. (3) Finally delete .commit or .inflight file,
*/
public boolean rollback(final String commitTime) throws HoodieRollbackException {
@@ -450,7 +504,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage)
*/
private void clean() throws HoodieIOException {
private void clean() throws HoodieIOException {
try {
logger.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();
@@ -459,26 +513,26 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// shuffle to distribute cleaning work across partitions evenly
Collections.shuffle(partitionsToClean);
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config.getCleanerPolicy());
if(partitionsToClean.isEmpty()) {
if (partitionsToClean.isEmpty()) {
logger.info("Nothing to clean here mom. It is already clean");
return;
}
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
int numFilesDeleted = jsc.parallelize(partitionsToClean, cleanerParallelism)
.map(new Function<String, Integer>() {
@Override
public Integer call(String partitionPathToClean) throws Exception {
FileSystem fs = FSUtils.getFs();
HoodieCleaner cleaner = new HoodieCleaner(metadata, config, fs);
return cleaner.clean(partitionPathToClean);
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
.map(new Function<String, Integer>() {
@Override
public Integer call(String partitionPathToClean) throws Exception {
FileSystem fs = FSUtils.getFs();
HoodieCleaner cleaner = new HoodieCleaner(metadata, config, fs);
return cleaner.clean(partitionPathToClean);
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
logger.info("Cleaned " + numFilesDeleted + " files");
// Emit metrics (duration, numFilesDeleted) if needed
if (context != null) {
@@ -504,18 +558,18 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
logger.info("Generate a new commit time " + commitTime);
// Create the in-flight commit file
Path inflightCommitFilePath = new Path(
config.getBasePath() + "/.hoodie/" + FSUtils.makeInflightCommitFileName(commitTime));
config.getBasePath() + "/.hoodie/" + FSUtils.makeInflightCommitFileName(commitTime));
try {
if (fs.createNewFile(inflightCommitFilePath)) {
logger.info("Create an inflight commit file " + inflightCommitFilePath);
return;
}
throw new HoodieCommitException(
"Failed to create the inflight commit file " + inflightCommitFilePath);
"Failed to create the inflight commit file " + inflightCommitFilePath);
} catch (IOException e) {
// handled below
throw new HoodieCommitException(
"Failed to create the inflight commit file " + inflightCommitFilePath, e);
"Failed to create the inflight commit file " + inflightCommitFilePath, e);
}
}
@@ -552,7 +606,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
/**
* Cleanup all inflight commits
* @throws IOException
*/
private void rollbackInflightCommits() {
final HoodieTableMetadata metadata = new HoodieTableMetadata(fs, config.getBasePath(), config.getTableName());

View File

@@ -30,15 +30,15 @@ import java.util.List;
/**
* Map function that handles a sorted stream of HoodieRecords
*/
public class InsertMapFunction<T extends HoodieRecordPayload>
public class BulkInsertMapFunction<T extends HoodieRecordPayload>
implements Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<List<WriteStatus>>> {
private String commitTime;
private HoodieWriteConfig config;
private HoodieTableMetadata metadata;
public InsertMapFunction(String commitTime, HoodieWriteConfig config,
HoodieTableMetadata metadata) {
public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config,
HoodieTableMetadata metadata) {
this.commitTime = commitTime;
this.config = config;
this.metadata = metadata;

View File

@@ -25,6 +25,7 @@ import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieTableMetadata;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieInsertException;
import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.func.LazyInsertIterable;
import com.uber.hoodie.io.HoodieUpdateHandle;
@@ -376,7 +377,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
@Override
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
return null;
return getUpsertPartitioner(profile);
}
@Override
@@ -385,7 +386,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
public Iterator<List<WriteStatus>> handleUpdate(String fileLoc, Iterator<HoodieRecord<T>> recordItr) throws Exception {
// these are updates
HoodieUpdateHandle upsertHandle =
@@ -449,4 +449,11 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
throw new HoodieUpsertException(msg, t);
}
}
@Override
public Iterator<List<WriteStatus>> handleInsertPartition(Integer partition,
Iterator recordItr,
Partitioner partitioner) {
return handleUpsertPartition(partition, recordItr, partitioner);
}
}

View File

@@ -84,6 +84,17 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
Iterator<HoodieRecord<T>> recordIterator,
Partitioner partitioner);
/**
* Perform the ultimate IO for a given inserted (RDD) partition
*
* @param partition
* @param recordIterator
* @param partitioner
*/
public abstract Iterator<List<WriteStatus>> handleInsertPartition(Integer partition,
Iterator<HoodieRecord<T>> recordIterator,
Partitioner partitioner);
public static HoodieTable getHoodieTable(HoodieTableType type,
String commitTime,

View File

@@ -137,7 +137,7 @@ public class TestHoodieClient implements Serializable {
JavaRDD<HoodieRecord> smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1);
// We create three parquet file, each having one record. (two different partitions)
List<WriteStatus> statuses = writeClient.insert(smallRecordsRDD, newCommitTime).collect();
List<WriteStatus> statuses = writeClient.bulkInsert(smallRecordsRDD, newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
@@ -158,7 +158,7 @@ public class TestHoodieClient implements Serializable {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = client.insert(writeRecords, newCommitTime);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, newCommitTime);
assertFalse("If Autocommit is false, then commit should not be made automatically",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
@@ -169,7 +169,7 @@ public class TestHoodieClient implements Serializable {
newCommitTime = "002";
records = dataGen.generateUpdates(newCommitTime, 100);
JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(records, 1);
result = client.upsert(writeRecords, newCommitTime);
result = client.upsert(updateRecords, newCommitTime);
assertFalse("If Autocommit is false, then commit should not be made automatically",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
assertTrue("Commit should succeed", client.commit(newCommitTime, result));
@@ -542,24 +542,27 @@ public class TestHoodieClient implements Serializable {
}
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) {
HoodieWriteConfig.Builder builder = getConfigBuilder();
return builder.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 15)
.insertSplitSize(insertSplitSize).build()) // tolerate upto 15 records
.withStorageConfig(HoodieStorageConfig.newBuilder()
.limitFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 20)
.build())
.build();
}
@Test
public void testSmallInsertHandling() throws Exception {
public void testSmallInsertHandlingForUpserts() throws Exception {
HoodieWriteConfig.Builder builder = getConfigBuilder();
FileSystem fs = FSUtils.getFs();
final String TEST_PARTITION_PATH = "2016/09/26";
final int INSERT_SPLIT_LIMIT = 10;
// based on examination of sample file, the schema produces the following per record size
final int SIZE_PER_RECORD = 50 * 1024;
// setup the small file handling params
HoodieWriteConfig config = builder.withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(SIZE_PER_RECORD * 15)
.insertSplitSize(INSERT_SPLIT_LIMIT).build()) // tolerate upto 15 records
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(SIZE_PER_RECORD * 20)
.build()).build(); // hold upto 20 records max
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max
dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH});
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
@@ -651,6 +654,79 @@ public class TestHoodieClient implements Serializable {
assertEquals("Total inserts in commit3 must add up", keys3.size(), numTotalInsertsInCommit3);
}
@Test
public void testSmallInsertHandlingForInserts() throws Exception {
final String TEST_PARTITION_PATH = "2016/09/26";
final int INSERT_SPLIT_LIMIT = 10;
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(INSERT_SPLIT_LIMIT); // hold upto 20 records max
dataGen = new HoodieTestDataGenerator(new String[] {TEST_PARTITION_PATH});
HoodieWriteClient client = new HoodieWriteClient(jsc, config);
// Inserts => will write file1
String commitTime1 = "001";
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, INSERT_SPLIT_LIMIT); // this writes ~500kb
Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses= client.insert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
assertEquals("Just 1 file needs to be added.", 1, statuses.size());
String file1 = statuses.get(0).getFileId();
assertEquals("file should contain 10 records",
ParquetUtils.readRowKeysFromParquet(new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(),
10);
// Second, set of Inserts should just expand file1
String commitTime2 = "002";
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 4);
Set<String> keys2 = HoodieClientTestUtils.getRecordKeys(inserts2);
JavaRDD<HoodieRecord> insertRecordsRDD2 = jsc.parallelize(inserts2, 1);
statuses = client.insert(insertRecordsRDD2, commitTime2).collect();
assertNoWriteErrors(statuses);
assertEquals("Just 1 file needs to be updated.", 1, statuses.size());
assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
Path newFile = new Path(basePath, TEST_PARTITION_PATH + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
assertEquals("file should contain 14 records", ParquetUtils.readRowKeysFromParquet(newFile).size(), 14);
List<GenericRecord> records = ParquetUtils.readAvroRecords(newFile);
for (GenericRecord record: records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
assertTrue("Record expected to be part of commit 1 or commit2", commitTime1.equals(recCommitTime) || commitTime2.equals(recCommitTime));
assertTrue("key expected to be part of commit 1 or commit2", keys2.contains(recordKey) || keys1.contains(recordKey));
}
// Lots of inserts such that file1 is updated and expanded, a new file2 is created.
String commitTime3 = "003";
List<HoodieRecord> insert3 = dataGen.generateInserts(commitTime3, 20);
JavaRDD<HoodieRecord> insertRecordsRDD3 = jsc.parallelize(insert3, 1);
statuses = client.insert(insertRecordsRDD3, commitTime3).collect();
assertNoWriteErrors(statuses);
assertEquals("2 files needs to be committed.", 2, statuses.size());
FileSystem fs = FSUtils.getFs();
HoodieTableMetadata metadata = new HoodieTableMetadata(fs, basePath);
FileStatus[] files = metadata.getLatestVersionInPartition(fs, TEST_PARTITION_PATH, commitTime3);
assertEquals("Total of 2 valid data files", 2, files.length);
int totalInserts = 0;
for (FileStatus file: files) {
assertEquals("All files must be at commit 3", commitTime3, FSUtils.getCommitTime(file.getPath().getName()));
records = ParquetUtils.readAvroRecords(file.getPath());
totalInserts += records.size();
}
assertEquals("Total number of records must add up", totalInserts, inserts1.size() + inserts2.size() + insert3.size());
}
@After
public void clean() {

View File

@@ -62,6 +62,9 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
// based on examination of sample file, the schema produces the following per record size
public static final int SIZE_PER_RECORD = 50 * 1024;
private List<KeyPartition> existingKeysList = new ArrayList<>();
private static Schema avroSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA));