diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java b/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java
index 910e62618..b87ab23c9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java
@@ -21,12 +21,11 @@ import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieCommitCallbackException;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.codehaus.jackson.map.ObjectMapper;
-import java.io.IOException;
+import static org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil.convertToJsonString;
/**
* A http implementation of {@link HoodieWriteCommitCallback}.
@@ -44,13 +43,7 @@ public class HoodieWriteCommitHttpCallback implements HoodieWriteCommitCallback
@Override
public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
// convert to json
- ObjectMapper mapper = new ObjectMapper();
- String callbackMsg = null;
- try {
- callbackMsg = mapper.writeValueAsString(callbackMessage);
- } catch (IOException e) {
- throw new HoodieCommitCallbackException("Callback service convert message to json failed", e);
- }
+ String callbackMsg = convertToJsonString(callbackMessage);
LOG.info("Try to send callbackMsg, msg = " + callbackMsg);
client.send(callbackMsg);
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieWriteCommitCallbackUtil.java b/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieWriteCommitCallbackUtil.java
new file mode 100644
index 000000000..c160819f1
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieWriteCommitCallbackUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.util;
+
+import org.apache.hudi.exception.HoodieCommitCallbackException;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * Util helps to prepare callback message.
+ */
+public class HoodieWriteCommitCallbackUtil {
+
+ private static ObjectMapper mapper = new ObjectMapper();
+
+ /**
+ * Convert data to json string format.
+ */
+ public static String convertToJsonString(Object obj) {
+ try {
+ return mapper.writeValueAsString(obj);
+ } catch (IOException e) {
+ throw new HoodieCommitCallbackException("Callback service convert data to json failed", e);
+ }
+ }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
index 47a01aa05..126e4f019 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
@@ -29,17 +29,18 @@ import java.util.Properties;
*/
public class HoodieWriteCommitCallbackConfig extends DefaultHoodieConfig {
- public static final String CALLBACK_ON = "hoodie.write.commit.callback.on";
+ public static final String CALLBACK_PREFIX = "hoodie.write.commit.callback.";
+ public static final String CALLBACK_ON = CALLBACK_PREFIX + "on";
public static final boolean DEFAULT_CALLBACK_ON = false;
- public static final String CALLBACK_CLASS_PROP = "hoodie.write.commit.callback.class";
+ public static final String CALLBACK_CLASS_PROP = CALLBACK_PREFIX + "class";
public static final String DEFAULT_CALLBACK_CLASS_PROP = "org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback";
- // ***** REST callback configs *****
- public static final String CALLBACK_HTTP_URL_PROP = "hoodie.write.commit.callback.http.url";
- public static final String CALLBACK_HTTP_API_KEY = "hoodie.write.commit.callback.http.api.key";
+ // ***** HTTP callback configs *****
+ public static final String CALLBACK_HTTP_URL_PROP = CALLBACK_PREFIX + "http.url";
+ public static final String CALLBACK_HTTP_API_KEY = CALLBACK_PREFIX + "http.api.key";
public static final String DEFAULT_CALLBACK_HTTP_API_KEY = "hudi_write_commit_http_callback";
- public static final String CALLBACK_HTTP_TIMEOUT_SECONDS = "hoodie.write.commit.callback.http.timeout.seconds";
+ public static final String CALLBACK_HTTP_TIMEOUT_SECONDS = CALLBACK_PREFIX + "http.timeout.seconds";
public static final int DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS = 3;
private HoodieWriteCommitCallbackConfig(Properties props) {
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index fe24839bb..02100a27c 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -125,6 +125,13 @@
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
log4j
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
new file mode 100644
index 000000000..9133e502d
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallback.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.callback.kafka;
+
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.Properties;
+
+import static org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME;
+import static org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig.CALLBACK_KAFKA_ACKS;
+import static org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig.CALLBACK_KAFKA_BOOTSTRAP_SERVERS;
+import static org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig.CALLBACK_KAFKA_PARTITION;
+import static org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig.CALLBACK_KAFKA_RETRIES;
+import static org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig.CALLBACK_KAFKA_TOPIC;
+
+/**
+ * Kafka implementation of {@link HoodieWriteCommitCallback}.
+ */
+public class HoodieWriteCommitKafkaCallback implements HoodieWriteCommitCallback {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitKafkaCallback.class);
+
+ private Properties props;
+ private String bootstrapServers;
+ private String topic;
+
+ public HoodieWriteCommitKafkaCallback(HoodieWriteConfig config) {
+ this.props = config.getProps();
+ this.bootstrapServers = props.getProperty(CALLBACK_KAFKA_BOOTSTRAP_SERVERS);
+ this.topic = props.getProperty(CALLBACK_KAFKA_TOPIC);
+ validateKafkaConfig();
+ }
+
+ @Override
+ public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
+ String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
+ try (KafkaProducer producer = createProducer(props)) {
+ ProducerRecord record = buildProducerRecord(props, callbackMsg);
+ producer.send(record);
+ LOG.info(String.format("Send callback message %s succeed", callbackMsg));
+ } catch (Exception e) {
+ LOG.error("Send kafka callback msg failed : ", e);
+ }
+ }
+
+ /**
+ * Method helps to create {@link KafkaProducer}. Here we set acks = all and retries = 3 by default to ensure no data
+ * loss.
+ *
+ * @param props Kafka configs
+ * @return A {@link KafkaProducer}
+ */
+ public KafkaProducer createProducer(Properties props) {
+ Properties kafkaProducerProps = new Properties();
+ // bootstrap.servers
+ kafkaProducerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ // default "all" to ensure no message loss
+ kafkaProducerProps.setProperty(ProducerConfig.ACKS_CONFIG, props.getProperty(CALLBACK_KAFKA_ACKS));
+ // retries 3 times by default
+ kafkaProducerProps.setProperty(ProducerConfig.RETRIES_CONFIG, props.getProperty(CALLBACK_KAFKA_RETRIES));
+ kafkaProducerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ kafkaProducerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+
+ LOG.debug("Callback kafka producer init with configs: "
+ + HoodieWriteCommitCallbackUtil.convertToJsonString(kafkaProducerProps));
+ return new KafkaProducer(kafkaProducerProps);
+ }
+
+ /**
+ * Method helps to create a {@link ProducerRecord}. To ensure the order of the callback messages, we should guarantee
+ * that the callback message of the same table will goes to the same partition. Therefore, if user does not specify
+ * the partition, we can use the table name as {@link ProducerRecord} key.
+ *
+ * @param props Kafka configs
+ * @param callbackMsg Callback message
+ * @return Callback {@link ProducerRecord}
+ */
+ private ProducerRecord buildProducerRecord(Properties props, String callbackMsg) {
+ String partition = props.getProperty(CALLBACK_KAFKA_PARTITION);
+ if (null != partition) {
+ return new ProducerRecord(topic, Integer.valueOf(partition), props.getProperty(TABLE_NAME),
+ callbackMsg);
+ } else {
+ return new ProducerRecord(topic, props.getProperty(TABLE_NAME), callbackMsg);
+ }
+ }
+
+ /**
+ * Validate whether both {@code ProducerConfig.BOOTSTRAP_SERVERS_CONFIG} and kafka topic are configured.
+ * Exception will be thrown if anyone of them is not configured.
+ */
+ private void validateKafkaConfig() {
+ ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(bootstrapServers), String.format("Config %s can not be "
+ + "null or empty", CALLBACK_KAFKA_BOOTSTRAP_SERVERS));
+ ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(topic), String.format("Config %s can not be null or empty",
+ CALLBACK_KAFKA_TOPIC));
+ }
+
+ /**
+ * Callback class for this produce operation.
+ */
+ private static class ProducerSendCallback implements Callback {
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ LOG.info(String.format("message offset=%s partition=%s timestamp=%s topic=%s",
+ metadata.offset(), metadata.partition(), metadata.timestamp(), metadata.topic()));
+ }
+ }
+
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallbackConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallbackConfig.java
new file mode 100644
index 000000000..457e0726e
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/kafka/HoodieWriteCommitKafkaCallbackConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.callback.kafka;
+
+import java.util.Properties;
+
+import static org.apache.hudi.common.config.DefaultHoodieConfig.setDefaultOnCondition;
+import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_PREFIX;
+
+/**
+ * Kafka write callback related config.
+ */
+public class HoodieWriteCommitKafkaCallbackConfig {
+
+ public static final String CALLBACK_KAFKA_BOOTSTRAP_SERVERS = CALLBACK_PREFIX + "kafka.bootstrap.servers";
+ public static final String CALLBACK_KAFKA_TOPIC = CALLBACK_PREFIX + "kafka.topic";
+ public static final String CALLBACK_KAFKA_PARTITION = CALLBACK_PREFIX + "kafka.partition";
+ public static final String CALLBACK_KAFKA_ACKS = CALLBACK_PREFIX + "kafka.acks";
+ public static final String DEFAULT_CALLBACK_KAFKA_ACKS = "all";
+ public static final String CALLBACK_KAFKA_RETRIES = CALLBACK_PREFIX + "kafka.retries";
+ public static final int DEFAULT_CALLBACK_KAFKA_RETRIES = 3;
+
+ /**
+ * Set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
+ */
+ public static void setCallbackKafkaConfigIfNeeded(Properties props) {
+ setDefaultOnCondition(props, !props.containsKey(CALLBACK_KAFKA_ACKS), CALLBACK_KAFKA_ACKS,
+ DEFAULT_CALLBACK_KAFKA_ACKS);
+ setDefaultOnCondition(props, !props.containsKey(CALLBACK_KAFKA_RETRIES), CALLBACK_KAFKA_RETRIES,
+ String.valueOf(DEFAULT_CALLBACK_KAFKA_RETRIES));
+ }
+
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index cd507d0b6..9fc24b4e6 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -45,6 +45,8 @@ import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
+import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
+import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -565,6 +567,11 @@ public class DeltaSync implements Serializable {
}
HoodieWriteConfig config = builder.build();
+ // set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
+ if (config.writeCommitCallbackOn() && HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
+ HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config.getProps());
+ }
+
// Validate what deltastreamer assumes of write-config to be really safe
ValidationUtils.checkArgument(config.isInlineCompaction() == cfg.isInlineCompactionEnabled(),
String.format("%s should be set to %s", INLINE_COMPACT_PROP, cfg.isInlineCompactionEnabled()));
@@ -623,4 +630,3 @@ public class DeltaSync implements Serializable {
return commitTimelineOpt;
}
}
-
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index 11b68fa41..eef203c58 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -94,6 +94,7 @@
io.confluent:common-config
io.confluent:common-utils
io.confluent:kafka-schema-registry-client
+ org.apache.kafka:kafka-clients
io.dropwizard.metrics:metrics-core
io.dropwizard.metrics:metrics-graphite
io.prometheus:simpleclient