[HUDI-3675] Adding post write termination strategy to deltastreamer continuous mode (#5073)
- Added a postWriteTerminationStrategy to deltastreamer continuous mode. One can enable by setting the appropriate termination strategy using DeltastreamerConfig.postWriteTerminationStrategyClass. If not, continuous mode is expected to run forever. - Added one concrete impl for termination strategy as NoNewDataTerminationStrategy which shuts down deltastreamer if there is no new data to consume from source for N consecutive rounds.
This commit is contained in:
committed by
GitHub
parent
c319ee9cea
commit
52fe1c9fae
@@ -60,6 +60,7 @@ import org.apache.hudi.utilities.HoodieClusteringJob;
|
||||
import org.apache.hudi.utilities.HoodieIndexer;
|
||||
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
|
||||
import org.apache.hudi.utilities.deltastreamer.NoNewDataTerminationStrategy;
|
||||
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
|
||||
@@ -738,18 +739,30 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception {
|
||||
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertsMORContinuousMode() throws Exception {
|
||||
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
|
||||
}
|
||||
|
||||
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
|
||||
testUpsertsContinuousMode(tableType, tempDir, false);
|
||||
}
|
||||
|
||||
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/" + tempDir;
|
||||
// Keep it higher than batch-size to test continuous mode
|
||||
int totalRecords = 3000;
|
||||
// Initial bulk insert
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
|
||||
cfg.continuousMode = true;
|
||||
if (testShutdownGracefully) {
|
||||
cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
|
||||
}
|
||||
cfg.tableType = tableType.name();
|
||||
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||
cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
||||
@@ -763,6 +776,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
}
|
||||
TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
|
||||
TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext);
|
||||
if (testShutdownGracefully) {
|
||||
TestDataSource.returnEmptyBatch = true;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -781,8 +797,35 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
}
|
||||
});
|
||||
TestHelpers.waitTillCondition(condition, dsFuture, 360);
|
||||
ds.shutdownGracefully();
|
||||
dsFuture.get();
|
||||
if (cfg != null && !cfg.postWriteTerminationStrategyClass.isEmpty()) {
|
||||
awaitDeltaStreamerShutdown(ds);
|
||||
} else {
|
||||
ds.shutdownGracefully();
|
||||
dsFuture.get();
|
||||
}
|
||||
}
|
||||
|
||||
static void awaitDeltaStreamerShutdown(HoodieDeltaStreamer ds) throws InterruptedException {
|
||||
// await until deltastreamer shuts down on its own
|
||||
boolean shutDownRequested = false;
|
||||
int timeSoFar = 0;
|
||||
while (!shutDownRequested) {
|
||||
shutDownRequested = ds.getDeltaSyncService().isShutdownRequested();
|
||||
Thread.sleep(500);
|
||||
timeSoFar += 500;
|
||||
if (timeSoFar > (2 * 60 * 1000)) {
|
||||
Assertions.fail("Deltastreamer should have shutdown by now");
|
||||
}
|
||||
}
|
||||
boolean shutdownComplete = false;
|
||||
while (!shutdownComplete) {
|
||||
shutdownComplete = ds.getDeltaSyncService().isShutdown();
|
||||
Thread.sleep(500);
|
||||
timeSoFar += 500;
|
||||
if (timeSoFar > (2 * 60 * 1000)) {
|
||||
Assertions.fail("Deltastreamer should have shutdown by now");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function<Boolean, Boolean> condition) throws Exception {
|
||||
|
||||
@@ -39,11 +39,14 @@ import java.util.stream.Collectors;
|
||||
public class TestDataSource extends AbstractBaseTestSource {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(TestDataSource.class);
|
||||
public static boolean returnEmptyBatch = false;
|
||||
private static int counter = 0;
|
||||
|
||||
public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||
SchemaProvider schemaProvider) {
|
||||
super(props, sparkContext, sparkSession, schemaProvider);
|
||||
initDataGen();
|
||||
returnEmptyBatch = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -54,9 +57,13 @@ public class TestDataSource extends AbstractBaseTestSource {
|
||||
LOG.info("Source Limit is set to " + sourceLimit);
|
||||
|
||||
// No new data.
|
||||
if (sourceLimit <= 0) {
|
||||
if (sourceLimit <= 0 || returnEmptyBatch) {
|
||||
LOG.warn("Return no new data from Test Data source " + counter + ", source limit " + sourceLimit);
|
||||
return new InputBatch<>(Option.empty(), lastCheckpointStr.orElse(null));
|
||||
} else {
|
||||
LOG.warn("Returning valid data from Test Data source " + counter + ", source limit " + sourceLimit);
|
||||
}
|
||||
counter++;
|
||||
|
||||
List<GenericRecord> records =
|
||||
fetchNextBatch(props, (int) sourceLimit, instantTime, DEFAULT_PARTITION_NUM).collect(Collectors.toList());
|
||||
|
||||
Reference in New Issue
Block a user