1
0

Add new APIs in HoodieReadClient and HoodieWriteClient

This commit is contained in:
Jian Xu
2018-02-22 11:20:54 -08:00
committed by vinoth chandar
parent 6fec9655a8
commit 5d5c306e64
3 changed files with 173 additions and 65 deletions

View File

@@ -19,11 +19,14 @@ package com.uber.hoodie;
import com.google.common.base.Optional;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import com.uber.hoodie.exception.HoodieIndexException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.table.HoodieTable;
import java.io.Serializable;
import java.util.HashSet;
@@ -45,7 +48,7 @@ import scala.Tuple2;
/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
*/
public class HoodieReadClient implements Serializable {
public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializable {
private static Logger logger = LogManager.getLogger(HoodieReadClient.class);
@@ -57,7 +60,7 @@ public class HoodieReadClient implements Serializable {
* just with a simple basepath pointing to the dataset. Until, then just always assume a
* BloomIndex
*/
private transient final HoodieBloomIndex index;
private transient final HoodieIndex<T> index;
private final HoodieTimeline commitTimeline;
private HoodieTable hoodieTable;
private transient Optional<SQLContext> sqlContextOpt;
@@ -66,15 +69,12 @@ public class HoodieReadClient implements Serializable {
* @param basePath path to Hoodie dataset
*/
public HoodieReadClient(JavaSparkContext jsc, String basePath) {
this.jsc = jsc;
this.fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
// Create a Hoodie table which encapsulated the commits and files visible
this.hoodieTable = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), null);
this.commitTimeline = hoodieTable.getCommitTimeline().filterCompletedInstants();
this.index =
new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc);
this.sqlContextOpt = Optional.absent();
this(jsc, HoodieWriteConfig.newBuilder()
.withPath(basePath)
// by default we use HoodieBloomIndex
.withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.build());
}
/**
@@ -88,6 +88,22 @@ public class HoodieReadClient implements Serializable {
this.sqlContextOpt = Optional.of(sqlContext);
}
/**
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
final String basePath = clientConfig.getBasePath();
this.jsc = jsc;
this.fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
// Create a Hoodie table which encapsulated the commits and files visible
this.hoodieTable = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true),
clientConfig);
this.commitTimeline = hoodieTable.getCommitTimeline().filterCompletedInstants();
this.index = HoodieIndex.createIndex(clientConfig, jsc);
this.sqlContextOpt = Optional.absent();
}
/**
* Adds support for accessing Hoodie built tables from SparkSQL, as you normally would.
*
@@ -158,8 +174,20 @@ public class HoodieReadClient implements Serializable {
* @param hoodieRecords Input RDD of Hoodie records.
* @return A subset of hoodieRecords RDD, with existing records filtered out.
*/
public JavaRDD<HoodieRecord> filterExists(JavaRDD<HoodieRecord> hoodieRecords) {
JavaRDD<HoodieRecord> recordsWithLocation = index.tagLocation(hoodieRecords, hoodieTable);
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
JavaRDD<HoodieRecord<T>> recordsWithLocation = tagLocation(hoodieRecords);
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
/**
* Looks up the index and tags each incoming record with a location of a file that contains the
* row (if it is actually present). Input RDD should contain no duplicates if needed.
*
* @param hoodieRecords Input RDD of Hoodie records
* @return Tagged RDD of Hoodie records
*/
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> hoodieRecords)
throws HoodieIndexException {
return index.tagLocation(hoodieRecords, hoodieTable);
}
}

View File

@@ -142,12 +142,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
* Upserts a bunch of new records into the Hoodie table, at the supplied commitTime
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
writeContext = metrics.getCommitCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config);
HoodieTable<T> table = getTableAndInitCtx();
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
@@ -165,6 +160,30 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}
/**
* Upserts the given prepared records into the Hoodie table, at the supplied commitTime.
*
* This implementation requires that the input records are already tagged, and de-duped if
* needed.
*
* @param preppedRecords Prepared HoodieRecords to upsert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords,
final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx();
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, true);
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to upsert prepared records for commit time " +
commitTime, e);
}
}
/**
* Inserts the given HoodieRecords, into the table. This API is intended to be used for normal
* writes.
@@ -177,11 +196,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
writeContext = metrics.getCommitCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config);
HoodieTable<T> table = getTableAndInitCtx();
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
@@ -197,6 +212,31 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}
/**
* Inserts the given prepared records into the Hoodie table, at the supplied commitTime.
*
* This implementation skips the index check, skips de-duping and is able to leverage benefits
* such as small file handling/blocking alignment, as with insert(), by profiling the workload.
* The prepared HoodieRecords should be de-duped if needed.
*
* @param preppedRecords HoodieRecords to insert
* @param commitTime Commit Time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords,
final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx();
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, false);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to insert prepared records for commit time " +
commitTime, e);
}
}
/**
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
* loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
@@ -235,40 +275,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records,
final String commitTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
writeContext = metrics.getCommitCtx();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config);
HoodieTable<T> table = getTableAndInitCtx();
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
combineOnCondition(config.shouldCombineBeforeInsert(), records,
config.getInsertShuffleParallelism());
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
if (bulkInsertPartitioner.isDefined()) {
repartitionedRecords =
bulkInsertPartitioner.get().repartitionRecords(dedupedRecords,
config.getBulkInsertShuffleParallelism());
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords
.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, config.getBulkInsertShuffleParallelism());
}
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table),
true)
.flatMap(writeStatuses -> writeStatuses.iterator());
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
return bulkInsertInternal(dedupedRecords, commitTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
@@ -278,6 +292,67 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
}
}
/**
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk
* loads into a Hoodie table for the very first time (e.g: converting an existing dataset to
* Hoodie). The input records should contain no duplicates if needed.
*
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and
* attempts to control the numbers of files with less memory compared to the {@link
* HoodieWriteClient#insert(JavaRDD, String)}. Optionally it allows users to specify their own
* partitioner. If specified then it will be used for repartitioning records. See {@link
* UserDefinedBulkInsertPartitioner}.
*
* @param preppedRecords HoodieRecords to insert
* @param commitTime Commit Time handle
* @param bulkInsertPartitioner If specified then it will be used to partition input records
* before they are inserted into hoodie.
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords,
final String commitTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx();
try {
return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
if (e instanceof HoodieInsertException) {
throw e;
}
throw new HoodieInsertException("Failed to bulk insert prepared records for commit time " +
commitTime, e);
}
}
private JavaRDD<WriteStatus> bulkInsertInternal(
JavaRDD<HoodieRecord<T>> dedupedRecords,
String commitTime,
HoodieTable<T> table,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
if (bulkInsertPartitioner.isDefined()) {
repartitionedRecords =
bulkInsertPartitioner.get().repartitionRecords(dedupedRecords,
config.getBulkInsertShuffleParallelism());
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords
.sortBy(record -> {
// Let's use "partitionPath + key" as the sort key. Spark, will ensure
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String
.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, config.getBulkInsertShuffleParallelism());
}
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table),
true)
.flatMap(writeStatuses -> writeStatuses.iterator());
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
}
private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRDD) {
if (config.shouldAutoCommit()) {
logger.info("Auto commit enabled: Committing " + commitTime);
@@ -907,4 +982,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
rollback(commit);
}
}
private HoodieTable getTableAndInitCtx() {
writeContext = metrics.getCommitCtx();
// Create a Hoodie table which encapsulated the commits and files visible
return HoodieTable.getHoodieTable(
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
config);
}
}

View File

@@ -80,6 +80,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.collection.Iterator;
public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
@@ -190,7 +191,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, newCommitTime);
JavaRDD<WriteStatus> result = client.bulkInsertPreppedRecords(writeRecords, newCommitTime,
Option.empty());
assertFalse("If Autocommit is false, then commit should not be made automatically",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
@@ -218,7 +220,6 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig) throws Exception {
HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig);
HoodieIndex index = HoodieIndex.createIndex(hoodieWriteConfig, jsc);
/**
* Write 1 (only inserts)
@@ -229,7 +230,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
List<WriteStatus> statuses = client.upsertPreppedRecords(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
// check the partition metadata is written out
@@ -249,10 +250,8 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
records.size(),
HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
// Should have 100 records in table (check using Index), all in locations marked at commit
HoodieTable table = HoodieTable
.getHoodieTable(metaClient, getConfig());
List<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), table)
HoodieReadClient readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
List<HoodieRecord> taggedRecords = readClient.tagLocation(jsc.parallelize(records, 1))
.collect();
checkTaggedRecords(taggedRecords, "001");
@@ -282,11 +281,9 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
assertEquals("Latest commit should be 004", timeline.lastInstant().get().getTimestamp(),
newCommitTime);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
table = HoodieTable.getHoodieTable(metaClient, getConfig());
// Index should be able to locate all updates in correct locations.
taggedRecords = index.tagLocation(jsc.parallelize(dedupedRecords, 1), table).collect();
readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
taggedRecords = readClient.tagLocation(jsc.parallelize(dedupedRecords, 1)).collect();
checkTaggedRecords(taggedRecords, "004");
// Check the entire dataset has 100 records still
@@ -732,7 +729,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 500);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 5);
List<WriteStatus> statuses = client.insert(writeRecords, newCommitTime).collect();
List<WriteStatus> statuses = client.insertPreppedRecords(writeRecords, newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);