1
0

[HUDI-3430] Fix Deltastreamer to properly shut down the services upon failure (#4824)

This commit is contained in:
Y Ethan Guo
2022-02-18 05:44:56 -08:00
committed by GitHub
parent de8161ae96
commit fba5822ee3
4 changed files with 44 additions and 16 deletions

View File

@@ -82,10 +82,16 @@ public abstract class AsyncClusteringService extends HoodieAsyncTableService {
} }
LOG.info("Clustering executor shutting down properly"); LOG.info("Clustering executor shutting down properly");
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
hasError = true;
LOG.warn("Clustering executor got interrupted exception! Stopping", ie); LOG.warn("Clustering executor got interrupted exception! Stopping", ie);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Clustering executor failed", e); hasError = true;
LOG.error("Clustering executor failed due to IOException", e);
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} catch (Exception e) {
hasError = true;
LOG.error("Clustering executor failed", e);
throw e;
} }
return true; return true;
}, executor)).toArray(CompletableFuture[]::new)), executor); }, executor)).toArray(CompletableFuture[]::new)), executor);

View File

@@ -92,10 +92,16 @@ public abstract class AsyncCompactService extends HoodieAsyncTableService {
} }
LOG.info("Compactor shutting down properly!!"); LOG.info("Compactor shutting down properly!!");
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
hasError = true;
LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie); LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Compactor executor failed", e); hasError = true;
LOG.error("Compactor executor failed due to IOException", e);
throw new HoodieIOException(e.getMessage(), e); throw new HoodieIOException(e.getMessage(), e);
} catch (Exception e) {
hasError = true;
LOG.error("Compactor executor failed", e);
throw e;
} }
return true; return true;
}, executor)).toArray(CompletableFuture[]::new)), executor); }, executor)).toArray(CompletableFuture[]::new)), executor);

View File

@@ -41,7 +41,10 @@ import java.util.function.Function;
public abstract class HoodieAsyncService implements Serializable { public abstract class HoodieAsyncService implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieAsyncService.class); private static final Logger LOG = LogManager.getLogger(HoodieAsyncService.class);
private static final long POLLING_SECONDS = 10;
// Flag indicating whether an error is incurred in the service
protected boolean hasError;
// Flag to track if the service is started. // Flag to track if the service is started.
private boolean started; private boolean started;
// Flag indicating shutdown is externally requested // Flag indicating shutdown is externally requested
@@ -82,6 +85,10 @@ public abstract class HoodieAsyncService implements Serializable {
return shutdown; return shutdown;
} }
public boolean hasError() {
return hasError;
}
/** /**
* Wait till the service shutdown. If the service shutdown with exception, it will be thrown * Wait till the service shutdown. If the service shutdown with exception, it will be thrown
* *
@@ -109,6 +116,7 @@ public abstract class HoodieAsyncService implements Serializable {
public void shutdown(boolean force) { public void shutdown(boolean force) {
if (!shutdownRequested || force) { if (!shutdownRequested || force) {
shutdownRequested = true; shutdownRequested = true;
shutdown = true;
if (executor != null) { if (executor != null) {
if (force) { if (force) {
executor.shutdownNow(); executor.shutdownNow();
@@ -178,8 +186,8 @@ public abstract class HoodieAsyncService implements Serializable {
public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) throws InterruptedException { public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) throws InterruptedException {
try { try {
queueLock.lock(); queueLock.lock();
while (!isShutdown() && (pendingInstants.size() > numPending)) { while (!isShutdown() && !hasError() && (pendingInstants.size() > numPending)) {
consumed.await(); consumed.await(POLLING_SECONDS, TimeUnit.SECONDS);
} }
} finally { } finally {
queueLock.unlock(); queueLock.unlock();
@@ -202,8 +210,8 @@ public abstract class HoodieAsyncService implements Serializable {
* @throws InterruptedException * @throws InterruptedException
*/ */
HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException { HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException {
LOG.info("Waiting for next instant upto 10 seconds"); LOG.info(String.format("Waiting for next instant up to %d seconds", POLLING_SECONDS));
HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS); HoodieInstant instant = pendingInstants.poll(POLLING_SECONDS, TimeUnit.SECONDS);
if (instant != null) { if (instant != null) {
try { try {
queueLock.lock(); queueLock.lock();

View File

@@ -659,6 +659,10 @@ public class HoodieDeltaStreamer implements Serializable {
asyncCompactService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, asyncCompactService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get())); HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstantAndRDD.get().getLeft().get()));
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
if (asyncCompactService.get().hasError()) {
error = true;
throw new HoodieException("Async compaction failed. Shutting down Delta Sync...");
}
} }
if (clusteringConfig.isAsyncClusteringEnabled()) { if (clusteringConfig.isAsyncClusteringEnabled()) {
Option<String> clusteringInstant = deltaSync.getClusteringInstantOpt(); Option<String> clusteringInstant = deltaSync.getClusteringInstantOpt();
@@ -666,6 +670,10 @@ public class HoodieDeltaStreamer implements Serializable {
LOG.info("Scheduled async clustering for instant: " + clusteringInstant.get()); LOG.info("Scheduled async clustering for instant: " + clusteringInstant.get());
asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get())); asyncClusteringService.get().enqueuePendingAsyncServiceInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get()));
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering); asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
if (asyncClusteringService.get().hasError()) {
error = true;
throw new HoodieException("Async clustering failed. Shutting down Delta Sync...");
}
} }
} }
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start); long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start);
@@ -684,6 +692,7 @@ public class HoodieDeltaStreamer implements Serializable {
} }
} finally { } finally {
shutdownAsyncServices(error); shutdownAsyncServices(error);
executor.shutdownNow();
} }
return true; return true;
}, executor), executor); }, executor), executor);
@@ -737,13 +746,12 @@ public class HoodieDeltaStreamer implements Serializable {
HoodieTableMetaClient.builder().setConf(new Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTableMetaClient.builder().setConf(new Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta); List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta);
pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); pending.forEach(hoodieInstant -> asyncCompactService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
asyncCompactService.get().start((error) -> { asyncCompactService.get().start(error -> true);
// Shutdown DeltaSync
shutdown(false);
return true;
});
try { try {
asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions); asyncCompactService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingCompactions);
if (asyncCompactService.get().hasError()) {
throw new HoodieException("Async compaction failed during write client initialization.");
}
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new HoodieException(ie); throw new HoodieException(ie);
} }
@@ -762,12 +770,12 @@ public class HoodieDeltaStreamer implements Serializable {
List<HoodieInstant> pending = ClusteringUtils.getPendingClusteringInstantTimes(meta); List<HoodieInstant> pending = ClusteringUtils.getPendingClusteringInstantTimes(meta);
LOG.info(String.format("Found %d pending clustering instants ", pending.size())); LOG.info(String.format("Found %d pending clustering instants ", pending.size()));
pending.forEach(hoodieInstant -> asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant)); pending.forEach(hoodieInstant -> asyncClusteringService.get().enqueuePendingAsyncServiceInstant(hoodieInstant));
asyncClusteringService.get().start((error) -> { asyncClusteringService.get().start(error -> true);
shutdown(false);
return true;
});
try { try {
asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering); asyncClusteringService.get().waitTillPendingAsyncServiceInstantsReducesTo(cfg.maxPendingClustering);
if (asyncClusteringService.get().hasError()) {
throw new HoodieException("Async clustering failed during write client initialization.");
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new HoodieException(e); throw new HoodieException(e);
} }