diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java index 7167965ed..57238f70e 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestRepairsCommand.java @@ -23,9 +23,7 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.commands.RepairsCommand; import org.apache.hudi.cli.commands.TableCommand; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -33,7 +31,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.SchemaTestUtil; -import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; @@ -43,17 +41,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; -import java.io.File; import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; -import static org.apache.spark.sql.functions.lit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -69,10 +62,10 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { private String repairedOutputPath; @BeforeEach - public void init() throws IOException, URISyntaxException { - String tablePath = basePath + File.separator + "test_table"; - duplicatedPartitionPath = tablePath + File.separator + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; - repairedOutputPath = basePath + File.separator + "tmp"; + public void init() throws Exception { + final String tablePath = Paths.get(basePath, "test_table").toString(); + duplicatedPartitionPath = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString(); + repairedOutputPath = Paths.get(basePath, "tmp").toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); @@ -83,33 +76,19 @@ public class ITTestRepairsCommand extends AbstractShellIntegrationTest { // generate 200 records Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema); - String fileName1 = "1_0_20160401010101.parquet"; - String fileName2 = "2_0_20160401010101.parquet"; - - List hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema); - HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, - fileName1, hoodieRecords1, schema, null, false); - List hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema); - HoodieClientTestUtils.writeParquetFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, - fileName2, hoodieRecords2, schema, null, false); - - // generate commit file - String fileId1 = UUID.randomUUID().toString(); - String testWriteToken = "1-0-1"; - String commitTime = FSUtils.getCommitTime(fileName1); - Files.createFile(Paths.get(duplicatedPartitionPath + "/" - + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime, 1, testWriteToken))); - Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit")); + HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]); + HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]); + testTable.addCommit("20160401010101") + .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1) + .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "2", hoodieRecords2) + .withLogFile(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); // read records and get 10 to generate duplicates - Dataset df = sqlContext.read().parquet(duplicatedPartitionPath); - - String fileName3 = "3_0_20160401010202.parquet"; - commitTime = FSUtils.getCommitTime(fileName3); - df.limit(10).withColumn("_hoodie_commit_time", lit(commitTime)) - .write().parquet(duplicatedPartitionPath + File.separator + fileName3); - Files.createFile(Paths.get(tablePath + "/.hoodie/" + commitTime + ".commit")); + HoodieRecord[] dupRecords = Arrays.copyOf(hoodieRecords1, 10); + testTable.addCommit("20160401010202") + .withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "3", dupRecords); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index 39ee532ba..aefad87c6 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -20,10 +20,8 @@ package org.apache.hudi.index; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.ConsistencyGuardConfig; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; @@ -39,7 +37,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.avro.Schema; @@ -50,7 +48,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -67,6 +64,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class TestHoodieIndex extends HoodieClientTestHarness { @@ -247,6 +245,8 @@ public class TestHoodieIndex extends HoodieClientTestHarness { @EnumSource(value = IndexType.class, names = {"BLOOM", "SIMPLE",}) public void testTagLocationAndFetchRecordLocations(IndexType indexType) throws Exception { setUp(indexType); + String p1 = "2016/01/31"; + String p2 = "2015/01/31"; String rowKey1 = UUID.randomUUID().toString(); String rowKey2 = UUID.randomUUID().toString(); String rowKey3 = UUID.randomUUID().toString(); @@ -279,12 +279,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness { } // We create three parquet file, each having one record. (two different partitions) - String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true); - String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true); - String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); + String fileId1 = testTable.addCommit("001").withInserts(p1, record1); + String fileId2 = testTable.addCommit("002").withInserts(p1, record2); + String fileId3 = testTable.addCommit("003").withInserts(p2, record4); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); @@ -295,13 +293,13 @@ public class TestHoodieIndex extends HoodieClientTestHarness { // Check results for (HoodieRecord record : taggedRecordRDD.collect()) { if (record.getRecordKey().equals(rowKey1)) { - if (record.getPartitionPath().equals("2015/01/31")) { - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3)); + if (record.getPartitionPath().equals(p2)) { + assertEquals(record.getCurrentLocation().getFileId(), fileId3); } else { - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename1)); + assertEquals(record.getCurrentLocation().getFileId(), fileId1); } } else if (record.getRecordKey().equals(rowKey2)) { - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2)); + assertEquals(record.getCurrentLocation().getFileId(), fileId2); } else if (record.getRecordKey().equals(rowKey3)) { assertFalse(record.isCurrentLocationKnown()); } @@ -312,15 +310,15 @@ public class TestHoodieIndex extends HoodieClientTestHarness { for (Tuple2>> entry : recordLocations.collect()) { if (entry._1.getRecordKey().equals(rowKey1)) { assertTrue(entry._2.isPresent(), "Row1 should have been present "); - if (entry._1.getPartitionPath().equals("2015/01/31")) { + if (entry._1.getPartitionPath().equals(p2)) { assertTrue(entry._2.isPresent(), "Row1 should have been present "); - assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename3)); + assertEquals(entry._2.get().getRight(), fileId3); } else { - assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename1)); + assertEquals(entry._2.get().getRight(), fileId1); } } else if (entry._1.getRecordKey().equals(rowKey2)) { assertTrue(entry._2.isPresent(), "Row2 should have been present "); - assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename2)); + assertEquals(entry._2.get().getRight(), fileId2); } else if (entry._1.getRecordKey().equals(rowKey3)) { assertFalse(entry._2.isPresent(), "Row3 should have been absent "); } @@ -338,12 +336,13 @@ public class TestHoodieIndex extends HoodieClientTestHarness { .build()).build(); writeClient = getHoodieWriteClient(config); index = writeClient.getIndex(); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); + final String p1 = "2016/01/31"; + final String p2 = "2016/02/28"; // Create the original partition, and put a record, along with the meta file // "2016/01/31": 1 file (1_0_20160131101010.parquet) - new File(basePath + "/2016/01/31").mkdirs(); - new File(basePath + "/2016/01/31/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); - // this record will be saved in table and will be tagged to an empty record RawTripTestPayload originalPayload = new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); @@ -359,7 +358,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness { - tag the new partition of the incomingRecord */ RawTripTestPayload incomingPayload = - new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}"); + new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}"); HoodieRecord incomingRecord = new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()), incomingPayload); @@ -376,67 +375,42 @@ public class TestHoodieIndex extends HoodieClientTestHarness { incomingPayloadSamePartition); // We have some records to be tagged (two different partitions) - HoodieClientTestUtils - .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), SCHEMA, null, false); - - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); - - // Add some commits - new File(basePath + "/.hoodie").mkdirs(); + testTable.addCommit("1000").withInserts(p1, originalRecord); // test against incoming record with a different partition JavaRDD recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord)); - JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, table); + JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable); assertEquals(2, taggedRecordRDD.count()); for (HoodieRecord record : taggedRecordRDD.collect()) { switch (record.getPartitionPath()) { - case "2016/01/31": + case p1: assertEquals("000", record.getRecordKey()); assertTrue(record.getData() instanceof EmptyHoodieRecordPayload); break; - case "2016/02/31": + case p2: assertEquals("000", record.getRecordKey()); assertEquals(incomingPayload.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData()); break; default: - assertFalse(true, String.format("Should not get partition path: %s", record.getPartitionPath())); + fail(String.format("Should not get partition path: %s", record.getPartitionPath())); } } // test against incoming record with the same partition JavaRDD recordRDDSamePartition = jsc .parallelize(Collections.singletonList(incomingRecordSamePartition)); - JavaRDD taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, table); + JavaRDD taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, hoodieTable); assertEquals(1, taggedRecordRDDSamePartition.count()); HoodieRecord record = taggedRecordRDDSamePartition.first(); assertEquals("000", record.getRecordKey()); - assertEquals("2016/01/31", record.getPartitionPath()); + assertEquals(p1, record.getPartitionPath()); assertEquals(incomingPayloadSamePartition.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData()); } - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - public HoodieWriteConfig.Builder getConfigBuilder() { - return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - } - - HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { - return getConfigBuilder(schemaStr, indexType); - } - - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) + private HoodieWriteConfig.Builder getConfigBuilder() { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) .withWriteStatusClass(MetadataMergeWriteStatus.class) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java index f6fd5f461..458432486 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java @@ -21,7 +21,6 @@ package org.apache.hudi.index.bloom; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -33,7 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.HoodieKeyLookupHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; @@ -46,12 +45,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -105,18 +100,17 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) @MethodSource("configParams") - public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws IOException { + public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieBloomIndex index = new HoodieBloomIndex(config); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); // Create some partitions, and put some files // "2016/01/21": 0 file // "2016/04/01": 1 file (2_0_20160401010101.parquet) - // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, - // 4_0_20150312101010.parquet) - Files.createDirectories(Paths.get(basePath, "2016", "01", "21")); - Files.createDirectories(Paths.get(basePath, "2016", "04", "01")); - Files.createDirectories(Paths.get(basePath, "2015", "03", "12")); + // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet) + testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12"); RawTripTestPayload rowChange1 = new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); @@ -135,29 +129,17 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); - HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", new ArrayList<>(), - SCHEMA, null, false); - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", new ArrayList<>(), - SCHEMA, null, false); - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", Collections.singletonList(record1), - SCHEMA, null, false); - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet", - Arrays.asList(record2, record3, record4), SCHEMA, null, false); - List partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12"); - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); - List> filesList = index.loadInvolvedFiles(partitions, jsc, table); + List> filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable); // Still 0, as no valid commit assertEquals(0, filesList.size()); - // Add some commits - java.nio.file.Path hoodieDir = Files.createDirectories(Paths.get(basePath, ".hoodie")); - Files.createFile(hoodieDir.resolve("20160401010101.commit")); - Files.createFile(hoodieDir.resolve("20150312101010.commit")); + testTable.addCommit("20160401010101").withInserts("2016/04/01", "2"); + testTable.addCommit("20150312101010").withInserts("2015/03/12", "1") + .withInserts("2015/03/12", "3", record1) + .withInserts("2015/03/12", "4", record2, record3, record4); - table = HoodieTable.create(metaClient, config, hadoopConf); - filesList = index.loadInvolvedFiles(partitions, jsc, table); + filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable); assertEquals(4, filesList.size()); if (rangePruning) { @@ -211,8 +193,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { } @Test - public void testCheckUUIDsAgainstOneFile() throws IOException, InterruptedException { - + public void testCheckUUIDsAgainstOneFile() throws Exception { + final String partition = "2016/01/31"; // Create some records to use String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; @@ -239,8 +221,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // record2, record3). BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); filter.add(record3.getRecordKey()); - String filename = HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Arrays.asList(record1, record2), - SCHEMA, filter, true); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter); + String fileId = testTable.addCommit("000").withInserts(partition, record1, record2); + String filename = testTable.getBaseFileNameById(fileId); // The bloom filter contains 3 records assertTrue(filter.mightContain(record1.getRecordKey())); @@ -254,10 +237,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); - HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table, - Pair.of("2016/01/31/", FSUtils.getFileId(filename))); + HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table, Pair.of(partition, fileId)); List results = keyHandle.checkCandidatesAgainstFile(hadoopConf, uuids, - new Path(basePath + "/2016/01/31/" + filename)); + new Path(Paths.get(basePath, partition, filename).toString())); assertEquals(results.size(), 2); assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0") || results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")); @@ -314,12 +296,12 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Also create the metadata and config HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); - JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); + JavaRDD taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, hoodieTable); // Should not find any files for (HoodieRecord record : taggedRecordRDD.collect()) { @@ -327,29 +309,23 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { } // We create three parquet file, each having one record. (two different partitions) - String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true); - String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true); - String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true); + String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1); + String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2); + String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4); // We do the tag again - metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, config, hadoopConf); - - taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table); + taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, HoodieTable.create(metaClient, config, hadoopConf)); // Check results for (HoodieRecord record : taggedRecordRDD.collect()) { if (record.getRecordKey().equals(rowKey1)) { if (record.getPartitionPath().equals("2015/01/31")) { - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3)); + assertEquals(record.getCurrentLocation().getFileId(), fileId3); } else { - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename1)); + assertEquals(record.getCurrentLocation().getFileId(), fileId1); } } else if (record.getRecordKey().equals(rowKey2)) { - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2)); + assertEquals(record.getCurrentLocation().getFileId(), fileId2); } else if (record.getRecordKey().equals(rowKey3)) { assertFalse(record.isCurrentLocationKnown()); } @@ -385,13 +361,13 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Also create the metadata and config HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); // Let's tag HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config); JavaPairRDD>> taggedRecordRDD = - bloomIndex.fetchRecordLocation(keysRDD, jsc, table); + bloomIndex.fetchRecordLocation(keysRDD, jsc, hoodieTable); // Should not find any files for (Tuple2>> record : taggedRecordRDD.collect()) { @@ -399,29 +375,26 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { } // We create three parquet file, each having one record. (two different partitions) - String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, null, true); - String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), SCHEMA, null, true); - String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), SCHEMA, null, true); + String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1); + String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2); + String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4); // We do the tag again metaClient = HoodieTableMetaClient.reload(metaClient); - table = HoodieTable.create(metaClient, config, hadoopConf); - taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table); + hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); + taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, hoodieTable); // Check results for (Tuple2>> record : taggedRecordRDD.collect()) { if (record._1.getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) { assertTrue(record._2.isPresent()); - assertEquals(FSUtils.getFileId(filename1), record._2.get().getRight()); + assertEquals(fileId1, record._2.get().getRight()); } else if (record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { assertTrue(record._2.isPresent()); if (record._1.getPartitionPath().equals("2015/01/31")) { - assertEquals(FSUtils.getFileId(filename3), record._2.get().getRight()); + assertEquals(fileId3, record._2.get().getRight()); } else { - assertEquals(FSUtils.getFileId(filename2), record._2.get().getRight()); + assertEquals(fileId2, record._2.get().getRight()); } } else if (record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) { assertFalse(record._2.isPresent()); @@ -431,7 +404,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) @MethodSource("configParams") - public void testBloomFilterFalseError(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws IOException, InterruptedException { + public void testBloomFilterFalseError(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { // We have two hoodie records String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; @@ -449,8 +422,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); filter.add(record2.getRecordKey()); - String filename = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), SCHEMA, filter, true); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter); + String fileId = testTable.addCommit("000").withInserts("2016/01/31", record1); assertTrue(filter.mightContain(record1.getRecordKey())); assertTrue(filter.mightContain(record2.getRecordKey())); @@ -466,7 +439,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness { // Check results for (HoodieRecord record : taggedRecordRDD.collect()) { if (record.getKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) { - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename)); + assertEquals(record.getCurrentLocation().getFileId(), fileId); } else if (record.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { assertFalse(record.isCurrentLocationKnown()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index 3c4600df9..197a3bb1f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -18,18 +18,15 @@ package org.apache.hudi.index.bloom; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.avro.Schema; import org.apache.spark.api.java.JavaPairRDD; @@ -39,10 +36,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -78,21 +71,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { } @Test - public void testLoadInvolvedFiles() throws IOException { + public void testLoadInvolvedFiles() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); // Create some partitions, and put some files, along with the meta file // "2016/01/21": 0 file // "2016/04/01": 1 file (2_0_20160401010101.parquet) - // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, - // 4_0_20150312101010.parquet) - Path dir1 = Files.createDirectories(Paths.get(basePath, "2016", "01", "21")); - Files.createFile(dir1.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - Path dir2 = Files.createDirectories(Paths.get(basePath, "2016", "04", "01")); - Files.createFile(dir2.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - Path dir3 = Files.createDirectories(Paths.get(basePath, "2015", "03", "12")); - Files.createFile(dir3.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); + // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet) + testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12"); RawTripTestPayload rowChange1 = new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); @@ -111,31 +100,19 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); - HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", "2_0_20160401010101.parquet", new ArrayList<>(), - SCHEMA, null, false); - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "1_0_20150312101010.parquet", new ArrayList<>(), - SCHEMA, null, false); - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "3_0_20150312101010.parquet", Collections.singletonList(record1), - SCHEMA, null, false); - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", "4_0_20150312101010.parquet", - Arrays.asList(record2, record3, record4), SCHEMA, null, false); - // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up List partitions = Arrays.asList("2016/01/21", "2016/04/01"); - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); // partitions will NOT be respected by this loadInvolvedFiles(...) call - List> filesList = index.loadInvolvedFiles(partitions, jsc, table); + List> filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable); // Still 0, as no valid commit assertEquals(0, filesList.size()); - // Add some commits - Path hoodieDir = Files.createDirectories(Paths.get(basePath, ".hoodie")); - Files.createFile(hoodieDir.resolve("20160401010101.commit")); - Files.createFile(hoodieDir.resolve("20150312101010.commit")); + testTable.addCommit("20160401010101").withInserts("2016/04/01", "2"); + testTable.addCommit("20150312101010").withInserts("2015/03/12", "1") + .withInserts("2015/03/12", "3", record1) + .withInserts("2015/03/12", "4", record2, record3, record4); - table = HoodieTable.create(metaClient, config, hadoopConf); - filesList = index.loadInvolvedFiles(partitions, jsc, table); + filesList = index.loadInvolvedFiles(partitions, jsc, hoodieTable); assertEquals(4, filesList.size()); Map filesMap = toFileMap(filesList); @@ -201,18 +178,14 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { public void testTagLocation() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); // Create some partitions, and put some files, along with the meta file // "2016/01/21": 0 file // "2016/04/01": 1 file (2_0_20160401010101.parquet) - // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, - // 4_0_20150312101010.parquet) - Path dir1 = Files.createDirectories(Paths.get(basePath, "2016", "01", "21")); - Files.createFile(dir1.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - Path dir2 = Files.createDirectories(Paths.get(basePath, "2016", "04", "01")); - Files.createFile(dir2.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - Path dir3 = Files.createDirectories(Paths.get(basePath, "2015", "03", "12")); - Files.createFile(dir3.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); + // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet) + testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12"); RawTripTestPayload rowChange1 = new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); @@ -242,33 +215,23 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); - String filename0 = - HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1), SCHEMA, null, false); - String filename1 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", new ArrayList<>(), SCHEMA, null, false); - String filename2 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2), SCHEMA, null, false); - String filename3 = - HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4), SCHEMA, null, false); - // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); - - // Add some commits - Files.createDirectories(Paths.get(basePath, ".hoodie")); + String fileId1 = testTable.addCommit("1000").withInserts("2016/04/01", record1); + String fileId2 = testTable.addCommit("2000").withInserts("2015/03/12"); + String fileId3 = testTable.addCommit("3000").withInserts("2015/03/12", record2); + String fileId4 = testTable.addCommit("4000").withInserts("2015/03/12", record4); // partitions will NOT be respected by this loadInvolvedFiles(...) call - JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, table); + JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable); for (HoodieRecord record : taggedRecordRDD.collect()) { switch (record.getRecordKey()) { case "000": - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename0)); + assertEquals(record.getCurrentLocation().getFileId(), fileId1); assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange1.getJsonData()); break; case "001": - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2)); + assertEquals(record.getCurrentLocation().getFileId(), fileId3); assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange2.getJsonData()); break; case "002": @@ -276,11 +239,11 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange3.getJsonData()); break; case "003": - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3)); + assertEquals(record.getCurrentLocation().getFileId(), fileId4); assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange5.getJsonData()); break; case "004": - assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3)); + assertEquals(record.getCurrentLocation().getFileId(), fileId4); assertEquals(((RawTripTestPayload) record.getData()).getJsonData(), rowChange4.getJsonData()); break; default: @@ -296,12 +259,13 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { .withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build()) .build(); HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config); + HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA); + final String p1 = "2016/01/31"; + final String p2 = "2016/02/28"; // Create the original partition, and put a record, along with the meta file // "2016/01/31": 1 file (1_0_20160131101010.parquet) - Path dir = Files.createDirectories(Paths.get(basePath, "2016", "01", "31")); - Files.createFile(dir.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - // this record will be saved in table and will be tagged to an empty record RawTripTestPayload originalPayload = new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); @@ -317,7 +281,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { - tag the new partition of the incomingRecord */ RawTripTestPayload incomingPayload = - new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}"); + new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-28T03:16:41.415Z\",\"number\":12}"); HoodieRecord incomingRecord = new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()), incomingPayload); @@ -334,27 +298,20 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()), incomingPayloadSamePartition); - HoodieClientTestUtils - .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), SCHEMA, null, false); - - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); - - // Add some commits - Files.createDirectories(Paths.get(basePath, ".hoodie")); + testTable.addCommit("1000").withInserts(p1, originalRecord); // test against incoming record with a different partition JavaRDD recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord)); - JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, table); + JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable); assertEquals(2, taggedRecordRDD.count()); for (HoodieRecord record : taggedRecordRDD.collect()) { switch (record.getPartitionPath()) { - case "2016/01/31": + case p1: assertEquals("000", record.getRecordKey()); assertTrue(record.getData() instanceof EmptyHoodieRecordPayload); break; - case "2016/02/31": + case p2: assertEquals("000", record.getRecordKey()); assertEquals(incomingPayload.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData()); break; @@ -366,17 +323,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { // test against incoming record with the same partition JavaRDD recordRDDSamePartition = jsc .parallelize(Collections.singletonList(incomingRecordSamePartition)); - JavaRDD taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, table); + JavaRDD taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, hoodieTable); assertEquals(1, taggedRecordRDDSamePartition.count()); HoodieRecord record = taggedRecordRDDSamePartition.first(); assertEquals("000", record.getRecordKey()); - assertEquals("2016/01/31", record.getPartitionPath()); + assertEquals(p1, record.getPartitionPath()); assertEquals(incomingPayloadSamePartition.getJsonData(), ((RawTripTestPayload) record.getData()).getJsonData()); } // convert list to map to avoid sorting order dependencies - private Map toFileMap(List> filesList) { + private static Map toFileMap(List> filesList) { Map filesMap = new HashMap<>(); for (Tuple2 t : filesList) { filesMap.put(t._1() + "/" + t._2().getFileId(), t._2()); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java index ba5d6dd29..ba20f5d10 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java @@ -19,16 +19,12 @@ package org.apache.hudi.io; import org.apache.hudi.common.fs.ConsistencyGuardConfig; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -37,7 +33,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.spark.api.java.JavaSparkContext; @@ -46,19 +42,18 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.UUID; import scala.Tuple2; import static java.util.stream.Collectors.toList; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.common.testutils.HoodieTestUtils.makeNewCommitTime; import static org.apache.hudi.common.testutils.Transformations.recordsToPartitionRecordsMap; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -88,17 +83,12 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { @Test public void testFetchHandle() throws Exception { - - String commitTime = "000"; - List records = dataGen.generateInserts(commitTime, 100); + List records = dataGen.generateInserts(makeNewCommitTime(), 100); Map> partitionRecordsMap = recordsToPartitionRecordsMap(records); - - Map, List>> expectedList = writeToParquetAndGetExpectedRecordLocations(partitionRecordsMap); - - metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration()); - - Files.createDirectories(Paths.get(basePath, ".hoodie")); + HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, AVRO_SCHEMA_WITH_METADATA_FIELDS); + Map, List>> expectedList = + writeToParquetAndGetExpectedRecordLocations(partitionRecordsMap, testTable); List> partitionPathFileIdPairs = loadAllFilesForPartitions(new ArrayList<>(partitionRecordsMap.keySet()), jsc, hoodieTable); @@ -112,7 +102,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { } private Map, List>> writeToParquetAndGetExpectedRecordLocations( - Map> partitionRecordsMap) throws Exception { + Map> partitionRecordsMap, HoodieWriteableTestTable testTable) throws Exception { Map, List>> expectedList = new HashMap<>(); for (Map.Entry> entry : partitionRecordsMap.entrySet()) { int totalRecordsPerPartition = entry.getValue().size(); @@ -138,7 +128,9 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { } for (List recordsPerSlice : recordsForFileSlices) { - Tuple2 fileIdInstantTimePair = writeToParquet(entry.getKey(), recordsPerSlice); + String instantTime = makeNewCommitTime(); + String fileId = testTable.addCommit(instantTime).withInserts(entry.getKey(), recordsPerSlice.toArray(new HoodieRecord[0])); + Tuple2 fileIdInstantTimePair = new Tuple2<>(fileId, instantTime); List> expectedEntries = new ArrayList<>(); for (HoodieRecord record : recordsPerSlice) { expectedEntries.add(new Tuple2<>(record.getKey(), new HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1))); @@ -149,31 +141,16 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { return expectedList; } - protected List> loadAllFilesForPartitions(List partitions, final JavaSparkContext jsc, - final HoodieTable hoodieTable) { - + private static List> loadAllFilesForPartitions(List partitions, JavaSparkContext jsc, + HoodieTable hoodieTable) { // Obtain the latest data files from all the partitions. List> partitionPathFileIDList = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, jsc, hoodieTable); return partitionPathFileIDList.stream() .map(pf -> new Tuple2<>(pf.getKey(), pf.getValue())).collect(toList()); } - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - public HoodieWriteConfig.Builder getConfigBuilder() { - return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - } - - /** - * Get Config builder with default configs set. - * - * @return Config Builder - */ - private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) + private HoodieWriteConfig.Builder getConfigBuilder() { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) .withWriteStatusClass(MetadataMergeWriteStatus.class) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) @@ -184,15 +161,4 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness { .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); } - - private Tuple2 writeToParquet(String partitionPath, List records) throws Exception { - Thread.sleep(100); - String instantTime = HoodieTestUtils.makeNewCommitTime(); - String fileId = UUID.randomUUID().toString(); - String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId); - HoodieTestUtils.createCommitFiles(basePath, instantTime); - HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, AVRO_SCHEMA_WITH_METADATA_FIELDS, null, - true); - return new Tuple2<>(fileId, instantTime); - } } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java index f7ce95c4a..8016acc02 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -527,18 +527,18 @@ public class TestCleaner extends HoodieClientTestBase { : UUID.randomUUID().toString(); String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() : UUID.randomUUID().toString(); - testTable.addCommit("00000000000001").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0); + testTable.addCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); List hoodieCleanStatsOne = runCleaner(config); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); - assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0)); - assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); // make next commit, with 1 insert & 1 update per partition Map partitionAndFileId002 = testTable.addCommit("00000000000002") - .withUpdates(p0, file1P0C0) - .withUpdates(p1, file1P1C0) - .withInserts(p0, p1); + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p1, file1P1C0) + .withBaseFilesInPartitions(p0, p1); List hoodieCleanStatsTwo = runCleaner(config); // enableBootstrapSourceClean would delete the bootstrap base file as the same time @@ -559,10 +559,10 @@ public class TestCleaner extends HoodieClientTestBase { cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); String file2P0C1 = partitionAndFileId002.get(p0); String file2P1C1 = partitionAndFileId002.get(p1); - assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1)); - assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0)); - assertFalse(testTable.fileExists(p1, "00000000000001", file1P1C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1)); + assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size() + (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); @@ -579,21 +579,21 @@ public class TestCleaner extends HoodieClientTestBase { // make next commit, with 2 updates to existing files, and 1 insert String file3P0C2 = testTable.addCommit("00000000000003") - .withUpdates(p0, file1P0C0, file2P0C1) - .withInserts(p0).get(p0); + .withBaseFilesInPartition(p0, file1P0C0, file2P0C1) + .withBaseFilesInPartitions(p0).get(p0); List hoodieCleanStatsThree = runCleaner(config); assertEquals(2, getCleanStat(hoodieCleanStatsThree, p0) .getSuccessDeleteFiles().size(), "Must clean two files"); - assertFalse(testTable.fileExists(p0, "00000000000002", file1P0C0)); - assertFalse(testTable.fileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2)); + assertFalse(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); + assertFalse(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); // No cleaning on partially written file, with no commit. - testTable.forCommit("00000000000004").withUpdates(p0, file3P0C2); + testTable.forCommit("00000000000004").withBaseFilesInPartition(p0, file3P0C2); List hoodieCleanStatsFour = runCleaner(config); assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); - assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); } /** @@ -613,23 +613,23 @@ public class TestCleaner extends HoodieClientTestBase { String p0 = "2020/01/01"; // Make 3 files, one base file and 2 log files associated with base file - String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0); + String file1P0 = testTable.addDeltaCommit("000").withBaseFilesInPartitions(p0).get(p0); testTable.forDeltaCommit("000") .withLogFile(p0, file1P0, 1) .withLogFile(p0, file1P0, 2); // Make 2 files, one base file and 1 log files associated with base file testTable.addDeltaCommit("001") - .withUpdates(p0, file1P0) + .withBaseFilesInPartition(p0, file1P0) .withLogFile(p0, file1P0, 3); List hoodieCleanStats = runCleaner(config); assertEquals(3, getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() .size(), "Must clean three files, one parquet and 2 log files"); - assertFalse(testTable.fileExists(p0, "000", file1P0)); + assertFalse(testTable.baseFileExists(p0, "000", file1P0)); assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); - assertTrue(testTable.fileExists(p0, "001", file1P0)); + assertTrue(testTable.baseFileExists(p0, "001", file1P0)); assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); } @@ -831,7 +831,7 @@ public class TestCleaner extends HoodieClientTestBase { : UUID.randomUUID().toString(); String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId() : UUID.randomUUID().toString(); - testTable.addInflightCommit("00000000000001").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0); + testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); HoodieCommitMetadata commitMetadata = generateCommitMetadata( Collections.unmodifiableMap(new HashMap>() { @@ -849,14 +849,14 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); - assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0)); - assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); // make next commit, with 1 insert & 1 update per partition - Map partitionAndFileId002 = testTable.addInflightCommit("00000000000002").withInserts(p0, p1); + Map partitionAndFileId002 = testTable.addInflightCommit("00000000000002").withBaseFilesInPartitions(p0, p1); String file2P0C1 = partitionAndFileId002.get(p0); String file2P1C1 = partitionAndFileId002.get(p1); - testTable.forCommit("00000000000002").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0); + testTable.forCommit("00000000000002").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); commitMetadata = generateCommitMetadata(new HashMap>() { { put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); @@ -868,16 +868,16 @@ public class TestCleaner extends HoodieClientTestBase { Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); List hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); - assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1)); - assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0)); - assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p1, "00000000000002", file2P1C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); // make next commit, with 2 updates to existing files, and 1 insert String file3P0C2 = testTable.addInflightCommit("00000000000003") - .withUpdates(p0, file1P0C0) - .withUpdates(p0, file2P0C1) - .withInserts(p0).get(p0); + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p0, file2P0C1) + .withBaseFilesInPartitions(p0).get(p0); commitMetadata = generateCommitMetadata(CollectionUtils .createImmutableMap(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); @@ -888,13 +888,13 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); assertEquals(0, hoodieCleanStatsThree.size(), "Must not clean any file. We have to keep 1 version before the latest commit time to keep"); - assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); // make next commit, with 2 updates to existing files, and 1 insert String file4P0C3 = testTable.addInflightCommit("00000000000004") - .withUpdates(p0, file1P0C0) - .withUpdates(p0, file2P0C1) - .withInserts(p0).get(p0); + .withBaseFilesInPartition(p0, file1P0C0) + .withBaseFilesInPartition(p0, file2P0C1) + .withBaseFilesInPartitions(p0).get(p0); commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap( p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); metaClient.getActiveTimeline().saveAsComplete( @@ -908,20 +908,20 @@ public class TestCleaner extends HoodieClientTestBase { assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size() + (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0 : partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file"); - assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0)); - assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0)); - assertTrue(testTable.fileExists(p0, "00000000000003", file1P0C0)); - assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.fileExists(p0, "00000000000003", file2P0C1)); - assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2)); - assertTrue(testTable.fileExists(p0, "00000000000004", file4P0C3)); + assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); + assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); if (enableBootstrapSourceClean) { assertFalse(Files.exists(Paths.get(bootstrapMapping.get( p0).get(0).getBoostrapFileStatus().getPath().getUri()))); } // No cleaning on partially written file, with no commit. - testTable.forCommit("00000000000005").withUpdates(p0, file3P0C2); + testTable.forCommit("00000000000005").withBaseFilesInPartition(p0, file3P0C2); commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0, CollectionUtils.createImmutableList(file3P0C2))); metaClient.getActiveTimeline().createNewInstant( @@ -932,9 +932,9 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0); assertNull(cleanStat, "Must not clean any files"); - assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0)); - assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1)); - assertTrue(testTable.fileExists(p0, "00000000000005", file3P0C2)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file1P0C0)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2)); } /** diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java index 1f638c39f..8b19ac1c1 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java @@ -66,9 +66,9 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @ParameterizedTest @MethodSource("consistencyGuardType") public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2"); - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f2"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f3"); ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000); ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName()) @@ -88,7 +88,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearFailSafe() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays @@ -98,7 +98,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearTimedWait() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -106,7 +106,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearsFailSafe() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); @@ -115,14 +115,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingAppearsTimedWait() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); } @Test public void testCheckFailingDisappearFailSafe() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays @@ -132,7 +132,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearTimedWait() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); @@ -140,8 +140,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearsFailSafe() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); @@ -150,8 +150,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { @Test public void testCheckFailingDisappearsTimedWait() throws Exception { - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); - FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); + FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 138f60ec6..2fc9fe8b2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -75,7 +75,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { .build(); FileCreateUtils.createCommit(basePath, "001"); - FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize); + FileCreateUtils.createBaseFile(basePath, testPartitionPath, "001", "file1", fileSize); metaClient = HoodieTableMetaClient.reload(metaClient); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java index 83e7ea082..8c4da54b2 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java @@ -58,10 +58,10 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // given: wrote some base files and corresponding markers HoodieTestTable testTable = HoodieTestTable.of(metaClient); String f0 = testTable.addRequestedCommit("000") - .withInserts("partA").get("partA"); + .withBaseFilesInPartitions("partA").get("partA"); String f1 = testTable.addCommit("001") - .withUpdates("partA", f0) - .withInserts("partB").get("partB"); + .withBaseFilesInPartition("partA", f0) + .withBaseFilesInPartitions("partB").get("partB"); String f2 = "f2"; testTable.forCommit("001") .withMarkerFile("partA", f0, IOType.MERGE) @@ -89,10 +89,10 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { // given: wrote some base + log files and corresponding markers HoodieTestTable testTable = HoodieTestTable.of(metaClient); String f2 = testTable.addRequestedDeltaCommit("000") - .withInserts("partA").get("partA"); + .withBaseFilesInPartitions("partA").get("partA"); String f1 = testTable.addDeltaCommit("001") .withLogFile("partA", f2) - .withInserts("partB").get("partB"); + .withBaseFilesInPartitions("partB").get("partB"); String f3 = "f3"; String f4 = "f4"; testTable.forDeltaCommit("001") diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index c4c67fa16..307e06867 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -19,12 +19,7 @@ package org.apache.hudi.testutils; import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.client.HoodieReadClient; -import org.apache.hudi.client.SparkTaskContextSupplier; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.BloomFilterFactory; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -36,11 +31,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; -import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.storage.HoodieAvroParquetConfig; -import org.apache.hudi.io.storage.HoodieParquetWriter; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -52,9 +43,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -62,13 +50,11 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import java.io.IOException; - import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -214,54 +200,4 @@ public class HoodieClientTestUtils { return valuesAsList.stream(); } - /** - * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. - */ - public static String writeParquetFile(String basePath, String partitionPath, String filename, - List records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { - - if (filter == null) { - filter = BloomFilterFactory - .createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); - } - HoodieAvroWriteSupport writeSupport = - new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter); - String instantTime = FSUtils.getCommitTime(filename); - HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, - ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, - HoodieTestUtils.getDefaultHadoopConf(), Double.valueOf(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); - HoodieParquetWriter writer = - new HoodieParquetWriter(instantTime, new Path(basePath + "/" + partitionPath + "/" + filename), config, - schema, new SparkTaskContextSupplier()); - int seqId = 1; - for (HoodieRecord record : records) { - GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); - HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, instantTime, "" + seqId++); - HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), filename); - writer.writeAvro(record.getRecordKey(), avroRecord); - filter.add(record.getRecordKey()); - } - writer.close(); - - if (createCommitTime) { - HoodieTestUtils.createMetadataFolder(basePath); - HoodieTestUtils.createCommitFiles(basePath, instantTime); - } - return filename; - } - - /** - * TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}. - */ - public static String writeParquetFile(String basePath, String partitionPath, List records, - Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException { - Thread.sleep(1000); - String instantTime = HoodieTestUtils.makeNewCommitTime(); - String fileId = UUID.randomUUID().toString(); - String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId); - HoodieTestUtils.createCommitFiles(basePath, instantTime); - return HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, schema, filter, - createCommitTime); - } - } diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java new file mode 100644 index 000000000..c2faa83c8 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.FileCreateUtils; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetWriter; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.nio.file.Paths; +import java.util.UUID; + +import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; + +public class HoodieWriteableTestTable extends HoodieTestTable { + + private final Schema schema; + private final BloomFilter filter; + + private HoodieWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) { + super(basePath, fs, metaClient); + this.schema = schema; + this.filter = filter; + } + + public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) { + return new HoodieWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter); + } + + public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) { + BloomFilter filter = BloomFilterFactory + .createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); + return of(metaClient, schema, filter); + } + + public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema schema) { + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + return of(metaClient, schema); + } + + public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema schema, BloomFilter filter) { + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + return of(metaClient, schema, filter); + } + + @Override + public HoodieWriteableTestTable addCommit(String instantTime) throws Exception { + return (HoodieWriteableTestTable) super.addCommit(instantTime); + } + + @Override + public HoodieWriteableTestTable forCommit(String instantTime) { + return (HoodieWriteableTestTable) super.forCommit(instantTime); + } + + public String withInserts(String partition) throws Exception { + return withInserts(partition, new HoodieRecord[0]); + } + + public String withInserts(String partition, HoodieRecord... records) throws Exception { + String fileId = UUID.randomUUID().toString(); + withInserts(partition, fileId, records); + return fileId; + } + + public HoodieWriteableTestTable withInserts(String partition, String fileId) throws Exception { + return withInserts(partition, fileId, new HoodieRecord[0]); + } + + public HoodieWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception { + FileCreateUtils.createPartitionMetaFile(basePath, partition); + String fileName = baseFileName(currentInstantTime, fileId); + + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( + new AvroSchemaConverter().convert(schema), schema, filter); + HoodieAvroParquetConfig config = new HoodieAvroParquetConfig(writeSupport, CompressionCodecName.GZIP, + ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, + new Configuration(), Double.parseDouble(HoodieStorageConfig.DEFAULT_STREAM_COMPRESSION_RATIO)); + try (HoodieParquetWriter writer = new HoodieParquetWriter( + currentInstantTime, + new Path(Paths.get(basePath, partition, fileName).toString()), + config, schema, new SparkTaskContextSupplier())) { + int seqId = 1; + for (HoodieRecord record : records) { + GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); + HoodieAvroUtils.addCommitMetadataToRecord(avroRecord, currentInstantTime, String.valueOf(seqId++)); + HoodieAvroUtils.addHoodieKeyToRecord(avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); + writer.writeAvro(record.getRecordKey(), avroRecord); + filter.add(record.getRecordKey()); + } + } + + return this; + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 987b5679d..f2a1144d7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -40,6 +41,32 @@ import java.util.Map; public class FileCreateUtils { + private static final String WRITE_TOKEN = "1-0-1"; + + public static String baseFileName(String instantTime, String fileId) { + return baseFileName(instantTime, fileId, HoodieFileFormat.PARQUET.getFileExtension()); + } + + public static String baseFileName(String instantTime, String fileId, String fileExtension) { + return FSUtils.makeDataFileName(instantTime, WRITE_TOKEN, fileId, fileExtension); + } + + public static String logFileName(String instantTime, String fileId, int version) { + return logFileName(instantTime, fileId, version, HoodieFileFormat.HOODIE_LOG.getFileExtension()); + } + + public static String logFileName(String instantTime, String fileId, int version, String fileExtension) { + return FSUtils.makeLogFileName(fileId, fileExtension, instantTime, version, WRITE_TOKEN); + } + + public static String markerFileName(String instantTime, String fileId, IOType ioType) { + return markerFileName(instantTime, fileId, ioType, HoodieFileFormat.PARQUET.getFileExtension()); + } + + public static String markerFileName(String instantTime, String fileId, IOType ioType, String fileExtension) { + return String.format("%s_%s_%s%s%s.%s", fileId, WRITE_TOKEN, instantTime, fileExtension, HoodieTableMetaClient.MARKER_EXTN, ioType); + } + private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException { Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME); Files.createDirectories(parentPath); @@ -73,45 +100,52 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION); } - public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId) - throws Exception { - createDataFile(basePath, partitionPath, instantTime, fileId, 0); - } - - public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length) - throws Exception { + public static void createPartitionMetaFile(String basePath, String partitionPath) throws IOException { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); - Path dataFilePath = parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)); - if (Files.notExists(dataFilePath)) { - Files.createFile(dataFilePath); + Path metaFilePath = parentPath.resolve(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE); + if (Files.notExists(metaFilePath)) { + Files.createFile(metaFilePath); } - new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length); } - public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version) + public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId) throws Exception { - createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0); + createBaseFile(basePath, partitionPath, instantTime, fileId, 0); } - public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length) + public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId, long length) throws Exception { Path parentPath = Paths.get(basePath, partitionPath); Files.createDirectories(parentPath); - Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1")); + Path baseFilePath = parentPath.resolve(baseFileName(instantTime, fileId)); + if (Files.notExists(baseFilePath)) { + Files.createFile(baseFilePath); + } + new RandomAccessFile(baseFilePath.toFile(), "rw").setLength(length); + } + + public static void createLogFile(String basePath, String partitionPath, String instantTime, String fileId, int version) + throws Exception { + createLogFile(basePath, partitionPath, instantTime, fileId, version, 0); + } + + public static void createLogFile(String basePath, String partitionPath, String instantTime, String fileId, int version, int length) + throws Exception { + Path parentPath = Paths.get(basePath, partitionPath); + Files.createDirectories(parentPath); + Path logFilePath = parentPath.resolve(logFileName(instantTime, fileId, version)); if (Files.notExists(logFilePath)) { Files.createFile(logFilePath); } new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length); } - public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType) + public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileId, IOType ioType) throws IOException { - Path folderPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); - Files.createDirectories(folderPath); - String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", instantTime, - HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType); - Path markerFilePath = folderPath.resolve(markerFileName); + Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); + Files.createDirectories(parentPath); + Path markerFilePath = parentPath.resolve(markerFileName(instantTime, fileId, ioType)); if (Files.notExists(markerFilePath)) { Files.createFile(markerFilePath); } @@ -119,11 +153,11 @@ public class FileCreateUtils { } public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException { - Path markerDir = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); - if (Files.notExists(markerDir)) { + Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); + if (Files.notExists(parentPath)) { return 0; } - return Files.list(markerDir).filter(p -> p.getFileName().toString() + return Files.list(parentPath).filter(p -> p.getFileName().toString() .endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, ioType))).count(); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 32f2d4580..c99fe1d0e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -19,8 +19,6 @@ package org.apache.hudi.common.testutils; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; @@ -39,6 +37,7 @@ import java.util.Objects; import java.util.UUID; import java.util.stream.IntStream; +import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit; @@ -46,15 +45,16 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDel import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit; import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit; +import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName; public class HoodieTestTable { - private final String basePath; - private final FileSystem fs; - private HoodieTableMetaClient metaClient; - private String currentInstantTime; + protected final String basePath; + protected final FileSystem fs; + protected HoodieTableMetaClient metaClient; + protected String currentInstantTime; - private HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) { + protected HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) { ValidationUtils.checkArgument(Objects.equals(basePath, metaClient.getBasePath())); ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs())); this.basePath = basePath; @@ -124,6 +124,13 @@ public class HoodieTestTable { return this; } + public HoodieTestTable withPartitionMetaFiles(String... partitionPaths) throws IOException { + for (String partitionPath : partitionPaths) { + FileCreateUtils.createPartitionMetaFile(basePath, partitionPath); + } + return this; + } + public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException { return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType); } @@ -150,19 +157,19 @@ public class HoodieTestTable { * * @return A {@link Map} of partition and its newly inserted file's id. */ - public Map withInserts(String... partitions) throws Exception { + public Map withBaseFilesInPartitions(String... partitions) throws Exception { Map partitionFileIdMap = new HashMap<>(); for (String p : partitions) { String fileId = UUID.randomUUID().toString(); - FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId); + FileCreateUtils.createBaseFile(basePath, p, currentInstantTime, fileId); partitionFileIdMap.put(p, fileId); } return partitionFileIdMap; } - public HoodieTestTable withUpdates(String partition, String... fileIds) throws Exception { + public HoodieTestTable withBaseFilesInPartition(String partition, String... fileIds) throws Exception { for (String f : fileIds) { - FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, f); + FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, f); } return this; } @@ -182,35 +189,37 @@ public class HoodieTestTable { return this; } - public boolean filesExist(Map partitionAndFileId, String instantTime) { + public boolean baseFilesExist(Map partitionAndFileId, String instantTime) { return partitionAndFileId.entrySet().stream().allMatch(entry -> { String partition = entry.getKey(); String fileId = entry.getValue(); - return fileExists(partition, instantTime, fileId); + return baseFileExists(partition, instantTime, fileId); }); } - public boolean filesExist(String partition, String instantTime, String... fileIds) { - return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, instantTime, f)); + public boolean baseFilesExist(String partition, String instantTime, String... fileIds) { + return Arrays.stream(fileIds).allMatch(f -> baseFileExists(partition, instantTime, f)); } - public boolean fileExists(String partition, String instantTime, String fileId) { + public boolean baseFileExists(String partition, String instantTime, String fileId) { try { - return fs.exists(new Path(Paths.get(basePath, partition, - FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString())); + return fs.exists(new Path(Paths.get(basePath, partition, baseFileName(instantTime, fileId)).toString())); } catch (IOException e) { throw new HoodieTestTableException(e); } } + public String getBaseFileNameById(String fileId) { + return baseFileName(currentInstantTime, fileId); + } + public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) { return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v)); } public boolean logFileExists(String partition, String instantTime, String fileId, int version) { try { - return fs.exists(new Path(Paths.get(basePath, partition, - FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, "1-0-1")).toString())); + return fs.exists(new Path(Paths.get(basePath, partition, logFileName(instantTime, fileId, version)).toString())); } catch (IOException e) { throw new HoodieTestTableException(e); }