From 63b15607ff6c251c8965f76066b1c882dc356266 Mon Sep 17 00:00:00 2001 From: ForwardXu Date: Sun, 5 Dec 2021 15:51:06 +0800 Subject: [PATCH] =?UTF-8?q?[HUDI-2937]=20Introduce=20a=20pulsar=20implemen?= =?UTF-8?q?tation=20of=20hoodie=20write=20commit=20=E2=80=A6=20(#4217)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback * [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback * [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback * [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback * [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback * [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback * [HUDI-2937] Introduce a pulsar implementation of hoodie write commit callback --- .../hudi/common/util/DateTimeUtils.java | 149 +++++++++++++++ hudi-utilities/pom.xml | 17 ++ .../HoodieWriteCommitPulsarCallback.java | 180 ++++++++++++++++++ ...HoodieWriteCommitPulsarCallbackConfig.java | 118 ++++++++++++ .../utilities/deltastreamer/DeltaSync.java | 15 +- pom.xml | 1 + 6 files changed, 477 insertions(+), 3 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallbackConfig.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java index 1c18a77d6..e52e56609 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java @@ -19,14 +19,26 @@ package org.apache.hudi.common.util; +import java.time.Duration; import java.time.Instant; import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; public class DateTimeUtils { + private static final Map LABEL_TO_UNIT_MAP = + Collections.unmodifiableMap(initMap()); /** * Parse input String to a {@link java.time.Instant}. + * * @param s Input String should be Epoch time in millisecond or ISO-8601 format. */ public static Instant parseDateTime(String s) throws DateTimeParseException { @@ -37,4 +49,141 @@ public class DateTimeUtils { return Instant.parse(s); } } + + /** + * Parse the given string to a java {@link Duration}. The string is in format "{length + * value}{time unit label}", e.g. "123ms", "321 s". If no time unit label is specified, it will + * be considered as milliseconds. + * + *

Supported time unit labels are: + * + *

+ * + * @param text string to parse. + */ + public static Duration parseDuration(String text) { + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(text)); + + final String trimmed = text.trim(); + ValidationUtils.checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string"); + + final int len = trimmed.length(); + int pos = 0; + + char current; + while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { + pos++; + } + + final String number = trimmed.substring(0, pos); + final String unitLabel = trimmed.substring(pos).trim().toLowerCase(Locale.US); + + if (number.isEmpty()) { + throw new NumberFormatException("text does not start with a number"); + } + + final long value; + try { + value = Long.parseLong(number); // this throws a NumberFormatException on overflow + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "The value '" + + number + + "' cannot be re represented as 64bit number (numeric overflow)."); + } + + if (unitLabel.isEmpty()) { + return Duration.of(value, ChronoUnit.MILLIS); + } + + ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel); + if (unit != null) { + return Duration.of(value, unit); + } else { + throw new IllegalArgumentException( + "Time interval unit label '" + + unitLabel + + "' does not match any of the recognized units: " + + TimeUnit.getAllUnits()); + } + } + + private static Map initMap() { + Map labelToUnit = new HashMap<>(); + for (TimeUnit timeUnit : TimeUnit.values()) { + for (String label : timeUnit.getLabels()) { + labelToUnit.put(label, timeUnit.getUnit()); + } + } + return labelToUnit; + } + + /** + * Enum which defines time unit, mostly used to parse value from configuration file. + */ + private enum TimeUnit { + DAYS(ChronoUnit.DAYS, singular("d"), plural("day")), + HOURS(ChronoUnit.HOURS, singular("h"), plural("hour")), + MINUTES(ChronoUnit.MINUTES, singular("min"), plural("minute")), + SECONDS(ChronoUnit.SECONDS, singular("s"), plural("sec"), plural("second")), + MILLISECONDS(ChronoUnit.MILLIS, singular("ms"), plural("milli"), plural("millisecond")), + MICROSECONDS(ChronoUnit.MICROS, singular("µs"), plural("micro"), plural("microsecond")), + NANOSECONDS(ChronoUnit.NANOS, singular("ns"), plural("nano"), plural("nanosecond")); + + private static final String PLURAL_SUFFIX = "s"; + + private final List labels; + + private final ChronoUnit unit; + + TimeUnit(ChronoUnit unit, String[]... labels) { + this.unit = unit; + this.labels = + Arrays.stream(labels) + .flatMap(Arrays::stream) + .collect(Collectors.toList()); + } + + /** + * @param label the original label + * @return the singular format of the original label + */ + private static String[] singular(String label) { + return new String[] {label}; + } + + /** + * @param label the original label + * @return both the singular format and plural format of the original label + */ + private static String[] plural(String label) { + return new String[] {label, label + PLURAL_SUFFIX}; + } + + public List getLabels() { + return labels; + } + + public ChronoUnit getUnit() { + return unit; + } + + public static String getAllUnits() { + return Arrays.stream(TimeUnit.values()) + .map(TimeUnit::createTimeUnitString) + .collect(Collectors.joining(", ")); + } + + private static String createTimeUnitString(TimeUnit timeUnit) { + return timeUnit.name() + ": (" + String.join(" | ", timeUnit.getLabels()) + ")"; + } + } } diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index e2bd37979..2e68039c1 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -148,6 +148,23 @@ ${kafka.version} + + + org.apache.pulsar + pulsar-client + ${pulsar.version} + + + org.slf4j + slf4j-api + + + com.google.protobuf + protobuf-java + + + + log4j diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java new file mode 100644 index 000000000..2009c2460 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallback.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.callback.pulsar; + +import org.apache.hudi.callback.HoodieWriteCommitCallback; +import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; +import org.apache.hudi.callback.util.HoodieWriteCommitCallbackUtil; +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.util.DateTimeUtils.parseDuration; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.BROKER_SERVICE_URL; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.CONNECTION_TIMEOUT; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.KEEPALIVE_INTERVAL; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.OPERATION_TIMEOUT; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_BLOCK_QUEUE_FULL; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_PENDING_QUEUE_SIZE; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_PENDING_SIZE; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_ROUTE_MODE; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.PRODUCER_SEND_TIMEOUT; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.REQUEST_TIMEOUT; +import static org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig.TOPIC; + +/** + * Pulsar implementation of {@link HoodieWriteCommitCallback}. + */ +public class HoodieWriteCommitPulsarCallback implements HoodieWriteCommitCallback, Closeable { + + private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitPulsarCallback.class); + + private final String serviceUrl; + private final String topic; + + /** + * The pulsar client. + */ + private final transient PulsarClient client; + + /** + * The pulsar producer. + */ + private final transient Producer producer; + + public HoodieWriteCommitPulsarCallback(HoodieWriteConfig config) throws PulsarClientException { + this.serviceUrl = config.getString(BROKER_SERVICE_URL); + this.topic = config.getString(TOPIC); + this.client = createClient(config); + this.producer = createProducer(config); + } + + @Override + public void call(HoodieWriteCommitCallbackMessage callbackMessage) { + String callbackMsg = HoodieWriteCommitCallbackUtil.convertToJsonString(callbackMessage); + try { + producer.newMessage().key(callbackMessage.getTableName()).value(callbackMsg).send(); + LOG.info("Send callback message succeed"); + } catch (Exception e) { + LOG.error("Send pulsar callback msg failed : ", e); + } + } + + /** + * Method helps to create {@link Producer}. + * + * @param hoodieConfig Pulsar configs + * @return A {@link Producer} + */ + public Producer createProducer(HoodieConfig hoodieConfig) throws PulsarClientException { + MessageRoutingMode routeMode = Enum.valueOf(MessageRoutingMode.class, + PRODUCER_ROUTE_MODE.defaultValue()); + Duration sendTimeout = + parseDuration(hoodieConfig.getString(PRODUCER_SEND_TIMEOUT)); + int pendingQueueSize = + hoodieConfig.getInt(PRODUCER_PENDING_QUEUE_SIZE); + int pendingSize = + hoodieConfig.getInt(PRODUCER_PENDING_SIZE); + boolean blockIfQueueFull = + hoodieConfig.getBoolean(PRODUCER_BLOCK_QUEUE_FULL); + + return client + .newProducer(Schema.STRING) + .topic(topic) + .messageRoutingMode(routeMode) + .sendTimeout((int) sendTimeout.toMillis(), TimeUnit.MILLISECONDS) + .maxPendingMessages(pendingQueueSize) + .maxPendingMessagesAcrossPartitions(pendingSize) + .blockIfQueueFull(blockIfQueueFull) + .create(); + } + + public PulsarClient createClient(HoodieConfig hoodieConfig) throws PulsarClientException { + validatePulsarConfig(); + + Duration operationTimeout = + parseDuration(hoodieConfig.getString(OPERATION_TIMEOUT)); + Duration connectionTimeout = + parseDuration(hoodieConfig.getString(CONNECTION_TIMEOUT)); + Duration requestTimeout = + parseDuration(hoodieConfig.getString(REQUEST_TIMEOUT)); + Duration keepAliveInterval = + parseDuration(hoodieConfig.getString(KEEPALIVE_INTERVAL)); + + ClientConfigurationData clientConfigurationData = + new ClientConfigurationData(); + clientConfigurationData + .setServiceUrl(serviceUrl); + clientConfigurationData + .setOperationTimeoutMs(operationTimeout.toMillis()); + clientConfigurationData + .setConnectionTimeoutMs((int) connectionTimeout.toMillis()); + clientConfigurationData + .setRequestTimeoutMs((int) requestTimeout.toMillis()); + clientConfigurationData + .setKeepAliveIntervalSeconds((int) keepAliveInterval.getSeconds()); + + return new PulsarClientImpl(clientConfigurationData); + } + + /** + * Validate whether both pulsar's brokerServiceUrl and topic are configured. + * Exception will be thrown if anyone of them is not configured. + */ + private void validatePulsarConfig() { + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(serviceUrl), String.format("Config %s can not be " + + "null or empty", BROKER_SERVICE_URL.key())); + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(topic), String.format("Config %s can not be null or empty", + TOPIC.key())); + } + + @Override + public void close() throws IOException { + if (producer != null) { + try { + producer.close(); + } catch (Throwable t) { + LOG.warn("Could not properly close the producer.", t); + } + } + + if (client != null) { + try { + client.close(); + } catch (Throwable t) { + LOG.warn("Could not properly close the client.", t); + } + } + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallbackConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallbackConfig.java new file mode 100644 index 000000000..f185e6b51 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/callback/pulsar/HoodieWriteCommitPulsarCallbackConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.callback.pulsar; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_PREFIX; + +/** + * pulsar write callback related config. + */ +@ConfigClassProperty(name = "Write commit pulsar callback configs", + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "Controls notifications sent to pulsar, on events happening to a hudi table.") +public class HoodieWriteCommitPulsarCallbackConfig extends HoodieConfig { + + public static final ConfigProperty BROKER_SERVICE_URL = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.broker.service.url") + .noDefaultValue() + .sinceVersion("0.11.0") + .withDocumentation("Server's url of pulsar cluster, to be used for publishing commit metadata."); + + public static final ConfigProperty TOPIC = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.topic") + .noDefaultValue() + .sinceVersion("0.11.0") + .withDocumentation("pulsar topic name to publish timeline activity into."); + + public static final ConfigProperty PRODUCER_ROUTE_MODE = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.producer.route-mode") + .defaultValue("RoundRobinPartition") + .sinceVersion("0.11.0") + .withDocumentation("Message routing logic for producers on partitioned topics."); + + public static final ConfigProperty PRODUCER_PENDING_QUEUE_SIZE = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.producer.pending-queue-size") + .defaultValue(1000) + .sinceVersion("0.11.0") + .withDocumentation("The maximum size of a queue holding pending messages."); + + public static final ConfigProperty PRODUCER_PENDING_SIZE = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.producer.pending-total-size") + .defaultValue(50000) + .sinceVersion("0.11.0") + .withDocumentation("The maximum number of pending messages across partitions."); + + public static final ConfigProperty PRODUCER_BLOCK_QUEUE_FULL = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.producer.block-if-queue-full") + .defaultValue(true) + .sinceVersion("0.11.0") + .withDocumentation("When the queue is full, the method is blocked " + + "instead of an exception is thrown."); + + public static final ConfigProperty PRODUCER_SEND_TIMEOUT = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.producer.send-timeout") + .defaultValue("30s") + .sinceVersion("0.11.0") + .withDocumentation("The timeout in each sending to pulsar."); + + public static final ConfigProperty OPERATION_TIMEOUT = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.operation-timeout") + .defaultValue("30s") + .sinceVersion("0.11.0") + .withDocumentation("Duration of waiting for completing an operation."); + + public static final ConfigProperty CONNECTION_TIMEOUT = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.connection-timeout") + .defaultValue("10s") + .sinceVersion("0.11.0") + .withDocumentation("Duration of waiting for a connection to a " + + "broker to be established."); + + public static final ConfigProperty REQUEST_TIMEOUT = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.request-timeout") + .defaultValue("60s") + .sinceVersion("0.11.0") + .withDocumentation("Duration of waiting for completing a request."); + + public static final ConfigProperty KEEPALIVE_INTERVAL = ConfigProperty + .key(CALLBACK_PREFIX + "pulsar.keepalive-interval") + .defaultValue("30s") + .sinceVersion("0.11.0") + .withDocumentation("Duration of keeping alive interval for each " + + "client broker connection."); + + /** + * Set default value for {@link HoodieWriteCommitPulsarCallbackConfig} if needed. + */ + public static void setCallbackPulsarConfigIfNeeded(HoodieConfig config) { + config.setDefaultValue(PRODUCER_ROUTE_MODE); + config.setDefaultValue(OPERATION_TIMEOUT); + config.setDefaultValue(CONNECTION_TIMEOUT); + config.setDefaultValue(REQUEST_TIMEOUT); + config.setDefaultValue(KEEPALIVE_INTERVAL); + config.setDefaultValue(PRODUCER_SEND_TIMEOUT); + config.setDefaultValue(PRODUCER_PENDING_QUEUE_SIZE); + config.setDefaultValue(PRODUCER_PENDING_SIZE); + config.setDefaultValue(PRODUCER_BLOCK_QUEUE_FULL); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 07f37a755..eb553c94e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -60,6 +60,8 @@ import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallbackConfig; +import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback; +import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; @@ -726,9 +728,16 @@ public class DeltaSync implements Serializable { HoodieWriteConfig config = builder.build(); - // set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed. - if (config.writeCommitCallbackOn() && HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) { - HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config); + if (config.writeCommitCallbackOn()) { + // set default value for {@link HoodieWriteCommitKafkaCallbackConfig} if needed. + if (HoodieWriteCommitKafkaCallback.class.getName().equals(config.getCallbackClass())) { + HoodieWriteCommitKafkaCallbackConfig.setCallbackKafkaConfigIfNeeded(config); + } + + // set default value for {@link HoodieWriteCommitPulsarCallbackConfig} if needed. + if (HoodieWriteCommitPulsarCallback.class.getName().equals(config.getCallbackClass())) { + HoodieWriteCommitPulsarCallbackConfig.setCallbackPulsarConfigIfNeeded(config); + } } HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props); diff --git a/pom.xml b/pom.xml index d0368c0be..16298aa8f 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ 2.7.4 2.10.0 2.0.0 + 2.8.1 5.3.4 2.17 1.10.1