From 24f738fe68d03fa15755d8a53aeb6e25921c6e94 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 7 Feb 2022 08:10:06 -0500 Subject: [PATCH] [HUDI-3360] Adding retries to deltastreamer for source errors (#4744) --- .../utilities/deltastreamer/DeltaSync.java | 28 ++++++++++++++-- .../deltastreamer/HoodieDeltaStreamer.java | 9 +++++ .../HoodieSourceTimeoutException.java | 33 +++++++++++++++++++ .../utilities/sources/AvroKafkaSource.java | 19 +++++++---- .../utilities/sources/JsonKafkaSource.java | 19 +++++++---- 5 files changed, 92 insertions(+), 16 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSourceTimeoutException.java diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 38e862e94..acfdc1bbc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -66,6 +66,7 @@ import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config; import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; +import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException; import org.apache.hudi.utilities.schema.DelegatingSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SchemaSet; @@ -377,6 +378,29 @@ public class DeltaSync implements Serializable { } LOG.info("Checkpoint to resume from : " + resumeCheckpointStr); + int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1; + int curRetryCount = 0; + Pair>> sourceDataToSync = null; + while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) { + try { + sourceDataToSync = fetchFromSource(resumeCheckpointStr); + } catch (HoodieSourceTimeoutException e) { + if (curRetryCount >= maxRetryCount) { + throw e; + } + try { + LOG.error("Exception thrown while fetching data from source. Msg : " + e.getMessage() + ", class : " + e.getClass() + ", cause : " + e.getCause()); + LOG.error("Sleeping for " + (cfg.retryIntervalSecs) + " before retrying again. Current retry count " + curRetryCount + ", max retry count " + cfg.maxRetryCount); + Thread.sleep(cfg.retryIntervalSecs * 1000); + } catch (InterruptedException ex) { + LOG.error("Ignoring InterruptedException while waiting to retry on source failure " + e.getMessage()); + } + } + } + return sourceDataToSync; + } + + private Pair>> fetchFromSource(Option resumeCheckpointStr) { final Option> avroRDDOptional; final String checkpointStr; SchemaProvider schemaProvider; @@ -415,7 +439,7 @@ public class DeltaSync implements Serializable { targetSchemaProvider = UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc); } return (SchemaProvider) new DelegatingSchemaProvider(props, jssc, - dataAndCheckpoint.getSchemaProvider(), targetSchemaProvider); }) + dataAndCheckpoint.getSchemaProvider(), targetSchemaProvider); }) .orElse(dataAndCheckpoint.getSchemaProvider()); avroRDDOptional = transformed .map(t -> HoodieSparkUtils.createRdd( @@ -434,7 +458,7 @@ public class DeltaSync implements Serializable { if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" - + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"); + + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"); return null; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 3e4f00930..8edd45d44 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -369,6 +369,15 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--bootstrap-index-class"}, description = "subclass of BootstrapIndex") public String bootstrapIndexClass = HFileBootstrapIndex.class.getName(); + @Parameter(names = {"--retry-on-source-failures"}, description = "Retry on any source failures") + public Boolean retryOnSourceFailures = false; + + @Parameter(names = {"--retry-interval-seconds"}, description = "the retry interval for source failures if --retry-on-source-failures is enabled") + public Integer retryIntervalSecs = 30; + + @Parameter(names = {"--max-retry-count"}, description = "the max retry count if --retry-on-source-failures is enabled") + public Integer maxRetryCount = 3; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSourceTimeoutException.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSourceTimeoutException.java new file mode 100644 index 000000000..d95f4f4b5 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieSourceTimeoutException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.exception; + +import org.apache.hudi.exception.HoodieException; + +public class HoodieSourceTimeoutException extends HoodieException { + + public HoodieSourceTimeoutException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieSourceTimeoutException(String msg) { + super(msg); + } +} \ No newline at end of file diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index ff8ea5a7a..2e4caa08b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer; +import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.AvroConvertor; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; @@ -89,14 +90,18 @@ public class AvroKafkaSource extends AvroSource { @Override protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) { - OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); - long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); - LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); - if (totalNewMsgs <= 0) { - return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); + try { + OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); + long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); + LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); + if (totalNewMsgs <= 0) { + return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); + } + JavaRDD newDataRDD = toRDD(offsetRanges); + return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); + } catch (org.apache.kafka.common.errors.TimeoutException e) { + throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage()); } - JavaRDD newDataRDD = toRDD(offsetRanges); - return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); } private JavaRDD toRDD(OffsetRange[] offsetRanges) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java index 3dfc61100..200b6450e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java @@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen; import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; @@ -59,14 +60,18 @@ public class JsonKafkaSource extends JsonSource { @Override protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) { - OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); - long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); - LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); - if (totalNewMsgs <= 0) { - return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); + try { + OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics); + long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges); + LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName()); + if (totalNewMsgs <= 0) { + return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges)); + } + JavaRDD newDataRDD = toRDD(offsetRanges); + return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); + } catch (org.apache.kafka.common.errors.TimeoutException e) { + throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage()); } - JavaRDD newDataRDD = toRDD(offsetRanges); - return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges)); } private JavaRDD toRDD(OffsetRange[] offsetRanges) {