diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java new file mode 100644 index 000000000..c302c1db0 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaRegistryProvider.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.schema; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.util.StreamerUtil; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.Collections; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Obtains latest schema from the Confluent/Kafka schema-registry. + *

+ * https://github.com/confluentinc/schema-registry + */ +public class SchemaRegistryProvider extends SchemaProvider { + + private final TypedProperties config; + + + /** + * Configs supported. + */ + public static class Config { + + private static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url"; + private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = + "hoodie.deltastreamer.schemaprovider.registry.targetUrl"; + } + + /** + * The method takes the provided url {@code registryUrl} and gets the schema from the schema registry using that url. + * If the caller provides userInfo credentials in the url (e.g "https://foo:bar@schemaregistry.org") then the credentials + * are extracted the url using the Matcher and the extracted credentials are set on the request as an Authorization + * header. + * + * @param registryUrl + * @return the Schema in String form. + * @throws IOException + */ + public String fetchSchemaFromRegistry(String registryUrl) throws IOException { + URL registry; + HttpURLConnection connection; + Matcher matcher = Pattern.compile("://(.*?)@").matcher(registryUrl); + if (matcher.find()) { + String creds = matcher.group(1); + String urlWithoutCreds = registryUrl.replace(creds + "@", ""); + registry = new URL(urlWithoutCreds); + connection = (HttpURLConnection) registry.openConnection(); + setAuthorizationHeader(matcher.group(1), connection); + } else { + registry = new URL(registryUrl); + connection = (HttpURLConnection) registry.openConnection(); + } + ObjectMapper mapper = new ObjectMapper(); + JsonNode node = mapper.readTree(getStream(connection)); + return node.get("schema").asText(); + } + + protected void setAuthorizationHeader(String creds, HttpURLConnection connection) { + String encodedAuth = Base64.getEncoder().encodeToString(creds.getBytes(StandardCharsets.UTF_8)); + connection.setRequestProperty("Authorization", "Basic " + encodedAuth); + } + + protected InputStream getStream(HttpURLConnection connection) throws IOException { + return connection.getInputStream(); + } + + public SchemaRegistryProvider(TypedProperties props) { + this.config = props; + StreamerUtil.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP)); + } + + private Schema getSchema(String registryUrl) throws IOException { + return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl)); + } + + @Override + public Schema getSourceSchema() { + String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP); + try { + return getSchema(registryUrl); + } catch (IOException ioe) { + throw new HoodieIOException("Error reading source schema from registry :" + registryUrl, ioe); + } + } + + @Override + public Schema getTargetSchema() { + String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP); + String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl); + try { + return getSchema(targetRegistryUrl); + } catch (IOException ioe) { + throw new HoodieIOException("Error reading target schema from registry :" + registryUrl, ioe); + } + } +} diff --git a/hudi-kafka-connect/README.md b/hudi-kafka-connect/README.md index fd0a5d010..b9ba9662d 100644 --- a/hudi-kafka-connect/README.md +++ b/hudi-kafka-connect/README.md @@ -15,32 +15,35 @@ * See the License for the specific language governing permissions and --> -# Quick Start guide for Kafka Connect Sink for Hudi +# Quick Start (demo) guide for Kafka Connect Sink for Hudi This repo contains a sample project that can be used to start off your own source connector for Kafka Connect. -## Building the connector +## Building the Hudi Sink Connector The first thing you need to do to start using this connector is building it. In order to do that, you need to install the following dependencies: - [Java 1.8+](https://openjdk.java.net/) - [Apache Maven](https://maven.apache.org/) +- Install [kcat](https://github.com/edenhill/kcat) -After installing these dependencies, execute the following command: +After installing these dependencies, execute the following commands. This will install all the Hudi dependency jars, +including the fat packaged jar that contains all the dependencies required for a functional Hudi Kafka Connect Sink. ```bash cd $HUDI_DIR -mvn clean package +mvn clean -DskipTests install ``` -## Incremental Builds +Henceforth, incremental builds can be performed as follows. ```bash mvn clean -pl hudi-kafka-connect install -DskipTests mvn clean -pl packaging/hudi-kafka-connect-bundle install ``` -## Put hudi connector in Kafka Connect classpath +Next, we need to make sure that the hudi sink connector bundle jar is in Kafka Connect classpath. Note that the connect +classpath should be same as the one configured in the connector configuration file. ```bash cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.10.0-SNAPSHOT.jar /usr/local/share/java/hudi-kafka-connect/ @@ -52,43 +55,77 @@ After building the package, we need to install the Apache Kafka ### 1 - Starting the environment -Start the ZK and Kafka: +To try out the Connect Sink locally, set up a Kafka broker locally. Download the latest apache kafka from https://kafka.apache.org/downloads. +Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools. ```bash +export KAFKA_HOME=/path/to/kafka_install_dir +cd $KAFKA_KAFKA_HOME ./bin/zookeeper-server-start.sh ./config/zookeeper.properties ./bin/kafka-server-start.sh ./config/server.properties ``` Wait until the kafka cluster is up and running. -### 2 - Create the Hudi Control Topic for Coordination of the transactions +### 2 - Set up the schema registry -The control topic should only have `1` partition +Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema registries, +we use Confluent schema registry. Download the latest confluent schema registry code from https://github.com/confluentinc/schema-registry +and start the schema registry service. ```bash +cd $CONFLUENT_DIR +./bin/schema-registry-start etc/schema-registry/schema-registry.properties +``` + +### 3 - Create the Hudi Control Topic for Coordination of the transactions + +The control topic should only have `1` partition, since its used to coordinate the Hudi write transactions across the multiple Connect tasks. + +```bash +cd $KAFKA_HOME ./bin/kafka-topics.sh --delete --topic hudi-control-topic --bootstrap-server localhost:9092 ./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 ``` -### 3 - Create the Hudi Topic for the Sink and insert data into the topic +### 4 - Create the Hudi Topic for the Sink and insert data into the topic Open a terminal to execute the following command: ```bash -bash runKafkaTrafficGenerator.sh +cd $HUDI_DIR/demo/ +bash setupKafka.sh -n ``` ### 4 - Run the Sink connector worker (multiple workers can be run) -Open a terminal to execute the following command: +The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks) +that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with +default properties to start a Hudi connector. + +Note that if multiple workers need to be run, the webserver needs to be reconfigured for subsequent workers to ensure +successful running of the workers. ```bash -./bin/connect-distributed.sh ../hudi-kafka-connect/configs/connect-distributed.properties +cd $KAFKA_HOME +./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties ``` ### 5- To add the Hudi Sink to the Connector (delete it if you want to re-configure) +Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following +curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink, +that can be changed based on the desired properties. + ```bash curl -X DELETE http://localhost:8083/connectors/hudi-sink -curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/configs/config-sink.json http://localhost:8083/connectors +curl -X POST -H "Content-Type:application/json" -d @$HUDI-DIR/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors +``` + +Now, you should see that the connector is created and tasks are running. + +```bash +curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors +["hudi-sink"] +curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq ``` diff --git a/hudi-kafka-connect/configs/config-sink.json b/hudi-kafka-connect/demo/config-sink.json similarity index 56% rename from hudi-kafka-connect/configs/config-sink.json rename to hudi-kafka-connect/demo/config-sink.json index 4e94bf541..75e6d8448 100644 --- a/hudi-kafka-connect/configs/config-sink.json +++ b/hudi-kafka-connect/demo/config-sink.json @@ -9,11 +9,10 @@ "value.converter.schemas.enable": "false", "topics": "hudi-test-topic", "hoodie.table.name": "hudi-test-topic", - "hoodie.base.path": "file:///tmp/hoodie/sample-table", + "hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic", "hoodie.datasource.write.recordkey.field": "volume", - "hoodie.datasource.write.partitionpath.field": "year", - "hoodie.schemaprovider.class": "org.apache.hudi.schema.FilebasedSchemaProvider", - "hoodie.deltastreamer.schemaprovider.source.schema.file": "file:///tmp/hoodie/schema.avsc", - "hoodie.deltastreamer.schemaprovider.target.schema.file": "file:///tmp/hoodie/schema.avsc" + "hoodie.datasource.write.partitionpath.field": "date", + "hoodie.schemaprovider.class": "org.apache.hudi.schema.SchemaRegistryProvider", + "hoodie.deltastreamer.schemaprovider.registry.url": "http://localhost:8081/subjects/hudi-test-topic/versions/latest" } } diff --git a/hudi-kafka-connect/configs/connect-distributed.properties b/hudi-kafka-connect/demo/connect-distributed.properties similarity index 94% rename from hudi-kafka-connect/configs/connect-distributed.properties rename to hudi-kafka-connect/demo/connect-distributed.properties index d7d453c69..9e3cec149 100644 --- a/hudi-kafka-connect/configs/connect-distributed.properties +++ b/hudi-kafka-connect/demo/connect-distributed.properties @@ -30,4 +30,4 @@ status.storage.replication.factor=1 offset.flush.interval.ms=60000 listeners=HTTP://:8083 -plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, +plugin.path=/usr/local/share/java diff --git a/hudi-kafka-connect/demo/setupKafka.sh b/hudi-kafka-connect/demo/setupKafka.sh new file mode 100644 index 000000000..f2c173596 --- /dev/null +++ b/hudi-kafka-connect/demo/setupKafka.sh @@ -0,0 +1,131 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash + +## Directories +HOME_DIR=~ +HUDI_DIR=${HOME_DIR}/hudi +KAFKA_HOME=${HOME_DIR}/kafka + +######################### +# The command line help # +######################### +usage() { + echo "Usage: $0" + echo " -n |--num-kafka-records, (required) number of kafka records to generate" + echo " -f |--raw-file, (optional) raw file for the kafka records" + echo " -k |--kafka-topic, (optional) Topic name for Kafka" + echo " -m |--num-kafka-partitions, (optional) number of kafka partitions" + echo " -r |--record-key, (optional) field to use as record key" + echo " -l |--num-hudi-partitions, (optional) number of hudi partitions" + echo " -p |--partition-key, (optional) field to use as partition" + echo " -s |--schema-file, (optional) path of the file containing the schema of the records" + exit 1 +} + +case "$1" in + --help) + usage + exit 0 + ;; +esac + +if [ $# -lt 1 ]; then + echo "Illegal number of parameters" + usage + exit 0 +fi + +## defaults +rawDataFile=${HUDI_DIR}/docker/demo/data/batch_1.json +kafkaTopicName=hudi-test-topic +numKafkaPartitions=4 +recordKey=volume +numHudiPartitions=5 +partitionField=date +schemaFile=${HUDI_DIR}/docker/demo/config/schema.avsc + +while getopts ":n:f:k:m:r:l:p:s:-:" opt; do + case $opt in + n) num_records="$OPTARG" + printf "Argument num-kafka-records is %s\n" "$num_records" + ;; + k) rawDataFile="$OPTARG" + printf "Argument raw-file is %s\n" "$rawDataFile" + ;; + f) kafkaTopicName="$OPTARG" + printf "Argument kafka-topic is %s\n" "$kafkaTopicName" + ;; + m) numKafkaPartitions="$OPTARG" + printf "Argument num-kafka-partitions is %s\n" "$numKafkaPartitions" + ;; + r) recordKey="$OPTARG" + printf "Argument record-key is %s\n" "$recordKey" + ;; + l) numHudiPartitions="$OPTARG" + printf "Argument num-hudi-partitions is %s\n" "$numHudiPartitions" + ;; + p) partitionField="$OPTARG" + printf "Argument partition-key is %s\n" "$partitionField" + ;; + p) schemaFile="$OPTARG" + printf "Argument schema-file is %s\n" "$schemaFile" + ;; + -) echo "Invalid option -$OPTARG" >&2 + ;; +esac +done + +# First delete the existing topic +$KAFKA_HOME/bin/kafka-topics.sh --delete --topic ${kafkaTopicName} --bootstrap-server localhost:9092 + +# Create the topic with 4 partitions +$KAFKA_HOME/bin/kafka-topics.sh --create --topic ${kafkaTopicName} --partitions $numKafkaPartitions --replication-factor 1 --bootstrap-server localhost:9092 + + +# Setup the schema registry +export SCHEMA=`sed 's|/\*|\n&|g;s|*/|&\n|g' ${schemaFile} | sed '/\/\*/,/*\//d' | jq tostring` +curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "{\"schema\": $SCHEMA}" http://localhost:8081/subjects/${kafkaTopicName}/versions +curl -X GET http://localhost:8081/subjects/${kafkaTopicName}/versions/latest + + +# Generate kafka messages from raw records +# Each records with unique keys and generate equal messages across each hudi partition +partitions={} +for ((i=0; i<${numHudiPartitions}; i++)) +do + partitions[$i]="partition-"$i; +done + +for ((recordValue=0; recordValue<=${num_records}; )) +do + while IFS= read line + do + for partitionValue in "${partitions[@]}" + do + echo $line | jq --arg recordKey $recordKey --arg recordValue $recordValue --arg partitionField $partitionField --arg partitionValue $partitionValue -c '.[$recordKey] = $recordValue | .[$partitionField] = $partitionValue' | kafkacat -P -b localhost:9092 -t hudi-test-topic; + ((recordValue++)); + if [ $recordValue -gt ${num_records} ]; then + exit 0 + fi + done + + if [ $(( $recordValue % 1000 )) -eq 0 ] + then sleep 1 + fi + done < "$rawDataFile" +done diff --git a/hudi-kafka-connect/scripts/raw.json b/hudi-kafka-connect/scripts/raw.json deleted file mode 100644 index aa2cc7037..000000000 --- a/hudi-kafka-connect/scripts/raw.json +++ /dev/null @@ -1,5 +0,0 @@ -{"volume": 0, "symbol": "TPNL", "ts": "2017-08-31 09:30:00", "month": "08", "high": 6.37, "low": 1.37, "key": "TPNL_2017-08-31 09", "year": 2017, "date": "2017/08/31", "close": 4.44, "open": 1.37, "day": "31"} -{"volume": 0, "symbol": "SPOT", "ts": "2018-08-31 09:30:00", "month": "08", "high": 1.87, "low": 0.37, "key": "TPNL_2018-08-31 09", "year": 2018, "date": "2018/08/31", "close": 1.44, "open": 1.77, "day": "31"} -{"volume": 0, "symbol": "GOOG", "ts": "2019-08-31 09:30:00", "month": "08", "high": 2.1, "low": 1.7, "key": "TPNL_2019-08-31 09", "year": 2019, "date": "2019/08/31", "close": 1.94, "open": 2.0, "day": "31"} -{"volume": 0, "symbol": "MSFT", "ts": "2020-08-31 09:30:00", "month": "08", "high": 3.33, "low": 0.87, "key": "TPNL_2020-08-31 09", "year": 2020, "date": "2020/08/31", "close": 3.33, "open": 3.1, "day": "31"} -{"volume": 0, "symbol": "APPL", "ts": "2021-08-31 09:30:00", "month": "08", "high": 3.17, "low": 2.37, "key": "TPNL_2021-08-31 09", "year": 2021, "date": "2021/08/31", "close": 2.66, "open": 3.1, "day": "31"} diff --git a/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh b/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh deleted file mode 100644 index cff414070..000000000 --- a/hudi-kafka-connect/scripts/runKafkaTrafficGenerator.sh +++ /dev/null @@ -1,38 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -#!/bin/bash - -# First delete the existing topic -$KAFKA_HOME/bin/kafka-topics.sh --delete --topic hudi-test-topic --bootstrap-server localhost:9092 - -# Create the topic with 4 partitions -$KAFKA_HOME/bin/kafka-topics.sh --create --topic hudi-test-topic --partitions 4 --replication-factor 1 --bootstrap-server localhost:9092 - -# Generate kafka messages from raw records -inputFile="raw.json" -# Generate the records with unique keys -for ((recordKey=0; recordKey<=$1; )) -do - while IFS= read line - do - echo $line | jq --argjson recordKey $recordKey -c '.volume = $recordKey' | kcat -P -b localhost:9092 -t hudi-test-topic - ((recordKey++)) - if [ $(( $recordKey % 1000 )) -eq 0 ] - then sleep 1 - fi - done < "$inputFile" -done diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java index c7dde9a2e..a937a8b82 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/HoodieSinkTask.java @@ -30,11 +30,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -52,7 +54,7 @@ public class HoodieSinkTask extends SinkTask { private static final int COORDINATOR_KAFKA_PARTITION = 0; private final Map transactionCoordinators; - private final Map hudiTransactionParticipants; + private final Map transactionParticipants; private KafkaConnectControlAgent controlKafkaClient; private KafkaConnectConfigs connectConfigs; @@ -60,8 +62,8 @@ public class HoodieSinkTask extends SinkTask { private String connectorName; public HoodieSinkTask() { - transactionCoordinators = new HashMap(); - hudiTransactionParticipants = new HashMap<>(); + transactionCoordinators = new HashMap<>(); + transactionParticipants = new HashMap<>(); } @Override @@ -80,7 +82,6 @@ public class HoodieSinkTask extends SinkTask { controlKafkaClient = KafkaConnectControlAgent.createKafkaControlManager( connectConfigs.getBootstrapServers(), connectConfigs.getControlTopicName()); - bootstrap(context.assignment()); } catch (ConfigException e) { throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e); } catch (ConnectException e) { @@ -98,11 +99,25 @@ public class HoodieSinkTask extends SinkTask { String topic = record.topic(); int partition = record.kafkaPartition(); TopicPartition tp = new TopicPartition(topic, partition); - hudiTransactionParticipants.get(tp).buffer(record); + + TransactionParticipant transactionParticipant = transactionParticipants.get(tp); + if (transactionParticipant != null) { + transactionParticipant.buffer(record); + } } for (TopicPartition partition : context.assignment()) { - hudiTransactionParticipants.get(partition).processRecords(); + if (transactionParticipants.get(partition) == null) { + throw new RetriableException("TransactionParticipant should be created for each assigned partition, " + + "but has not been created for the topic/partition: " + partition.topic() + ":" + partition.partition()); + } + try { + transactionParticipants.get(partition).processRecords(); + } catch (IOException exception) { + throw new RetriableException("Intermittent write errors for Hudi " + + " for the topic/partition: " + partition.topic() + ":" + partition.partition() + + " , ensuring kafka connect will retry ", exception); + } } } @@ -123,12 +138,9 @@ public class HoodieSinkTask extends SinkTask { // committed to Hudi. Map result = new HashMap<>(); for (TopicPartition partition : context.assignment()) { - TransactionParticipant worker = hudiTransactionParticipants.get(partition); - if (worker != null) { - worker.processRecords(); - if (worker.getLastKafkaCommittedOffset() >= 0) { - result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset())); - } + TransactionParticipant worker = transactionParticipants.get(partition); + if (worker != null && worker.getLastKafkaCommittedOffset() >= 0) { + result.put(partition, new OffsetAndMetadata(worker.getLastKafkaCommittedOffset())); } } return result; @@ -158,7 +170,7 @@ public class HoodieSinkTask extends SinkTask { transactionCoordinators.remove(partition); } } - TransactionParticipant worker = hudiTransactionParticipants.remove(partition); + TransactionParticipant worker = transactionParticipants.remove(partition); if (worker != null) { try { LOG.debug("Closing data writer due to task start failure."); @@ -185,7 +197,7 @@ public class HoodieSinkTask extends SinkTask { transactionCoordinators.put(partition, coordinator); } ConnectTransactionParticipant worker = new ConnectTransactionParticipant(connectConfigs, partition, controlKafkaClient, context); - hudiTransactionParticipants.put(partition, worker); + transactionParticipants.put(partition, worker); worker.start(); } catch (HoodieException exception) { LOG.error(String.format("Fatal error initializing task %s for partition %s", taskId, partition.partition()), exception); @@ -195,7 +207,7 @@ public class HoodieSinkTask extends SinkTask { private void cleanup() { for (TopicPartition partition : context.assignment()) { - TransactionParticipant worker = hudiTransactionParticipants.get(partition); + TransactionParticipant worker = transactionParticipants.get(partition); if (worker != null) { try { LOG.debug("Closing data writer due to task start failure."); @@ -205,7 +217,7 @@ public class HoodieSinkTask extends SinkTask { } } } - hudiTransactionParticipants.clear(); + transactionParticipants.clear(); transactionCoordinators.forEach((topic, transactionCoordinator) -> transactionCoordinator.stop()); transactionCoordinators.clear(); } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java index 13291c827..73a30c610 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionCoordinator.java @@ -131,6 +131,7 @@ public class ConnectTransactionCoordinator implements TransactionCoordinator, Ru @Override public void stop() { kafkaControlClient.deregisterTransactionCoordinator(this); + scheduler.shutdownNow(); hasStarted.set(false); if (executorService != null) { boolean terminated = false; diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java index fe1996e65..c3950717d 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ConnectTransactionParticipant.java @@ -32,7 +32,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -111,7 +110,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant { } @Override - public void processRecords() { + public void processRecords() throws IOException { while (!controlEvents.isEmpty()) { ControlEvent message = controlEvents.poll(); switch (message.getMsgType()) { @@ -153,7 +152,7 @@ public class ConnectTransactionParticipant implements TransactionParticipant { } } - private void handleEndCommit(ControlEvent message) { + private void handleEndCommit(ControlEvent message) throws IOException { if (ongoingTransactionInfo == null) { LOG.warn(String.format("END_COMMIT %s is received while we were NOT in active transaction", message.getCommitTime())); return; @@ -167,28 +166,23 @@ public class ConnectTransactionParticipant implements TransactionParticipant { return; } + context.pause(partition); + ongoingTransactionInfo.commitInitiated(); // send Writer Status Message and wait for ACK_COMMIT in async fashion try { - context.pause(partition); - ongoingTransactionInfo.commitInitiated(); //sendWriterStatus - List writeStatuses = new ArrayList<>(); - try { - writeStatuses = ongoingTransactionInfo.getWriter().close(); - } catch (IOException exception) { - LOG.warn("Error closing the Hudi Writer", exception); - } - - ControlEvent writeStatus = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS, + List writeStatuses = ongoingTransactionInfo.getWriter().close(); + ControlEvent writeStatusEvent = new ControlEvent.Builder(ControlEvent.MsgType.WRITE_STATUS, ControlEvent.SenderType.PARTICIPANT, ongoingTransactionInfo.getCommitTime(), partition) .setParticipantInfo(new ControlEvent.ParticipantInfo( writeStatuses, ongoingTransactionInfo.getLastWrittenKafkaOffset(), ControlEvent.OutcomeType.WRITE_SUCCESS)) .build(); - kafkaControlAgent.publishMessage(writeStatus); + kafkaControlAgent.publishMessage(writeStatusEvent); } catch (Exception exception) { - LOG.warn(String.format("Error ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception); + LOG.error(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception); + throw new IOException(String.format("Error writing records and ending commit %s for partition %s", message.getCommitTime(), partition.partition()), exception); } } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java index 093064881..5a35e7a16 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/ControlEvent.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; /** * The events sent over the Kafka Control Topic between the @@ -108,7 +109,9 @@ public class ControlEvent implements Serializable { @Override public String toString() { return String.format("%s %s %s %s %s %s", version, msgType.name(), commitTime, - Arrays.toString(senderPartition), coordinatorInfo.toString(), participantInfo.toString()); + Arrays.toString(senderPartition), + (coordinatorInfo == null) ? "" : coordinatorInfo.toString(), + (participantInfo == null) ? "" : participantInfo.toString()); } /** @@ -163,6 +166,13 @@ public class ControlEvent implements Serializable { public Map getGlobalKafkaCommitOffsets() { return (globalKafkaCommitOffsets == null) ? new HashMap<>() : globalKafkaCommitOffsets; } + + @Override + public String toString() { + return String.format("%s", globalKafkaCommitOffsets.keySet().stream() + .map(key -> key + "=" + globalKafkaCommitOffsets.get(key)) + .collect(Collectors.joining(", ", "{", "}"))); + } } /** @@ -199,6 +209,11 @@ public class ControlEvent implements Serializable { public OutcomeType getOutcomeType() { return outcomeType; } + + @Override + public String toString() { + return String.format("%s %s %s", Arrays.toString(writeStatusList), kafkaCommitOffset, outcomeType.name()); + } } /** diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java index 0179f3b71..c19d1b849 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/transaction/TransactionParticipant.java @@ -21,6 +21,8 @@ package org.apache.hudi.connect.transaction; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; +import java.io.IOException; + /** * Interface for the Participant that * manages Writes for a @@ -35,7 +37,7 @@ public interface TransactionParticipant { void buffer(SinkRecord record); - void processRecords(); + void processRecords() throws IOException; TopicPartition getPartition(); diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java index c958b2b48..3d8e5f8ab 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/AbstractConnectWriter.java @@ -81,11 +81,11 @@ public abstract class AbstractConnectWriter implements ConnectWriter close() { + public List close() throws IOException { return flushHudiRecords(); } protected abstract void writeHudiRecord(HoodieRecord record); - protected abstract List flushHudiRecords(); + protected abstract List flushHudiRecords() throws IOException; } diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java index 3319604b5..a60293d00 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/BufferedConnectWriter.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.IOUtils; import org.apache.hudi.keygen.KeyGenerator; @@ -94,7 +93,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter { } @Override - public List flushHudiRecords() { + public List flushHudiRecords() throws IOException { try { LOG.info("Number of entries in MemoryBasedMap => " + bufferedRecords.getInMemoryMapNumEntries() @@ -114,7 +113,7 @@ public class BufferedConnectWriter extends AbstractConnectWriter { + writeStatuses); return writeStatuses; } catch (Exception e) { - throw new HoodieException("Write records failed", e); + throw new IOException("Write records failed", e); } } } diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java index 900ba46f7..4e5aaa19b 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestConnectTransactionParticipant.java @@ -68,44 +68,47 @@ public class TestConnectTransactionParticipant { @EnumSource(value = CoordinatorFailureTestScenarios.class) public void testAllCoordinatorFailureScenarios(CoordinatorFailureTestScenarios testScenario) { int expectedRecordsWritten = 0; - switch (testScenario) { - case REGULAR_SCENARIO: - expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - break; - case COORDINATOR_FAILED_AFTER_START_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - // Coordinator Failed - initializeCoordinator(); - break; - case COORDINATOR_FAILED_AFTER_END_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); - expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); - // Coordinator Failed - initializeCoordinator(); - break; - default: - throw new HoodieException("Unknown test scenario " + testScenario); + try { + switch (testScenario) { + case REGULAR_SCENARIO: + expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); + assertTrue(testKafkaConnect.isPaused()); + break; + case COORDINATOR_FAILED_AFTER_START_COMMIT: + testKafkaConnect.putRecordsToParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + // Coordinator Failed + initializeCoordinator(); + break; + case COORDINATOR_FAILED_AFTER_END_COMMIT: + testKafkaConnect.putRecordsToParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); + expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); + // Coordinator Failed + initializeCoordinator(); + break; + default: + throw new HoodieException("Unknown test scenario " + testScenario); + } + + // Regular Case or Coordinator Recovery Case + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); + expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); + assertTrue(testKafkaConnect.isResumed()); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + assertTrue(testKafkaConnect.isPaused()); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); + // Ensure Coordinator and participant are in sync in the kafka offsets + assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); + } catch (Exception exception) { + throw new HoodieException("Unexpected test failure ", exception); } - - // Regular Case or Coordinator Recovery Case - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); - expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isResumed()); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); - // Ensure Coordinator and participant are in sync in the kafka offsets - assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); - participant.stop(); } @@ -113,59 +116,63 @@ public class TestConnectTransactionParticipant { @EnumSource(value = ParticipantFailureTestScenarios.class) public void testAllParticipantFailureScenarios(ParticipantFailureTestScenarios testScenario) { int expectedRecordsWritten = 0; - switch (testScenario) { - case FAILURE_BEFORE_START_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - // Participant fails - initializeParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); - expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isResumed()); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); - // Ensure Coordinator and participant are in sync in the kafka offsets - assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); - break; - case FAILURE_AFTER_START_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - // Participant fails - initializeParticipant(); - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); - // Ensure Coordinator and participant are in sync in the kafka offsets - assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); - break; - case FAILURE_AFTER_END_COMMIT: - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - // Participant fails - initializeParticipant(); - testKafkaConnect.putRecordsToParticipant(); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertTrue(testKafkaConnect.isPaused()); - coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT); - testKafkaConnect.putRecordsToParticipant(); - assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); - // Ensure Coordinator and participant are in sync in the kafka offsets - assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); - break; - default: - throw new HoodieException("Unknown test scenario " + testScenario); + try { + switch (testScenario) { + case FAILURE_BEFORE_START_COMMIT: + testKafkaConnect.putRecordsToParticipant(); + // Participant fails + initializeParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); + expectedRecordsWritten += testKafkaConnect.putRecordsToParticipant(); + assertTrue(testKafkaConnect.isResumed()); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + assertTrue(testKafkaConnect.isPaused()); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); + // Ensure Coordinator and participant are in sync in the kafka offsets + assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); + break; + case FAILURE_AFTER_START_COMMIT: + testKafkaConnect.putRecordsToParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + // Participant fails + initializeParticipant(); + testKafkaConnect.putRecordsToParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + assertTrue(testKafkaConnect.isPaused()); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); + // Ensure Coordinator and participant are in sync in the kafka offsets + assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); + break; + case FAILURE_AFTER_END_COMMIT: + testKafkaConnect.putRecordsToParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.START_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + // Participant fails + initializeParticipant(); + testKafkaConnect.putRecordsToParticipant(); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.END_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + assertTrue(testKafkaConnect.isPaused()); + coordinator.sendEventFromCoordinator(ControlEvent.MsgType.ACK_COMMIT); + testKafkaConnect.putRecordsToParticipant(); + assertEquals(testHudiWriterProvider.getLatestNumberWrites(), expectedRecordsWritten); + // Ensure Coordinator and participant are in sync in the kafka offsets + assertEquals(participant.getLastKafkaCommittedOffset(), coordinator.getCommittedKafkaOffset()); + break; + default: + throw new HoodieException("Unknown test scenario " + testScenario); + } + } catch (Exception exception) { + throw new HoodieException("Unexpected test failure ", exception); } } diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java index 953080921..6e947de07 100644 --- a/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/helper/TestKafkaConnect.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; +import java.io.IOException; import java.util.Arrays; import java.util.Map; import java.util.Set; @@ -60,7 +61,7 @@ public class TestKafkaConnect implements SinkTaskContext { return !isPaused; } - public int putRecordsToParticipant() { + public int putRecordsToParticipant() throws IOException { for (int i = 1; i <= NUM_RECORDS_BATCH; i++) { participant.buffer(getNextKafkaRecord()); } diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index 14bc4e4c8..cf81096ee 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -74,6 +74,8 @@ commons-httpclient:commons-httpclient org.apache.htrace:htrace-core org.jamon:jamon-runtime + org.slf4j:* + log4j:log4j jdk.tools:jdk.tools junit:junit