[HUDI-3330] Remove fixture test tables for multi writer tests (#4704)
This commit is contained in:
@@ -1,81 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hudi.common.testutils;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.zip.ZipEntry;
|
|
||||||
import java.util.zip.ZipInputStream;
|
|
||||||
|
|
||||||
public final class FixtureUtils {
|
|
||||||
|
|
||||||
public static Path prepareFixtureTable(URL fixtureResource, Path basePath) throws IOException {
|
|
||||||
File zippedFixtureTable = new File(fixtureResource.getFile());
|
|
||||||
try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zippedFixtureTable))) {
|
|
||||||
byte[] buffer = new byte[1024];
|
|
||||||
ZipEntry zipEntry = zis.getNextEntry();
|
|
||||||
Path tableBasePath = basePath.resolve(Objects.requireNonNull(zipEntry).getName()
|
|
||||||
.replaceAll(File.separator + "$", ""));
|
|
||||||
while (zipEntry != null) {
|
|
||||||
File newFile = newFile(basePath.toFile(), zipEntry);
|
|
||||||
if (zipEntry.isDirectory()) {
|
|
||||||
if (!newFile.isDirectory() && !newFile.mkdirs()) {
|
|
||||||
throw new IOException("Failed to create directory " + newFile);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// fix for Windows-created archives
|
|
||||||
File parent = newFile.getParentFile();
|
|
||||||
if (!parent.isDirectory() && !parent.mkdirs()) {
|
|
||||||
throw new IOException("Failed to create directory " + parent);
|
|
||||||
}
|
|
||||||
|
|
||||||
// write file content
|
|
||||||
try (FileOutputStream fos = new FileOutputStream(newFile)) {
|
|
||||||
int len;
|
|
||||||
while ((len = zis.read(buffer)) > 0) {
|
|
||||||
fos.write(buffer, 0, len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
zipEntry = zis.getNextEntry();
|
|
||||||
}
|
|
||||||
zis.closeEntry();
|
|
||||||
return tableBasePath;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static File newFile(File destinationDir, ZipEntry zipEntry) throws IOException {
|
|
||||||
File destFile = new File(destinationDir, zipEntry.getName());
|
|
||||||
|
|
||||||
String destDirPath = destinationDir.getCanonicalPath();
|
|
||||||
String destFilePath = destFile.getCanonicalPath();
|
|
||||||
|
|
||||||
if (!destFilePath.startsWith(destDirPath + File.separator)) {
|
|
||||||
throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
return destFile;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Tag;
|
import org.junit.jupiter.api.Tag;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.EnumSource;
|
import org.junit.jupiter.params.provider.EnumSource;
|
||||||
@@ -48,15 +49,12 @@ import java.nio.file.Paths;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.ConcurrentModificationException;
|
import java.util.ConcurrentModificationException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable;
|
|
||||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
|
||||||
import static org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE;
|
import static org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE;
|
||||||
import static org.apache.hudi.config.HoodieWriteConfig.BULK_INSERT_SORT_MODE;
|
import static org.apache.hudi.config.HoodieWriteConfig.BULK_INSERT_SORT_MODE;
|
||||||
import static org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE;
|
import static org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE;
|
||||||
@@ -68,31 +66,50 @@ import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.a
|
|||||||
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
|
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
|
||||||
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
|
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
|
||||||
import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
|
import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
|
||||||
import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.DEFAULT_PARTITION_NUM;
|
|
||||||
import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.dataGeneratorMap;
|
|
||||||
import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.initDataGen;
|
|
||||||
|
|
||||||
@Tag("functional")
|
@Tag("functional")
|
||||||
public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness {
|
public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness {
|
||||||
|
|
||||||
private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE";
|
|
||||||
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
|
private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
|
||||||
|
|
||||||
String basePath;
|
String basePath;
|
||||||
String propsFilePath;
|
String propsFilePath;
|
||||||
String tableBasePath;
|
String tableBasePath;
|
||||||
int totalRecords;
|
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(HoodieTableType.class)
|
@EnumSource(HoodieTableType.class)
|
||||||
void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception {
|
void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception {
|
||||||
// NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
// NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
||||||
setUpTestTable(tableType);
|
basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString();
|
||||||
|
propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
|
||||||
|
tableBasePath = basePath + "/testtable_" + tableType;
|
||||||
prepareInitialConfigs(fs(), basePath, "foo");
|
prepareInitialConfigs(fs(), basePath, "foo");
|
||||||
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||||
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
|
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
|
||||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
|
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
|
||||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
||||||
|
// Keep it higher than batch-size to test continuous mode
|
||||||
|
int totalRecords = 3000;
|
||||||
|
|
||||||
|
HoodieDeltaStreamer.Config prepJobConfig = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
||||||
|
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
|
||||||
|
prepJobConfig.continuousMode = true;
|
||||||
|
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||||
|
prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
||||||
|
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
|
||||||
|
|
||||||
|
// Prepare base dataset with some commits
|
||||||
|
deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
|
||||||
|
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
|
||||||
|
} else {
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
|
||||||
|
}
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
||||||
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
|
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
|
||||||
@@ -125,12 +142,36 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
|||||||
@EnumSource(HoodieTableType.class)
|
@EnumSource(HoodieTableType.class)
|
||||||
void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception {
|
void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception {
|
||||||
// NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
// NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
||||||
setUpTestTable(tableType);
|
basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString();
|
||||||
|
propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
|
||||||
|
tableBasePath = basePath + "/testtable_" + tableType;
|
||||||
prepareInitialConfigs(fs(), basePath, "foo");
|
prepareInitialConfigs(fs(), basePath, "foo");
|
||||||
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||||
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
|
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
|
||||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
|
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
|
||||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
||||||
|
// Keep it higher than batch-size to test continuous mode
|
||||||
|
int totalRecords = 3000;
|
||||||
|
|
||||||
|
HoodieDeltaStreamer.Config prepJobConfig = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
||||||
|
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
|
||||||
|
prepJobConfig.continuousMode = true;
|
||||||
|
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||||
|
prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
||||||
|
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
|
||||||
|
|
||||||
|
// Prepare base dataset with some commits
|
||||||
|
deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
|
||||||
|
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
|
||||||
|
} else {
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
|
||||||
|
}
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
|
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
|
||||||
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||||
@@ -164,26 +205,41 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
|||||||
cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false, "batch2");
|
cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false, "batch2");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Disabled
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
|
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
|
||||||
public void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
|
void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
|
||||||
testCheckpointCarryOver(tableType);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void testCheckpointCarryOver(HoodieTableType tableType) throws Exception {
|
|
||||||
// NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
// NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
||||||
setUpTestTable(tableType);
|
basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString();
|
||||||
|
propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
|
||||||
|
tableBasePath = basePath + "/testtable_" + tableType;
|
||||||
prepareInitialConfigs(fs(), basePath, "foo");
|
prepareInitialConfigs(fs(), basePath, "foo");
|
||||||
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||||
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
|
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
|
||||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
|
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
|
||||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
||||||
|
// Keep it higher than batch-size to test continuous mode
|
||||||
|
int totalRecords = 3000;
|
||||||
|
|
||||||
HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
HoodieDeltaStreamer.Config prepJobConfig = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
||||||
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
|
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
|
||||||
cfgIngestionJob.continuousMode = true;
|
prepJobConfig.continuousMode = true;
|
||||||
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||||
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
|
||||||
|
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
|
||||||
|
|
||||||
|
// Prepare base dataset with some commits
|
||||||
|
deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
|
||||||
|
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
|
||||||
|
} else {
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
|
||||||
|
}
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
|
||||||
|
TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
// create a backfill job with checkpoint from the first instant
|
// create a backfill job with checkpoint from the first instant
|
||||||
HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
||||||
@@ -292,26 +348,6 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
|||||||
return cfg;
|
return cfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Specifically used for {@link TestHoodieDeltaStreamerWithMultiWriter}.
|
|
||||||
*
|
|
||||||
* The fixture test tables have random records generated by
|
|
||||||
* {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator} using
|
|
||||||
* {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator#TRIP_EXAMPLE_SCHEMA}.
|
|
||||||
*
|
|
||||||
* The COW fixture test table has 3000 unique records in 7 commits.
|
|
||||||
* The MOR fixture test table has 3000 unique records in 9 deltacommits and 1 compaction commit.
|
|
||||||
*/
|
|
||||||
private void setUpTestTable(HoodieTableType tableType) throws IOException {
|
|
||||||
basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString();
|
|
||||||
propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
|
|
||||||
String fixtureName = String.format("fixtures/testUpsertsContinuousModeWithMultipleWriters.%s.zip", tableType.name());
|
|
||||||
tableBasePath = prepareFixtureTable(Objects.requireNonNull(getClass()
|
|
||||||
.getClassLoader().getResource(fixtureName)), Paths.get(basePath)).toString();
|
|
||||||
initDataGen(sqlContext(), tableBasePath + "/*/*.parquet", DEFAULT_PARTITION_NUM);
|
|
||||||
totalRecords = dataGeneratorMap.get(DEFAULT_PARTITION_NUM).getNumExistingKeys(TRIP_EXAMPLE_SCHEMA);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
|
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
|
||||||
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
|
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
|
||||||
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception {
|
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception {
|
||||||
|
|||||||
Binary file not shown.
Reference in New Issue
Block a user