[HUDI-3043] Adding some test fixes to continuous mode multi writer tests (#4356)
This commit is contained in:
committed by
GitHub
parent
9246b16492
commit
6eba8345cb
@@ -313,7 +313,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
|
||||
static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) {
|
||||
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
|
||||
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
|
||||
HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
|
||||
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
||||
int numDeltaCommits = (int) timeline.getInstants().count();
|
||||
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
|
||||
@@ -336,7 +336,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
boolean ret = false;
|
||||
while (!ret && !dsFuture.isDone()) {
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
Thread.sleep(3000);
|
||||
ret = condition.apply(true);
|
||||
} catch (Throwable error) {
|
||||
LOG.warn("Got error :", error);
|
||||
@@ -713,14 +713,18 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
||||
}
|
||||
|
||||
static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition) throws Exception {
|
||||
deltaStreamerTestRunner(ds, cfg, condition, "single_ds_job");
|
||||
}
|
||||
|
||||
static void deltaStreamerTestRunner(HoodieDeltaStreamer ds, HoodieDeltaStreamer.Config cfg, Function<Boolean, Boolean> condition, String jobId) throws Exception {
|
||||
Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
|
||||
try {
|
||||
ds.sync();
|
||||
} catch (Exception ex) {
|
||||
LOG.warn("DS continuous job failed, hence not proceeding with condition check for " + jobId);
|
||||
throw new RuntimeException(ex.getMessage(), ex);
|
||||
}
|
||||
});
|
||||
|
||||
TestHelpers.waitTillCondition(condition, dsFuture, 360);
|
||||
ds.shutdownGracefully();
|
||||
dsFuture.get();
|
||||
|
||||
@@ -35,6 +35,8 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
|
||||
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
@@ -50,6 +52,7 @@ import java.util.Objects;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable;
|
||||
@@ -73,6 +76,7 @@ import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource
|
||||
public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness {
|
||||
|
||||
private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE";
|
||||
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
|
||||
|
||||
String basePath;
|
||||
String propsFilePath;
|
||||
@@ -85,7 +89,6 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
||||
setUpTestTable(tableType);
|
||||
prepareInitialConfigs(fs(), basePath, "foo");
|
||||
// enable carrying forward latest checkpoint
|
||||
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
|
||||
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
|
||||
@@ -104,7 +107,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
|
||||
cfgBackfillJob.continuousMode = false;
|
||||
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
|
||||
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
HoodieTimeline timeline = meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
|
||||
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
|
||||
@@ -117,7 +120,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
|
||||
// run ingestion & backfill in parallel, create conflict and fail one
|
||||
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
|
||||
cfgIngestionJob, backfillJob, cfgBackfillJob, true);
|
||||
cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1");
|
||||
|
||||
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
|
||||
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||
@@ -125,30 +128,30 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
|
||||
props.setProperty("hoodie.test.source.generate.inserts", "true");
|
||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER);
|
||||
cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
|
||||
HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
|
||||
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
|
||||
cfgBackfillJob.continuousMode = false;
|
||||
cfgBackfillJob2.continuousMode = false;
|
||||
meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
|
||||
timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
|
||||
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
|
||||
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
||||
cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
|
||||
cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||
cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
||||
|
||||
cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
||||
HoodieDeltaStreamer.Config cfgIngestionJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
||||
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
|
||||
cfgIngestionJob.continuousMode = true;
|
||||
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
||||
cfgIngestionJob2.continuousMode = true;
|
||||
cfgIngestionJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||
cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
||||
// re-init ingestion job
|
||||
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
|
||||
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc());
|
||||
// re-init backfill job
|
||||
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
|
||||
HoodieDeltaStreamer backfillJob2 = new HoodieDeltaStreamer(cfgBackfillJob2, jsc());
|
||||
|
||||
// run ingestion & backfill in parallel, avoid conflict and succeed both
|
||||
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3,
|
||||
cfgIngestionJob, backfillJob2, cfgBackfillJob, false);
|
||||
cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false, "batch2");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@@ -301,7 +304,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
|
||||
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
|
||||
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
|
||||
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {
|
||||
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception {
|
||||
ExecutorService service = Executors.newFixedThreadPool(2);
|
||||
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
|
||||
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
@@ -318,18 +321,27 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
return true;
|
||||
};
|
||||
|
||||
AtomicBoolean continousFailed = new AtomicBoolean(false);
|
||||
AtomicBoolean backfillFailed = new AtomicBoolean(false);
|
||||
try {
|
||||
Future regularIngestionJobFuture = service.submit(() -> {
|
||||
try {
|
||||
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion);
|
||||
} catch (Exception ex) {
|
||||
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, conditionForRegularIngestion, jobId);
|
||||
} catch (Throwable ex) {
|
||||
continousFailed.set(true);
|
||||
LOG.error("Continuous job failed " + ex.getMessage());
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
});
|
||||
Future backfillJobFuture = service.submit(() -> {
|
||||
try {
|
||||
// trigger backfill atleast after 1 requested entry is added to timline from continuous job. If not, there is a chance that backfill will complete even before
|
||||
// continous job starts.
|
||||
awaitCondition(new GetCommitsAfterInstant(tableBasePath, lastSuccessfulCommit));
|
||||
backfillJob.sync();
|
||||
} catch (Exception ex) {
|
||||
} catch (Throwable ex) {
|
||||
LOG.error("Backfilling job failed " + ex.getMessage());
|
||||
backfillFailed.set(true);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
});
|
||||
@@ -345,10 +357,48 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
*/
|
||||
if (expectConflict && e.getCause().getMessage().contains(ConcurrentModificationException.class.getName())) {
|
||||
// expected ConcurrentModificationException since ingestion & backfill will have overlapping writes
|
||||
if (backfillFailed.get()) {
|
||||
// if backfill job failed, shutdown the continuous job.
|
||||
LOG.warn("Calling shutdown on ingestion job since the backfill job has failed for " + jobId);
|
||||
ingestionJob.shutdownGracefully();
|
||||
}
|
||||
} else {
|
||||
LOG.error("Conflict happened, but not expected " + e.getCause().getMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class GetCommitsAfterInstant {
|
||||
|
||||
String basePath;
|
||||
String lastSuccessfulCommit;
|
||||
HoodieTableMetaClient meta;
|
||||
GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) {
|
||||
this.basePath = basePath;
|
||||
this.lastSuccessfulCommit = lastSuccessfulCommit;
|
||||
meta = HoodieTableMetaClient.builder().setConf(fs().getConf()).setBasePath(basePath).build();
|
||||
}
|
||||
|
||||
long getCommitsAfterInstant() {
|
||||
HoodieTimeline timeline1 = meta.reloadActiveTimeline().getAllCommitsTimeline().findInstantsAfter(lastSuccessfulCommit);
|
||||
// LOG.info("Timeline Instants=" + meta1.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
||||
return timeline1.getInstants().count();
|
||||
}
|
||||
}
|
||||
|
||||
private static void awaitCondition(GetCommitsAfterInstant callback) throws InterruptedException {
|
||||
long startTime = System.currentTimeMillis();
|
||||
long soFar = 0;
|
||||
while (soFar <= 5000) {
|
||||
if (callback.getCommitsAfterInstant() > 0) {
|
||||
break;
|
||||
} else {
|
||||
Thread.sleep(500);
|
||||
soFar += 500;
|
||||
}
|
||||
}
|
||||
LOG.warn("Awaiting completed in " + (System.currentTimeMillis() - startTime));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user