1
0

[HUDI-476]: Add hudi-examples module (#1151)

add hoodie delta streamer mock source example and dfs source and kafka source examples

Signed-off-by: dengziming <dengziming1993@gmail.com>

add defaultSparkConf utils method

change version of hudi-examples to 0.5.2-SNAPSHOT
change the artifcatId of hudi-spark and hudi-utilities
alter some code to adapt kafka2.0

Update scritps

Add license
This commit is contained in:
dengziming
2020-05-28 01:44:39 +08:00
committed by GitHub
parent 03f136361a
commit bde7a7043e
18 changed files with 1113 additions and 144 deletions

View File

@@ -1,144 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex.IndexType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
/**
* Driver program that uses the Hoodie client with synthetic workload, and performs basic operations.
* <p>
*/
public class HoodieClientExample {
private static final Logger LOG = LogManager.getLogger(HoodieClientExample.class);
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie sample table")
private String tablePath = "file:///tmp/hoodie/sample-table";
@Parameter(names = {"--table-name", "-n"}, description = "table name for Hoodie sample table")
private String tableName = "hoodie_rt";
@Parameter(names = {"--table-type", "-t"}, description = "One of COPY_ON_WRITE or MERGE_ON_READ")
private String tableType = HoodieTableType.COPY_ON_WRITE.name();
public static void main(String[] args) throws Exception {
HoodieClientExample cli = new HoodieClientExample();
JCommander cmd = new JCommander(cli, null, args);
if (cli.help) {
cmd.usage();
System.exit(1);
}
cli.run();
}
public void run() throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example");
sparkConf.setMaster("local[1]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryoserializer.buffer.max", "512m");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Generator of some records to be loaded in.
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// initialize the table, if not done already
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration());
if (!fs.exists(path)) {
HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
}
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
List<HoodieRecord> recordsSoFar = new ArrayList<>();
/**
* Write 1 (only inserts)
*/
String newCommitTime = client.startCommit();
LOG.info("Starting commit " + newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
recordsSoFar.addAll(records);
JavaRDD<HoodieRecord> writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
client.upsert(writeRecords, newCommitTime);
/**
* Write 2 (updates)
*/
newCommitTime = client.startCommit();
LOG.info("Starting commit " + newCommitTime);
List<HoodieRecord> toBeUpdated = dataGen.generateUpdates(newCommitTime, 100);
records.addAll(toBeUpdated);
recordsSoFar.addAll(toBeUpdated);
writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
client.upsert(writeRecords, newCommitTime);
/**
* Delete 1
*/
newCommitTime = client.startCommit();
LOG.info("Starting commit " + newCommitTime);
List<HoodieKey> toBeDeleted = HoodieClientTestUtils
.getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(recordsSoFar), 10);
JavaRDD<HoodieKey> deleteRecords = jsc.<HoodieKey>parallelize(toBeDeleted, 1);
client.delete(deleteRecords, newCommitTime);
/**
* Schedule a compaction and also perform compaction on a MOR table
*/
if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) {
Option<String> instant = client.scheduleCompaction(Option.empty());
JavaRDD<WriteStatus> writeStatues = client.compact(instant.get());
client.commitCompaction(instant.get(), writeStatues, Option.empty());
}
}
}

View File

@@ -67,4 +67,9 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
}
return Option.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
}
// for examples
public byte[] getRecordBytes() {
return recordBytes;
}
}

35
hudi-examples/README.md Normal file
View File

@@ -0,0 +1,35 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
This directory contains examples code that uses hudi.
To run the demo:
1. Configure your `SPARK_MASTER` env variable, yarn-cluster mode by default.
2. For hudi write client demo and hudi data source demo, just use spark-submit as common spark app
3. For hudi delta streamer demo of custom source, run `bin/custom-delta-streamer-example.sh`
4. For hudi delta streamer demo of dfs source:
4.1 Prepare dfs data, we have provided `src/main/resources/delta-streamer-config/dfs/source-file.json` for test
4.2 Run `bin/dfs-delta-streamer-example.sh`
5. For hudi delta streamer demo of dfs source:
5.1 Start Kafka server
5.2 Configure your Kafka properties, we have provided `src/main/resources/delta-streamer-config/kafka/kafka-source.properties` for test
5.3 Run `bin/kafka-delta-streamer-example.sh`
5.4 continuously write source data to the Kafka topic your configured with `hoodie.deltastreamer.source.kafka.topic` in `kafka-source.properties`
6. Some notes delta streamer demo:
6.1 The configuration files we provided is just the simplest demo, you can change it according to your specific needs.
6.2 You could also use Intellij to run the example directly by configuring parameters as "Program arguments"

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Simple examples of HoodieDeltaStreamer which read data from a mock HoodieExampleDataGenerator,
# this is an example for developers to define your own custom data source.
BASE_PATH=$(cd `dirname $0`; pwd)
${BASE_PATH}/hudi-delta-streamer \
--hoodie-conf hoodie.datasource.write.recordkey.field=uuid \
--hoodie-conf hoodie.datasource.write.partitionpath.field=driver \
--target-base-path /tmp/hoodie/deltastreamertable \
--table-type MERGE_ON_READ \
--target-table deltastreamertable \
--source-ordering-field ts \
--source-class org.apache.hudi.examples.common.RandomJsonSource \
--schemaprovider-class org.apache.hudi.examples.common.ExampleDataSchemaProvider \
--transformer-class org.apache.hudi.examples.common.IdentityTransformer \
--continuous

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Simple examples of HoodieDeltaStreamer which read data from JsonDFSSource,
# which will read data from a dfs directory for once, then write data to a hudi table which could be queried.
BASE_PATH=$(cd `dirname $0`; pwd)
${BASE_PATH}/hudi-delta-streamer \
--hoodie-conf hoodie.datasource.write.recordkey.field=uuid \
--hoodie-conf hoodie.datasource.write.partitionpath.field=driver \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=hudi-examples/src/main/resources/delta-streamer-config/dfs \
--target-base-path /tmp/hoodie/deltastreamertable \
--table-type MERGE_ON_READ \
--target-table deltastreamertable \
--source-ordering-field ts \
--source-class org.apache.hudi.utilities.sources.JsonDFSSource \
--schemaprovider-class org.apache.hudi.examples.common.ExampleDataSchemaProvider \
--transformer-class org.apache.hudi.examples.common.IdentityTransformer

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
EXAMPLES_DIR="$(dirname $(dirname "${BASH_SOURCE[0]}"))"
PROJECT_DIR="$(dirname ${EXAMPLES_DIR})"
JAR_FILE=`ls ${PROJECT_DIR}/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_*.jar | grep -v sources | grep -v tests.jar`
EXAMPLES_JARS=`ls ${PROJECT_DIR}/hudi-examples/target/hudi-examples-*.jar | grep -v sources | grep -v tests.jar`
if [ -z "${SPARK_MASTER}" ]; then
SPARK_MASTER="yarn-cluster"
fi
exec "${SPARK_HOME}"/bin/spark-submit \
--master ${SPARK_MASTER} \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.kryoserializer.buffer.max=128m \
--conf spark.yarn.queue=root.default \
--conf spark.yarn.submit.waitAppCompletion=false \
--packages org.apache.spark:spark-avro_2.11:2.4.4 \
--jars ${EXAMPLES_JARS} \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
"${JAR_FILE}" \
$@

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Simple examples of HoodieDeltaStreamer which read data from kafka,
# create the source topic using: kafka-topics.sh --create --zookeeper zk:2181 --replication-factor 3 --partitions 1 --topic hoodie-source-topic
# insert data using: kafka-console-producer.sh --broker-list localhost:9092 --topic hoodie-source-topic
# start the delta-streamer
BASE_PATH=$(cd `dirname $0`; pwd)
${BASE_PATH}/hudi-delta-streamer \
--props hudi-examples/src/main/resources/delta-streamer-config/kafka/kafka-source.properties \
--target-base-path /tmp/hoodie/deltastreamertable \
--table-type MERGE_ON_READ \
--target-table deltastreamertable \
--source-ordering-field ts \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--schemaprovider-class org.apache.hudi.examples.common.ExampleDataSchemaProvider \
--transformer-class org.apache.hudi.examples.common.IdentityTransformer \
--continuous

199
hudi-examples/pom.xml Normal file
View File

@@ -0,0 +1,199 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-examples</artifactId>
<packaging>jar</packaging>
<properties>
<main.basedir>${project.parent.basedir}</main.basedir>
<checkstyle.skip>true</checkstyle.skip>
</properties>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>true</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
<configuration>
<skip>false</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-utilities_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-timeline-service</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
</dependency>
<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
</dependency>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-common</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.examples.common;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
/**
* the example SchemaProvider of example json data from uber.
*/
public class ExampleDataSchemaProvider extends SchemaProvider {
public ExampleDataSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
@Override
public Schema getSourceSchema() {
return org.apache.hudi.examples.common.HoodieExampleDataGenerator.avroSchema;
}
}

View File

@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.examples.common;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
* Class to be used to generate test data.
*/
public class HoodieExampleDataGenerator<T extends HoodieRecordPayload<T>> {
public static final String DEFAULT_FIRST_PARTITION_PATH = "2020/01/01";
public static final String DEFAULT_SECOND_PARTITION_PATH = "2020/01/02";
public static final String DEFAULT_THIRD_PARTITION_PATH = "2020/01/03";
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"ts\",\"type\": \"double\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
private static Random rand = new Random(46474747);
private final Map<Integer, KeyPartition> existingKeys;
private final String[] partitionPaths;
private int numExistingKeys;
public HoodieExampleDataGenerator(String[] partitionPaths) {
this(partitionPaths, new HashMap<>());
}
public HoodieExampleDataGenerator() {
this(DEFAULT_PARTITION_PATHS);
}
public HoodieExampleDataGenerator(String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeys = keyPartitionMap;
}
/**
* Generates a new avro record of the above schema format, retaining the key if optionally provided.
*/
@SuppressWarnings("unchecked")
public T generateRandomValue(HoodieKey key, String commitTime) {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
return (T) new HoodieAvroPayload(Option.of(rec));
}
public GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
double timestamp) {
GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("uuid", rowKey);
rec.put("ts", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("begin_lat", rand.nextDouble());
rec.put("begin_lon", rand.nextDouble());
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
rec.put("fare", rand.nextDouble() * 100);
return rec;
}
/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public List<HoodieRecord<T>> generateInserts(String commitTime, Integer n) {
return generateInsertsStream(commitTime, n).collect(Collectors.toList());
}
/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public Stream<HoodieRecord<T>> generateInsertsStream(String commitTime, Integer n) {
int currSize = getNumExistingKeys();
return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
KeyPartition kp = new KeyPartition();
kp.key = key;
kp.partitionPath = partitionPath;
existingKeys.put(currSize + i, kp);
numExistingKeys++;
return new HoodieRecord<>(key, generateRandomValue(key, commitTime));
});
}
/**
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned
* list
*
* @param commitTime Commit Timestamp
* @param n Number of updates (including dups)
* @return list of hoodie record updates
*/
public List<HoodieRecord<T>> generateUpdates(String commitTime, Integer n) {
List<HoodieRecord<T>> updates = new ArrayList<>();
for (int i = 0; i < n; i++) {
KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1));
HoodieRecord<T> record = generateUpdateRecord(kp.key, commitTime);
updates.add(record);
}
return updates;
}
public HoodieRecord<T> generateUpdateRecord(HoodieKey key, String commitTime) {
return new HoodieRecord<>(key, generateRandomValue(key, commitTime));
}
private Option<String> convertToString(HoodieRecord<T> record) {
try {
String str = HoodieAvroUtils
.bytesToAvro(((HoodieAvroPayload)record.getData()).getRecordBytes(), avroSchema)
.toString();
str = "{" + str.substring(str.indexOf("\"ts\":"));
return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}"));
} catch (IOException e) {
return Option.empty();
}
}
public List<String> convertToStringList(List<HoodieRecord<T>> records) {
return records.stream().map(this::convertToString).filter(Option::isPresent).map(Option::get)
.collect(Collectors.toList());
}
public int getNumExistingKeys() {
return numExistingKeys;
}
public static class KeyPartition implements Serializable {
HoodieKey key;
String partitionPath;
}
public void close() {
existingKeys.clear();
}
}

View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.examples.common;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import java.util.HashMap;
import java.util.Map;
/**
* Bunch of util methods.
*/
public class HoodieExampleSparkUtils {
private static Map<String, String> defaultConf() {
Map<String, String> additionalConfigs = new HashMap<>();
additionalConfigs.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
return additionalConfigs;
}
public static SparkConf defaultSparkConf(String appName) {
return buildSparkConf(appName, defaultConf());
}
public static SparkConf buildSparkConf(String appName, Map<String, String> additionalConfigs) {
SparkConf sparkConf = new SparkConf().setAppName(appName);
additionalConfigs.forEach(sparkConf::set);
return sparkConf;
}
public static SparkSession defaultSparkSession(String appName) {
return buildSparkSession(appName, defaultConf());
}
public static SparkSession buildSparkSession(String appName, Map<String, String> additionalConfigs) {
SparkSession.Builder builder = SparkSession.builder().appName(appName);
additionalConfigs.forEach(builder::config);
return builder.getOrCreate();
}
}

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.examples.common;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Identity transformer.
*/
public class IdentityTransformer implements Transformer {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
return rowDataset;
}
}

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.examples.common;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.List;
public class RandomJsonSource extends JsonSource {
private HoodieExampleDataGenerator<HoodieAvroPayload> dataGen;
public RandomJsonSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
dataGen = new HoodieExampleDataGenerator<>();
}
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
List<String> inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20));
return new InputBatch<>(Option.of(sparkContext.parallelize(inserts, 1)), commitTime);
}
}

View File

@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.examples.spark;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
import org.apache.hudi.examples.common.HoodieExampleSparkUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Simple examples of #{@link HoodieWriteClient}.
*
* To run this example, you should
* 1. For running in IDE, set VM options `-Dspark.master=local[2]`
* 2. For running in shell, using `spark-submit`
*
* Usage: HoodieWriteClientExample <tablePath> <tableName>
* <tablePath> and <tableName> describe root path of hudi and table name
* for example, `HoodieWriteClientExample file:///tmp/hoodie/sample-table hoodie_rt`
*/
public class HoodieWriteClientExample {
private static final Logger LOG = LogManager.getLogger(HoodieWriteClientExample.class);
private static String tableType = HoodieTableType.COPY_ON_WRITE.name();
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: HoodieWriteClientExample <tablePath> <tableName>");
System.exit(1);
}
String tablePath = args[0];
String tableName = args[1];
SparkConf sparkConf = HoodieExampleSparkUtils.defaultSparkConf("hoodie-client-example");
try (JavaSparkContext jsc = new JavaSparkContext(sparkConf)) {
// Generator of some records to be loaded in.
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen = new HoodieExampleDataGenerator<>();
// initialize the table, if not done already
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration());
if (!fs.exists(path)) {
HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
}
// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
HoodieWriteClient<HoodieAvroPayload> client = new HoodieWriteClient<>(jsc, cfg);
// inserts
String newCommitTime = client.startCommit();
LOG.info("Starting commit " + newCommitTime);
List<HoodieRecord<HoodieAvroPayload>> records = dataGen.generateInserts(newCommitTime, 10);
List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records);
JavaRDD<HoodieRecord<HoodieAvroPayload>> writeRecords = jsc.parallelize(records, 1);
client.upsert(writeRecords, newCommitTime);
// updates
newCommitTime = client.startCommit();
LOG.info("Starting commit " + newCommitTime);
List<HoodieRecord<HoodieAvroPayload>> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2);
records.addAll(toBeUpdated);
recordsSoFar.addAll(toBeUpdated);
writeRecords = jsc.parallelize(records, 1);
client.upsert(writeRecords, newCommitTime);
// Delete
newCommitTime = client.startCommit();
LOG.info("Starting commit " + newCommitTime);
// just delete half of the records
int numToDelete = recordsSoFar.size() / 2;
List<HoodieKey> toBeDeleted = recordsSoFar.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
client.delete(deleteRecords, newCommitTime);
// compaction
if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) {
Option<String> instant = client.scheduleCompaction(Option.empty());
JavaRDD<WriteStatus> writeStatues = client.compact(instant.get());
client.commitCompaction(instant.get(), writeStatues, Option.empty());
}
}
}
}

View File

@@ -0,0 +1,10 @@
{"ts": 0.0, "uuid": "d4b5b1eb-5c4f-484b-9f5b-d9e0c29ab0ba", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.18433112391820694, "begin_lon": 0.4457079093559174, "end_lat": 0.38128402026859787, "end_lon": 0.4528353922784837, "fare": 18.769410203570114, "partitionpath": "2020/01/02"}
{"ts": 0.0, "uuid": "12a6af4a-08e9-4fbd-8942-32525e8ffc25", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.3415828471638285, "begin_lon": 0.35472417786727917, "end_lat": 0.5744827818563615, "end_lon": 0.4645148864505745, "fare": 83.12040940386028, "partitionpath": "2020/01/01"}
{"ts": 0.0, "uuid": "209289a5-fe6d-42ba-92c3-3380c96d382e", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.434923811219014, "begin_lon": 0.834448733526223, "end_lat": 0.2861201538495416, "end_lon": 0.6434040231985722, "fare": 12.289323546303788, "partitionpath": "2020/01/01"}
{"ts": 0.0, "uuid": "2e8a6c33-ecc3-4279-ac4b-9571ecfb2fc3", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.6427696295884016, "begin_lon": 0.23849882507684073, "end_lat": 0.6613489410705939, "end_lon": 0.6220454661413275, "fare": 72.024792576853, "partitionpath": "2020/01/01"}
{"ts": 0.0, "uuid": "2acc3303-86d1-4f73-a062-cd05faf3c46a", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.9356018115236618, "begin_lon": 0.44714286038480855, "end_lat": 0.8473860258521023, "end_lon": 0.1762368947074756, "fare": 79.42627821413218, "partitionpath": "2020/01/02"}
{"ts": 0.0, "uuid": "25c579d6-e480-4373-ae71-b4d524a74142", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.17851135255091155, "begin_lon": 0.39556048623031603, "end_lat": 0.5878106318835439, "end_lon": 0.36519521355305173, "fare": 98.88075495133515, "partitionpath": "2020/01/02"}
{"ts": 0.0, "uuid": "974198b6-507e-4de2-bbe3-997d025e83b9", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.19179139106643606, "begin_lon": 0.8157865134723307, "end_lat": 0.6418467159488594, "end_lon": 0.14516349705850584, "fare": 12.153670568058683, "partitionpath": "2020/01/01"}
{"ts": 0.0, "uuid": "f91c2067-7527-491b-a766-e180d6da1371", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.33922164839486424, "begin_lon": 0.909372837469859, "end_lat": 0.9017656600243008, "end_lon": 0.8236411667430927, "fare": 2.0856583634078385, "partitionpath": "2020/01/03"}
{"ts": 0.0, "uuid": "1b9fb778-3349-4172-b81d-522e181b36e1", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.6662084366450246, "begin_lon": 0.9065078444936647, "end_lat": 0.7124299678100179, "end_lon": 0.05336723040266267, "fare": 38.63372961020515, "partitionpath": "2020/01/02"}
{"ts": 0.0, "uuid": "a4b56c00-f372-414a-9c1c-458c10d648b9", "rider": "rider-20200113114823", "driver": "driver-20200113114823", "begin_lat": 0.4106290929046368, "begin_lon": 0.964603455586492, "end_lat": 0.13957566957654388, "end_lon": 0.45400191464227213, "fare": 81.37564420028626, "partitionpath": "2020/01/02"}

View File

@@ -0,0 +1,31 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=driver
# Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=hoodie-source-topic
# Kafka props
# The kafka cluster we want to ingest from
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
group.id=hoodie-delta-streamer

View File

@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.examples.spark
import org.apache.hudi.DataSourceReadOptions.{BEGIN_INSTANTTIME_OPT_KEY, END_INSTANTTIME_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_OPT_KEY}
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
import org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs
import org.apache.hudi.common.model.HoodieAvroPayload
import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
import org.apache.hudi.examples.common.{HoodieExampleDataGenerator, HoodieExampleSparkUtils}
import org.apache.spark.sql.SaveMode.{Append, Overwrite}
import org.apache.spark.sql.SparkSession
import scala.collection.JavaConversions._
/**
* Simple examples of [[org.apache.hudi.DefaultSource]]
*
* To run this example, you should
* 1. For running in IDE, set VM options `-Dspark.master=local[2]`
* 2. For running in shell, using `spark-submit`
*
* Usage: HoodieWriteClientExample <tablePath> <tableName>.
* <tablePath> and <tableName> describe root path of hudi and table name
* for example, `HoodieDataSourceExample file:///tmp/hoodie/hudi_cow_table hudi_cow_table`
*/
object HoodieDataSourceExample {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: HoodieDataSourceExample <tablePath> <tableName>")
System.exit(1)
}
val tablePath = args(0)
val tableName = args(1)
val spark = HoodieExampleSparkUtils.defaultSparkSession("Hudi Spark basic example")
val dataGen = new HoodieExampleDataGenerator[HoodieAvroPayload]
insertData(spark, tablePath, tableName, dataGen)
updateData(spark, tablePath, tableName, dataGen)
queryData(spark, tablePath, tableName, dataGen)
incrementalQuery(spark, tablePath, tableName)
pointInTimeQuery(spark, tablePath, tableName)
spark.stop()
}
/**
* Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi dataset as below.
*/
def insertData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = {
val commitTime: String = System.currentTimeMillis().toString
val inserts = dataGen.convertToStringList(dataGen.generateInserts(commitTime, 20))
spark.sparkContext.parallelize(inserts, 2)
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(tablePath)
}
/**
* Load the data files into a DataFrame.
*/
def queryData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = {
val roViewDF = spark.
read.
format("org.apache.hudi").
load(tablePath + "/*/*/*/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()
// +-----------------+-------------------+-------------------+---+
// | fare| begin_lon| begin_lat| ts|
// +-----------------+-------------------+-------------------+---+
// |98.88075495133515|0.39556048623031603|0.17851135255091155|0.0|
// ...
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show()
// +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+
// |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare|
// +-------------------+--------------------+----------------------+-------------------+--------------------+------------------+
// | 20191231181501|31cafb9f-0196-4b1...| 2020/01/02|rider-1577787297889|driver-1577787297889| 98.88075495133515|
// ...
}
/**
* This is similar to inserting new data. Generate updates to existing trips using the data generator,
* load into a DataFrame and write DataFrame into the hudi dataset.
*/
def updateData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = {
val commitTime: String = System.currentTimeMillis().toString
val updates = dataGen.convertToStringList(dataGen.generateUpdates(commitTime, 10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(tablePath)
}
/**
* Hudi also provides capability to obtain a stream of records that changed since given commit timestamp.
* This can be achieved using Hudis incremental view and providing a begin time from which changes need to be streamed.
* We do not need to specify endTime, if we want all changes after the given commit (as is the common case).
*/
def incrementalQuery(spark: SparkSession, tablePath: String, tableName: String) {
import spark.implicits._
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2) // commit time we are interested in
// incrementally query data
val incViewDF = spark.
read.
format("org.apache.hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(tablePath)
incViewDF.createOrReplaceTempView("hudi_incr_table")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
}
/**
* Lets look at how to query data as of a specific time.
* The specific time can be represented by pointing endTime to a specific commit time
* and beginTime to “000” (denoting earliest possible commit time).
*/
def pointInTimeQuery(spark: SparkSession, tablePath: String, tableName: String) {
import spark.implicits._
val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_ro_table order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = "000" // Represents all commits > this time.
val endTime = commits(commits.length - 2) // commit time we are interested in
//incrementally query data
val incViewDF = spark.read.format("org.apache.hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(tablePath)
incViewDF.createOrReplaceTempView("hudi_incr_table")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_incr_table where fare > 20.0").show()
}
}

10
pom.xml
View File

@@ -51,6 +51,7 @@
<module>packaging/hudi-timeline-server-bundle</module>
<module>docker/hoodie/hadoop</module>
<module>hudi-integ-test</module>
<module>hudi-examples</module>
</modules>
<licenses>
@@ -267,6 +268,15 @@
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${scala-maven-plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>