diff --git a/hudi-client/src/test/java/HoodieClientExample.java b/hudi-client/src/test/java/HoodieClientExample.java deleted file mode 100644 index 2bc716ec2..000000000 --- a/hudi-client/src/test/java/HoodieClientExample.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.hudi.client.HoodieWriteClient; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.HoodieClientTestUtils; -import org.apache.hudi.common.HoodieTestDataGenerator; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieIndexConfig; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.index.HoodieIndex.IndexType; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; - -import java.util.ArrayList; -import java.util.List; - -/** - * Driver program that uses the Hoodie client with synthetic workload, and performs basic operations. - *

- */ -public class HoodieClientExample { - - private static final Logger LOG = LogManager.getLogger(HoodieClientExample.class); - @Parameter(names = {"--help", "-h"}, help = true) - public Boolean help = false; - @Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie sample table") - private String tablePath = "file:///tmp/hoodie/sample-table"; - @Parameter(names = {"--table-name", "-n"}, description = "table name for Hoodie sample table") - private String tableName = "hoodie_rt"; - @Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ") - private String tableType = HoodieTableType.COPY_ON_WRITE.name(); - - public static void main(String[] args) throws Exception { - HoodieClientExample cli = new HoodieClientExample(); - JCommander cmd = new JCommander(cli, null, args); - - if (cli.help) { - cmd.usage(); - System.exit(1); - } - cli.run(); - } - - public void run() throws Exception { - - SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example"); - sparkConf.setMaster("local[1]"); - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - sparkConf.set("spark.kryoserializer.buffer.max", "512m"); - JavaSparkContext jsc = new JavaSparkContext(sparkConf); - - // Generator of some records to be loaded in. - HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); - - // initialize the table, if not done already - Path path = new Path(tablePath); - FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration()); - if (!fs.exists(path)) { - HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), - tableName, HoodieAvroPayload.class.getName()); - } - - // Create the write client to write some records in - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) - .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); - - List recordsSoFar = new ArrayList<>(); - /** - * Write 1 (only inserts) - */ - String newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - - List records = dataGen.generateInserts(newCommitTime, 100); - recordsSoFar.addAll(records); - JavaRDD writeRecords = jsc.parallelize(records, 1); - client.upsert(writeRecords, newCommitTime); - - /** - * Write 2 (updates) - */ - newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - List toBeUpdated = dataGen.generateUpdates(newCommitTime, 100); - records.addAll(toBeUpdated); - recordsSoFar.addAll(toBeUpdated); - writeRecords = jsc.parallelize(records, 1); - client.upsert(writeRecords, newCommitTime); - - /** - * Delete 1 - */ - newCommitTime = client.startCommit(); - LOG.info("Starting commit " + newCommitTime); - List toBeDeleted = HoodieClientTestUtils - .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(recordsSoFar), 10); - JavaRDD deleteRecords = jsc.parallelize(toBeDeleted, 1); - client.delete(deleteRecords, newCommitTime); - - /** - * Schedule a compaction and also perform compaction on a MOR table - */ - if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { - Option instant = client.scheduleCompaction(Option.empty()); - JavaRDD writeStatues = client.compact(instant.get()); - client.commitCompaction(instant.get(), writeStatues, Option.empty()); - } - } - -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java index 255c8195c..e6a7e3323 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroPayload.java @@ -67,4 +67,9 @@ public class HoodieAvroPayload implements HoodieRecordPayload } return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema)); } + + // for examples + public byte[] getRecordBytes() { + return recordBytes; + } } diff --git a/hudi-examples/README.md b/hudi-examples/README.md new file mode 100644 index 000000000..b17cd9691 --- /dev/null +++ b/hudi-examples/README.md @@ -0,0 +1,35 @@ + + +This directory contains examples code that uses hudi. + +To run the demo: + + 1. Configure your `SPARK_MASTER` env variable, yarn-cluster mode by default. + 2. For hudi write client demo and hudi data source demo, just use spark-submit as common spark app + 3. For hudi delta streamer demo of custom source, run `bin/custom-delta-streamer-example.sh` + 4. For hudi delta streamer demo of dfs source: + 4.1 Prepare dfs data, we have provided `src/main/resources/delta-streamer-config/dfs/source-file.json` for test + 4.2 Run `bin/dfs-delta-streamer-example.sh` + 5. For hudi delta streamer demo of dfs source: + 5.1 Start Kafka server + 5.2 Configure your Kafka properties, we have provided `src/main/resources/delta-streamer-config/kafka/kafka-source.properties` for test + 5.3 Run `bin/kafka-delta-streamer-example.sh` + 5.4 continuously write source data to the Kafka topic your configured with `hoodie.deltastreamer.source.kafka.topic` in `kafka-source.properties` + 6. Some notes delta streamer demo: + 6.1 The configuration files we provided is just the simplest demo, you can change it according to your specific needs. + 6.2 You could also use Intellij to run the example directly by configuring parameters as "Program arguments" diff --git a/hudi-examples/bin/custom-delta-streamer-example.sh b/hudi-examples/bin/custom-delta-streamer-example.sh new file mode 100755 index 000000000..4f21456ad --- /dev/null +++ b/hudi-examples/bin/custom-delta-streamer-example.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Simple examples of HoodieDeltaStreamer which read data from a mock HoodieExampleDataGenerator, +# this is an example for developers to define your own custom data source. + +BASE_PATH=$(cd `dirname $0`; pwd) + +${BASE_PATH}/hudi-delta-streamer \ +--hoodie-conf hoodie.datasource.write.recordkey.field=uuid \ +--hoodie-conf hoodie.datasource.write.partitionpath.field=driver \ +--target-base-path /tmp/hoodie/deltastreamertable \ +--table-type MERGE_ON_READ \ +--target-table deltastreamertable \ +--source-ordering-field ts \ +--source-class org.apache.hudi.examples.common.RandomJsonSource \ +--schemaprovider-class org.apache.hudi.examples.common.ExampleDataSchemaProvider \ +--transformer-class org.apache.hudi.examples.common.IdentityTransformer \ +--continuous diff --git a/hudi-examples/bin/dfs-delta-streamer-example.sh b/hudi-examples/bin/dfs-delta-streamer-example.sh new file mode 100755 index 000000000..d440a8116 --- /dev/null +++ b/hudi-examples/bin/dfs-delta-streamer-example.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Simple examples of HoodieDeltaStreamer which read data from JsonDFSSource, +# which will read data from a dfs directory for once, then write data to a hudi table which could be queried. + +BASE_PATH=$(cd `dirname $0`; pwd) + +${BASE_PATH}/hudi-delta-streamer \ +--hoodie-conf hoodie.datasource.write.recordkey.field=uuid \ +--hoodie-conf hoodie.datasource.write.partitionpath.field=driver \ +--hoodie-conf hoodie.deltastreamer.source.dfs.root=hudi-examples/src/main/resources/delta-streamer-config/dfs \ +--target-base-path /tmp/hoodie/deltastreamertable \ +--table-type MERGE_ON_READ \ +--target-table deltastreamertable \ +--source-ordering-field ts \ +--source-class org.apache.hudi.utilities.sources.JsonDFSSource \ +--schemaprovider-class org.apache.hudi.examples.common.ExampleDataSchemaProvider \ +--transformer-class org.apache.hudi.examples.common.IdentityTransformer diff --git a/hudi-examples/bin/hudi-delta-streamer b/hudi-examples/bin/hudi-delta-streamer new file mode 100755 index 000000000..9accd7174 --- /dev/null +++ b/hudi-examples/bin/hudi-delta-streamer @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +EXAMPLES_DIR="$(dirname $(dirname "${BASH_SOURCE[0]}"))" +PROJECT_DIR="$(dirname ${EXAMPLES_DIR})" + +JAR_FILE=`ls ${PROJECT_DIR}/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_*.jar | grep -v sources | grep -v tests.jar` +EXAMPLES_JARS=`ls ${PROJECT_DIR}/hudi-examples/target/hudi-examples-*.jar | grep -v sources | grep -v tests.jar` + +if [ -z "${SPARK_MASTER}" ]; then + SPARK_MASTER="yarn-cluster" +fi + +exec "${SPARK_HOME}"/bin/spark-submit \ +--master ${SPARK_MASTER} \ +--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \ +--conf spark.kryoserializer.buffer.max=128m \ +--conf spark.yarn.queue=root.default \ +--conf spark.yarn.submit.waitAppCompletion=false \ +--packages org.apache.spark:spark-avro_2.11:2.4.4 \ +--jars ${EXAMPLES_JARS} \ +--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ +"${JAR_FILE}" \ +$@ diff --git a/hudi-examples/bin/kafka-delta-streamer-example.sh b/hudi-examples/bin/kafka-delta-streamer-example.sh new file mode 100755 index 000000000..c271bf0ce --- /dev/null +++ b/hudi-examples/bin/kafka-delta-streamer-example.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Simple examples of HoodieDeltaStreamer which read data from kafka, +# create the source topic using: kafka-topics.sh --create --zookeeper zk:2181 --replication-factor 3 --partitions 1 --topic hoodie-source-topic +# insert data using: kafka-console-producer.sh --broker-list localhost:9092 --topic hoodie-source-topic +# start the delta-streamer + +BASE_PATH=$(cd `dirname $0`; pwd) + +${BASE_PATH}/hudi-delta-streamer \ +--props hudi-examples/src/main/resources/delta-streamer-config/kafka/kafka-source.properties \ +--target-base-path /tmp/hoodie/deltastreamertable \ +--table-type MERGE_ON_READ \ +--target-table deltastreamertable \ +--source-ordering-field ts \ +--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ +--schemaprovider-class org.apache.hudi.examples.common.ExampleDataSchemaProvider \ +--transformer-class org.apache.hudi.examples.common.IdentityTransformer \ +--continuous diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml new file mode 100644 index 000000000..f148b2acf --- /dev/null +++ b/hudi-examples/pom.xml @@ -0,0 +1,199 @@ + + + + + + hudi + org.apache.hudi + 0.6.0-SNAPSHOT + + 4.0.0 + + hudi-examples + jar + + + ${project.parent.basedir} + true + + + + + + src/main/resources + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + + + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.apache.hudi + hudi-common + ${project.version} + + + + org.apache.hudi + hudi-cli + ${project.version} + + + + org.apache.hudi + hudi-client + ${project.version} + + + + org.apache.hudi + hudi-utilities_${scala.binary.version} + ${project.version} + + + + org.apache.hudi + hudi-spark_${scala.binary.version} + ${project.version} + + + + org.apache.hudi + hudi-hadoop-mr + ${project.version} + + + + org.apache.hudi + hudi-timeline-service + ${project.version} + + + + + org.apache.spark + spark-core_${scala.binary.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + + + org.apache.spark + spark-avro_${scala.binary.version} + + + + + org.apache.parquet + parquet-hadoop + ${parquet.version} + + + + + org.apache.avro + avro + + + + org.apache.parquet + parquet-avro + + + + + ${hive.groupid} + hive-common + + + + diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java b/hudi-examples/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java new file mode 100644 index 000000000..3a07cc597 --- /dev/null +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/common/ExampleDataSchemaProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.common; + +import org.apache.avro.Schema; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.spark.api.java.JavaSparkContext; + + +/** + * the example SchemaProvider of example json data from uber. + */ +public class ExampleDataSchemaProvider extends SchemaProvider { + + public ExampleDataSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + } + + @Override + public Schema getSourceSchema() { + return org.apache.hudi.examples.common.HoodieExampleDataGenerator.avroSchema; + } +} diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java new file mode 100644 index 000000000..a9847ecc0 --- /dev/null +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.common; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + + +/** + * Class to be used to generate test data. + */ +public class HoodieExampleDataGenerator> { + + public static final String DEFAULT_FIRST_PARTITION_PATH = "2020/01/01"; + public static final String DEFAULT_SECOND_PARTITION_PATH = "2020/01/02"; + public static final String DEFAULT_THIRD_PARTITION_PATH = "2020/01/03"; + + public static final String[] DEFAULT_PARTITION_PATHS = + {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; + public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " + + "{\"name\": \"ts\",\"type\": \"double\"},{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," + + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," + + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); + + private static Random rand = new Random(46474747); + + private final Map existingKeys; + private final String[] partitionPaths; + private int numExistingKeys; + + public HoodieExampleDataGenerator(String[] partitionPaths) { + this(partitionPaths, new HashMap<>()); + } + + public HoodieExampleDataGenerator() { + this(DEFAULT_PARTITION_PATHS); + } + + public HoodieExampleDataGenerator(String[] partitionPaths, Map keyPartitionMap) { + this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length); + this.existingKeys = keyPartitionMap; + } + + /** + * Generates a new avro record of the above schema format, retaining the key if optionally provided. + */ + @SuppressWarnings("unchecked") + public T generateRandomValue(HoodieKey key, String commitTime) { + GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0); + return (T) new HoodieAvroPayload(Option.of(rec)); + } + + public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, + double timestamp) { + GenericRecord rec = new GenericData.Record(avroSchema); + rec.put("uuid", rowKey); + rec.put("ts", timestamp); + rec.put("rider", riderName); + rec.put("driver", driverName); + rec.put("begin_lat", rand.nextDouble()); + rec.put("begin_lon", rand.nextDouble()); + rec.put("end_lat", rand.nextDouble()); + rec.put("end_lon", rand.nextDouble()); + rec.put("fare", rand.nextDouble() * 100); + return rec; + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public List> generateInserts(String commitTime, Integer n) { + return generateInsertsStream(commitTime, n).collect(Collectors.toList()); + } + + /** + * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. + */ + public Stream> generateInsertsStream(String commitTime, Integer n) { + int currSize = getNumExistingKeys(); + + return IntStream.range(0, n).boxed().map(i -> { + String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)]; + HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath); + KeyPartition kp = new KeyPartition(); + kp.key = key; + kp.partitionPath = partitionPath; + existingKeys.put(currSize + i, kp); + numExistingKeys++; + return new HoodieRecord<>(key, generateRandomValue(key, commitTime)); + }); + } + + /** + * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned + * list + * + * @param commitTime Commit Timestamp + * @param n Number of updates (including dups) + * @return list of hoodie record updates + */ + public List> generateUpdates(String commitTime, Integer n) { + List> updates = new ArrayList<>(); + for (int i = 0; i < n; i++) { + KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1)); + HoodieRecord record = generateUpdateRecord(kp.key, commitTime); + updates.add(record); + } + return updates; + } + + public HoodieRecord generateUpdateRecord(HoodieKey key, String commitTime) { + return new HoodieRecord<>(key, generateRandomValue(key, commitTime)); + } + + private Option convertToString(HoodieRecord record) { + try { + String str = HoodieAvroUtils + .bytesToAvro(((HoodieAvroPayload)record.getData()).getRecordBytes(), avroSchema) + .toString(); + str = "{" + str.substring(str.indexOf("\"ts\":")); + return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}")); + } catch (IOException e) { + return Option.empty(); + } + } + + public List convertToStringList(List> records) { + return records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get) + .collect(Collectors.toList()); + } + + public int getNumExistingKeys() { + return numExistingKeys; + } + + public static class KeyPartition implements Serializable { + + HoodieKey key; + String partitionPath; + } + + public void close() { + existingKeys.clear(); + } + +} diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java new file mode 100644 index 000000000..3e7b0e837 --- /dev/null +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleSparkUtils.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.common; + +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; + +import java.util.HashMap; +import java.util.Map; + +/** + * Bunch of util methods. + */ +public class HoodieExampleSparkUtils { + + private static Map defaultConf() { + Map additionalConfigs = new HashMap<>(); + additionalConfigs.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + additionalConfigs.put("spark.kryoserializer.buffer.max", "512m"); + return additionalConfigs; + } + + public static SparkConf defaultSparkConf(String appName) { + return buildSparkConf(appName, defaultConf()); + } + + public static SparkConf buildSparkConf(String appName, Map additionalConfigs) { + + SparkConf sparkConf = new SparkConf().setAppName(appName); + additionalConfigs.forEach(sparkConf::set); + return sparkConf; + } + + public static SparkSession defaultSparkSession(String appName) { + return buildSparkSession(appName, defaultConf()); + } + + public static SparkSession buildSparkSession(String appName, Map additionalConfigs) { + + SparkSession.Builder builder = SparkSession.builder().appName(appName); + additionalConfigs.forEach(builder::config); + return builder.getOrCreate(); + } +} diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/IdentityTransformer.java b/hudi-examples/src/main/java/org/apache/hudi/examples/common/IdentityTransformer.java new file mode 100644 index 000000000..1f27c9c9f --- /dev/null +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/common/IdentityTransformer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.common; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.transform.Transformer; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +/** + * Identity transformer. + */ +public class IdentityTransformer implements Transformer { + + @Override + public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, + TypedProperties properties) { + return rowDataset; + } +} \ No newline at end of file diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java b/hudi-examples/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java new file mode 100644 index 000000000..af755f177 --- /dev/null +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/common/RandomJsonSource.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.common; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.InputBatch; +import org.apache.hudi.utilities.sources.JsonSource; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import java.util.List; + +public class RandomJsonSource extends JsonSource { + private HoodieExampleDataGenerator dataGen; + + public RandomJsonSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + dataGen = new HoodieExampleDataGenerator<>(); + } + + protected InputBatch> fetchNewData(Option lastCkptStr, long sourceLimit) { + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + List inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)); + + return new InputBatch<>(Option.of(sparkContext.parallelize(inserts, 1)), commitTime); + } +} \ No newline at end of file diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java new file mode 100644 index 000000000..8e976c307 --- /dev/null +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.spark; + +import org.apache.hudi.client.HoodieWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.examples.common.HoodieExampleDataGenerator; +import org.apache.hudi.examples.common.HoodieExampleSparkUtils; +import org.apache.hudi.index.HoodieIndex; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + + +/** + * Simple examples of #{@link HoodieWriteClient}. + * + * To run this example, you should + * 1. For running in IDE, set VM options `-Dspark.master=local[2]` + * 2. For running in shell, using `spark-submit` + * + * Usage: HoodieWriteClientExample + * and describe root path of hudi and table name + * for example, `HoodieWriteClientExample file:///tmp/hoodie/sample-table hoodie_rt` + */ +public class HoodieWriteClientExample { + + private static final Logger LOG = LogManager.getLogger(HoodieWriteClientExample.class); + + private static String tableType = HoodieTableType.COPY_ON_WRITE.name(); + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: HoodieWriteClientExample "); + System.exit(1); + } + String tablePath = args[0]; + String tableName = args[1]; + SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example"); + + try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) { + + // Generator of some records to be loaded in. + HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + + // initialize the table, if not done already + Path path = new Path(tablePath); + FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration()); + if (!fs.exists(path)) { + HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), + tableName, HoodieAvroPayload.class.getName()); + } + + // Create the write client to write some records in + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + HoodieWriteClient client = new HoodieWriteClient<>(jsc, cfg); + + // inserts + String newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + + List> records = dataGen.generateInserts(newCommitTime, 10); + List> recordsSoFar = new ArrayList<>(records); + JavaRDD> writeRecords = jsc.parallelize(records, 1); + client.upsert(writeRecords, newCommitTime); + + // updates + newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + List> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); + records.addAll(toBeUpdated); + recordsSoFar.addAll(toBeUpdated); + writeRecords = jsc.parallelize(records, 1); + client.upsert(writeRecords, newCommitTime); + + // Delete + newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + // just delete half of the records + int numToDelete = recordsSoFar.size() / 2; + List toBeDeleted = recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList()); + JavaRDD deleteRecords = jsc.parallelize(toBeDeleted, 1); + client.delete(deleteRecords, newCommitTime); + + // compaction + if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) { + Option instant = client.scheduleCompaction(Option.empty()); + JavaRDD writeStatues = client.compact(instant.get()); + client.commitCompaction(instant.get(), writeStatues, Option.empty()); + } + + } + } + +} diff --git a/hudi-examples/src/main/resources/delta-streamer-config/dfs/source-file.json b/hudi-examples/src/main/resources/delta-streamer-config/dfs/source-file.json new file mode 100644 index 000000000..2448f7a63 --- /dev/null +++ b/hudi-examples/src/main/resources/delta-streamer-config/dfs/source-file.json @@ -0,0 +1,10 @@ +{"ts": 0.0, "uuid": "d4b5b1eb-5c4f-484b-9f5b-d9e0c29ab0ba", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.18433112391820694, "begin_lon": 0.4457079093559174, "end_lat": 0.38128402026859787, "end_lon": 0.4528353922784837, "fare": 18.769410203570114, "partitionpath": "2020/01/02"} +{"ts": 0.0, "uuid": "12a6af4a-08e9-4fbd-8942-32525e8ffc25", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.3415828471638285, "begin_lon": 0.35472417786727917, "end_lat": 0.5744827818563615, "end_lon": 0.4645148864505745, "fare": 83.12040940386028, "partitionpath": "2020/01/01"} +{"ts": 0.0, "uuid": "209289a5-fe6d-42ba-92c3-3380c96d382e", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.434923811219014, "begin_lon": 0.834448733526223, "end_lat": 0.2861201538495416, "end_lon": 0.6434040231985722, "fare": 12.289323546303788, "partitionpath": "2020/01/01"} +{"ts": 0.0, "uuid": "2e8a6c33-ecc3-4279-ac4b-9571ecfb2fc3", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.6427696295884016, "begin_lon": 0.23849882507684073, "end_lat": 0.6613489410705939, "end_lon": 0.6220454661413275, "fare": 72.024792576853, "partitionpath": "2020/01/01"} +{"ts": 0.0, "uuid": "2acc3303-86d1-4f73-a062-cd05faf3c46a", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.9356018115236618, "begin_lon": 0.44714286038480855, "end_lat": 0.8473860258521023, "end_lon": 0.1762368947074756, "fare": 79.42627821413218, "partitionpath": "2020/01/02"} +{"ts": 0.0, "uuid": "25c579d6-e480-4373-ae71-b4d524a74142", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.17851135255091155, "begin_lon": 0.39556048623031603, "end_lat": 0.5878106318835439, "end_lon": 0.36519521355305173, "fare": 98.88075495133515, "partitionpath": "2020/01/02"} +{"ts": 0.0, "uuid": "974198b6-507e-4de2-bbe3-997d025e83b9", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.19179139106643606, "begin_lon": 0.8157865134723307, "end_lat": 0.6418467159488594, "end_lon": 0.14516349705850584, "fare": 12.153670568058683, "partitionpath": "2020/01/01"} +{"ts": 0.0, "uuid": "f91c2067-7527-491b-a766-e180d6da1371", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.33922164839486424, "begin_lon": 0.909372837469859, "end_lat": 0.9017656600243008, "end_lon": 0.8236411667430927, "fare": 2.0856583634078385, "partitionpath": "2020/01/03"} +{"ts": 0.0, "uuid": "1b9fb778-3349-4172-b81d-522e181b36e1", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.6662084366450246, "begin_lon": 0.9065078444936647, "end_lat": 0.7124299678100179, "end_lon": 0.05336723040266267, "fare": 38.63372961020515, "partitionpath": "2020/01/02"} +{"ts": 0.0, "uuid": "a4b56c00-f372-414a-9c1c-458c10d648b9", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.4106290929046368, "begin_lon": 0.964603455586492, "end_lat": 0.13957566957654388, "end_lon": 0.45400191464227213, "fare": 81.37564420028626, "partitionpath": "2020/01/02"} \ No newline at end of file diff --git a/hudi-examples/src/main/resources/delta-streamer-config/kafka/kafka-source.properties b/hudi-examples/src/main/resources/delta-streamer-config/kafka/kafka-source.properties new file mode 100644 index 000000000..496ab69fd --- /dev/null +++ b/hudi-examples/src/main/resources/delta-streamer-config/kafka/kafka-source.properties @@ -0,0 +1,31 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + + +# Key fields, for kafka example +hoodie.datasource.write.recordkey.field=uuid +hoodie.datasource.write.partitionpath.field=driver + +# Kafka Source topic +hoodie.deltastreamer.source.kafka.topic=hoodie-source-topic + +# Kafka props +# The kafka cluster we want to ingest from +bootstrap.servers=localhost:9092 +auto.offset.reset=earliest +group.id=hoodie-delta-streamer diff --git a/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala b/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala new file mode 100644 index 000000000..27accadeb --- /dev/null +++ b/hudi-examples/src/main/scala/org/apache/hudi/examples/spark/HoodieDataSourceExample.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.spark + +import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME_OPT_KEY, END_INSTANTTIME_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_OPT_KEY} +import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} +import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs +import org.apache.hudi.common.model.HoodieAvroPayload +import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME +import org.apache.hudi.examples.common.{HoodieExampleDataGenerator, HoodieExampleSparkUtils} +import org.apache.spark.sql.SaveMode.{Append, Overwrite} +import org.apache.spark.sql.SparkSession + +import scala.collection.JavaConversions._ + +/** + * Simple examples of [[org.apache.hudi.DefaultSource]] + * + * To run this example, you should + * 1. For running in IDE, set VM options `-Dspark.master=local[2]` + * 2. For running in shell, using `spark-submit` + * + * Usage: HoodieWriteClientExample . + * and describe root path of hudi and table name + * for example, `HoodieDataSourceExample file:///tmp/hoodie/hudi_cow_table hudi_cow_table` + */ +object HoodieDataSourceExample { + + def main(args: Array[String]): Unit = { + + if (args.length < 2) { + System.err.println("Usage: HoodieDataSourceExample ") + System.exit(1) + } + val tablePath = args(0) + val tableName = args(1) + + val spark = HoodieExampleSparkUtils.defaultSparkSession("Hudi Spark basic example") + + val dataGen = new HoodieExampleDataGenerator[HoodieAvroPayload] + insertData(spark, tablePath, tableName, dataGen) + updateData(spark, tablePath, tableName, dataGen) + queryData(spark, tablePath, tableName, dataGen) + + incrementalQuery(spark, tablePath, tableName) + pointInTimeQuery(spark, tablePath, tableName) + + spark.stop() + } + + + /** + * Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below. + */ + def insertData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { + + val commitTime: String = System.currentTimeMillis().toString + val inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20)) + spark.sparkContext.parallelize(inserts, 2) + val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1)) + df.write.format("org.apache.hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Overwrite). + save(tablePath) + } + + /** + * Load the data files into a DataFrame. + */ + def queryData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { + val roViewDF = spark. + read. + format("org.apache.hudi"). + load(tablePath + "/*/*/*/*") + + roViewDF.createOrReplaceTempView("hudi_ro_table") + + spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show() + // +-----------------+-------------------+-------------------+---+ + // | fare| begin_lon| begin_lat| ts| + // +-----------------+-------------------+-------------------+---+ + // |98.88075495133515|0.39556048623031603|0.17851135255091155|0.0| + // ... + + spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show() + // +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+ + // |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| + // +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+ + // | 20191231181501|31cafb9f-0196-4b1...| 2020/01/02|rider-1577787297889|driver-1577787297889| 98.88075495133515| + // ... + } + + /** + * This is similar to inserting new data. Generate updates to existing trips using the data generator, + * load into a DataFrame and write DataFrame into the hudi dataset. + */ + def updateData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = { + + val commitTime: String = System.currentTimeMillis().toString + val updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10)) + val df = spark.read.json(spark.sparkContext.parallelize(updates, 1)) + df.write.format("org.apache.hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Append). + save(tablePath) + } + + /** + * Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. + * This can be achieved using Hudi’s incremental view and providing a begin time from which changes need to be streamed. + * We do not need to specify endTime, if we want all changes after the given commit (as is the common case). + */ + def incrementalQuery(spark: SparkSession, tablePath: String, tableName: String) { + import spark.implicits._ + val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) + val beginTime = commits(commits.length - 2) // commit time we are interested in + + // incrementally query data + val incViewDF = spark. + read. + format("org.apache.hudi"). + option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). + option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). + load(tablePath) + incViewDF.createOrReplaceTempView("hudi_incr_table") + spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() + } + + /** + * Lets look at how to query data as of a specific time. + * The specific time can be represented by pointing endTime to a specific commit time + * and beginTime to “000” (denoting earliest possible commit time). + */ + def pointInTimeQuery(spark: SparkSession, tablePath: String, tableName: String) { + import spark.implicits._ + val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50) + val beginTime = "000" // Represents all commits > this time. + val endTime = commits(commits.length - 2) // commit time we are interested in + + //incrementally query data + val incViewDF = spark.read.format("org.apache.hudi"). + option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). + option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). + option(END_INSTANTTIME_OPT_KEY, endTime). + load(tablePath) + incViewDF.createOrReplaceTempView("hudi_incr_table") + spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show() + } +} diff --git a/pom.xml b/pom.xml index 8f1bac468..cc39f4136 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ packaging/hudi-timeline-server-bundle docker/hoodie/hadoop hudi-integ-test + hudi-examples @@ -267,6 +268,15 @@ maven-jar-plugin ${maven-jar-plugin.version} + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + org.apache.maven.plugins + maven-compiler-plugin + org.jacoco jacoco-maven-plugin