1
0

[HUDI-3188] Update quick start guide for Kafka Connect Sink for Hudi (#4527)

This commit is contained in:
Y Ethan Guo
2022-01-07 04:56:08 -08:00
committed by GitHub
parent 2467c137e4
commit 76a72641f1

View File

@@ -27,7 +27,7 @@ The first thing you need to do to start using this connector is building it. In
- [Java 1.8+](https://openjdk.java.net/) - [Java 1.8+](https://openjdk.java.net/)
- [Apache Maven](https://maven.apache.org/) - [Apache Maven](https://maven.apache.org/)
- Install [kcat](https://github.com/edenhill/kcat) - Install [kcat](https://github.com/edenhill/kcat)
= Install jq. `brew install jq` - Install jq. `brew install jq`
## Trying the connector ## Trying the connector
@@ -42,16 +42,20 @@ Either use confluent-hub to install the connector or download it from [here](htt
Copy the entire folder to the classpath that will be used by the Hudi Kafka Connector. Copy the entire folder to the classpath that will be used by the Hudi Kafka Connector.
```bash ```bash
confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0 export CONFLUENT_DIR=/path/to/confluent_install_dir
cp confluentinc-kafka-connect-hdfs-10.1.0/* /usr/local/share/kafka/plugins/ mkdir -p /usr/local/share/kafka/plugins
$CONFLUENT_DIR/bin/confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0
cp -r $CONFLUENT_DIR/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/* /usr/local/share/kafka/plugins/
``` ```
Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it
to the plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`) to the plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`)
```bash ```bash
cd ${HUDI_DIR} cd $HUDI_DIR
mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib mkdir -p /usr/local/share/kafka/plugins/lib
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib
``` ```
Set up a Kafka broker locally. Download the latest apache kafka from [here](https://kafka.apache.org/downloads). Set up a Kafka broker locally. Download the latest apache kafka from [here](https://kafka.apache.org/downloads).
@@ -59,6 +63,7 @@ Once downloaded and built, run the Zookeeper server and Kafka server using the c
```bash ```bash
export KAFKA_HOME=/path/to/kafka_install_dir export KAFKA_HOME=/path/to/kafka_install_dir
cd $KAFKA_HOME cd $KAFKA_HOME
# Run the following commands in separate terminals to keep them running
./bin/zookeeper-server-start.sh ./config/zookeeper.properties ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties ./bin/kafka-server-start.sh ./config/server.properties
``` ```
@@ -71,11 +76,13 @@ registries, we use Confluent schema registry. Download the
latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema
registry service. registry service.
NOTE: You must change the port from `8081` (default) to `8082` to avoid conflict. NOTE: You must change the port from `8081` (default) to `8082` to avoid conflict, i.e.,
using `listeners=http://0.0.0.0:8082` in the properties file `etc/schema-registry/schema-registry.properties`.
```bash ```bash
cd $CONFLUENT_DIR cd $CONFLUENT_DIR
/bin/kafka-configs --zookeeper localhost --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact ./bin/kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact
# Make sure you have changed the listener port as above
./bin/schema-registry-start etc/schema-registry/schema-registry.properties ./bin/schema-registry-start etc/schema-registry/schema-registry.properties
``` ```
@@ -85,6 +92,8 @@ The control topic should only have `1` partition, since its used to coordinate t
```bash ```bash
cd $KAFKA_HOME cd $KAFKA_HOME
# The following command is expected to throw an error if the control topic does not exist.
# "Error while executing topic command : Topic 'hudi-control-topic' does not exist as expected"
./bin/kafka-topics.sh --delete --topic hudi-control-topic --bootstrap-server localhost:9092 ./bin/kafka-topics.sh --delete --topic hudi-control-topic --bootstrap-server localhost:9092
./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 ./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
``` ```
@@ -115,8 +124,8 @@ Note that if multiple workers need to be run, the webserver needs to be reconfig
successful running of the workers. successful running of the workers.
```bash ```bash
cd ${KAFKA_HOME} cd $KAFKA_HOME
./bin/connect-distributed.sh ${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties ./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
``` ```
### 6 - To add the Hudi Sink to the Connector (delete it if you want to re-configure) ### 6 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)
@@ -133,21 +142,54 @@ curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect
Now, you should see that the connector is created and tasks are running. Now, you should see that the connector is created and tasks are running.
```bash ```bash
mkdir /tmp/hoodie/ > curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors
["hudi-sink"] ["hudi-sink"]
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq
> curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 329 100 329 0 0 21096 0 --:--:-- --:--:-- --:--:-- 36555
{
"name": "hudi-sink",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "sink"
}
``` ```
And, you should see your Hudi table created, which you can query using Spark/Flink. And, you should see your Hudi table created, which you can query using Spark/Flink. Note: HUDI-2325 tracks Hive sync,
Note: HUDI-2325 tracks Hive sync, which will unlock pretty much every other query engine. which will unlock pretty much every other query engine.
```bash ```bash
ls -a /tmp/hoodie/hudi-test-topic > ls -a /tmp/hoodie/hudi-test-topic
. .hoodie partition_1 partition_3 . .hoodie partition_1 partition_3
.. partition_0 partition_2 partition_4 .. partition_0 partition_2 partition_4
ls -lt /tmp/hoodie/hudi-test-topic/.hoodie > ls -lt /tmp/hoodie/hudi-test-topic/.hoodie
total 72 total 72
-rw-r--r-- 1 user wheel 346 Sep 14 10:32 hoodie.properties -rw-r--r-- 1 user wheel 346 Sep 14 10:32 hoodie.properties
-rw-r--r-- 1 user wheel 0 Sep 13 23:18 20210913231805.inflight -rw-r--r-- 1 user wheel 0 Sep 13 23:18 20210913231805.inflight
@@ -160,7 +202,7 @@ total 72
-rw-r--r-- 1 user wheel 0 Sep 13 21:41 20210913214114.commit.requested -rw-r--r-- 1 user wheel 0 Sep 13 21:41 20210913214114.commit.requested
drwxr-xr-x 2 user wheel 64 Sep 13 21:41 archived drwxr-xr-x 2 user wheel 64 Sep 13 21:41 archived
ls -l /tmp/hoodie/hudi-test-topic/partition_0 > ls -l /tmp/hoodie/hudi-test-topic/partition_0
total 5168 total 5168
-rw-r--r-- 1 user wheel 439332 Sep 13 21:43 2E0E6DB44ACC8479059574A2C71C7A7E-0_0-0-0_20210913214114.parquet -rw-r--r-- 1 user wheel 439332 Sep 13 21:43 2E0E6DB44ACC8479059574A2C71C7A7E-0_0-0-0_20210913214114.parquet
-rw-r--r-- 1 user wheel 440179 Sep 13 21:42 3B56FAAAE2BDD04E480C1CBACD463D3E-0_0-0-0_20210913214114.parquet -rw-r--r-- 1 user wheel 440179 Sep 13 21:42 3B56FAAAE2BDD04E480C1CBACD463D3E-0_0-0-0_20210913214114.parquet
@@ -185,7 +227,7 @@ After the compaction is scheduled, you can see the requested compaction instant
below: below:
``` ```
ls -l /tmp/hoodie/hudi-test-topic/.hoodie > ls -l /tmp/hoodie/hudi-test-topic/.hoodie
total 280 total 280
-rw-r--r-- 1 user wheel 21172 Nov 11 11:09 20211111110807.deltacommit -rw-r--r-- 1 user wheel 21172 Nov 11 11:09 20211111110807.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.inflight -rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.inflight
@@ -245,7 +287,7 @@ Similarly, you can see the requested clustering instant (`20211111111813.replace
by the Sink: by the Sink:
``` ```
ls -l /tmp/hoodie/hudi-test-topic/.hoodie > ls -l /tmp/hoodie/hudi-test-topic/.hoodie
total 736 total 736
-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit -rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight -rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight
@@ -300,7 +342,7 @@ hoodie.write.concurrency.mode=single_writer
Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled
clustering is going to be executed. clustering is going to be executed.
### 8- Querying via Hive ### 8 - Querying via Hive
In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore, In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore,
that enable querying via Hive, Presto etc. that enable querying via Hive, Presto etc.
@@ -326,7 +368,7 @@ Essentially, follow the steps listed here:
Bring up the docker containers Bring up the docker containers
```bash ```bash
cd ${HUDI_DIR}/docker cd $HUDI_DIR/docker
./setup_demo.sh ./setup_demo.sh
``` ```
@@ -345,7 +387,6 @@ beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \ --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false --hiveconf hive.stats.autogather=false
# List Tables # List Tables
0: jdbc:hive2://hiveserver:10000> show tables; 0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+ +---------------------+--+
@@ -357,7 +398,6 @@ beeline -u jdbc:hive2://hiveserver:10000 \
3 rows selected (1.199 seconds) 3 rows selected (1.199 seconds)
0: jdbc:hive2://hiveserver:10000> 0: jdbc:hive2://hiveserver:10000>
# Look at partitions that were added # Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_ro; 0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_ro;
+-------------------+--+ +-------------------+--+
@@ -371,7 +411,6 @@ beeline -u jdbc:hive2://hiveserver:10000 \
+-------------------+--+ +-------------------+--+
1 row selected (0.24 seconds) 1 row selected (0.24 seconds)
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_ro; 0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_ro;
+----------------------+---------+----------------------+---------+------------+-----------+--+ +----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close | | _hoodie_commit_time | symbol | ts | volume | open | close |