1
0

[HUDI-2077] Fix TestHoodieDeltaStreamerWithMultiWriter (#3849)

Remove the logic of using deltastreamer to prep test table. Use fixture (compressed test table) instead.
This commit is contained in:
Raymond Xu
2021-10-24 21:14:39 -07:00
committed by GitHub
parent 91845e241d
commit d8560377c3
10 changed files with 178 additions and 65 deletions

View File

@@ -176,8 +176,14 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
} }
} }
/**
* To clean up Spark resources after all testcases have run in functional tests.
*
* Spark session and contexts were reused for testcases in the same test class. Some
* testcase may invoke this specifically to clean up in case of repeated test runs.
*/
@AfterAll @AfterAll
public static synchronized void cleanUpAfterAll() { public static synchronized void resetSpark() {
if (spark != null) { if (spark != null) {
spark.close(); spark.close();
spark = null; spark = null;

View File

@@ -39,6 +39,8 @@ public interface SparkProvider extends org.apache.hudi.testutils.providers.Hoodi
SparkConf sparkConf = new SparkConf(); SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.app.name", getClass().getName()); sparkConf.set("spark.app.name", getClass().getName());
sparkConf.set("spark.master", "local[*]"); sparkConf.set("spark.master", "local[*]");
sparkConf.set("spark.default.parallelism", "4");
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark.driver.maxResultSize", "2g"); sparkConf.set("spark.driver.maxResultSize", "2g");
sparkConf.set("spark.hadoop.mapred.output.compress", "true"); sparkConf.set("spark.hadoop.mapred.output.compress", "true");
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
@@ -52,4 +54,4 @@ public interface SparkProvider extends org.apache.hudi.testutils.providers.Hoodi
default SparkConf conf() { default SparkConf conf() {
return conf(Collections.emptyMap()); return conf(Collections.emptyMap());
} }
} }

View File

@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.testutils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.util.Objects;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public final class FixtureUtils {
public static Path prepareFixtureTable(URL fixtureResource, Path basePath) throws IOException {
File zippedFixtureTable = new File(fixtureResource.getFile());
try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zippedFixtureTable))) {
byte[] buffer = new byte[1024];
ZipEntry zipEntry = zis.getNextEntry();
Path tableBasePath = basePath.resolve(Objects.requireNonNull(zipEntry).getName()
.replaceAll(File.separator + "$", ""));
while (zipEntry != null) {
File newFile = newFile(basePath.toFile(), zipEntry);
if (zipEntry.isDirectory()) {
if (!newFile.isDirectory() && !newFile.mkdirs()) {
throw new IOException("Failed to create directory " + newFile);
}
} else {
// fix for Windows-created archives
File parent = newFile.getParentFile();
if (!parent.isDirectory() && !parent.mkdirs()) {
throw new IOException("Failed to create directory " + parent);
}
// write file content
try (FileOutputStream fos = new FileOutputStream(newFile)) {
int len;
while ((len = zis.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
}
}
zipEntry = zis.getNextEntry();
}
zis.closeEntry();
return tableBasePath;
}
}
public static File newFile(File destinationDir, ZipEntry zipEntry) throws IOException {
File destFile = new File(destinationDir, zipEntry.getName());
String destDirPath = destinationDir.getCanonicalPath();
String destFilePath = destFile.getCanonicalPath();
if (!destFilePath.startsWith(destDirPath + File.separator)) {
throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
}
return destFile;
}
}

View File

@@ -160,6 +160,7 @@ public class HoodieTestDataGenerator {
this.existingKeysBySchema = new HashMap<>(); this.existingKeysBySchema = new HashMap<>();
existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap); existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
numKeysBySchema = new HashMap<>(); numKeysBySchema = new HashMap<>();
numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size());
} }
/** /**
@@ -844,8 +845,8 @@ public class HoodieTestDataGenerator {
public static class KeyPartition implements Serializable { public static class KeyPartition implements Serializable {
HoodieKey key; public HoodieKey key;
String partitionPath; public String partitionPath;
} }
public void close() { public void close() {

View File

@@ -239,32 +239,32 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
} }
static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) { static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) {
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
sqlContext.clearCache(); sqlContext.clearCache();
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
assertEquals(expected, recordCount); assertEquals(expected, recordCount);
} }
static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) { static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
sqlContext.clearCache();
List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tablePath) List<Row> rows = sqlContext.read().format("org.apache.hudi").load(tablePath)
.groupBy("_hoodie_commit_time").count() .groupBy("_hoodie_commit_time").count()
.sort("_hoodie_commit_time").collectAsList(); .sort("_hoodie_commit_time").collectAsList();
sqlContext.clearCache();
return rows; return rows;
} }
static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) { static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) {
sqlContext.clearCache();
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
long recordCount = long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count(); sqlContext.sql("select * from tmp_trips where haversine_distance is not NULL").count();
sqlContext.clearCache();
assertEquals(expected, recordCount); assertEquals(expected, recordCount);
} }
static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) { static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) {
sqlContext.clearCache();
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
long recordCount = long recordCount =
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count(); sqlContext.sql("select * from tmp_trips where haversine_distance = 1.0").count();
sqlContext.clearCache();
assertEquals(expected, recordCount); assertEquals(expected, recordCount);
} }

View File

@@ -27,7 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.TestDataSource;
@@ -41,60 +41,60 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Collections; import java.util.Collections;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.Function; import java.util.function.Function;
import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE;
import static org.apache.hudi.config.HoodieWriteConfig.BULK_INSERT_SORT_MODE;
import static org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE;
import static org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE;
import static org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner; import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.DEFAULT_PARTITION_NUM;
import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.dataGeneratorMap;
import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.initDataGen;
@Tag("functional") @Tag("functional")
public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness { public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness {
String basePath;
String propsFilePath;
String tableBasePath;
int totalRecords;
@ParameterizedTest @ParameterizedTest
@EnumSource(HoodieTableType.class) @EnumSource(HoodieTableType.class)
void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception { void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
final String basePath = basePath().replaceAll("/$", ""); setUpTestTable(tableType);
final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
final String tableBasePath = basePath + "/testtable_" + tableType;
prepareInitialConfigs(fs(), basePath, "foo"); prepareInitialConfigs(fs(), basePath, "foo");
// enable carrying forward latest checkpoint // enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"3"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"5000"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgIngestionJob.continuousMode = true; cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
} else {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
}
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
return true;
});
// create a backfill job // create a backfill job
HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
@@ -152,37 +152,19 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"}) @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception { void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
final String basePath = basePath().replaceAll("/$", ""); setUpTestTable(tableType);
final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
final String tableBasePath = basePath + "/testtable_" + tableType;
prepareInitialConfigs(fs(), basePath, "foo"); prepareInitialConfigs(fs(), basePath, "foo");
// enable carrying forward latest checkpoint // enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgIngestionJob.continuousMode = true; cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc());
// Prepare base dataset with some commits
deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs());
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs());
} else {
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs());
}
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext());
return true;
});
// create a backfill job with checkpoint from the first instant // create a backfill job with checkpoint from the first instant
HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
@@ -245,6 +227,11 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
props.setProperty("hoodie.write.lock.num_retries", "10"); props.setProperty("hoodie.write.lock.num_retries", "10");
props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table"); props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test"); props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test");
props.setProperty(INSERT_PARALLELISM_VALUE.key(), "4");
props.setProperty(UPSERT_PARALLELISM_VALUE.key(), "4");
props.setProperty(BULKINSERT_PARALLELISM_VALUE.key(), "4");
props.setProperty(FINALIZE_WRITE_PARALLELISM_VALUE.key(), "4");
props.setProperty(BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name());
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath);
return props; return props;
@@ -264,16 +251,29 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
cfg.propsFilePath = propsFilePath; cfg.propsFilePath = propsFilePath;
cfg.sourceLimit = 1000; cfg.sourceLimit = 1000;
cfg.schemaProviderClassName = defaultSchemaProviderClassName; cfg.schemaProviderClassName = defaultSchemaProviderClassName;
cfg.deltaSyncSchedulingWeight = 1;
cfg.deltaSyncSchedulingMinShare = 1;
cfg.compactSchedulingWeight = 2;
cfg.compactSchedulingMinShare = 1;
cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 10));
cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), 10));
cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 10));
return cfg; return cfg;
} }
/**
* Specifically used for {@link TestHoodieDeltaStreamerWithMultiWriter}.
*
* The fixture test tables have random records generated by
* {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator} using
* {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator#TRIP_EXAMPLE_SCHEMA}.
*
* The COW fixture test table has 3000 unique records in 7 commits.
* The MOR fixture test table has 3000 unique records in 9 deltacommits and 1 compaction commit.
*/
private void setUpTestTable(HoodieTableType tableType) throws IOException {
basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString();
propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
String fixtureName = String.format("fixtures/testUpsertsContinuousModeWithMultipleWriters.%s.zip", tableType.name());
tableBasePath = prepareFixtureTable(Objects.requireNonNull(getClass()
.getClassLoader().getResource(fixtureName)), Paths.get(basePath)).toString();
initDataGen(sqlContext(), tableBasePath + "/*/*.parquet", DEFAULT_PARTITION_NUM);
totalRecords = dataGeneratorMap.get(DEFAULT_PARTITION_NUM).getNumExistingKeys(TRIP_EXAMPLE_SCHEMA);
}
private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords,
HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob,
HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception {

View File

@@ -32,7 +32,6 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
@@ -72,11 +71,11 @@ public class TestJdbcbasedSchemaProvider extends SparkClientFunctionalTestHarnes
* Initialize the H2 database and obtain a connection, then create a table as a test. * Initialize the H2 database and obtain a connection, then create a table as a test.
* Based on the characteristics of the H2 in-memory database, we do not need to display the initialized database. * Based on the characteristics of the H2 in-memory database, we do not need to display the initialized database.
* @throws SQLException * @throws SQLException
* @throws IOException
*/ */
private void initH2Database() throws SQLException, IOException { private void initH2Database() throws SQLException {
Connection conn = DriverManager.getConnection("jdbc:h2:mem:test_mem", "sa", ""); try (Connection conn = DriverManager.getConnection("jdbc:h2:mem:test_mem", "sa", "")) {
PreparedStatement ps = conn.prepareStatement(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/triprec.sql")); PreparedStatement ps = conn.prepareStatement(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/triprec.sql"));
ps.executeUpdate(); ps.executeUpdate();
}
} }
} }

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.testutils.sources; package org.apache.hudi.utilities.testutils.sources;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.testutils.RawTripTestPayload;
@@ -32,12 +33,18 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
public abstract class AbstractBaseTestSource extends AvroSource { public abstract class AbstractBaseTestSource extends AvroSource {
@@ -47,7 +54,7 @@ public abstract class AbstractBaseTestSource extends AvroSource {
public static final int DEFAULT_PARTITION_NUM = 0; public static final int DEFAULT_PARTITION_NUM = 0;
// Static instance, helps with reuse across a test. // Static instance, helps with reuse across a test.
protected static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>(); public static transient Map<Integer, HoodieTestDataGenerator> dataGeneratorMap = new HashMap<>();
public static void initDataGen() { public static void initDataGen() {
dataGeneratorMap.putIfAbsent(DEFAULT_PARTITION_NUM, dataGeneratorMap.putIfAbsent(DEFAULT_PARTITION_NUM,
@@ -68,6 +75,23 @@ public abstract class AbstractBaseTestSource extends AvroSource {
} }
} }
public static void initDataGen(SQLContext sqlContext, String globParquetPath, int partition) {
List<Row> rows = sqlContext.read().format("hudi").load(globParquetPath)
.select("_hoodie_record_key", "_hoodie_partition_path")
.collectAsList();
Map<Integer, HoodieTestDataGenerator.KeyPartition> keyPartitionMap = IntStream
.range(0, rows.size()).boxed()
.collect(Collectors.toMap(Function.identity(), i -> {
Row r = rows.get(i);
HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition();
kp.key = new HoodieKey(r.getString(0), r.getString(1));
kp.partitionPath = r.getString(1);
return kp;
}));
dataGeneratorMap.put(partition,
new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, keyPartitionMap));
}
public static void resetDataGen() { public static void resetDataGen() {
for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) { for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) {
dataGenerator.close(); dataGenerator.close();