1
0

Cache RDD to avoid recomputing data ingestion. Return result RDD after updating index so that this step is not skipped by chained actions on the same RDD

This commit is contained in:
venkatr
2019-07-24 17:55:38 -07:00
committed by n3nash
parent 8139ffd94c
commit 86b5fcdd33
7 changed files with 113 additions and 14 deletions

View File

@@ -466,10 +466,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD,
HoodieTable<T> table, String commitTime) {
// cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
// RDD actions that are performed after updating the index.
writeStatusRDD = writeStatusRDD.persist(config.getWriteStatusStorageLevel());
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
// Trigger the insert and collect statuses
statuses = statuses.persist(config.getWriteStatusStorageLevel());
commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType());
return statuses;
}
@@ -974,6 +976,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
public void close() {
// Stop timeline-server if running
super.close();
// Calling this here releases any resources used by your index, so make sure to finish any related operations
// before this point
this.index.close();
}
/**

View File

@@ -57,6 +57,15 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket";
public static final String DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET = "10000000";
// ***** HBase Index Configs *****
public static final String HBASE_ZKQUORUM_PROP = "hoodie.index.hbase.zkquorum";
public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public static final String HBASE_ZK_ZNODEPARENT = "hoodie.index.hbase.zknode.path";
public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
public static final String HBASE_PUT_BATCH_SIZE_PROP = "hoodie.index.hbase.put.batch.size";
public static final String DEFAULT_HBASE_BATCH_SIZE = "100";
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL =
"hoodie.bloom.index.input.storage" + ".level";
@@ -109,6 +118,26 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
return this;
}
public Builder hbaseZkQuorum(String zkString) {
props.setProperty(HBASE_ZKQUORUM_PROP, zkString);
return this;
}
public Builder hbaseZkPort(int port) {
props.setProperty(HBASE_ZKPORT_PROP, String.valueOf(port));
return this;
}
public Builder hbaseZkZnodeParent(String zkZnodeParent) {
props.setProperty(HBASE_ZK_ZNODEPARENT, zkZnodeParent);
return this;
}
public Builder hbaseTableName(String tableName) {
props.setProperty(HBASE_TABLENAME_PROP, tableName);
return this;
}
public Builder bloomIndexParallelism(int parallelism) {
props.setProperty(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
return this;

View File

@@ -306,6 +306,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_ZKPORT_PROP));
}
public String getHBaseZkZnodeParent() {
return props.getProperty(HoodieIndexConfig.HBASE_ZK_ZNODEPARENT);
}
public String getHbaseTableName() {
return props.getProperty(HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP);
}

View File

@@ -115,6 +115,10 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
*/
public abstract boolean isImplicitWithStorage();
/**
* Each index type should implement it's own logic to release any resources acquired during the process.
*/
public void close() {}
public enum IndexType {
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM

View File

@@ -83,6 +83,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
private static Logger logger = LogManager.getLogger(HBaseIndex.class);
private static Connection hbaseConnection = null;
private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
private float qpsFraction;
private int maxQpsPerRegionServer;
/**
@@ -106,6 +107,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
this.qpsFraction = config.getHbaseIndexQPSFraction();
this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
this.putBatchSizeCalculator = new HbasePutBatchSizeCalculator();
this.hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
}
@VisibleForTesting
@@ -132,6 +134,10 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
Configuration hbaseConfig = HBaseConfiguration.create();
String quorum = config.getHbaseZkQuorum();
hbaseConfig.set("hbase.zookeeper.quorum", quorum);
String zkZnodeParent = config.getHBaseZkZnodeParent();
if (zkZnodeParent != null) {
hbaseConfig.set("zookeeper.znode.parent", zkZnodeParent);
}
String port = String.valueOf(config.getHbaseZkPort());
hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
try {
@@ -158,6 +164,13 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
});
}
/**
* Ensure that any resources used for indexing are released here.
*/
public void close() {
this.hBaseIndexQPSResourceAllocator.releaseQPSResources();
}
private Get generateStatement(String key) throws IOException {
return new Get(Bytes.toBytes(key)).setMaxVersions(1)
.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
@@ -368,21 +381,13 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) {
final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
JavaRDD<WriteStatus> writeStatusResultRDD;
setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
logger.info("multiPutBatchSize: before puts" + multiPutBatchSize);
logger.info("multiPutBatchSize: before hbase puts" + multiPutBatchSize);
JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(
updateLocationFunction(), true);
// Forcing a spark action so HBase puts are triggered before releasing resources
if (this.config.getHBaseIndexShouldComputeQPSDynamically()) {
logger.info("writestatus count: " + writeStatusJavaRDD.count());
writeStatusResultRDD = writeStatusRDD;
} else {
writeStatusResultRDD = writeStatusJavaRDD;
}
// Release QPS resources as HBAse puts are done at this point
hBaseIndexQPSResourceAllocator.releaseQPSResources();
return writeStatusResultRDD;
// caching the index updated status RDD
writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel());
return writeStatusJavaRDD;
}
private void setPutBatchSize(JavaRDD<WriteStatus> writeStatusRDD,
@@ -430,7 +435,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
final JavaPairRDD<Long, Integer> insertOnlyWriteStatusRDD =
writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
.mapToPair(w -> new Tuple2<>(w.getStat().getNumInserts(), 1));
return insertOnlyWriteStatusRDD.reduce((w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2));
return insertOnlyWriteStatusRDD.fold(new Tuple2<>(0L, 0), (w, c) -> new Tuple2<>(w._1 + c._1, w._2 + c._2));
}
public static class HbasePutBatchSizeCalculator implements Serializable {

View File

@@ -149,6 +149,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
* Extract old file path, initialize StorageWriter and WriteStatus
*/
private void init(String fileId, String partitionPath, HoodieDataFile dataFileToBeMerged) {
logger.info("partitionPath:" + partitionPath + ", fileId to be merged:" + fileId);
this.writtenRecordKeys = new HashSet<>();
writeStatus.setStat(new HoodieWriteStat());
try {

View File

@@ -185,6 +185,40 @@ public class TestHbaseIndex {
}
@Test
public void testTagLocationAndDuplicateUpdate() throws Exception {
String newCommitTime = "001";
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Load to memory
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
writeClient.startCommit();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
// Duplicate upsert and ensure correctness is maintained
writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());
// Now commit this & update location of records inserted and validate no errors
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them correctly
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 10);
assertTrue(javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count() == 10);
assertTrue(javaRDD.filter(
record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime()
.equals(newCommitTime))).distinct().count() == 10);
}
@Test
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
@@ -359,6 +393,23 @@ public class TestHbaseIndex {
Assert.assertEquals(11, hbaseNumPuts);
}
@Test
public void testsHBasePutAccessParallelismWithNoInserts() {
HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config);
final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
Arrays.asList(
getSampleWriteStatus(0, 2),
getSampleWriteStatus(0, 1)),
10);
final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
Assert.assertEquals(10, writeStatusRDD.getNumPartitions());
Assert.assertEquals(0, hbasePutAccessParallelism);
Assert.assertEquals(0, hbaseNumPuts);
}
@Test
public void testsHBaseIndexDefaultQPSResourceAllocator() {
HoodieWriteConfig config = getConfig();