1
0

[HUDI-2858] Fixing handling of cluster update reject exception in deltastreamer (#4120)

This commit is contained in:
Sivabalan Narayanan
2021-11-25 14:34:07 -05:00
committed by GitHub
parent f692078d32
commit 6a0f079866
2 changed files with 51 additions and 7 deletions

View File

@@ -46,8 +46,10 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.IdentitySplitter;
@@ -649,6 +651,8 @@ public class HoodieDeltaStreamer implements Serializable {
+ toSleepMs + " ms.");
Thread.sleep(toSleepMs);
}
} catch (HoodieUpsertException ue) {
handleUpsertException(ue);
} catch (Exception e) {
LOG.error("Shutting down delta-sync due to exception", e);
error = true;
@@ -662,6 +666,21 @@ public class HoodieDeltaStreamer implements Serializable {
}, executor), executor);
}
private void handleUpsertException(HoodieUpsertException ue) {
if (ue.getCause() instanceof HoodieClusteringUpdateException) {
LOG.warn("Write rejected due to conflicts with pending clustering operation. Going to retry after 1 min with the hope "
+ "that clustering will complete by then.", ue);
try {
Thread.sleep(60000); // Intentionally not using cfg.minSyncIntervalSeconds, since it could be too high or it could be 0.
// Once the delta streamer gets past this clustering update exception, regular syncs will honor cfg.minSyncIntervalSeconds.
} catch (InterruptedException e) {
throw new HoodieException("Deltastreamer interrupted while waiting for next round ", e);
}
} else {
throw ue;
}
}
/**
* Shutdown async services like compaction/clustering as DeltaSync is shutdown.
*/

View File

@@ -152,7 +152,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
protected HoodieClusteringJob initialHoodieClusteringJob(String tableBasePath, String clusteringInstantTime, Boolean runSchedule, String scheduleAndExecute, Boolean retryLastFailedClusteringJob) {
HoodieClusteringJob.Config scheduleClusteringConfig = buildHoodieClusteringUtilConfig(tableBasePath,
clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
clusteringInstantTime, runSchedule, scheduleAndExecute, retryLastFailedClusteringJob);
return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
}
@@ -931,6 +931,31 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
});
}
/**
* When deltastreamer writes clashes with pending clustering, deltastreamer should keep retrying and eventually succeed(once clustering completes)
* w/o failing mid way.
*
* @throws Exception
*/
@Test
public void testAsyncClusteringServiceWithConflicts() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts";
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
return true;
});
}
@ParameterizedTest
@ValueSource(strings = {"true", "false"})
public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception {
@@ -1319,7 +1344,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
// Properties used for testing delta-streamer with orc source
orcProps.setProperty("include", "base.properties");
orcProps.setProperty("hoodie.embed.timeline.server","false");
orcProps.setProperty("hoodie.embed.timeline.server", "false");
orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
if (useSchemaProvider) {
@@ -1333,9 +1358,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(),
transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
testNum++;
@@ -1844,8 +1869,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
private static Stream<Arguments> testORCDFSSource() {
// arg1 boolean useSchemaProvider, arg2 List<String> transformerClassNames
return Stream.of(
arguments(false, null),
arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
arguments(false, null),
arguments(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
);
}