1
0

[HUDI-3360] Adding retries to deltastreamer for source errors (#4744)

This commit is contained in:
Sivabalan Narayanan
2022-02-07 08:10:06 -05:00
committed by GitHub
parent 538db185ca
commit 24f738fe68
5 changed files with 92 additions and 16 deletions

View File

@@ -66,6 +66,7 @@ import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallback
import org.apache.hudi.utilities.callback.pulsar.HoodieWriteCommitPulsarCallbackConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Config;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SchemaSet;
@@ -377,6 +378,29 @@ public class DeltaSync implements Serializable {
}
LOG.info("Checkpoint to resume from : " + resumeCheckpointStr);
int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
int curRetryCount = 0;
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> sourceDataToSync = null;
while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
try {
sourceDataToSync = fetchFromSource(resumeCheckpointStr);
} catch (HoodieSourceTimeoutException e) {
if (curRetryCount >= maxRetryCount) {
throw e;
}
try {
LOG.error("Exception thrown while fetching data from source. Msg : " + e.getMessage() + ", class : " + e.getClass() + ", cause : " + e.getCause());
LOG.error("Sleeping for " + (cfg.retryIntervalSecs) + " before retrying again. Current retry count " + curRetryCount + ", max retry count " + cfg.maxRetryCount);
Thread.sleep(cfg.retryIntervalSecs * 1000);
} catch (InterruptedException ex) {
LOG.error("Ignoring InterruptedException while waiting to retry on source failure " + e.getMessage());
}
}
}
return sourceDataToSync;
}
private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSource(Option<String> resumeCheckpointStr) {
final Option<JavaRDD<GenericRecord>> avroRDDOptional;
final String checkpointStr;
SchemaProvider schemaProvider;
@@ -415,7 +439,7 @@ public class DeltaSync implements Serializable {
targetSchemaProvider = UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
}
return (SchemaProvider) new DelegatingSchemaProvider(props, jssc,
dataAndCheckpoint.getSchemaProvider(), targetSchemaProvider); })
dataAndCheckpoint.getSchemaProvider(), targetSchemaProvider); })
.orElse(dataAndCheckpoint.getSchemaProvider());
avroRDDOptional = transformed
.map(t -> HoodieSparkUtils.createRdd(
@@ -434,7 +458,7 @@ public class DeltaSync implements Serializable {
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=("
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
return null;
}

View File

@@ -369,6 +369,15 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--bootstrap-index-class"}, description = "subclass of BootstrapIndex")
public String bootstrapIndexClass = HFileBootstrapIndex.class.getName();
@Parameter(names = {"--retry-on-source-failures"}, description = "Retry on any source failures")
public Boolean retryOnSourceFailures = false;
@Parameter(names = {"--retry-interval-seconds"}, description = "the retry interval for source failures if --retry-on-source-failures is enabled")
public Integer retryIntervalSecs = 30;
@Parameter(names = {"--max-retry-count"}, description = "the max retry count if --retry-on-source-failures is enabled")
public Integer maxRetryCount = 3;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.exception;
import org.apache.hudi.exception.HoodieException;
public class HoodieSourceTimeoutException extends HoodieException {
public HoodieSourceTimeoutException(String msg, Throwable e) {
super(msg, e);
}
public HoodieSourceTimeoutException(String msg) {
super(msg);
}
}

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
@@ -89,14 +90,18 @@ public class AvroKafkaSource extends AvroSource {
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
try {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<GenericRecord> toRDD(OffsetRange[] offsetRanges) {

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.exception.HoodieSourceTimeoutException;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
@@ -59,14 +60,18 @@ public class JsonKafkaSource extends JsonSource {
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
try {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {
throw new HoodieSourceTimeoutException("Kafka Source timed out " + e.getMessage());
}
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {