From 10e426879237b636f8b5913ac931dd7f8af8b817 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Sat, 1 Aug 2020 05:57:18 -0700 Subject: [PATCH] [HUDI-995] Use Transformations, Assertions and SchemaTestUtil (#1884) - Consolidate transform functions for tests in Transformations.java - Consolidate assertion functions for tests in Assertions.java - Make use of SchemaTestUtil for loading schema from resource --- .../TestHoodieClientOnCopyOnWriteStorage.java | 25 +++--- .../client/TestUpdateSchemaEvolution.java | 9 ++- .../apache/hudi/index/TestHoodieIndex.java | 19 ++--- .../index/bloom/TestHoodieBloomIndex.java | 33 ++++---- .../bloom/TestHoodieGlobalBloomIndex.java | 29 +++---- .../table/TestHoodieMergeOnReadTable.java | 3 +- .../commit/TestCopyOnWriteActionExecutor.java | 27 +++---- .../action/commit/TestUpsertPartitioner.java | 9 ++- .../hudi/testutils/HoodieClientTestUtils.java | 44 ----------- .../timeline/TestHoodieActiveTimeline.java | 35 +++++---- .../hudi/common/testutils/Assertions.java | 41 ++++++++++ .../common/testutils/HoodieTestUtils.java | 11 --- .../common/testutils/RawTripTestPayload.java | 20 +++++ .../hudi/common/testutils/SchemaTestUtil.java | 25 ++++-- .../common/testutils/Transformations.java | 75 ++++++++++++++++++ .../hadoop/TestHoodieParquetInputFormat.java | 7 +- .../hadoop/testutils/InputFormatTestUtil.java | 4 - hudi-spark/src/test/java/HoodieJavaApp.java | 16 ++-- .../src/test/java/HoodieJavaGenerateApp.java | 14 ++-- .../src/test/java/HoodieJavaStreamingApp.java | 7 +- .../org/apache/hudi/TestDataSourceUtils.java | 28 +++++-- .../hudi/testutils/DataSourceTestUtils.java | 76 ------------------- .../hudi/functional/TestDataSource.scala | 22 +++--- 23 files changed, 302 insertions(+), 277 deletions(-) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/testutils/Assertions.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/testutils/Transformations.java delete mode 100644 hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index cb3bdf999..b1d1aa8c2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -83,6 +83,8 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_S import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.NULL_SCHEMA; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys; +import static org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet; import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -634,7 +636,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { String commitTime1 = "001"; client.startCommitWithTime(commitTime1); List inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb - Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); + Set keys1 = recordsToRecordKeySet(inserts1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect(); @@ -651,7 +653,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { String commitTime2 = "002"; client.startCommitWithTime(commitTime2); List inserts2 = dataGen.generateInserts(commitTime2, 40); - Set keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); + Set keys2 = recordsToRecordKeySet(inserts2); List insertsAndUpdates2 = new ArrayList<>(); insertsAndUpdates2.addAll(inserts2); insertsAndUpdates2.addAll(dataGen.generateUpdates(commitTime2, inserts1)); @@ -678,7 +680,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { String commitTime3 = "003"; client.startCommitWithTime(commitTime3); List insertsAndUpdates3 = dataGen.generateInserts(commitTime3, 200); - Set keys3 = HoodieClientTestUtils.getRecordKeys(insertsAndUpdates3); + Set keys3 = recordsToRecordKeySet(insertsAndUpdates3); List updates3 = dataGen.generateUpdates(commitTime3, inserts2); insertsAndUpdates3.addAll(updates3); @@ -745,7 +747,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { String commitTime1 = "001"; client.startCommitWithTime(commitTime1); List inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb - Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); + Set keys1 = recordsToRecordKeySet(inserts1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses = client.insert(insertRecordsRDD1, commitTime1).collect(); @@ -762,7 +764,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { String commitTime2 = "002"; client.startCommitWithTime(commitTime2); List inserts2 = dataGen.generateInserts(commitTime2, 40); - Set keys2 = HoodieClientTestUtils.getRecordKeys(inserts2); + Set keys2 = recordsToRecordKeySet(inserts2); JavaRDD insertRecordsRDD2 = jsc.parallelize(inserts2, 1); statuses = client.insert(insertRecordsRDD2, commitTime2).collect(); assertNoWriteErrors(statuses); @@ -826,7 +828,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { String commitTime1 = "001"; client.startCommitWithTime(commitTime1); List inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb - Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); + Set keys1 = recordsToRecordKeySet(inserts1); List keysSoFar = new ArrayList<>(keys1); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect(); @@ -858,8 +860,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { client.startCommitWithTime(commitTime6); List dummyInserts3 = dataGen.generateInserts(commitTime6, 20); - List hoodieKeysToDelete3 = HoodieClientTestUtils - .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(dummyInserts3), 20); + List hoodieKeysToDelete3 = randomSelectAsHoodieKeys(dummyInserts3, 20); JavaRDD deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1); statuses = client.delete(deleteKeys3, commitTime6).collect(); assertNoWriteErrors(statuses); @@ -884,7 +885,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { throws IOException { client.startCommitWithTime(instantTime); List inserts = dataGen.generateInserts(instantTime, sizeToInsertAndUpdate); - Set keys = HoodieClientTestUtils.getRecordKeys(inserts); + Set keys = recordsToRecordKeySet(inserts); List insertsAndUpdates = new ArrayList<>(); insertsAndUpdates.addAll(inserts); insertsAndUpdates.addAll(dataGen.generateUpdates(instantTime, inserts)); @@ -908,8 +909,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { String existingFile, String instantTime, int exepctedRecords, List keys) { client.startCommitWithTime(instantTime); - List hoodieKeysToDelete = HoodieClientTestUtils - .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(previousRecords), sizeToDelete); + List hoodieKeysToDelete = randomSelectAsHoodieKeys(previousRecords, sizeToDelete); JavaRDD deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1); List statuses = client.delete(deleteKeys, instantTime).collect(); @@ -958,8 +958,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { client.startCommitWithTime(commitTime1); List dummyInserts = dataGen.generateInserts(commitTime1, 20); - List hoodieKeysToDelete = HoodieClientTestUtils - .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(dummyInserts), 20); + List hoodieKeysToDelete = randomSelectAsHoodieKeys(dummyInserts, 20); JavaRDD deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1); assertThrows(HoodieIOException.class, () -> { client.delete(deleteKeys, commitTime1).collect(); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index db350694d..c6806b46a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.HoodieCreateHandle; @@ -33,6 +32,7 @@ import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -135,8 +136,8 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { }).collect().size()); } - private HoodieWriteConfig makeHoodieClientConfig(String schema) throws Exception { - String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream(schema)); - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr).build(); + private HoodieWriteConfig makeHoodieClientConfig(String name) { + Schema schema = getSchemaFromResource(getClass(), name); + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schema.toString()).build(); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index 8ac53dd1f..39c820140 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -18,7 +18,6 @@ package org.apache.hudi.index; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; @@ -31,7 +30,6 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; @@ -64,6 +62,7 @@ import java.util.UUID; import scala.Tuple2; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -71,18 +70,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieIndex extends HoodieClientTestHarness { + private static final Schema SCHEMA = getSchemaFromResource(TestHoodieIndex.class, "/exampleSchema.txt", true); private final Random random = new Random(); private IndexType indexType; private HoodieIndex index; private HoodieWriteConfig config; - private Schema schema; private void setUp(IndexType indexType) throws Exception { this.indexType = indexType; initResources(); - // We have some records to be tagged (two different partitions) - String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); - schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); config = getConfigBuilder() .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType) .build()).withAutoCommit(false).build(); @@ -284,11 +280,11 @@ public class TestHoodieIndex extends HoodieClientTestHarness { // We create three parquet file, each having one record. (two different partitions) String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true); String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true); String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); @@ -380,11 +376,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness { incomingPayloadSamePartition); // We have some records to be tagged (two different partitions) - String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); - Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); - HoodieClientTestUtils - .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false); + .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), SCHEMA, null, false); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index f0a116fd5..f6fd5f461 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -18,7 +18,6 @@ package org.apache.hudi.index.bloom; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; @@ -27,7 +26,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieIndexConfig; @@ -64,6 +62,7 @@ import java.util.stream.Stream; import scala.Tuple2; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -73,9 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHoodieBloomIndex extends HoodieClientTestHarness { - private String schemaStr; - private Schema schema; - + private static final Schema SCHEMA = getSchemaFromResource(TestHoodieBloomIndex.class, "/exampleSchema.txt", true); private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}"; public static Stream configParams() { @@ -90,8 +87,6 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { initPath(); initFileSystem(); // We have some records to be tagged (two different partitions) - schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); - schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); initMetaClient(); } @@ -141,13 +136,13 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", new ArrayList<>(), - schema, null, false); + SCHEMA, null, false); HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", new ArrayList<>(), - schema, null, false); + SCHEMA, null, false); HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", Collections.singletonList(record1), - schema, null, false); + SCHEMA, null, false); HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet", - Arrays.asList(record2, record3, record4), schema, null, false); + Arrays.asList(record2, record3, record4), SCHEMA, null, false); List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -245,7 +240,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); filter.add(record3.getRecordKey()); String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1, record2), - schema, filter, true); + SCHEMA, filter, true); // The bloom filter contains 3 records assertTrue(filter.mightContain(record1.getRecordKey())); @@ -333,11 +328,11 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // We create three parquet file, each having one record. (two different partitions) String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true); String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true); String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); @@ -405,11 +400,11 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // We create three parquet file, each having one record. (two different partitions) String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true); String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true); String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); @@ -455,7 +450,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { BloomFilterTypeCode.SIMPLE.name()); filter.add(record2.getRecordKey()); String filename = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, filter, true); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, filter, true); assertTrue(filter.mightContain(record1.getRecordKey())); assertTrue(filter.mightContain(record2.getRecordKey())); diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index c5f3045a3..3c4600df9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -18,7 +18,6 @@ package org.apache.hudi.index.bloom; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; @@ -26,7 +25,6 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -55,6 +53,7 @@ import java.util.stream.Collectors; import scala.Tuple2; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -64,18 +63,12 @@ import static org.junit.jupiter.api.Assertions.fail; public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { - private Schema schema; - - public TestHoodieGlobalBloomIndex() { - } + private static final Schema SCHEMA = getSchemaFromResource(TestHoodieGlobalBloomIndex.class, "/exampleSchema.txt", true); @BeforeEach public void setUp() throws Exception { initSparkContexts(); initPath(); - // We have some records to be tagged (two different partitions) - String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); - schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); initMetaClient(); } @@ -119,13 +112,13 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", new ArrayList<>(), - schema, null, false); + SCHEMA, null, false); HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", new ArrayList<>(), - schema, null, false); + SCHEMA, null, false); HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", Collections.singletonList(record1), - schema, null, false); + SCHEMA, null, false); HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet", - Arrays.asList(record2, record3, record4), schema, null, false); + Arrays.asList(record2, record3, record4), SCHEMA, null, false); // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up List partitions = Arrays.asList("2016/01/21", "2016/04/01"); @@ -250,13 +243,13 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); String filename0 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1), schema, null, false); + HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1), SCHEMA, null, false); String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", new ArrayList<>(), schema, null, false); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", new ArrayList<>(), SCHEMA, null, false); String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2), schema, null, false); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2), SCHEMA, null, false); String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4), schema, null, false); + HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4), SCHEMA, null, false); // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up metaClient = HoodieTableMetaClient.reload(metaClient); @@ -342,7 +335,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { incomingPayloadSamePartition); HoodieClientTestUtils - .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false); + .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), SCHEMA, null, false); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index d4ac53342..b80eaba2a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -42,6 +42,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.Transformations; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -1431,7 +1432,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { actionExecutor.getUpsertPartitioner(new WorkloadProfile(deleteRDD)); final List> deleteStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator()); - }).map(x -> (List) HoodieClientTestUtils.collectStatuses(x)).collect(); + }).map(Transformations::flatten).collect(); // Verify there are errors because records are from multiple partitions (but handleUpdate is invoked for // specific partition) diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 7ab147096..cc7868467 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.testutils.Transformations; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; @@ -40,9 +40,9 @@ import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestBase; -import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.MetadataMergeWriteStatus; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -68,6 +68,7 @@ import java.util.Map; import java.util.UUID; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords; import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -78,9 +79,10 @@ import static org.mockito.Mockito.when; public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class); + private static final Schema SCHEMA = getSchemaFromResource(TestCopyOnWriteActionExecutor.class, "/exampleSchema.txt"); @Test - public void testMakeNewPath() throws Exception { + public void testMakeNewPath() { String fileName = UUID.randomUUID().toString(); String partitionPath = "2016/05/04"; @@ -102,14 +104,13 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { FSUtils.makeDataFileName(instantTime, newPathWithWriteToken.getRight(), fileName)).toString()); } - private HoodieWriteConfig makeHoodieClientConfig() throws Exception { + private HoodieWriteConfig makeHoodieClientConfig() { return makeHoodieClientConfigBuilder().build(); } - private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws Exception { + private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() { // Prepare the AvroParquetIO - String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr); + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString()); } // TODO (weiy): Add testcases for crossing file writing. @@ -288,7 +289,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { firstCommitTime, jsc.parallelize(records)); List writeStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> { return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator()); - }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect(); + }).flatMap(Transformations::flattenAsIterator).collect(); Map allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(writeStatuses); @@ -331,7 +332,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { instantTime, jsc.parallelize(recs2)); List returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> { return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs2.iterator()); - }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect(); + }).flatMap(Transformations::flattenAsIterator).collect(); // TODO: check the actual files and make sure 11 records, total were written. assertEquals(2, returnedStatuses.size()); @@ -352,7 +353,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { instantTime, jsc.parallelize(recs3)); returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> { return newActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs3.iterator()); - }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect(); + }).flatMap(Transformations::flattenAsIterator).collect(); assertEquals(3, returnedStatuses.size()); expectedPartitionNumRecords.clear(); @@ -384,7 +385,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { instantTime, jsc.parallelize(records)); jsc.parallelize(Arrays.asList(1)) .map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator())) - .map(x -> HoodieClientTestUtils.collectStatuses(x)).collect(); + .map(Transformations::flatten).collect(); // Check the updated file int counts = 0; @@ -410,7 +411,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { instantTime, jsc.parallelize(inserts)); final List> ws = jsc.parallelize(Arrays.asList(1)).map(x -> { return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator()); - }).map(x -> (List) HoodieClientTestUtils.collectStatuses(x)).collect(); + }).map(Transformations::flatten).collect(); WriteStatus writeStatus = ws.get(0).get(0); String fileId = writeStatus.getFileId(); @@ -423,7 +424,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { instantTime, jsc.parallelize(updates)); final List> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> { return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator()); - }).map(x -> (List) HoodieClientTestUtils.collectStatuses(x)).collect(); + }).map(Transformations::flatten).collect(); assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index afde632ac..d8bb946da 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieStorageConfig; @@ -37,6 +36,7 @@ import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.Test; @@ -53,6 +53,7 @@ import java.util.Map; import scala.Tuple2; import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -63,6 +64,7 @@ import static org.mockito.Mockito.when; public class TestUpsertPartitioner extends HoodieClientTestBase { private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class); + private static final Schema SCHEMA = getSchemaFromResource(TestUpsertPartitioner.class, "/exampleSchema.txt"); private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize, String testPartitionPath, boolean autoSplitInserts) throws Exception { @@ -273,9 +275,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { assertEquals(200.0 / 2400, insertBuckets.get(0).weight, 0.01, "First insert bucket should have weight 0.5"); } - private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws Exception { + private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() { // Prepare the AvroParquetIO - String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr); + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString()); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index dddc04d7f..2526d953e 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -22,7 +22,6 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkTaskContextSupplier; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; @@ -30,7 +29,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -65,12 +63,8 @@ import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -80,46 +74,8 @@ import java.util.stream.Collectors; public class HoodieClientTestUtils { private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class); - private static final Random RANDOM = new Random(); public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; - public static List collectStatuses(Iterator> statusListItr) { - List statuses = new ArrayList<>(); - while (statusListItr.hasNext()) { - statuses.addAll(statusListItr.next()); - } - return statuses; - } - - public static Set getRecordKeys(List hoodieRecords) { - Set keys = new HashSet<>(); - for (HoodieRecord rec : hoodieRecords) { - keys.add(rec.getRecordKey()); - } - return keys; - } - - public static List getHoodieKeys(List hoodieRecords) { - List keys = new ArrayList<>(); - for (HoodieRecord rec : hoodieRecords) { - keys.add(rec.getKey()); - } - return keys; - } - - public static List getKeysToDelete(List keys, int size) { - List toReturn = new ArrayList<>(); - int counter = 0; - while (counter < size) { - int index = RANDOM.nextInt(keys.size()); - if (!toReturn.contains(keys.get(index))) { - toReturn.add(keys.get(index)); - counter++; - } - } - return toReturn; - } - private static void fakeMetaFile(String basePath, String instantTime, String suffix) throws IOException { String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME; new File(parentPath).mkdirs(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 34c705516..8a23491d4 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.MockHoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; @@ -44,6 +43,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0; +import static org.apache.hudi.common.testutils.Assertions.assertStreamEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -94,17 +94,19 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { timeline = timeline.reload(); assertEquals(5, timeline.countInstants(), "Total instants should be 5"); - HoodieTestUtils.assertStreamEquals("Check the instants stream", + assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete, instant5), - timeline.getInstants()); - HoodieTestUtils.assertStreamEquals("Check the instants stream", + timeline.getInstants(), "Check the instants stream"); + assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete, instant5), - timeline.getCommitTimeline().getInstants()); - HoodieTestUtils.assertStreamEquals("Check the instants stream", + timeline.getCommitTimeline().getInstants(), "Check the instants stream"); + assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete), - timeline.getCommitTimeline().filterCompletedInstants().getInstants()); - HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5), - timeline.getCommitTimeline().filterPendingExcludingCompaction().getInstants()); + timeline.getCommitTimeline().filterCompletedInstants().getInstants(), + "Check the instants stream"); + assertStreamEquals(Stream.of(instant5), + timeline.getCommitTimeline().filterPendingExcludingCompaction().getInstants(), + "Check the instants stream"); // Backwards compatibility testing for reading compaction plans metaClient = HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), @@ -150,15 +152,18 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { public void testTimelineOperations() { timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "09", "11", "13", "15", "17", "19"), Stream.of("21", "23")); - HoodieTestUtils.assertStreamEquals("findInstantsInRange should return 4 instants", Stream.of("05", "07", "09", "11"), + assertStreamEquals(Stream.of("05", "07", "09", "11"), timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04", "11") - .getInstants().map(HoodieInstant::getTimestamp)); - HoodieTestUtils.assertStreamEquals("findInstantsAfter 07 should return 2 instants", Stream.of("09", "11"), + .getInstants().map(HoodieInstant::getTimestamp), + "findInstantsInRange should return 4 instants"); + assertStreamEquals(Stream.of("09", "11"), timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 2) - .getInstants().map(HoodieInstant::getTimestamp)); - HoodieTestUtils.assertStreamEquals("findInstantsBefore 07 should return 3 instants", Stream.of("01", "03", "05"), + .getInstants().map(HoodieInstant::getTimestamp), + "findInstantsAfter 07 should return 2 instants"); + assertStreamEquals(Stream.of("01", "03", "05"), timeline.getCommitTimeline().filterCompletedInstants().findInstantsBefore("07") - .getInstants().map(HoodieInstant::getTimestamp)); + .getInstants().map(HoodieInstant::getTimestamp), + "findInstantsBefore 07 should return 3 instants"); assertFalse(timeline.empty()); assertFalse(timeline.getCommitTimeline().filterPendingExcludingCompaction().empty()); assertEquals(12, timeline.countInstants()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/Assertions.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/Assertions.java new file mode 100644 index 000000000..9aabdc210 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/Assertions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import java.util.Iterator; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Common assertion functions. + */ +public class Assertions { + + public static void assertStreamEquals(Stream expected, Stream actual, String message) { + Iterator iter1 = expected.iterator(); + Iterator iter2 = actual.iterator(); + while (iter1.hasNext() && iter2.hasNext()) { + assertEquals(iter1.next(), iter2.next(), message); + } + assertTrue(!iter1.hasNext() && !iter2.hasNext(), message); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 9ad904f67..fb283d41c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -83,7 +83,6 @@ import java.util.Calendar; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -93,7 +92,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; /** @@ -388,15 +386,6 @@ public class HoodieTestUtils { } } - public static void assertStreamEquals(String message, Stream expected, Stream actual) { - Iterator iter1 = expected.iterator(); - Iterator iter2 = actual.iterator(); - while (iter1.hasNext() && iter2.hasNext()) { - assertEquals(iter1.next(), iter2.next(), message); - } - assert !iter1.hasNext() && !iter2.hasNext(); - } - public static T serializeDeserialize(T object, Class clazz) { // Using Kyro as the default serializer in Spark Jobs Kryo kryo = new Kryo(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java index 54525cb00..8442aff08 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.testutils; import org.apache.hudi.avro.MercifulJsonConverter; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; @@ -32,7 +33,9 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; @@ -74,6 +77,23 @@ public class RawTripTestPayload implements HoodieRecordPayload recordsToStrings(List records) { + return records.stream().map(RawTripTestPayload::recordToString).filter(Option::isPresent).map(Option::get) + .collect(Collectors.toList()); + } + + public static Option recordToString(HoodieRecord record) { + try { + String str = ((RawTripTestPayload) record.getData()).getJsonData(); + str = "{" + str.substring(str.indexOf("\"timestamp\":")); + // Remove the last } bracket + str = str.substring(0, str.length() - 1); + return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() + "\"}"); + } catch (IOException e) { + return Option.empty(); + } + } + public String getPartitionPath() { return partitionPath; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java index 140d95a88..d680b1268 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java @@ -52,7 +52,9 @@ import java.util.stream.Stream; /** * A utility class for testing schema. */ -public class SchemaTestUtil { +public final class SchemaTestUtil { + + private static final String RESOURCE_SAMPLE_DATA = "/sample.data"; public static Schema getSimpleSchema() throws IOException { return new Schema.Parser().parse(SchemaTestUtil.class.getResourceAsStream("/simple-test.avsc")); @@ -66,12 +68,12 @@ public class SchemaTestUtil { throws IOException, URISyntaxException { GenericDatumReader reader = new GenericDatumReader<>(writerSchema, readerSchema); // Required to register the necessary JAR:// file system - URI resource = SchemaTestUtil.class.getClass().getResource("/sample.data").toURI(); + URI resource = SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI(); Path dataPath; if (resource.toString().contains("!")) { dataPath = uriToPath(resource); } else { - dataPath = Paths.get(SchemaTestUtil.class.getClass().getResource("/sample.data").toURI()); + dataPath = Paths.get(SchemaTestUtil.class.getResource(RESOURCE_SAMPLE_DATA).toURI()); } try (Stream stream = Files.lines(dataPath)) { @@ -79,11 +81,11 @@ public class SchemaTestUtil { try { return reader.read(null, DecoderFactory.get().jsonDecoder(writerSchema, s)); } catch (IOException e) { - throw new HoodieIOException("Could not read data from simple_data.json", e); + throw new HoodieIOException("Could not read data from " + RESOURCE_SAMPLE_DATA, e); } }).collect(Collectors.toList()); } catch (IOException e) { - throw new HoodieIOException("Could not read data from simple_data.json", e); + throw new HoodieIOException("Could not read data from " + RESOURCE_SAMPLE_DATA, e); } } @@ -181,4 +183,17 @@ public class SchemaTestUtil { MercifulJsonConverter converter = new MercifulJsonConverter(); return converter.convert(record.toJsonString(), schema); } + + public static Schema getSchemaFromResource(Class clazz, String name, boolean withHoodieMetadata) { + try { + Schema schema = new Schema.Parser().parse(clazz.getResourceAsStream(name)); + return withHoodieMetadata ? HoodieAvroUtils.addMetadataFields(schema) : schema; + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to get schema from resource `%s` for class `%s`", name, clazz.getName())); + } + } + + public static Schema getSchemaFromResource(Class clazz, String name) { + return getSchemaFromResource(clazz, name, false); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/Transformations.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/Transformations.java new file mode 100644 index 000000000..fc42de0ba --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/Transformations.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Common transformations in test cases. + */ +public final class Transformations { + + public static List flatten(Iterator> iteratorOfLists) { + List flattened = new ArrayList<>(); + iteratorOfLists.forEachRemaining(flattened::addAll); + return flattened; + } + + public static Iterator flattenAsIterator(Iterator> iteratorOfLists) { + return flatten(iteratorOfLists).iterator(); + } + + public static Set recordsToRecordKeySet(List records) { + return records.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet()); + } + + public static List recordsToHoodieKeys(List records) { + return records.stream().map(HoodieRecord::getKey).collect(Collectors.toList()); + } + + /** + * Pseudorandom: select even indices first, then select odd ones. + */ + public static List randomSelect(List items, int n) { + int s = items.size(); + if (n < 0 || n > s) { + throw new IllegalArgumentException(String.format("Invalid number of items to select! Valid range for n: [0, %s]", s)); + } + List selected = new ArrayList<>(); + for (int i = 0, numSelected = 0; i < s && numSelected < n; i += 2, numSelected++) { + selected.add(items.get(i)); + } + for (int i = 1, numSelected = selected.size(); i < s && numSelected < n; i += 2, numSelected++) { + selected.add(items.get(i)); + } + return selected; + } + + public static List randomSelectAsHoodieKeys(List records, int n) { + return randomSelect(recordsToHoodieKeys(records), n); + } +} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 918aade81..27838897d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapreduce.Job; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -52,6 +53,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -316,10 +318,11 @@ public class TestHoodieParquetInputFormat { ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 200 commit", files, "200", 1); } - // TODO enable this after enabling predicate pushdown + @Disabled("enable this after enabling predicate pushdown") + @Test public void testPredicatePushDown() throws IOException { // initial commit - Schema schema = InputFormatTestUtil.readSchema("/sample1.avsc"); + Schema schema = getSchemaFromResource(getClass(), "/sample1.avsc"); String commit1 = "20160628071126"; File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 10, commit1); InputFormatTestUtil.commit(basePath, commit1); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 0201fac2d..fe0be469d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -123,10 +123,6 @@ public class InputFormatTestUtil { jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); } - public static Schema readSchema(String location) throws IOException { - return new Schema.Parser().parse(InputFormatTestUtil.class.getResourceAsStream(location)); - } - public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber) throws IOException { HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString()); diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java index 90e133ec9..2cf36f918 100644 --- a/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark/src/test/java/HoodieJavaApp.java @@ -28,8 +28,6 @@ import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.NonPartitionedExtractor; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; -import org.apache.hudi.testutils.DataSourceTestUtils; -import org.apache.hudi.testutils.HoodieClientTestUtils; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -45,6 +43,10 @@ import org.apache.spark.sql.SparkSession; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; +import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys; /** * Sample program that writes & reads hoodie tables via the Spark datasource. @@ -123,7 +125,7 @@ public class HoodieJavaApp { */ // Generate some input.. List recordsSoFar = new ArrayList<>(dataGen.generateInserts("001"/* ignore */, 100)); - List records1 = DataSourceTestUtils.convertToStringList(recordsSoFar); + List records1 = recordsToStrings(recordsSoFar); Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); // Save as hoodie dataset (copy on write) @@ -163,7 +165,7 @@ public class HoodieJavaApp { */ List recordsToBeUpdated = dataGen.generateUpdates("002"/* ignore */, 100); recordsSoFar.addAll(recordsToBeUpdated); - List records2 = DataSourceTestUtils.convertToStringList(recordsToBeUpdated); + List records2 = recordsToStrings(recordsToBeUpdated); Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2)); writer = inputDF2.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") @@ -185,9 +187,9 @@ public class HoodieJavaApp { /** * Commit that Deletes some records */ - List deletes = DataSourceTestUtils.convertKeysToStringList( - HoodieClientTestUtils - .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(recordsSoFar), 20)); + List deletes = randomSelectAsHoodieKeys(recordsSoFar, 20).stream() + .map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}") + .collect(Collectors.toList()); Dataset inputDF3 = spark.read().json(jssc.parallelize(deletes, 2)); writer = inputDF3.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") diff --git a/hudi-spark/src/test/java/HoodieJavaGenerateApp.java b/hudi-spark/src/test/java/HoodieJavaGenerateApp.java index 1160f2d47..012134fdb 100644 --- a/hudi-spark/src/test/java/HoodieJavaGenerateApp.java +++ b/hudi-spark/src/test/java/HoodieJavaGenerateApp.java @@ -16,21 +16,21 @@ * limitations under the License. */ -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.HoodieDataSourceHelpers; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.NonPartitionedExtractor; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; -import org.apache.hudi.testutils.DataSourceTestUtils; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -43,6 +43,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; + public class HoodieJavaGenerateApp { @Parameter(names = {"--table-path", "-p"}, description = "Path for Hoodie sample table") private String tablePath = "file:///tmp/hoodie/sample-table"; @@ -152,7 +154,7 @@ public class HoodieJavaGenerateApp { // Generate some input.. String instantTime = HoodieActiveTimeline.createNewInstantTime(); List recordsSoFar = new ArrayList<>(dataGen.generateInserts(instantTime/* ignore */, 100)); - List records1 = DataSourceTestUtils.convertToStringList(recordsSoFar); + List records1 = recordsToStrings(recordsSoFar); Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); // Save as hoodie dataset (copy on write) diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index b88eefbbc..977ae09d3 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hive.MultiPartKeysValueExtractor; -import org.apache.hudi.testutils.DataSourceTestUtils; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -45,6 +44,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; + /** * Sample program that writes & reads hoodie tables via the Spark datasource streaming. */ @@ -128,10 +129,10 @@ public class HoodieJavaStreamingApp { // Generator of some records to be loaded in. HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); - List records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001", 100)); + List records1 = recordsToStrings(dataGen.generateInserts("001", 100)); Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); - List records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002", 100)); + List records2 = recordsToStrings(dataGen.generateUpdates("002", 100)); Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2)); diff --git a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index fe812cb4e..a53685120 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -20,10 +20,11 @@ package org.apache.hudi; import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.testutils.DataSourceTestUtils; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -127,23 +128,36 @@ public class TestDataSourceUtils { @Test public void testDoWriteOperationWithUserDefinedBulkInsertPartitioner() throws HoodieException { - setAndVerifyHoodieWriteClientWith(DataSourceTestUtils.NoOpBulkInsertPartitioner.class.getName()); + setAndVerifyHoodieWriteClientWith(NoOpBulkInsertPartitioner.class.getName()); DataSourceUtils.doWriteOperation(hoodieWriteClient, hoodieRecords, "test-time", - DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL()); + DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL()); verify(hoodieWriteClient, times(1)).bulkInsert(any(hoodieRecords.getClass()), anyString(), - optionCaptor.capture()); - assertThat(optionCaptor.getValue().get(), is(instanceOf(DataSourceTestUtils.NoOpBulkInsertPartitioner.class))); + optionCaptor.capture()); + assertThat(optionCaptor.getValue().get(), is(instanceOf(NoOpBulkInsertPartitioner.class))); } private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) { config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath()) - .withUserDefinedBulkInsertPartitionerClass(partitionerClassName) - .build(); + .withUserDefinedBulkInsertPartitionerClass(partitionerClassName) + .build(); when(hoodieWriteClient.getConfig()).thenReturn(config); assertThat(config.getUserDefinedBulkInsertPartitionerClass(), is(equalTo(partitionerClassName))); } + public static class NoOpBulkInsertPartitioner + implements BulkInsertPartitioner { + + @Override + public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { + return records; + } + + @Override + public boolean arePartitionRecordsSorted() { + return false; + } + } } diff --git a/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java deleted file mode 100644 index c687352e2..000000000 --- a/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.testutils; - -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.table.BulkInsertPartitioner; - -import org.apache.spark.api.java.JavaRDD; - -import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Test utils for data source tests. - */ -public class DataSourceTestUtils { - - public static Option convertToString(HoodieRecord record) { - try { - String str = ((RawTripTestPayload) record.getData()).getJsonData(); - str = "{" + str.substring(str.indexOf("\"timestamp\":")); - // Remove the last } bracket - str = str.substring(0, str.length() - 1); - return Option.of(str + ", \"partition\": \"" + record.getPartitionPath() + "\"}"); - } catch (IOException e) { - return Option.empty(); - } - } - - public static List convertToStringList(List records) { - return records.stream().map(DataSourceTestUtils::convertToString).filter(Option::isPresent).map(Option::get) - .collect(Collectors.toList()); - } - - public static List convertKeysToStringList(List keys) { - return keys.stream() - .map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}") - .collect(Collectors.toList()); - } - - public static class NoOpBulkInsertPartitioner - implements BulkInsertPartitioner { - - @Override - public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - return records; - } - - @Override - public boolean arePartitionRecordsSorted() { - return false; - } - } - -} diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala index 7dfff554a..07e8704da 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala @@ -20,8 +20,8 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.spark.sql._ import org.apache.spark.sql.functions.col @@ -66,7 +66,7 @@ class TestDataSource { @Test def testShortNameStorage() { // Insert Operation - val records = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)).toList + val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) inputDF.write.format("hudi") .options(commonOpts) @@ -79,7 +79,7 @@ class TestDataSource { @Test def testCopyOnWriteStorage() { // Insert Operation - val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)).toList + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) @@ -95,7 +95,7 @@ class TestDataSource { .load(basePath + "/*/*/*/*"); assertEquals(100, hoodieROViewDF1.count()) - val records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("001", 100)).toList + val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() @@ -127,7 +127,7 @@ class TestDataSource { assertEquals(firstCommit, countsPerCommit(0).get(0)) // Upsert an empty dataFrame - val emptyRecords = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002", 0)).toList + val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList val emptyDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) emptyDF.write.format("org.apache.hudi") .options(commonOpts) @@ -156,7 +156,7 @@ class TestDataSource { @Test def testMergeOnReadStorage() { // Bulk Insert Operation - val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001", 100)).toList + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) @@ -184,7 +184,7 @@ class TestDataSource { val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt)) val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt)) - val records1 = DataSourceTestUtils.convertToStringList(inserts1).toList + val records1 = recordsToStrings(inserts1).toList val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) inputDF1.write.format("org.apache.hudi") .options(commonOpts) @@ -196,9 +196,7 @@ class TestDataSource { assertEquals(insert1Cnt, hoodieROViewDF1.count()) val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) - val records2 = DataSourceTestUtils - .convertToStringList(inserts2Dup ++ inserts2New) - .toList + val records2 = recordsToStrings(inserts2Dup ++ inserts2New).toList val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) @@ -224,11 +222,11 @@ class TestDataSource { fs.mkdirs(new Path(sourcePath)) // First chunk of data - val records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)).toList + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) // Second chunk of data - val records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("001", 100)).toList + val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()