1
0

[HUDI-2937] Introduce a pulsar implementation of hoodie write commit … (#4217)

* [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback

* [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback

* [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback

* [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback

* [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback

* [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback

* [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback
This commit is contained in:
ForwardXu
2021-12-05 15:51:06 +08:00
committed by GitHub
parent a8fb69656f
commit 63b15607ff
6 changed files with 477 additions and 3 deletions

View File

@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.callback.pulsar;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import static org.apache.hudi.common.util.DateTimeUtils.parseDuration;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.BROKER_SERVICE_URL;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.CONNECTION_TIMEOUT;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.KEEPALIVE_INTERVAL;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.OPERATION_TIMEOUT;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_BLOCK_QUEUE_FULL;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_PENDING_QUEUE_SIZE;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_PENDING_SIZE;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_ROUTE_MODE;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_SEND_TIMEOUT;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.REQUEST_TIMEOUT;
import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.TOPIC;
/**
* Pulsar implementation of {@link HoodieWriteCommitCallback}.
*/
public class HoodieWriteCommitPulsarCallback implements HoodieWriteCommitCallback, Closeable {
private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitPulsarCallback.class);
private final String serviceUrl;
private final String topic;
/**
* The pulsar client.
*/
private final transient PulsarClient client;
/**
* The pulsar producer.
*/
private final transient Producer<String> producer;
public HoodieWriteCommitPulsarCallback(HoodieWriteConfig config) throws PulsarClientException {
this.serviceUrl = config.getString(BROKER_SERVICE_URL);
this.topic = config.getString(TOPIC);
this.client = createClient(config);
this.producer = createProducer(config);
}
@Override
public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage);
try {
producer.newMessage().key(callbackMessage.getTableName()).value(callbackMsg).send();
LOG.info("Send callback message succeed");
} catch (Exception e) {
LOG.error("Send pulsar callback msg failed : ", e);
}
}
/**
* Method helps to create {@link Producer}.
*
* @param hoodieConfig Pulsar configs
* @return A {@link Producer}
*/
public Producer<String> createProducer(HoodieConfig hoodieConfig) throws PulsarClientException {
MessageRoutingMode routeMode = Enum.valueOf(MessageRoutingMode.class,
PRODUCER_ROUTE_MODE.defaultValue());
Duration sendTimeout =
parseDuration(hoodieConfig.getString(PRODUCER_SEND_TIMEOUT));
int pendingQueueSize =
hoodieConfig.getInt(PRODUCER_PENDING_QUEUE_SIZE);
int pendingSize =
hoodieConfig.getInt(PRODUCER_PENDING_SIZE);
boolean blockIfQueueFull =
hoodieConfig.getBoolean(PRODUCER_BLOCK_QUEUE_FULL);
return client
.newProducer(Schema.STRING)
.topic(topic)
.messageRoutingMode(routeMode)
.sendTimeout((int) sendTimeout.toMillis(), TimeUnit.MILLISECONDS)
.maxPendingMessages(pendingQueueSize)
.maxPendingMessagesAcrossPartitions(pendingSize)
.blockIfQueueFull(blockIfQueueFull)
.create();
}
public PulsarClient createClient(HoodieConfig hoodieConfig) throws PulsarClientException {
validatePulsarConfig();
Duration operationTimeout =
parseDuration(hoodieConfig.getString(OPERATION_TIMEOUT));
Duration connectionTimeout =
parseDuration(hoodieConfig.getString(CONNECTION_TIMEOUT));
Duration requestTimeout =
parseDuration(hoodieConfig.getString(REQUEST_TIMEOUT));
Duration keepAliveInterval =
parseDuration(hoodieConfig.getString(KEEPALIVE_INTERVAL));
ClientConfigurationData clientConfigurationData =
new ClientConfigurationData();
clientConfigurationData
.setServiceUrl(serviceUrl);
clientConfigurationData
.setOperationTimeoutMs(operationTimeout.toMillis());
clientConfigurationData
.setConnectionTimeoutMs((int) connectionTimeout.toMillis());
clientConfigurationData
.setRequestTimeoutMs((int) requestTimeout.toMillis());
clientConfigurationData
.setKeepAliveIntervalSeconds((int) keepAliveInterval.getSeconds());
return new PulsarClientImpl(clientConfigurationData);
}
/**
* Validate whether both pulsar's brokerServiceUrl and topic are configured.
* Exception will be thrown if anyone of them is not configured.
*/
private void validatePulsarConfig() {
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(serviceUrl), String.format("Config %s can not be "
+ "null or empty", BROKER_SERVICE_URL.key()));
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(topic), String.format("Config %s can not be null or empty",
TOPIC.key()));
}
@Override
public void close() throws IOException {
if (producer != null) {
try {
producer.close();
} catch (Throwable t) {
LOG.warn("Could not properly close the producer.", t);
}
}
if (client != null) {
try {
client.close();
} catch (Throwable t) {
LOG.warn("Could not properly close the client.", t);
}
}
}
}

View File

@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.callback.pulsar;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_PREFIX;
/**
* pulsar write callback related config.
*/
@ConfigClassProperty(name = "Write commit pulsar callback configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Controls notifications sent to pulsar, on events happening to a hudi table.")
public class HoodieWriteCommitPulsarCallbackConfig extends HoodieConfig {
public static final ConfigProperty<String> BROKER_SERVICE_URL = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.broker.service.url")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("Server's url of pulsar cluster, to be used for publishing commit metadata.");
public static final ConfigProperty<String> TOPIC = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.topic")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("pulsar topic name to publish timeline activity into.");
public static final ConfigProperty<String> PRODUCER_ROUTE_MODE = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.producer.route-mode")
.defaultValue("RoundRobinPartition")
.sinceVersion("0.11.0")
.withDocumentation("Message routing logic for producers on partitioned topics.");
public static final ConfigProperty<Integer> PRODUCER_PENDING_QUEUE_SIZE = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.producer.pending-queue-size")
.defaultValue(1000)
.sinceVersion("0.11.0")
.withDocumentation("The maximum size of a queue holding pending messages.");
public static final ConfigProperty<Integer> PRODUCER_PENDING_SIZE = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.producer.pending-total-size")
.defaultValue(50000)
.sinceVersion("0.11.0")
.withDocumentation("The maximum number of pending messages across partitions.");
public static final ConfigProperty<Boolean> PRODUCER_BLOCK_QUEUE_FULL = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.producer.block-if-queue-full")
.defaultValue(true)
.sinceVersion("0.11.0")
.withDocumentation("When the queue is full, the method is blocked "
+ "instead of an exception is thrown.");
public static final ConfigProperty<String> PRODUCER_SEND_TIMEOUT = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.producer.send-timeout")
.defaultValue("30s")
.sinceVersion("0.11.0")
.withDocumentation("The timeout in each sending to pulsar.");
public static final ConfigProperty<String> OPERATION_TIMEOUT = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.operation-timeout")
.defaultValue("30s")
.sinceVersion("0.11.0")
.withDocumentation("Duration of waiting for completing an operation.");
public static final ConfigProperty<String> CONNECTION_TIMEOUT = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.connection-timeout")
.defaultValue("10s")
.sinceVersion("0.11.0")
.withDocumentation("Duration of waiting for a connection to a "
+ "broker to be established.");
public static final ConfigProperty<String> REQUEST_TIMEOUT = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.request-timeout")
.defaultValue("60s")
.sinceVersion("0.11.0")
.withDocumentation("Duration of waiting for completing a request.");
public static final ConfigProperty<String> KEEPALIVE_INTERVAL = ConfigProperty
.key(CALLBACK_PREFIX + "pulsar.keepalive-interval")
.defaultValue("30s")
.sinceVersion("0.11.0")
.withDocumentation("Duration of keeping alive interval for each "
+ "client broker connection.");
/**
* Set default value for {@link HoodieWriteCommitPulsarCallbackConfig} if needed.
*/
public static void setCallbackPulsarConfigIfNeeded(HoodieConfig config) {
config.setDefaultValue(PRODUCER_ROUTE_MODE);
config.setDefaultValue(OPERATION_TIMEOUT);
config.setDefaultValue(CONNECTION_TIMEOUT);
config.setDefaultValue(REQUEST_TIMEOUT);
config.setDefaultValue(KEEPALIVE_INTERVAL);
config.setDefaultValue(PRODUCER_SEND_TIMEOUT);
config.setDefaultValue(PRODUCER_PENDING_QUEUE_SIZE);
config.setDefaultValue(PRODUCER_PENDING_SIZE);
config.setDefaultValue(PRODUCER_BLOCK_QUEUE_FULL);
}
}

View File

@@ -60,6 +60,8 @@ import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback;
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
@@ -726,9 +728,16 @@ public class DeltaSync implements Serializable {
HoodieWriteConfig config = builder.build();
// set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
if (config.writeCommitCallbackOn() && HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
if (config.writeCommitCallbackOn()) {
// set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed.
if (HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) {
HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config);
}
// set default value for {@link HoodieWriteCommitPulsarCallbackConfig} if needed.
if (HoodieWriteCommitPulsarCallback.class.getName().equals(config.getCallbackClass())) {
HoodieWriteCommitPulsarCallbackConfig.setCallbackPulsarConfigIfNeeded(config);
}
}
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props);