From 9f526396a085e52308b85d4dbfc71102005b5366 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Tue, 4 Apr 2017 05:17:38 -0700 Subject: [PATCH] Add support for merge_on_read tables to HoodieClientExample --- .../src/test/java/HoodieClientExample.java | 49 +++++++++++-------- .../common/table/HoodieTableMetaClient.java | 17 +++++++ 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/hoodie-client/src/test/java/HoodieClientExample.java b/hoodie-client/src/test/java/HoodieClientExample.java index 39724f67e..98cf2cba5 100644 --- a/hoodie-client/src/test/java/HoodieClientExample.java +++ b/hoodie-client/src/test/java/HoodieClientExample.java @@ -20,11 +20,15 @@ import com.beust.jcommander.Parameter; import com.uber.hoodie.HoodieWriteClient; import com.uber.hoodie.common.HoodieTestDataGenerator; import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieIndexConfig; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.index.HoodieIndex; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; @@ -32,7 +36,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.List; -import java.util.Properties; /** * Driver program that uses the Hoodie client with synthetic workload, and performs basic @@ -41,10 +44,13 @@ import java.util.Properties; public class HoodieClientExample { @Parameter(names={"--table-path", "-p"}, description = "path for Hoodie sample table") - private String inputTablePath = "file:///tmp/hoodie/sample-table"; + private String tablePath = "file:///tmp/hoodie/sample-table"; @Parameter(names={"--table-name", "-n"}, description = "table name for Hoodie sample table") - private String inputTableName = "sample-table"; + private String tableName = "sample-table"; + + @Parameter(names={"--table-type", "-t"}, description = "table type") + private String tableType = HoodieTableType.COPY_ON_WRITE.name(); private static Logger logger = LogManager.getLogger(HoodieClientExample.class); @@ -57,7 +63,6 @@ public class HoodieClientExample { public void run() throws Exception { - HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example"); sparkConf.setMaster("local[1]"); @@ -65,38 +70,42 @@ public class HoodieClientExample { sparkConf.set("spark.kryoserializer.buffer.max", "512m"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); - // generate some records to be loaded in. + // Generator of some records to be loaded in. + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + // initialize the table, if not done already + Path path = new Path(tablePath); + FileSystem fs = FSUtils.getFs(); + if (!fs.exists(path)) { + HoodieTableMetaClient.initTableType(fs, tablePath, HoodieTableType.valueOf(tableType), tableName); + } + + // Create the write client to write some records in HoodieWriteConfig cfg = - HoodieWriteConfig.newBuilder().withPath(inputTablePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) - .forTable(inputTableName).withIndexConfig( - HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) - .build(); - Properties properties = new Properties(); - properties.put(HoodieWriteConfig.TABLE_NAME, inputTableName); - HoodieTableMetaClient - .initializePathAsHoodieDataset(FSUtils.getFs(), inputTablePath, properties); + HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .forTable(tableName).withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); /** * Write 1 (only inserts) */ - String newCommitTime = "001"; + String newCommitTime = client.startCommit(); logger.info("Starting commit " + newCommitTime); List records = dataGen.generateInserts(newCommitTime, 100); - JavaRDD writeRecords = jsc.parallelize(records, 1); - + JavaRDD writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime); /** * Write 2 (updates) */ - newCommitTime = "002"; + newCommitTime = client.startCommit(); logger.info("Starting commit " + newCommitTime); records.addAll(dataGen.generateUpdates(newCommitTime, 100)); - - writeRecords = jsc.parallelize(records, 1); + writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime); } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index d145d21fb..b0387d870 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -172,6 +172,23 @@ public class HoodieTableMetaClient implements Serializable { return archivedTimeline; } + /** + * Helper method to initialize a given path, as a given storage type and table name + * + * @param fs + * @param basePath + * @param tableType + * @param tableName + * @return + * @throws IOException + */ + public static HoodieTableMetaClient initTableType(FileSystem fs, String basePath, HoodieTableType tableType, String tableName) throws IOException { + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); + properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); + return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties); + } + /** * Helper method to initialize a given path as a hoodie dataset with configs passed in as as Properties *