diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java index d06da9b0d..0fc082318 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/FileIdPrefixProvider.java @@ -18,17 +18,17 @@ package org.apache.hudi.table; -import java.util.Properties; +import org.apache.hudi.common.config.TypedProperties; public abstract class FileIdPrefixProvider { - private final Properties props; + private final TypedProperties props; - public FileIdPrefixProvider(Properties props) { + public FileIdPrefixProvider(TypedProperties props) { this.props = props; } - public Properties getProps() { + public TypedProperties getProps() { return props; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java index 89d993460..5ad3eedf4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/RandomFileIdPrefixProvider.java @@ -18,13 +18,12 @@ package org.apache.hudi.table; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; -import java.util.Properties; - public class RandomFileIdPrefixProvider extends FileIdPrefixProvider { - public RandomFileIdPrefixProvider(Properties props) { + public RandomFileIdPrefixProvider(TypedProperties props) { super(props); } diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index 33adb5b1d..0bf53d6c4 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -29,34 +29,30 @@ The first thing you need to do to start using this connector is building it. In - Install [kcat](https://github.com/edenhill/kcat) = Install jq. `brew install jq` -After installing these dependencies, execute the following commands. This will install all the Hudi dependency jars, -including the fat packaged jar that contains all the dependencies required for a functional Hudi Kafka Connect Sink. - - -```bash -mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am -``` - -Next, we need to make sure that the hudi sink connector bundle jar is in Kafka Connect classpath. Note that the connect -classpath should be same as the one configured in the connector configuration file. - -```bash -cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/java/hudi-kafka-connect/ -``` - + ## Trying the connector -After building the package, we need to install the Apache Kafka +After installing these dependencies, follow steps based on your requirement. ### 1 - Starting the environment For runtime dependencies, we encourage using the confluent HDFS connector jars. We have tested our setup with version `10.1.0`. -After downloading the connector, copy the jars from the lib folder to the Kafka Connect classpath. +Either use confluent-hub to install the connector or download it from [here](https://tinyurl.com/yb472f79). + +Copy the entire folder to the classpath that will be used by the Hudi Kafka Connector. ```bash confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0 +cp confluentinc-kafka-connect-hdfs-10.1.0/* /usr/local/share/kafka/plugins/ +``` + +Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it +to the plugin path that contains all the other jars (`/usr/local/share/kafka/plugins/lib`) +```bash +cd ${HUDI_DIR} +mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am +cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib ``` -Add `confluentinc-kafka-connect-hdfs-10.1.0/lib` to the plugin.path (comma separated) in $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties ### 2 - Set up the docker containers @@ -80,7 +76,7 @@ Essentially, follow the steps listed here: Bring up the docker containers ```bash -cd $HUDI_DIR/docker +cd ${HUDI_DIR}/docker ./setup_demo.sh ``` @@ -93,7 +89,7 @@ registries, we use Confluent schema registry. Download the latest [confluent platform](https://docs.confluent.io/platform/current/installation/index.html) and run the schema registry service. -NOTE: You might need to change the port from `8081` to `8082`. +NOTE: You must change the port from `8081` (default) to `8082` to avoid conflict. ```bash cd $CONFLUENT_DIR @@ -137,8 +133,8 @@ Note that if multiple workers need to be run, the webserver needs to be reconfig successful running of the workers. ```bash -cd $KAFKA_HOME -./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties +cd ${KAFKA_HOME} +./bin/connect-distributed.sh ${HUDI_DIR}/hudi-kafka-connect/demo/connect-distributed.properties ``` ### 7 - To add the Hudi Sink to the Connector (delete it if you want to re-configure) @@ -155,6 +151,7 @@ curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect Now, you should see that the connector is created and tasks are running. ```bash +mkdir /tmp/hoodie/ curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors ["hudi-sink"] curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq @@ -191,52 +188,7 @@ total 5168 -rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet ``` -### 8- Querying via Hive - -```bash -docker exec -it adhoc-2 /bin/bash -beeline -u jdbc:hive2://hiveserver:10000 \ - --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \ - --hiveconf hive.stats.autogather=false - - -# List Tables -0: jdbc:hive2://hiveserver:10000> show tables; -+---------------------+--+ -| tab_name | -+---------------------+--+ -| huditesttopic_ro | -| huditesttopic_rt | -+---------------------+--+ -3 rows selected (1.199 seconds) -0: jdbc:hive2://hiveserver:10000> - - -# Look at partitions that were added -0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_rt; -+-------------------+--+ -| partition | -+-------------------+--+ -| date=partition_0 | -| date=partition_1 | -| date=partition_2 | -| date=partition_3 | -| date=partition_4 | -+-------------------+--+ -1 row selected (0.24 seconds) - - -0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_rt; -+----------------------+---------+----------------------+---------+------------+-----------+--+ -| _hoodie_commit_time | symbol | ts | volume | open | close | -+----------------------+---------+----------------------+---------+------------+-----------+--+ -| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | -| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | -+----------------------+---------+----------------------+---------+------------+-----------+--+ -``` - - -### 9 - Run async compaction and clustering if scheduled +### 8 - Run async compaction and clustering if scheduled When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is running. Inline compaction and clustering are disabled by default due to performance reason. By default, async @@ -365,3 +317,68 @@ hoodie.write.concurrency.mode=single_writer Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled clustering is going to be executed. + +### 9- Querying via Hive + +In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore, +that enable querying via Hive, Presto etc. + +Firstly, (re)-install a different connector that is configured to write the Hudi table to Hdfs instead of local filesystem. + +```bash +curl -X DELETE http://localhost:8083/connectors/hudi-sink +curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect/demo/config-sink-hive.json http://localhost:8083/connectors +``` + +After running the connector, you can query the hive server using the following steps: + +```bash +docker exec -it adhoc-2 /bin/bash +beeline -u jdbc:hive2://hiveserver:10000 \ + --hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \ + --hiveconf hive.stats.autogather=false + + +# List Tables +0: jdbc:hive2://hiveserver:10000> show tables; ++---------------------+--+ +| tab_name | ++---------------------+--+ +| huditesttopic_ro | +| huditesttopic_rt | ++---------------------+--+ +3 rows selected (1.199 seconds) +0: jdbc:hive2://hiveserver:10000> + + +# Look at partitions that were added +0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_ro; ++-------------------+--+ +| partition | ++-------------------+--+ +| date=partition_0 | +| date=partition_1 | +| date=partition_2 | +| date=partition_3 | +| date=partition_4 | ++-------------------+--+ +1 row selected (0.24 seconds) + + +0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_ro; ++----------------------+---------+----------------------+---------+------------+-----------+--+ +| _hoodie_commit_time | symbol | ts | volume | open | close | ++----------------------+---------+----------------------+---------+------------+-----------+--+ +| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 | +| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 | ++----------------------+---------+----------------------+---------+------------+-----------+--+ +``` + +`Current Limitation:` The Hudi Kafka Connect sink uses `Merge-On-Read` by default, and inserts/ appends the kafka records +directly to the log file(s). Asynchronously, compaction service can be executed to merge the log files into base file (Parquet format). +Generally, we support both Read-Optimized that reads only parquet base files and Snapshot queries that read and merge +records across base and log files. However, currently there is a limitation where we are not able to read records from +only log files. Hence, the queries for Hudi Kafka Connect will only work after compaction merges the records into base files. Alternatively, +users have the option to reconfigure the table type to `COPY_ON_WRITE` in config-sink.json. + + diff --git a/hudi-kafka-connect/demo/config-sink-hive.json b/hudi-kafka-connect/demo/config-sink-hive.json new file mode 100644 index 000000000..bf7e99833 --- /dev/null +++ b/hudi-kafka-connect/demo/config-sink-hive.json @@ -0,0 +1,31 @@ +{ + "name": "hudi-sink", + "config": { + "bootstrap.servers": "kafkabroker:9092", + "connector.class": "org.apache.hudi.connect.HoodieSinkConnector", + "tasks.max": "4", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter.schemas.enable": "false", + "topics": "hudi-test-topic", + "hoodie.table.name": "hudi-test-topic", + "hoodie.table.type": "MERGE_ON_READ", + "hoodie.metadata.enable": "false", + "hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic", + "hoodie.datasource.write.recordkey.field": "volume", + "hoodie.datasource.write.partitionpath.field": "date", + "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", + "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest", + "hoodie.kafka.commit.interval.secs": 60, + "hoodie.meta.sync.enable": "true", + "hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool", + "hoodie.datasource.hive_sync.table": "huditesttopic", + "hoodie.datasource.hive_sync.partition_fields": "date", + "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", + "hoodie.datasource.hive_sync.use_jdbc": "false", + "hoodie.datasource.hive_sync.mode": "hms", + "dfs.client.use.datanode.hostname": "true", + "hive.metastore.uris": "thrift://hivemetastore:9083", + "hive.metastore.client.socket.timeout": "1500s" + } +} diff --git a/hudi-kafka-connect/demo/config-sink.json b/hudi-kafka-connect/demo/config-sink.json index bf7e99833..9d1aedff6 100644 --- a/hudi-kafka-connect/demo/config-sink.json +++ b/hudi-kafka-connect/demo/config-sink.json @@ -11,21 +11,11 @@ "hoodie.table.name": "hudi-test-topic", "hoodie.table.type": "MERGE_ON_READ", "hoodie.metadata.enable": "false", - "hoodie.base.path": "hdfs://namenode:8020/user/hive/warehouse/hudi-test-topic", + "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic", "hoodie.datasource.write.recordkey.field": "volume", "hoodie.datasource.write.partitionpath.field": "date", "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8082/subjects/hudi-test-topic/versions/latest", - "hoodie.kafka.commit.interval.secs": 60, - "hoodie.meta.sync.enable": "true", - "hoodie.meta.sync.classes": "org.apache.hudi.hive.HiveSyncTool", - "hoodie.datasource.hive_sync.table": "huditesttopic", - "hoodie.datasource.hive_sync.partition_fields": "date", - "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", - "hoodie.datasource.hive_sync.use_jdbc": "false", - "hoodie.datasource.hive_sync.mode": "hms", - "dfs.client.use.datanode.hostname": "true", - "hive.metastore.uris": "thrift://hivemetastore:9083", - "hive.metastore.client.socket.timeout": "1500s" + "hoodie.kafka.commit.interval.secs": 60 } } diff --git a/hudi-kafka-connect/demo/connect-distributed.properties b/hudi-kafka-connect/demo/connect-distributed.properties index 172e84789..1c28bc60d 100644 --- a/hudi-kafka-connect/demo/connect-distributed.properties +++ b/hudi-kafka-connect/demo/connect-distributed.properties @@ -30,4 +30,4 @@ status.storage.replication.factor=1 offset.flush.interval.ms=60000 listeners=HTTP://:8083 -plugin.path=/usr/local/share/java +plugin.path=/usr/local/share/kafka/plugins diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java index 9c4674706..436366709 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/KafkaConnectFileIdPrefixProvider.java @@ -18,6 +18,7 @@ package org.apache.hudi.connect; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.connect.utils.KafkaConnectUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.table.FileIdPrefixProvider; @@ -25,8 +26,6 @@ import org.apache.hudi.table.FileIdPrefixProvider; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import java.util.Properties; - public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider { public static final String KAFKA_CONNECT_PARTITION_ID = "hudi.kafka.connect.partition"; @@ -34,7 +33,7 @@ public class KafkaConnectFileIdPrefixProvider extends FileIdPrefixProvider { private final String kafkaPartition; - public KafkaConnectFileIdPrefixProvider(Properties props) { + public KafkaConnectFileIdPrefixProvider(TypedProperties props) { super(props); if (!props.containsKey(KAFKA_CONNECT_PARTITION_ID)) { LOG.error("Fatal error due to Kafka Connect Partition Id is not set"); diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index a828132d8..cd7b151ce 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -80,6 +80,7 @@ org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr org.apache.hudi:hudi-timeline-service + org.apache.hudi:hudi-aws org.apache.hudi:hudi-flink_${scala.binary.version}