[HUDI-2864] Fix README and scripts with current limitations of hive sync (#4129)
* Fix README with current limitations of hive sync * Fix README with current limitations of hive sync * Fix dep issue * Fix Copy on Write flow Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
This commit is contained in:
@@ -18,17 +18,17 @@
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import java.util.Properties;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
|
||||
public abstract class FileIdPrefixProvider {
|
||||
|
||||
private final Properties props;
|
||||
private final TypedProperties props;
|
||||
|
||||
public FileIdPrefixProvider(Properties props) {
|
||||
public FileIdPrefixProvider(TypedProperties props) {
|
||||
this.props = props;
|
||||
}
|
||||
|
||||
public Properties getProps() {
|
||||
public TypedProperties getProps() {
|
||||
return props;
|
||||
}
|
||||
|
||||
|
||||
@@ -18,13 +18,12 @@
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class RandomFileIdPrefixProvider extends FileIdPrefixProvider {
|
||||
|
||||
public RandomFileIdPrefixProvider(Properties props) {
|
||||
public RandomFileIdPrefixProvider(TypedProperties props) {
|
||||
super(props);
|
||||
}
|
||||
|
||||
|
||||
@@ -29,34 +29,30 @@ The first thing you need to do to start using this connector is building it. In
|
||||
- Install [kcat](https://github.com/edenhill/kcat)
|
||||
= Install jq. `brew install jq`
|
||||
|
||||
After installing these dependencies, execute the following commands. This will install all the Hudi dependency jars,
|
||||
including the fat packaged jar that contains all the dependencies required for a functional Hudi Kafka Connect Sink.
|
||||
|
||||
|
||||
```bash
|
||||
mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
|
||||
```
|
||||
|
||||
Next, we need to make sure that the hudi sink connector bundle jar is in Kafka Connect classpath. Note that the connect
|
||||
classpath should be same as the one configured in the connector configuration file.
|
||||
|
||||
```bash
|
||||
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/java/hudi-kafka-connect/
|
||||
```
|
||||
|
||||
|
||||
## Trying the connector
|
||||
|
||||
After building the package, we need to install the Apache Kafka
|
||||
After installing these dependencies, follow steps based on your requirement.
|
||||
|
||||
### 1 - Starting the environment
|
||||
|
||||
For runtime dependencies, we encourage using the confluent HDFS connector jars. We have tested our setup with version `10.1.0`.
|
||||
After downloading the connector, copy the jars from the lib folder to the Kafka Connect classpath.
|
||||
Either use confluent-hub to install the connector or download it from [here](https://tinyurl.com/yb472f79).
|
||||
|
||||
Copy the entire folder to the classpath that will be used by the Hudi Kafka Connector.
|
||||
|
||||
```bash
|
||||
confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0
|
||||
cp confluentinc-kafka-connect-hdfs-10.1.0/* /usr/local/share/kafka/plugins/
|
||||
```
|
||||
|
||||
Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it
|
||||
to the plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`)
|
||||
```bash
|
||||
cd ${HUDI_DIR}
|
||||
mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
|
||||
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib
|
||||
```
|
||||
Add `confluentinc-kafka-connect-hdfs-10.1.0/lib` to the plugin.path (comma separated) in $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
|
||||
|
||||
### 2 - Set up the docker containers
|
||||
|
||||
@@ -80,7 +76,7 @@ Essentially, follow the steps listed here:
|
||||
|
||||
Bring up the docker containers
|
||||
```bash
|
||||
cd $HUDI_DIR/docker
|
||||
cd ${HUDI_DIR}/docker
|
||||
./setup_demo.sh
|
||||
```
|
||||
|
||||
@@ -93,7 +89,7 @@ registries, we use Confluent schema registry. Download the
|
||||
latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema
|
||||
registry service.
|
||||
|
||||
NOTE: You might need to change the port from `8081` to `8082`.
|
||||
NOTE: You must change the port from `8081` (default) to `8082` to avoid conflict.
|
||||
|
||||
```bash
|
||||
cd $CONFLUENT_DIR
|
||||
@@ -137,8 +133,8 @@ Note that if multiple workers need to be run, the webserver needs to be reconfig
|
||||
successful running of the workers.
|
||||
|
||||
```bash
|
||||
cd $KAFKA_HOME
|
||||
./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
|
||||
cd ${KAFKA_HOME}
|
||||
./bin/connect-distributed.sh ${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties
|
||||
```
|
||||
|
||||
### 7 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)
|
||||
@@ -155,6 +151,7 @@ curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect
|
||||
Now, you should see that the connector is created and tasks are running.
|
||||
|
||||
```bash
|
||||
mkdir /tmp/hoodie/
|
||||
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors
|
||||
["hudi-sink"]
|
||||
curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq
|
||||
@@ -191,52 +188,7 @@ total 5168
|
||||
-rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
|
||||
```
|
||||
|
||||
### 8- Querying via Hive
|
||||
|
||||
```bash
|
||||
docker exec -it adhoc-2 /bin/bash
|
||||
beeline -u jdbc:hive2://hiveserver:10000 \
|
||||
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
|
||||
--hiveconf hive.stats.autogather=false
|
||||
|
||||
|
||||
# List Tables
|
||||
0: jdbc:hive2://hiveserver:10000> show tables;
|
||||
+---------------------+--+
|
||||
| tab_name |
|
||||
+---------------------+--+
|
||||
| huditesttopic_ro |
|
||||
| huditesttopic_rt |
|
||||
+---------------------+--+
|
||||
3 rows selected (1.199 seconds)
|
||||
0: jdbc:hive2://hiveserver:10000>
|
||||
|
||||
|
||||
# Look at partitions that were added
|
||||
0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_rt;
|
||||
+-------------------+--+
|
||||
| partition |
|
||||
+-------------------+--+
|
||||
| date=partition_0 |
|
||||
| date=partition_1 |
|
||||
| date=partition_2 |
|
||||
| date=partition_3 |
|
||||
| date=partition_4 |
|
||||
+-------------------+--+
|
||||
1 row selected (0.24 seconds)
|
||||
|
||||
|
||||
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_rt;
|
||||
+----------------------+---------+----------------------+---------+------------+-----------+--+
|
||||
| _hoodie_commit_time | symbol | ts | volume | open | close |
|
||||
+----------------------+---------+----------------------+---------+------------+-----------+--+
|
||||
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
|
||||
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
|
||||
+----------------------+---------+----------------------+---------+------------+-----------+--+
|
||||
```
|
||||
|
||||
|
||||
### 9 - Run async compaction and clustering if scheduled
|
||||
### 8 - Run async compaction and clustering if scheduled
|
||||
|
||||
When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is
|
||||
running. Inline compaction and clustering are disabled by default due to performance reason. By default, async
|
||||
@@ -365,3 +317,68 @@ hoodie.write.concurrency.mode=single_writer
|
||||
|
||||
Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled
|
||||
clustering is going to be executed.
|
||||
|
||||
### 9- Querying via Hive
|
||||
|
||||
In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore,
|
||||
that enable querying via Hive, Presto etc.
|
||||
|
||||
Firstly, (re)-install a different connector that is configured to write the Hudi table to Hdfs instead of local filesystem.
|
||||
|
||||
```bash
|
||||
curl -X DELETE http://localhost:8083/connectors/hudi-sink
|
||||
curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect/demo/config-sink-hive.json http://localhost:8083/connectors
|
||||
```
|
||||
|
||||
After running the connector, you can query the hive server using the following steps:
|
||||
|
||||
```bash
|
||||
docker exec -it adhoc-2 /bin/bash
|
||||
beeline -u jdbc:hive2://hiveserver:10000 \
|
||||
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
|
||||
--hiveconf hive.stats.autogather=false
|
||||
|
||||
|
||||
# List Tables
|
||||
0: jdbc:hive2://hiveserver:10000> show tables;
|
||||
+---------------------+--+
|
||||
| tab_name |
|
||||
+---------------------+--+
|
||||
| huditesttopic_ro |
|
||||
| huditesttopic_rt |
|
||||
+---------------------+--+
|
||||
3 rows selected (1.199 seconds)
|
||||
0: jdbc:hive2://hiveserver:10000>
|
||||
|
||||
|
||||
# Look at partitions that were added
|
||||
0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_ro;
|
||||
+-------------------+--+
|
||||
| partition |
|
||||
+-------------------+--+
|
||||
| date=partition_0 |
|
||||
| date=partition_1 |
|
||||
| date=partition_2 |
|
||||
| date=partition_3 |
|
||||
| date=partition_4 |
|
||||
+-------------------+--+
|
||||
1 row selected (0.24 seconds)
|
||||
|
||||
|
||||
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_ro;
|
||||
+----------------------+---------+----------------------+---------+------------+-----------+--+
|
||||
| _hoodie_commit_time | symbol | ts | volume | open | close |
|
||||
+----------------------+---------+----------------------+---------+------------+-----------+--+
|
||||
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
|
||||
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
|
||||
+----------------------+---------+----------------------+---------+------------+-----------+--+
|
||||
```
|
||||
|
||||
`Current Limitation:` The Hudi Kafka Connect sink uses `Merge-On-Read` by default, and inserts/ appends the kafka records
|
||||
directly to the log file(s). Asynchronously, compaction service can be executed to merge the log files into base file (Parquet format).
|
||||
Generally, we support both Read-Optimized that reads only parquet base files and Snapshot queries that read and merge
|
||||
records across base and log files. However, currently there is a limitation where we are not able to read records from
|
||||
only log files. Hence, the queries for Hudi Kafka Connect will only work after compaction merges the records into base files. Alternatively,
|
||||
users have the option to reconfigure the table type to `COPY_ON_WRITE` in config-sink.json.
|
||||
|
||||
|
||||
|
||||
31
hudi-kafka-connect/demo/config-sink-hive.json
Normal file
31
hudi-kafka-connect/demo/config-sink-hive.json
Normal file
@@ -0,0 +1,31 @@
|
||||
{
|
||||
"name": "hudi-sink",
|
||||
"config": {
|
||||
"bootstrap.servers": "kafkabroker:9092",
|
||||
"connector.class": "org.apache.hudi.connect.HoodieSinkConnector",
|
||||
"tasks.max": "4",
|
||||
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
|
||||
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
|
||||
"value.converter.schemas.enable": "false",
|
||||
"topics": "hudi-test-topic",
|
||||
"hoodie.table.name": "hudi-test-topic",
|
||||
"hoodie.table.type": "MERGE_ON_READ",
|
||||
"hoodie.metadata.enable": "false",
|
||||
"hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic",
|
||||
"hoodie.datasource.write.recordkey.field": "volume",
|
||||
"hoodie.datasource.write.partitionpath.field": "date",
|
||||
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
|
||||
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest",
|
||||
"hoodie.kafka.commit.interval.secs": 60,
|
||||
"hoodie.meta.sync.enable": "true",
|
||||
"hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool",
|
||||
"hoodie.datasource.hive_sync.table": "huditesttopic",
|
||||
"hoodie.datasource.hive_sync.partition_fields": "date",
|
||||
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
|
||||
"hoodie.datasource.hive_sync.use_jdbc": "false",
|
||||
"hoodie.datasource.hive_sync.mode": "hms",
|
||||
"dfs.client.use.datanode.hostname": "true",
|
||||
"hive.metastore.uris": "thrift://hivemetastore:9083",
|
||||
"hive.metastore.client.socket.timeout": "1500s"
|
||||
}
|
||||
}
|
||||
@@ -11,21 +11,11 @@
|
||||
"hoodie.table.name": "hudi-test-topic",
|
||||
"hoodie.table.type": "MERGE_ON_READ",
|
||||
"hoodie.metadata.enable": "false",
|
||||
"hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic",
|
||||
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
|
||||
"hoodie.datasource.write.recordkey.field": "volume",
|
||||
"hoodie.datasource.write.partitionpath.field": "date",
|
||||
"hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider",
|
||||
"hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest",
|
||||
"hoodie.kafka.commit.interval.secs": 60,
|
||||
"hoodie.meta.sync.enable": "true",
|
||||
"hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool",
|
||||
"hoodie.datasource.hive_sync.table": "huditesttopic",
|
||||
"hoodie.datasource.hive_sync.partition_fields": "date",
|
||||
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
|
||||
"hoodie.datasource.hive_sync.use_jdbc": "false",
|
||||
"hoodie.datasource.hive_sync.mode": "hms",
|
||||
"dfs.client.use.datanode.hostname": "true",
|
||||
"hive.metastore.uris": "thrift://hivemetastore:9083",
|
||||
"hive.metastore.client.socket.timeout": "1500s"
|
||||
"hoodie.kafka.commit.interval.secs": 60
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,4 +30,4 @@ status.storage.replication.factor=1
|
||||
|
||||
offset.flush.interval.ms=60000
|
||||
listeners=HTTP://:8083
|
||||
plugin.path=/usr/local/share/java
|
||||
plugin.path=/usr/local/share/kafka/plugins
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.connect;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.connect.utils.KafkaConnectUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.table.FileIdPrefixProvider;
|
||||
@@ -25,8 +26,6 @@ import org.apache.hudi.table.FileIdPrefixProvider;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
|
||||
|
||||
public static final String KAFKA_CONNECT_PARTITION_ID = "hudi.kafka.connect.partition";
|
||||
@@ -34,7 +33,7 @@ public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider {
|
||||
|
||||
private final String kafkaPartition;
|
||||
|
||||
public KafkaConnectFileIdPrefixProvider(Properties props) {
|
||||
public KafkaConnectFileIdPrefixProvider(TypedProperties props) {
|
||||
super(props);
|
||||
if (!props.containsKey(KAFKA_CONNECT_PARTITION_ID)) {
|
||||
LOG.error("Fatal error due to Kafka Connect Partition Id is not set");
|
||||
|
||||
@@ -80,6 +80,7 @@
|
||||
<include>org.apache.hudi:hudi-sync-common</include>
|
||||
<include>org.apache.hudi:hudi-hadoop-mr</include>
|
||||
<include>org.apache.hudi:hudi-timeline-service</include>
|
||||
<include>org.apache.hudi:hudi-aws</include>
|
||||
|
||||
<!-- NOTE: This is temp (SchemaProvide dep) until PR3162 lands -->
|
||||
<include>org.apache.hudi:hudi-flink_${scala.binary.version}</include>
|
||||
|
||||
Reference in New Issue
Block a user