diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 74ab52daa..aca1d83d4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -176,8 +176,14 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe } } + /** + * To clean up Spark resources after all testcases have run in functional tests. + * + * Spark session and contexts were reused for testcases in the same test class. Some + * testcase may invoke this specifically to clean up in case of repeated test runs. + */ @AfterAll - public static synchronized void cleanUpAfterAll() { + public static synchronized void resetSpark() { if (spark != null) { spark.close(); spark = null; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java index be15dc85d..92b1f76ac 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java @@ -39,6 +39,8 @@ public interface SparkProvider extends org.apache.hudi.testutils.providers.Hoodi SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.app.name", getClass().getName()); sparkConf.set("spark.master", "local[*]"); + sparkConf.set("spark.default.parallelism", "4"); + sparkConf.set("spark.sql.shuffle.partitions", "4"); sparkConf.set("spark.driver.maxResultSize", "2g"); sparkConf.set("spark.hadoop.mapred.output.compress", "true"); sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true"); @@ -52,4 +54,4 @@ public interface SparkProvider extends org.apache.hudi.testutils.providers.Hoodi default SparkConf conf() { return conf(Collections.emptyMap()); } -} \ No newline at end of file +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java new file mode 100644 index 000000000..6dfe0da79 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Objects; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public final class FixtureUtils { + + public static Path prepareFixtureTable(URL fixtureResource, Path basePath) throws IOException { + File zippedFixtureTable = new File(fixtureResource.getFile()); + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zippedFixtureTable))) { + byte[] buffer = new byte[1024]; + ZipEntry zipEntry = zis.getNextEntry(); + Path tableBasePath = basePath.resolve(Objects.requireNonNull(zipEntry).getName() + .replaceAll(File.separator + "$", "")); + while (zipEntry != null) { + File newFile = newFile(basePath.toFile(), zipEntry); + if (zipEntry.isDirectory()) { + if (!newFile.isDirectory() && !newFile.mkdirs()) { + throw new IOException("Failed to create directory " + newFile); + } + } else { + // fix for Windows-created archives + File parent = newFile.getParentFile(); + if (!parent.isDirectory() && !parent.mkdirs()) { + throw new IOException("Failed to create directory " + parent); + } + + // write file content + try (FileOutputStream fos = new FileOutputStream(newFile)) { + int len; + while ((len = zis.read(buffer)) > 0) { + fos.write(buffer, 0, len); + } + } + } + zipEntry = zis.getNextEntry(); + } + zis.closeEntry(); + return tableBasePath; + } + } + + public static File newFile(File destinationDir, ZipEntry zipEntry) throws IOException { + File destFile = new File(destinationDir, zipEntry.getName()); + + String destDirPath = destinationDir.getCanonicalPath(); + String destFilePath = destFile.getCanonicalPath(); + + if (!destFilePath.startsWith(destDirPath + File.separator)) { + throw new IOException("Entry is outside of the target dir: " + zipEntry.getName()); + } + + return destFile; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 86ea1f036..e988c9df6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -160,6 +160,7 @@ public class HoodieTestDataGenerator { this.existingKeysBySchema = new HashMap<>(); existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap); numKeysBySchema = new HashMap<>(); + numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap.size()); } /** @@ -844,8 +845,8 @@ public class HoodieTestDataGenerator { public static class KeyPartition implements Serializable { - HoodieKey key; - String partitionPath; + public HoodieKey key; + public String partitionPath; } public void close() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index f49c14899..86c92f240 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -239,32 +239,32 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) { - long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count(); sqlContext.clearCache(); + long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count(); assertEquals(expected, recordCount); } static List countsPerCommit(String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); List rows = sqlContext.read().format("org.apache.hudi").load(tablePath) .groupBy("_hoodie_commit_time").count() .sort("_hoodie_commit_time").collectAsList(); - sqlContext.clearCache(); return rows; } static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); long recordCount = - sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count(); - sqlContext.clearCache(); + sqlContext.sql("select * from tmp_trips where haversine_distance is not NULL").count(); assertEquals(expected, recordCount); } static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) { + sqlContext.clearCache(); sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips"); long recordCount = - sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count(); - sqlContext.clearCache(); + sqlContext.sql("select * from tmp_trips where haversine_distance = 1.0").count(); assertEquals(expected, recordCount); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index c93b7d998..3cdf5f902 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -27,7 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.sources.TestDataSource; @@ -41,60 +41,60 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; +import java.net.URI; +import java.nio.file.Paths; import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.function.Function; +import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE; +import static org.apache.hudi.config.HoodieWriteConfig.BULK_INSERT_SORT_MODE; +import static org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE; +import static org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE; +import static org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs; import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner; +import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.DEFAULT_PARTITION_NUM; +import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.dataGeneratorMap; +import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.initDataGen; @Tag("functional") public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness { + String basePath; + String propsFilePath; + String tableBasePath; + int totalRecords; + @ParameterizedTest @EnumSource(HoodieTableType.class) void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - final String basePath = basePath().replaceAll("/$", ""); - final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; - final String tableBasePath = basePath + "/testtable_" + tableType; + setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); // enable carrying forward latest checkpoint TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,"3"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"5000"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); - // Keep it higher than batch-size to test continuous mode - int totalRecords = 3000; HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); cfgIngestionJob.continuousMode = true; cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); - - // Prepare base dataset with some commits - deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> { - if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); - } else { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); - } - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); - TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); - return true; - }); // create a backfill job HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, @@ -152,37 +152,19 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"}) void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - final String basePath = basePath().replaceAll("/$", ""); - final String propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; - final String tableBasePath = basePath + "/testtable_" + tableType; + setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); // enable carrying forward latest checkpoint TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); - // Keep it higher than batch-size to test continuous mode - int totalRecords = 3000; HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); cfgIngestionJob.continuousMode = true; cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); - HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc()); - - // Prepare base dataset with some commits - deltaStreamerTestRunner(ingestionJob, cfgIngestionJob, (r) -> { - if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); - } else { - TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); - } - TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); - TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); - return true; - }); // create a backfill job with checkpoint from the first instant HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, @@ -245,6 +227,11 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona props.setProperty("hoodie.write.lock.num_retries", "10"); props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table"); props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test"); + props.setProperty(INSERT_PARALLELISM_VALUE.key(), "4"); + props.setProperty(UPSERT_PARALLELISM_VALUE.key(), "4"); + props.setProperty(BULKINSERT_PARALLELISM_VALUE.key(), "4"); + props.setProperty(FINALIZE_WRITE_PARALLELISM_VALUE.key(), "4"); + props.setProperty(BULK_INSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name()); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath); return props; @@ -264,16 +251,29 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona cfg.propsFilePath = propsFilePath; cfg.sourceLimit = 1000; cfg.schemaProviderClassName = defaultSchemaProviderClassName; - cfg.deltaSyncSchedulingWeight = 1; - cfg.deltaSyncSchedulingMinShare = 1; - cfg.compactSchedulingWeight = 2; - cfg.compactSchedulingMinShare = 1; - cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 10)); - cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), 10)); - cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 10)); return cfg; } + /** + * Specifically used for {@link TestHoodieDeltaStreamerWithMultiWriter}. + * + * The fixture test tables have random records generated by + * {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator} using + * {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator#TRIP_EXAMPLE_SCHEMA}. + * + * The COW fixture test table has 3000 unique records in 7 commits. + * The MOR fixture test table has 3000 unique records in 9 deltacommits and 1 compaction commit. + */ + private void setUpTestTable(HoodieTableType tableType) throws IOException { + basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString(); + propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; + String fixtureName = String.format("fixtures/testUpsertsContinuousModeWithMultipleWriters.%s.zip", tableType.name()); + tableBasePath = prepareFixtureTable(Objects.requireNonNull(getClass() + .getClassLoader().getResource(fixtureName)), Paths.get(basePath)).toString(); + initDataGen(sqlContext(), tableBasePath + "/*/*.parquet", DEFAULT_PARTITION_NUM); + totalRecords = dataGeneratorMap.get(DEFAULT_PARTITION_NUM).getNumExistingKeys(TRIP_EXAMPLE_SCHEMA); + } + private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict) throws Exception { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java index 7dd8af689..938f71c10 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java @@ -32,7 +32,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -72,11 +71,11 @@ public class TestJdbcbasedSchemaProvider extends SparkClientFunctionalTestHarnes * Initialize the H2 database and obtain a connection, then create a table as a test. * Based on the characteristics of the H2 in-memory database, we do not need to display the initialized database. * @throws SQLException - * @throws IOException */ - private void initH2Database() throws SQLException, IOException { - Connection conn = DriverManager.getConnection("jdbc:h2:mem:test_mem", "sa", ""); - PreparedStatement ps = conn.prepareStatement(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/triprec.sql")); - ps.executeUpdate(); + private void initH2Database() throws SQLException { + try (Connection conn = DriverManager.getConnection("jdbc:h2:mem:test_mem", "sa", "")) { + PreparedStatement ps = conn.prepareStatement(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/triprec.sql")); + ps.executeUpdate(); + } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java index 524591dd7..5186179c9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.testutils.sources; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.RawTripTestPayload; @@ -32,12 +33,18 @@ import org.apache.avro.generic.GenericRecord; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import java.io.File; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; public abstract class AbstractBaseTestSource extends AvroSource { @@ -47,7 +54,7 @@ public abstract class AbstractBaseTestSource extends AvroSource { public static final int DEFAULT_PARTITION_NUM = 0; // Static instance, helps with reuse across a test. - protected static transient Map dataGeneratorMap = new HashMap<>(); + public static transient Map dataGeneratorMap = new HashMap<>(); public static void initDataGen() { dataGeneratorMap.putIfAbsent(DEFAULT_PARTITION_NUM, @@ -68,6 +75,23 @@ public abstract class AbstractBaseTestSource extends AvroSource { } } + public static void initDataGen(SQLContext sqlContext, String globParquetPath, int partition) { + List rows = sqlContext.read().format("hudi").load(globParquetPath) + .select("_hoodie_record_key", "_hoodie_partition_path") + .collectAsList(); + Map keyPartitionMap = IntStream + .range(0, rows.size()).boxed() + .collect(Collectors.toMap(Function.identity(), i -> { + Row r = rows.get(i); + HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition(); + kp.key = new HoodieKey(r.getString(0), r.getString(1)); + kp.partitionPath = r.getString(1); + return kp; + })); + dataGeneratorMap.put(partition, + new HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, keyPartitionMap)); + } + public static void resetDataGen() { for (HoodieTestDataGenerator dataGenerator : dataGeneratorMap.values()) { dataGenerator.close(); diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip new file mode 100644 index 000000000..48bf278bd Binary files /dev/null and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip differ diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip new file mode 100644 index 000000000..657f83c2d Binary files /dev/null and b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip differ