diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala index 36c652e4a..8aa9709b1 100644 --- a/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala +++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/SparkHelpers.scala @@ -25,8 +25,9 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport import org.apache.hudi.client.SparkTaskContextSupplier import org.apache.hudi.common.HoodieJsonPayload import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory} +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.common.util.ParquetUtils +import org.apache.hudi.common.util.BaseFileUtils import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig} import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter} import org.apache.parquet.avro.AvroSchemaConverter @@ -40,7 +41,7 @@ import scala.collection.mutable._ object SparkHelpers { @throws[Exception] def skipKeysAndWriteNewFile(instantTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) { - val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile) + val sourceRecords = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(fs.getConf, sourceFile) val schema: Schema = sourceRecords.get(0).getSchema val filter: BloomFilter = BloomFilterFactory.createBloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble, HoodieIndexConfig.DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_INDEX_FILTER_TYPE); @@ -125,7 +126,7 @@ class SparkHelper(sqlContext: SQLContext, fs: FileSystem) { * @return */ def fileKeysAgainstBF(conf: Configuration, sqlContext: SQLContext, file: String): Boolean = { - val bf = ParquetUtils.readBloomFilterFromParquetMetadata(conf, new Path(file)) + val bf = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readBloomFilterFromMetadata(conf, new Path(file)) val foundCount = sqlContext.parquetFile(file) .select(s"`${HoodieRecord.RECORD_KEY_METADATA_FIELD}`") .collect().count(r => !bf.mightContain(r.getString(0))) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java index 8cf2be958..60f130e2d 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java @@ -70,7 +70,7 @@ public class TestRollbacksCommand extends AbstractShellIntegrationTest { tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); - //Create some commits files and parquet files + //Create some commits files and base files Map partitionAndFileId = new HashMap() { { put(DEFAULT_FIRST_PARTITION_PATH, "file-1"); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java index b9aa3f731..5dd556eab 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java @@ -60,7 +60,7 @@ public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest { tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); - //Create some commits files and parquet files + //Create some commits files and base files HoodieTestTable.of(metaClient) .withPartitionMetaFiles(DEFAULT_PARTITION_PATHS) .addCommit("100") diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java index adc61457a..b3c5c06be 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java @@ -71,7 +71,7 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest { */ @Test public void testRollbackCommit() throws Exception { - //Create some commits files and parquet files + //Create some commits files and base files Map partitionAndFileId = new HashMap() { { put(DEFAULT_FIRST_PARTITION_PATH, "file-1"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java index a5bc6b2fe..003bfb337 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -48,7 +48,8 @@ public class HoodieKeyLocationFetchHandle> locations() { HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight(); - return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream() + return BaseFileUtils.getInstance(baseFile.getPath()).fetchRecordKeyPartitionPath( + hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream() .map(entry -> Pair.of(entry, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 4c1c3e21f..8537e27f6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -135,7 +135,7 @@ public class RollbackUtils { !activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit, 1).empty(); if (higherDeltaCommits) { // Rollback of a compaction action with no higher deltacommit means that the compaction is scheduled - // and has not yet finished. In this scenario we should delete only the newly created parquet files + // and has not yet finished. In this scenario we should delete only the newly created base files // and not corresponding base commit log files created with this as baseCommit since updates would // have been written to the log files. LOG.info("Rolling back compaction. There are higher delta commits. So only deleting data files"); @@ -168,13 +168,13 @@ public class RollbackUtils { // --------------------------------------------------------------------------------------------------- // (B) The following cases are possible if !index.canIndexLogFiles and/or !index.isGlobal // --------------------------------------------------------------------------------------------------- - // (B.1) Failed first commit - Inserts were written to parquet files and HoodieWriteStat has no entries. - // In this scenario, we delete all the parquet files written for the failed commit. - // (B.2) Failed recurring commits - Inserts were written to parquet files and updates to log files. In + // (B.1) Failed first commit - Inserts were written to base files and HoodieWriteStat has no entries. + // In this scenario, we delete all the base files written for the failed commit. + // (B.2) Failed recurring commits - Inserts were written to base files and updates to log files. In // this scenario, perform (A.1) and for updates written to log files, write rollback blocks. // (B.3) Rollback triggered for first commit - Same as (B.1) // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files - // as well if the base parquet file gets deleted. + // as well if the base base file gets deleted. try { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( table.getMetaClient().getCommitTimeline() @@ -183,7 +183,7 @@ public class RollbackUtils { HoodieCommitMetadata.class); // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or parquet files), + // We do not know fileIds for inserts (first inserts are either log files or base files), // delete all files for the corresponding failed commit, if present (same as COW) partitionRollbackRequests.add( ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); @@ -211,7 +211,7 @@ public class RollbackUtils { // wStat.getPrevCommit() might not give the right commit time in the following // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be // used to write the new log files. In this case, the commit time for the log file is the compaction requested time. - // But the index (global) might store the baseCommit of the parquet and not the requested, hence get the + // But the index (global) might store the baseCommit of the base and not the requested, hence get the // baseCommit always by listing the file slice Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlices(partitionPath) .collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index e485c0068..91c5cbd26 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -221,7 +221,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { HoodieRecord record4 = new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); - // We write record1, record2 to a parquet file, but the bloom filter contains (record1, + // We write record1, record2 to a base file, but the bloom filter contains (record1, // record2, record3). BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); filter.add(record3.getRecordKey()); @@ -311,7 +311,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { assertFalse(record.isCurrentLocationKnown()); } - // We create three parquet file, each having one record. (two different partitions) + // We create three base file, each having one record. (two different partitions) String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1); String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2); String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4); @@ -385,7 +385,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { assertTrue(!record.isPresent()); } - // We create three parquet file, each having one record. (two different partitions) + // We create three base file, each having one record. (two different partitions) String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1); String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2); String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4); @@ -433,7 +433,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; - // We write record1 to a parquet file, using a bloom filter having both records + // We write record1 to a base file, using a bloom filter having both records RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java index af3f3c2bc..796d7b74a 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -31,8 +31,8 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.testutils.Transformations; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -125,6 +125,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); + BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); String partitionPath = "2016/01/31"; HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient); @@ -155,14 +156,14 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase assertEquals(1, allFiles.length); // Read out the bloom filter and make sure filter can answer record exist or not - Path parquetFilePath = allFiles[0].getPath(); - BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath); + Path filePath = allFiles[0].getPath(); + BloomFilter filter = fileUtils.readBloomFilterFromMetadata(hadoopConf, filePath); for (HoodieRecord record : records) { assertTrue(filter.mightContain(record.getRecordKey())); } - // Read the parquet file, check the record content - List fileRecords = ParquetUtils.readAvroRecords(hadoopConf, parquetFilePath); + // Read the base file, check the record content + List fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath); GenericRecord newRecord; int index = 0; for (GenericRecord record : fileRecords) { @@ -193,12 +194,12 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase allFiles = getIncrementalFiles(partitionPath, firstCommitTime, -1); assertEquals(1, allFiles.length); // verify new incremental file group is same as the previous one - assertEquals(FSUtils.getFileId(parquetFilePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName())); + assertEquals(FSUtils.getFileId(filePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName())); // Check whether the record has been updated - Path updatedParquetFilePath = allFiles[0].getPath(); + Path updatedfilePath = allFiles[0].getPath(); BloomFilter updatedFilter = - ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, updatedParquetFilePath); + fileUtils.readBloomFilterFromMetadata(hadoopConf, updatedfilePath); for (HoodieRecord record : records) { // No change to the _row_key assertTrue(updatedFilter.mightContain(record.getRecordKey())); @@ -207,7 +208,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase assertTrue(updatedFilter.mightContain(insertedRecord1.getRecordKey())); records.add(insertedRecord1);// add this so it can further check below - ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedParquetFilePath).build(); + ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedfilePath).build(); index = 0; while ((newRecord = (GenericRecord) updatedReader.read()) != null) { assertEquals(newRecord.get("_row_key").toString(), records.get(index).getRecordKey()); @@ -397,7 +398,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase // Check the updated file int counts = 0; for (File file : Paths.get(basePath, "2016/01/31").toFile().listFiles()) { - if (file.getName().endsWith(".parquet") && FSUtils.getCommitTime(file.getName()).equals(instantTime)) { + if (file.getName().endsWith(table.getBaseFileExtension()) && FSUtils.getCommitTime(file.getName()).equals(instantTime)) { LOG.info(file.getName() + "-" + file.length()); counts++; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java index 0dcd744e5..6e82f4241 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java @@ -47,7 +47,7 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro MessageType parquetSchema = partitions.stream().flatMap(p -> p.getValue().stream()).map(fs -> { try { Path filePath = FileStatusUtils.toPath(fs.getPath()); - return ParquetUtils.readSchema(context.getHadoopConf().get(), filePath); + return new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath); } catch (Exception ex) { return null; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java index b4b5c05df..32ac86896 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java @@ -163,7 +163,7 @@ public class TestClientRollback extends HoodieClientTestBase { */ @Test public void testRollbackCommit() throws Exception { - // Let's create some commit files and parquet files + // Let's create some commit files and base files final String p1 = "2016/05/01"; final String p2 = "2016/05/02"; final String p3 = "2016/05/06"; @@ -251,7 +251,7 @@ public class TestClientRollback extends HoodieClientTestBase { */ @Test public void testAutoRollbackInflightCommit() throws Exception { - // Let's create some commit files and parquet files + // Let's create some commit files and base files final String p1 = "2016/05/01"; final String p2 = "2016/05/02"; final String p3 = "2016/05/06"; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index bee5c82c8..4e5aefd95 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -49,9 +49,9 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; @@ -115,7 +115,6 @@ import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.NULL_SCHE import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys; import static org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet; -import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet; import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY; import static org.apache.hudi.config.HoodieClusteringConfig.DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; @@ -424,23 +423,24 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build(); String basePathStr = basePath; HoodieTable table = getHoodieTable(metaClient, cfg); + String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); jsc.parallelize(Arrays.asList(1)).map(e -> { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(metaClient.getActiveTimeline().getInstantDetails( metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(), HoodieCommitMetadata.class); String filePath = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny() + .flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(extension)).findAny() .map(ee -> ee.getPath()).orElse(null); String partitionPath = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny() + .flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(extension)).findAny() .map(ee -> ee.getPartitionPath()).orElse(null); - Path parquetFilePath = new Path(basePathStr, filePath); - HoodieBaseFile baseFile = new HoodieBaseFile(parquetFilePath.toString()); + Path baseFilePath = new Path(basePathStr, filePath); + HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath.toString()); try { HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime, table, new HashMap<>(), - partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier()); + partitionPath, FSUtils.getFileId(baseFilePath.getName()), baseFile, new SparkTaskContextSupplier()); WriteStatus writeStatus = new WriteStatus(false, 0.0); writeStatus.setStat(new HoodieWriteStat()); writeStatus.getStat().setNumWrites(0); @@ -454,7 +454,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { cfg.getProps().setProperty("hoodie.merge.data.validation.enabled", "true"); HoodieWriteConfig cfg2 = HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build(); HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new HashMap<>(), - partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier()); + partitionPath, FSUtils.getFileId(baseFilePath.getName()), baseFile, new SparkTaskContextSupplier()); WriteStatus writeStatus = new WriteStatus(false, 0.0); writeStatus.setStat(new HoodieWriteStat()); writeStatus.getStat().setNumWrites(0); @@ -850,6 +850,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config); + BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); // Inserts => will write file1 String commitTime1 = "001"; @@ -865,7 +866,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, - readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) + fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); // Update + Inserts such that they just expand file1 @@ -885,10 +886,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be expanded"); assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); - assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(), + assertEquals(140, fileUtils.readRowKeys(hadoopConf, newFile).size(), "file should contain 140 records"); - List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); + List records = fileUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); assertEquals(commitTime2, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect commit2"); @@ -919,7 +920,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { for (HoodieBaseFile file : files) { if (file.getFileName().contains(file1)) { assertEquals(commitTime3, file.getCommitTime(), "Existing file should be expanded"); - records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); + records = fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recordCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); @@ -935,7 +936,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(0, keys2.size(), "All keys added in commit 2 must be updated in commit3 correctly"); } else { assertEquals(commitTime3, file.getCommitTime(), "New file must be written for commit 3"); - records = ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); + records = fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath())); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); assertEquals(commitTime3, record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), @@ -961,6 +962,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config); + BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); // Inserts => will write file1 String commitTime1 = "001"; @@ -974,7 +976,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, - readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) + fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); // Second, set of Inserts should just expand file1 @@ -990,9 +992,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(), "Existing file should be expanded"); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); - assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(), + assertEquals(140, fileUtils.readRowKeys(hadoopConf, newFile).size(), "file should contain 140 records"); - List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); + List records = fileUtils.readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recCommitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(); @@ -1011,8 +1013,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertNoWriteErrors(statuses); assertEquals(2, statuses.size(), "2 files needs to be committed."); assertEquals(340, - readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size() - + readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), + fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size() + + fileUtils.readRowKeys(hadoopConf, new Path(basePath, statuses.get(1).getStat().getPath())).size(), "file should contain 340 records"); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); @@ -1024,7 +1026,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { int totalInserts = 0; for (HoodieBaseFile file : files) { assertEquals(commitTime3, file.getCommitTime(), "All files must be at commit 3"); - totalInserts += ParquetUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size(); + totalInserts += fileUtils.readAvroRecords(hadoopConf, new Path(file.getPath())).size(); } assertEquals(totalInserts, inserts1.size() + inserts2.size() + inserts3.size(), "Total number of records must add up"); } @@ -1056,7 +1058,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, - readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) + BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) .size(), "file should contain 100 records"); // Delete 20 among 100 inserted @@ -1356,13 +1358,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } /** - * Verify data in parquet files matches expected records and commit time. + * Verify data in base files matches expected records and commit time. */ private void verifyRecordsWritten(String commitTime, List expectedRecords, List allStatus) { List records = new ArrayList<>(); for (WriteStatus status : allStatus) { Path filePath = new Path(basePath, status.getStat().getPath()); - records.addAll(ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), filePath)); + records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath)); } Set expectedKeys = recordsToRecordKeySet(expectedRecords); @@ -1410,7 +1412,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } private void testDeletes(SparkRDDWriteClient client, List previousRecords, int sizeToDelete, - String existingFile, String instantTime, int exepctedRecords, List keys) { + String existingFile, String instantTime, int expectedRecords, List keys) { client.startCommitWithTime(instantTime); List hoodieKeysToDelete = randomSelectAsHoodieKeys(previousRecords, sizeToDelete); @@ -1427,16 +1429,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { for (int i = 0; i < fullPartitionPaths.length; i++) { fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); } - assertEquals(exepctedRecords, + assertEquals(expectedRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(), - "Must contain " + exepctedRecords + " records"); + "Must contain " + expectedRecords + " records"); Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); - assertEquals(exepctedRecords, - readRowKeysFromParquet(hadoopConf, newFile).size(), + assertEquals(expectedRecords, + BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, newFile).size(), "file should contain 110 records"); - List records = ParquetUtils.readAvroRecords(hadoopConf, newFile); + List records = BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime); @@ -1491,7 +1493,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertTrue(testTable.commitExists(instantTime), "After explicit commit, commit file should be created"); - // Get parquet file paths from commit metadata + // Get base file paths from commit metadata String actionType = metaClient.getCommitActionType(); HoodieInstant commitInstant = new HoodieInstant(false, actionType, instantTime); HoodieTimeline commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java index 35ee557ce..1cd7d6ee9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java @@ -113,7 +113,7 @@ public class TestHoodieReadClient extends HoodieClientTestBase { assertEquals(100, filteredRDD.collect().size()); JavaRDD smallRecordsRDD = jsc.parallelize(records.subList(0, 75), 1); - // We create three parquet file, each having one record. (3 different partitions) + // We create three base file, each having one record. (3 different partitions) List statuses = writeFn.apply(writeClient, smallRecordsRDD, newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java index 9b53de1d6..fd6581624 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.io.HoodieCreateHandle; @@ -36,7 +36,6 @@ import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; -import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.ParquetDecodingException; import org.junit.jupiter.api.AfterEach; @@ -123,9 +122,10 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness { Executable executable = () -> { HoodieMergeHandle mergeHandle = new HoodieMergeHandle(updateTable.getConfig(), "101", updateTable, updateRecords.iterator(), updateRecords.get(0).getPartitionPath(), insertResult.getFileId(), supplier); - AvroReadSupport.setAvroReadSchema(updateTable.getHadoopConf(), mergeHandle.getWriterSchemaWithMetafields()); - List oldRecords = ParquetUtils.readAvroRecords(updateTable.getHadoopConf(), - new Path(updateTable.getConfig().getBasePath() + "/" + insertResult.getStat().getPath())); + List oldRecords = BaseFileUtils.getInstance(updateTable.getBaseFileFormat()) + .readAvroRecords(updateTable.getHadoopConf(), + new Path(updateTable.getConfig().getBasePath() + "/" + insertResult.getStat().getPath()), + mergeHandle.getWriterSchemaWithMetafields()); for (GenericRecord rec : oldRecords) { mergeHandle.write(rec); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 83bb68429..cb37ed4bc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -754,7 +754,7 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStats = runCleaner(config); assertEquals(3, getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() - .size(), "Must clean three files, one parquet and 2 log files"); + .size(), "Must clean three files, one base and 2 log files"); assertFalse(testTable.baseFileExists(p0, "000", file1P0)); assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); assertTrue(testTable.baseFileExists(p0, "001", file1P0)); @@ -797,7 +797,7 @@ public class TestCleaner extends HoodieClientTestBase { List hoodieCleanStats = runCleaner(config); assertEquals(3, getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() - .size(), "Must clean three files, one parquet and 2 log files"); + .size(), "Must clean three files, one base and 2 log files"); assertFalse(testTable.baseFileExists(p0, "000", file1P0)); assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); assertTrue(testTable.baseFileExists(p0, "001", file1P0)); @@ -935,8 +935,9 @@ public class TestCleaner extends HoodieClientTestBase { String partition1 = DEFAULT_PARTITION_PATHS[0]; String partition2 = DEFAULT_PARTITION_PATHS[1]; - String fileName1 = "data1_1_000.parquet"; - String fileName2 = "data2_1_000.parquet"; + String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + String fileName1 = "data1_1_000" + extension; + String fileName2 = "data2_1_000" + extension; String filePath1 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName1; String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2; @@ -1025,8 +1026,9 @@ public class TestCleaner extends HoodieClientTestBase { String partition1 = DEFAULT_PARTITION_PATHS[0]; String partition2 = DEFAULT_PARTITION_PATHS[1]; - String fileName1 = "data1_1_000.parquet"; - String fileName2 = "data2_1_000.parquet"; + String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + String fileName1 = "data1_1_000" + extension; + String fileName2 = "data2_1_000" + extension; Map> filesToBeCleanedPerPartition = new HashMap<>(); filesToBeCleanedPerPartition.put(partition1, Arrays.asList(fileName1)); @@ -1314,7 +1316,7 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .build(); // Deletions: - // . FileId Parquet Logs Total Retained Commits + // . FileId Base Logs Total Retained Commits // FileId7 5 10 15 009, 011 // FileId6 5 10 15 009 // FileId5 3 6 9 005 @@ -1338,7 +1340,7 @@ public class TestCleaner extends HoodieClientTestBase { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build()) .build(); // Deletions: - // . FileId Parquet Logs Total Retained Commits + // . FileId Base Logs Total Retained Commits // FileId7 5 10 15 009, 011 // FileId6 4 8 12 007, 009 // FileId5 2 4 6 003 005 diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java index 8b19ac1c1..e7cd5ce04 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.ConsistencyGuard; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FailSafeConsistencyGuard; import org.apache.hudi.common.fs.OptimisticConsistencyGuard; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.testutils.HoodieClientTestHarness; @@ -44,6 +45,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; */ public class TestConsistencyGuard extends HoodieClientTestHarness { + private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension(); + // multiple parameters, uses Collection public static List consistencyGuardType() { return Arrays.asList( @@ -73,17 +76,19 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000); ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName()) ? new FailSafeConsistencyGuard(fs, config) : new OptimisticConsistencyGuard(fs, config); - passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); - passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet")); + passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION)); + passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION)); passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays - .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-1_000.parquet")); + .asList(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION, + basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION)); - fs.delete(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"), false); - fs.delete(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"), false); - passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); - passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet")); + fs.delete(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION), false); + fs.delete(new Path(basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION), false); + passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION)); + passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION)); passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays - .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-1_000.parquet")); + .asList(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION, + basePath + "/partition/path/f2_1-0-1_000" + BASE_FILE_EXTENSION)); } @Test @@ -92,7 +97,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays - .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); + .asList(basePath + "/partition/path/f1_1-0-2_000" + BASE_FILE_EXTENSION, + basePath + "/partition/path/f2_1-0-2_000" + BASE_FILE_EXTENSION)); }); } @@ -101,7 +107,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays - .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); + .asList(basePath + "/partition/path/f1_1-0-2_000" + BASE_FILE_EXTENSION, + basePath + "/partition/path/f2_1-0-2_000" + BASE_FILE_EXTENSION)); } @Test @@ -109,7 +116,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { - passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); + passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000" + BASE_FILE_EXTENSION)); }); } @@ -117,7 +124,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { public void testCheckFailingAppearsTimedWait() throws Exception { FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); - passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); + passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000" + BASE_FILE_EXTENSION)); } @Test @@ -126,7 +133,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays - .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); + .asList(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION, + basePath + "/partition/path/f2_1-0-2_000" + BASE_FILE_EXTENSION)); }); } @@ -135,7 +143,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays - .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); + .asList(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION, + basePath + "/partition/path/f2_1-0-2_000" + BASE_FILE_EXTENSION)); } @Test @@ -144,7 +153,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); assertThrows(TimeoutException.class, () -> { - passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); + passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION)); }); } @@ -153,7 +162,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness { FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); FileCreateUtils.createBaseFile(basePath, "partition/path", "000", "f1"); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); - passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); + passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000" + BASE_FILE_EXTENSION)); } private ConsistencyGuardConfig getConsistencyGuardConfig() { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index a6ac276e6..18ff1368d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -131,7 +132,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { @BeforeEach public void init() throws IOException { - init(HoodieFileFormat.PARQUET); + init(HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT); } @AfterEach @@ -345,7 +346,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List records004 = dataGen.generateUpdates(updateTime, 100); updateRecords(records004, client, cfg, updateTime); - // verify RO incremental reads - only one parquet file shows up because updates to into log files + // verify RO incremental reads - only one base file shows up because updates to into log files incrementalROFiles = getROIncrementalFiles(partitionPath, false); validateFiles(partitionPath, 1, incrementalROFiles, false, roJobConf, 200, commitTime1); assertEquals(firstFilePath, incrementalROFiles[0].getPath()); @@ -358,7 +359,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { String compactionCommitTime = "005"; client.scheduleCompactionAtInstant("005", Option.empty()); - // verify RO incremental reads - only one parquet file shows up because updates go into log files + // verify RO incremental reads - only one base file shows up because updates go into log files incrementalROFiles = getROIncrementalFiles(partitionPath, true); validateFiles(partitionPath,1, incrementalROFiles, false, roJobConf, 200, commitTime1); @@ -436,7 +437,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** - * Write 1 (only inserts, written as parquet file) + * Write 1 (only inserts, written as base file) */ String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); @@ -465,7 +466,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), - "should list the parquet files we wrote in the delta commit"); + "should list the base files we wrote in the delta commit"); /** * Write 2 (only updates, written to .log file) @@ -613,7 +614,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), - "should list the parquet files we wrote in the delta commit"); + "should list the base files we wrote in the delta commit"); /** * Write 2 (inserts + updates - testing failed delta commit) @@ -630,7 +631,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { List dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, basePath); - assertEquals(recordsRead.size(), 200); + assertEquals(200, recordsRead.size()); statuses = secondClient.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect(); // Verify there are no errors @@ -674,7 +675,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // Test successful delta commit rollback thirdClient.rollback(commitTime2); allFiles = listAllBaseFilesInPath(hoodieTable); - // After rollback, there should be no parquet file with the failed commit time + // After rollback, there should be no base file with the failed commit time assertEquals(0, Arrays.stream(allFiles) .filter(file -> file.getPath().getName().contains(commitTime2)).count()); @@ -768,7 +769,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = tableView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), - "Should list the parquet files we wrote in the delta commit"); + "Should list the base files we wrote in the delta commit"); /** * Write 2 (inserts + updates) @@ -901,7 +902,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** - * Write 1 (only inserts, written as parquet file) + * Write 1 (only inserts, written as base file) */ String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); @@ -926,17 +927,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles); Stream dataFilesToRead = roView.getLatestBaseFiles(); - Map parquetFileIdToSize = + Map fileIdToSize = dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize)); roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); List dataFilesList = dataFilesToRead.collect(Collectors.toList()); assertTrue(dataFilesList.size() > 0, - "Should list the parquet files we wrote in the delta commit"); + "Should list the base files we wrote in the delta commit"); /** - * Write 2 (only updates + inserts, written to .log file + correction of existing parquet file size) + * Write 2 (only updates + inserts, written to .log file + correction of existing base file size) */ newCommitTime = "002"; client.startCommitWithTime(newCommitTime); @@ -961,10 +962,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); List newDataFilesList = dataFilesToRead.collect(Collectors.toList()); - Map parquetFileIdToNewSize = + Map fileIdToNewSize = newDataFilesList.stream().collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize)); - assertTrue(parquetFileIdToNewSize.entrySet().stream().anyMatch(entry -> parquetFileIdToSize.get(entry.getKey()) < entry.getValue())); + assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue())); List dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf, dataFiles, @@ -1082,8 +1083,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // Do a compaction String instantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); statuses = (JavaRDD) writeClient.compact(instantTime); - assertEquals(statuses.map(status -> status.getStat().getPath().contains("parquet")).count(), numLogFiles); - assertEquals(statuses.count(), numLogFiles); + String extension = table.getBaseFileExtension(); + assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count()); + assertEquals(numLogFiles, statuses.count()); writeClient.commitCompaction(instantTime, statuses, Option.empty()); } } @@ -1215,9 +1217,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // Do a compaction newCommitTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); statuses = (JavaRDD) writeClient.compact(newCommitTime); - // Ensure all log files have been compacted into parquet files - assertEquals(statuses.map(status -> status.getStat().getPath().contains("parquet")).count(), numLogFiles); - assertEquals(statuses.count(), numLogFiles); + // Ensure all log files have been compacted into base files + String extension = table.getBaseFileExtension(); + assertEquals(numLogFiles, statuses.map(status -> status.getStat().getPath().contains(extension)).count()); + assertEquals(numLogFiles, statuses.count()); //writeClient.commitCompaction(newCommitTime, statuses, Option.empty()); // Trigger a rollback of compaction table.getActiveTimeline().reload(); @@ -1463,7 +1466,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** - * Write 1 (only inserts, written as parquet file) + * Write 1 (only inserts, written as base file) */ String newCommitTime = "001"; client.startCommitWithTime(newCommitTime); @@ -1493,7 +1496,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), - "should list the parquet files we wrote in the delta commit"); + "should list the base files we wrote in the delta commit"); /** * Write 2 (only updates, written to .log file) @@ -1603,7 +1606,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestBaseFiles(); assertTrue(dataFilesToRead.findAny().isPresent(), - "should list the parquet files we wrote in the delta commit"); + "should list the base files we wrote in the delta commit"); } private void updateRecords(List records, SparkRDDWriteClient client, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java index b57e1c527..83a6caecd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java @@ -18,6 +18,8 @@ package org.apache.hudi.table.action.bootstrap; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -47,13 +49,15 @@ public class TestBootstrapUtils extends HoodieClientTestBase { }); // Files inside partitions and marker directories - List files = Arrays.asList( - "2016/04/15/1_1-0-1_20190528120000.parquet", - "2016/04/15/2_1-0-1_20190528120000.parquet", - "2016/05/16/3_1-0-1_20190528120000.parquet", - "2016/05/16/4_1-0-1_20190528120000.parquet", - "2016/04/17/5_1-0-1_20190528120000.parquet", - "2016/04/17/6_1-0-1_20190528120000.parquet"); + List files = Stream.of( + "2016/04/15/1_1-0-1_20190528120000", + "2016/04/15/2_1-0-1_20190528120000", + "2016/05/16/3_1-0-1_20190528120000", + "2016/05/16/4_1-0-1_20190528120000", + "2016/04/17/5_1-0-1_20190528120000", + "2016/04/17/6_1-0-1_20190528120000") + .map(file -> file + metaClient.getTableConfig().getBaseFileFormat().getFileExtension()) + .collect(Collectors.toList()); files.forEach(f -> { try { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 3e03c0536..40df1af89 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -29,8 +29,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.testutils.Transformations; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -154,14 +154,14 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { assertEquals(1, allFiles.length); // Read out the bloom filter and make sure filter can answer record exist or not - Path parquetFilePath = allFiles[0].getPath(); - BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath); + Path filePath = allFiles[0].getPath(); + BloomFilter filter = BaseFileUtils.getInstance(table.getBaseFileFormat()).readBloomFilterFromMetadata(hadoopConf, filePath); for (HoodieRecord record : records) { assertTrue(filter.mightContain(record.getRecordKey())); } - // Read the parquet file, check the record content - List fileRecords = ParquetUtils.readAvroRecords(hadoopConf, parquetFilePath); + // Read the base file, check the record content + List fileRecords = BaseFileUtils.getInstance(table.getBaseFileFormat()).readAvroRecords(hadoopConf, filePath); GenericRecord newRecord; int index = 0; for (GenericRecord record : fileRecords) { @@ -192,12 +192,12 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { allFiles = getIncrementalFiles(partitionPath, firstCommitTime, -1); assertEquals(1, allFiles.length); // verify new incremental file group is same as the previous one - assertEquals(FSUtils.getFileId(parquetFilePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName())); + assertEquals(FSUtils.getFileId(filePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName())); // Check whether the record has been updated - Path updatedParquetFilePath = allFiles[0].getPath(); + Path updatedFilePath = allFiles[0].getPath(); BloomFilter updatedFilter = - ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, updatedParquetFilePath); + BaseFileUtils.getInstance(metaClient).readBloomFilterFromMetadata(hadoopConf, updatedFilePath); for (HoodieRecord record : records) { // No change to the _row_key assertTrue(updatedFilter.mightContain(record.getRecordKey())); @@ -206,7 +206,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { assertTrue(updatedFilter.mightContain(insertedRecord1.getRecordKey())); records.add(insertedRecord1);// add this so it can further check below - ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedParquetFilePath).build(); + ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedFilePath).build(); index = 0; while ((newRecord = (GenericRecord) updatedReader.read()) != null) { assertEquals(newRecord.get("_row_key").toString(), records.get(index).getRecordKey()); @@ -393,7 +393,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase { // Check the updated file int counts = 0; for (File file : Paths.get(basePath, "2016/01/31").toFile().listFiles()) { - if (file.getName().endsWith(".parquet") && FSUtils.getCommitTime(file.getName()).equals(instantTime)) { + if (file.getName().endsWith(table.getBaseFileExtension()) && FSUtils.getCommitTime(file.getName()).equals(instantTime)) { LOG.info(file.getName() + "-" + file.length()); counts++; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java index 6c8a54dab..e8e2e5410 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java @@ -135,7 +135,7 @@ public class CompactionTestBase extends HoodieClientTestBase { HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); List dataFilesToRead = getCurrentLatestBaseFiles(hoodieTable); assertTrue(dataFilesToRead.stream().findAny().isPresent(), - "should list the parquet files we wrote in the delta commit"); + "should list the base files we wrote in the delta commit"); validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index faf7e7d43..156f215ca 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -274,7 +275,7 @@ public class TestHoodieCompactionStrategy { private final long size; public TestHoodieBaseFile(long size) { - super("/tmp/XYXYXYXYXYYX_11_20180918020003.parquet"); + super("/tmp/XYXYXYXYXYYX_11_20180918020003" + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension()); this.size = size; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 030cc3e5d..c7500a000 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -67,7 +67,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT final String p1 = "2015/03/16"; final String p2 = "2015/03/17"; final String p3 = "2016/03/15"; - // Let's create some commit files and parquet files + // Let's create some commit files and base files HoodieTestTable testTable = HoodieTestTable.of(metaClient) .withPartitionMetaFiles(p1, p2, p3) .addCommit("001") diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackUtils.java index 8db2069b0..4f01b34dc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackUtils.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -37,6 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertIterableEquals; public class TestRollbackUtils { + private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension(); private FileStatus generateFileStatus(String filePath) { Path dataFile1Path = new Path(filePath); @@ -62,15 +64,15 @@ public class TestRollbackUtils { String partitionPath2 = "/partitionPath2/"; //prepare HoodieRollbackStat for different partition Map dataFilesOnlyStat1Files = new HashMap<>(); - dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile1.parquet"), true); - dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile2.parquet"), true); + dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile1" + BASE_FILE_EXTENSION), true); + dataFilesOnlyStat1Files.put(generateFileStatus(partitionPath1 + "dataFile2" + BASE_FILE_EXTENSION), true); HoodieRollbackStat dataFilesOnlyStat1 = HoodieRollbackStat.newBuilder() .withPartitionPath(partitionPath1) .withDeletedFileResults(dataFilesOnlyStat1Files).build(); Map dataFilesOnlyStat2Files = new HashMap<>(); - dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile1.parquet"), true); - dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile2.parquet"), true); + dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile1" + BASE_FILE_EXTENSION), true); + dataFilesOnlyStat2Files.put(generateFileStatus(partitionPath2 + "dataFile2" + BASE_FILE_EXTENSION), true); HoodieRollbackStat dataFilesOnlyStat2 = HoodieRollbackStat.newBuilder() .withPartitionPath(partitionPath2) .withDeletedFileResults(dataFilesOnlyStat1Files).build(); @@ -83,7 +85,7 @@ public class TestRollbackUtils { //prepare HoodieRollbackStat for failed and block append Map dataFilesOnlyStat3Files = new HashMap<>(); dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile1.log"), true); - dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile3.parquet"), false); + dataFilesOnlyStat3Files.put(generateFileStatus(partitionPath1 + "dataFile3" + BASE_FILE_EXTENSION), false); HoodieRollbackStat dataFilesOnlyStat3 = HoodieRollbackStat.newBuilder() .withPartitionPath(partitionPath1) .withDeletedFileResults(dataFilesOnlyStat3Files).build(); @@ -98,10 +100,10 @@ public class TestRollbackUtils { HoodieRollbackStat dataFilesOnlyStatMerge1 = RollbackUtils.mergeRollbackStat(dataFilesOnlyStat1, dataFilesOnlyStat3); assertEquals(partitionPath1, dataFilesOnlyStatMerge1.getPartitionPath()); - assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3.parquet"), + assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3" + BASE_FILE_EXTENSION), dataFilesOnlyStatMerge1.getFailedDeleteFiles()); - assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1.parquet", - partitionPath1 + "dataFile2.parquet", partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()), + assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1" + BASE_FILE_EXTENSION, + partitionPath1 + "dataFile2" + BASE_FILE_EXTENSION, partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()), dataFilesOnlyStatMerge1.getSuccessDeleteFiles().stream().sorted().collect(Collectors.toList())); assertEquals(0, dataFilesOnlyStatMerge1.getCommandBlocksCount().size()); @@ -109,10 +111,10 @@ public class TestRollbackUtils { HoodieRollbackStat dataFilesOnlyStatMerge2 = RollbackUtils.mergeRollbackStat(dataFilesOnlyStatMerge1, dataFilesOnlyStat4); assertEquals(partitionPath1, dataFilesOnlyStatMerge1.getPartitionPath()); - assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3.parquet").stream().sorted().collect(Collectors.toList()), + assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile3" + BASE_FILE_EXTENSION).stream().sorted().collect(Collectors.toList()), dataFilesOnlyStatMerge2.getFailedDeleteFiles().stream().sorted().collect(Collectors.toList())); - assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1.parquet", - partitionPath1 + "dataFile2.parquet", partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()), + assertIterableEquals(CollectionUtils.createImmutableList(partitionPath1 + "dataFile1" + BASE_FILE_EXTENSION, + partitionPath1 + "dataFile2" + BASE_FILE_EXTENSION, partitionPath1 + "dataFile1.log").stream().sorted().collect(Collectors.toList()), dataFilesOnlyStatMerge2.getSuccessDeleteFiles().stream().sorted().collect(Collectors.toList())); assertEquals(CollectionUtils.createImmutableMap(generateFileStatus(partitionPath1 + "dataFile1.log"), 10L), dataFilesOnlyStatMerge2.getCommandBlocksCount()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java index e391abf0b..78cd6d76c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java @@ -49,7 +49,6 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.util.Properties; -import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME; @@ -120,7 +119,6 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider, Hoodie .setTableName(RAW_TRIPS_TEST_NAME) .setTableType(COPY_ON_WRITE) .setPayloadClass(HoodieAvroPayload.class) - .setBaseFileFormat(PARQUET.toString()) .fromProperties(props) .build(); return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 9b229a37b..9a8e51a96 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; @@ -130,7 +131,8 @@ public class FSUtils { // TODO: this should be removed public static String makeDataFileName(String instantTime, String writeToken, String fileId) { - return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, HoodieFileFormat.PARQUET.getFileExtension()); + return String.format("%s_%s_%s%s", fileId, writeToken, instantTime, + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension()); } public static String makeDataFileName(String instantTime, String writeToken, String fileId, String fileExtension) { @@ -142,7 +144,7 @@ public class FSUtils { } public static String maskWithoutFileId(String instantTime, int taskPartitionId) { - return String.format("*_%s_%s%s", taskPartitionId, instantTime, HoodieFileFormat.PARQUET.getFileExtension()); + return String.format("*_%s_%s%s", taskPartitionId, instantTime, HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension()); } public static String getCommitFromCommitFile(String commitFileName) { @@ -329,7 +331,7 @@ public class FSUtils { } /** - * Check if the file is a parquet file of a log file. Then get the fileId appropriately. + * Check if the file is a base file of a log file. Then get the fileId appropriately. */ public static String getFileIdFromFilePath(Path filePath) { if (FSUtils.isLogFile(filePath)) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java new file mode 100644 index 000000000..c52d70008 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.table.HoodieTableMetaClient; + +public abstract class BaseFileUtils { + + public static BaseFileUtils getInstance(String path) { + if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + return new ParquetUtils(); + } + throw new UnsupportedOperationException("The format for file " + path + " is not supported yet."); + } + + public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) { + if (HoodieFileFormat.PARQUET.equals(fileFormat)) { + return new ParquetUtils(); + } + throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); + } + + public static BaseFileUtils getInstance(HoodieTableMetaClient metaClient) { + return getInstance(metaClient.getTableConfig().getBaseFileFormat()); + } + + public abstract Set readRowKeys(Configuration configuration, Path filePath); + + public abstract Set filterRowKeys(Configuration configuration, Path filePath, Set filter); + + public abstract List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath); + + public abstract Schema readAvroSchema(Configuration configuration, Path filePath); + + public abstract BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath); + + public abstract String[] readMinMaxRecordKeys(Configuration configuration, Path filePath); + + public abstract List readAvroRecords(Configuration configuration, Path filePath); + + public abstract List readAvroRecords(Configuration configuration, Path filePath, Schema schema); + + public abstract Map readFooter(Configuration conf, boolean required, Path orcFilePath, + String... footerNames); + + public abstract long getRowCount(Configuration conf, Path filePath); +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index dc444aa21..c7b3a3fce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -55,7 +55,7 @@ import java.util.function.Function; /** * Utility functions involving with parquet. */ -public class ParquetUtils { +public class ParquetUtils extends BaseFileUtils { /** * Read the rowKey list from the given parquet file. @@ -64,8 +64,9 @@ public class ParquetUtils { * @param configuration configuration to build fs object * @return Set Set of row keys */ - public static Set readRowKeysFromParquet(Configuration configuration, Path filePath) { - return filterParquetRowKeys(configuration, filePath, new HashSet<>()); + @Override + public Set readRowKeys(Configuration configuration, Path filePath) { + return filterRowKeys(configuration, filePath, new HashSet<>()); } /** @@ -77,7 +78,8 @@ public class ParquetUtils { * @param filter record keys filter * @return Set Set of row keys matching candidateRecordKeys */ - public static Set filterParquetRowKeys(Configuration configuration, Path filePath, Set filter) { + @Override + public Set filterRowKeys(Configuration configuration, Path filePath, Set filter) { return filterParquetRowKeys(configuration, filePath, filter, HoodieAvroUtils.getRecordKeySchema()); } @@ -128,7 +130,8 @@ public class ParquetUtils { * @param configuration configuration to build fs object * @return {@link List} of {@link HoodieKey}s fetched from the parquet file */ - public static List fetchRecordKeyPartitionPathFromParquet(Configuration configuration, Path filePath) { + @Override + public List fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { List hoodieKeys = new ArrayList<>(); try { if (!filePath.getFileSystem(configuration).exists(filePath)) { @@ -156,7 +159,7 @@ public class ParquetUtils { return hoodieKeys; } - public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { + public ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) { ParquetMetadata footer; try { // TODO(vc): Should we use the parallel reading version here? @@ -170,11 +173,12 @@ public class ParquetUtils { /** * Get the schema of the given parquet file. */ - public static MessageType readSchema(Configuration configuration, Path parquetFilePath) { + public MessageType readSchema(Configuration configuration, Path parquetFilePath) { return readMetadata(configuration, parquetFilePath).getFileMetaData().getSchema(); } - private static Map readParquetFooter(Configuration configuration, boolean required, + @Override + public Map readFooter(Configuration configuration, boolean required, Path parquetFilePath, String... footerNames) { Map footerVals = new HashMap<>(); ParquetMetadata footer = readMetadata(configuration, parquetFilePath); @@ -190,16 +194,18 @@ public class ParquetUtils { return footerVals; } - public static Schema readAvroSchema(Configuration configuration, Path parquetFilePath) { + @Override + public Schema readAvroSchema(Configuration configuration, Path parquetFilePath) { return new AvroSchemaConverter(configuration).convert(readSchema(configuration, parquetFilePath)); } /** * Read out the bloom filter from the parquet file meta data. */ - public static BloomFilter readBloomFilterFromParquetMetadata(Configuration configuration, Path parquetFilePath) { + @Override + public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path parquetFilePath) { Map footerVals = - readParquetFooter(configuration, false, parquetFilePath, + readFooter(configuration, false, parquetFilePath, HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE); @@ -220,8 +226,9 @@ public class ParquetUtils { return toReturn; } - public static String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) { - Map minMaxKeys = readParquetFooter(configuration, true, parquetFilePath, + @Override + public String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) { + Map minMaxKeys = readFooter(configuration, true, parquetFilePath, HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER); if (minMaxKeys.size() != 2) { throw new HoodieException( @@ -235,7 +242,8 @@ public class ParquetUtils { /** * NOTE: This literally reads the entire file contents, thus should be used with caution. */ - public static List readAvroRecords(Configuration configuration, Path filePath) { + @Override + public List readAvroRecords(Configuration configuration, Path filePath) { ParquetReader reader = null; List records = new ArrayList<>(); try { @@ -262,13 +270,20 @@ public class ParquetUtils { return records; } + @Override + public List readAvroRecords(Configuration configuration, Path filePath, Schema schema) { + AvroReadSupport.setAvroReadSchema(configuration, schema); + return readAvroRecords(configuration, filePath); + } + /** * Returns the number of records in the parquet file. * * @param conf Configuration * @param parquetFilePath path of the file */ - public static long getRowCount(Configuration conf, Path parquetFilePath) { + @Override + public long getRowCount(Configuration conf, Path parquetFilePath) { ParquetMetadata footer; long rowCount = 0; footer = readMetadata(conf, parquetFilePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java index feacbda54..9ead1ac87 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java @@ -27,8 +27,9 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ParquetReaderIterator; -import org.apache.hudi.common.util.ParquetUtils; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetReader; @@ -36,24 +37,26 @@ import org.apache.parquet.hadoop.ParquetReader; public class HoodieParquetReader implements HoodieFileReader { private Path path; private Configuration conf; + private final BaseFileUtils parquetUtils; public HoodieParquetReader(Configuration configuration, Path path) { this.conf = configuration; this.path = path; + this.parquetUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); } public String[] readMinMaxRecordKeys() { - return ParquetUtils.readMinMaxRecordKeys(conf, path); + return parquetUtils.readMinMaxRecordKeys(conf, path); } @Override public BloomFilter readBloomFilter() { - return ParquetUtils.readBloomFilterFromParquetMetadata(conf, path); + return parquetUtils.readBloomFilterFromMetadata(conf, path); } @Override public Set filterRowKeys(Set candidateRowKeys) { - return ParquetUtils.filterParquetRowKeys(conf, path, candidateRowKeys); + return parquetUtils.filterRowKeys(conf, path, candidateRowKeys); } @Override @@ -65,7 +68,7 @@ public class HoodieParquetReader implements HoodieFileR @Override public Schema getSchema() { - return ParquetUtils.readAvroSchema(conf, path); + return parquetUtils.readAvroSchema(conf, path); } @Override @@ -74,6 +77,6 @@ public class HoodieParquetReader implements HoodieFileR @Override public long getTotalRecords() { - return ParquetUtils.getRowCount(conf, path); + return parquetUtils.getRowCount(conf, path); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java index bbe75cf89..593f82bb0 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/bootstrap/TestBootstrapIndex.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.collection.Pair; @@ -62,7 +63,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness { private static final String[] PARTITIONS = {"2020/03/18", "2020/03/19", "2020/03/20", "2020/03/21"}; private static final Set PARTITION_SET = Arrays.stream(PARTITIONS).collect(Collectors.toSet()); - private static final String BOOTSTRAP_BASE_PATH = "/tmp/source/parquet_tables/table1"; + private static final String BOOTSTRAP_BASE_PATH = "/tmp/source/data_tables/table1"; @BeforeEach public void init() throws IOException { @@ -168,7 +169,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness { return Arrays.stream(partitions).map(partition -> { return Pair.of(partition, IntStream.range(0, numEntriesPerPartition).mapToObj(idx -> { String hudiFileId = UUID.randomUUID().toString(); - String sourceFileName = idx + ".parquet"; + String sourceFileName = idx + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension(); HoodieFileStatus sourceFileStatus = HoodieFileStatus.newBuilder() .setPath(HoodiePath.newBuilder().setUri(sourceBasePath + "/" + partition + "/" + sourceFileName).build()) .setLength(256 * 1024 * 1024L) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index 1dc1f5085..37b87ed56 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -21,6 +21,7 @@ package org.apache.hudi.common.fs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -56,6 +57,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { private final long minCleanToKeep = 10; private static String TEST_WRITE_TOKEN = "1-0-1"; + private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension(); @Rule public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); @@ -69,14 +71,14 @@ public class TestFSUtils extends HoodieCommonTestHarness { public void testMakeDataFileName() { String instantTime = COMMIT_FORMATTER.format(new Date()); String fileName = UUID.randomUUID().toString(); - assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + ".parquet"); + assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION); } @Test public void testMaskFileName() { String instantTime = COMMIT_FORMATTER.format(new Date()); int taskPartitionId = 2; - assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + ".parquet"); + assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION); } @Test @@ -100,9 +102,12 @@ public class TestFSUtils extends HoodieCommonTestHarness { }); // Files inside partitions and marker directories - List files = Arrays.asList("2016/04/15/1_1-0-1_20190528120000.parquet", - "2016/05/16/2_1-0-1_20190528120000.parquet", ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000.parquet", - ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000.parquet"); + List files = Stream.of("2016/04/15/1_1-0-1_20190528120000", + "2016/05/16/2_1-0-1_20190528120000", + ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000", + ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000") + .map(fileName -> fileName + BASE_FILE_EXTENSION) + .collect(Collectors.toList()); files.forEach(f -> { try { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java index bcd27f375..68f441b04 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieDeltaWriteStat.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.table.HoodieTableConfig; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -33,7 +34,7 @@ public class TestHoodieDeltaWriteStat { @Test public void testBaseFileAndLogFiles() { HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat(); - String baseFile = "file1.parquet"; + String baseFile = "file1" + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension(); String logFile1 = ".log1.log"; String logFile2 = ".log2.log"; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index cf7f6d849..939729501 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -284,7 +284,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { HoodieWriteStat stat = new HoodieWriteStat(); stat.setFileId(i + ""); stat.setPartitionPath(Paths.get(basePath, partition).toString()); - stat.setPath(commitTs + "." + i + ".parquet"); + stat.setPath(commitTs + "." + i + metaClient.getTableConfig().getBaseFileFormat().getFileExtension()); commit.addWriteStat(partition, stat); } for (Map.Entry extraEntries : extraMetadata.entrySet()) { @@ -303,7 +303,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { HoodieWriteStat stat = new HoodieWriteStat(); stat.setFileId(i + ""); stat.setPartitionPath(Paths.get(basePath, newFilePartition).toString()); - stat.setPath(commitTs + "." + i + ".parquet"); + stat.setPath(commitTs + "." + i + metaClient.getTableConfig().getBaseFileFormat().getFileExtension()); commit.addWriteStat(newFilePartition, stat); } Map> partitionToReplaceFileIds = new HashMap<>(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index e103427d4..37a7c91c8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -306,7 +306,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { String partitionPath = "2016/05/01"; new File(basePath + "/" + partitionPath).mkdirs(); String fileId = UUID.randomUUID().toString(); - String srcName = "part_0000.parquet"; + String srcName = "part_0000" + metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); HoodieFileStatus srcFileStatus = HoodieFileStatus.newBuilder() .setPath(HoodiePath.newBuilder().setUri(BOOTSTRAP_SOURCE_PATH + partitionPath + "/" + srcName).build()) .setLength(256 * 1024 * 1024L) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java index bae749611..369a0639e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.MockHoodieTimeline; @@ -66,7 +67,8 @@ public class TestPriorityBasedFileSystemView { public void setUp() { fsView = new PriorityBasedFileSystemView(primary, secondary); testBaseFileStream = Stream.of(new HoodieBaseFile("test")); - testFileSliceStream = Stream.of(new FileSlice("2020-01-01", "20:20", "file0001.parquet")); + testFileSliceStream = Stream.of(new FileSlice("2020-01-01", "20:20", + "file0001" + HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension())); } private void resetMocks() { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 9409566b4..96b517180 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -54,9 +55,10 @@ import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serial public class FileCreateUtils { private static final String WRITE_TOKEN = "1-0-1"; + private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension(); public static String baseFileName(String instantTime, String fileId) { - return baseFileName(instantTime, fileId, HoodieFileFormat.PARQUET.getFileExtension()); + return baseFileName(instantTime, fileId, BASE_FILE_EXTENSION); } public static String baseFileName(String instantTime, String fileId, String fileExtension) { @@ -72,7 +74,7 @@ public class FileCreateUtils { } public static String markerFileName(String instantTime, String fileId, IOType ioType) { - return markerFileName(instantTime, fileId, ioType, HoodieFileFormat.PARQUET.getFileExtension()); + return markerFileName(instantTime, fileId, ioType, BASE_FILE_EXTENSION); } public static String markerFileName(String instantTime, String fileId, IOType ioType, String fileExtension) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 98b0f90b7..341c4899f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -437,7 +438,7 @@ public class HoodieTestTable { } public FileStatus[] listAllBaseFiles() throws IOException { - return listAllBaseFiles(HoodieFileFormat.PARQUET.getFileExtension()); + return listAllBaseFiles(HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension()); } public FileStatus[] listAllBaseFiles(String fileExtension) throws IOException { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index 92c40c739..e3c89a377 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -97,6 +97,8 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { @Test public void testBuildFromFileSlice() { + String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + // Empty File-Slice with no data and log files FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); HoodieCompactionOperation op = @@ -106,7 +108,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { // File Slice with data-file but no log files FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); - noLogFileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000.parquet")); + noLogFileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000" + extension)); op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Option.of(metricsCaptureFn)); testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0], LATEST_COMPACTION_METADATA_VERSION); @@ -122,7 +124,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { // File Slice with data-file and log files present FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); - fileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000.parquet")); + fileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000" + extension)); fileSlice.addLogFile( new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))); fileSlice.addLogFile( @@ -135,16 +137,18 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { * Generate input for compaction plan tests. */ private Pair>, HoodieCompactionPlan> buildCompactionPlan() { + String extension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + Path fullPartitionPath = new Path(new Path(metaClient.getBasePath()), DEFAULT_PARTITION_PATHS[0]); FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1"); FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); - fileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/data1_1_000.parquet")); + fileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/data1_1_000" + extension)); fileSlice.addLogFile(new HoodieLogFile( new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); fileSlice.addLogFile(new HoodieLogFile( new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))))); FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1"); - noLogFileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/noLog_1_000.parquet")); + noLogFileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/noLog_1_000" + extension)); FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1"); noDataFileSlice.addLogFile(new HoodieLogFile( new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))))); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java index 2bcbcbdab..b99755491 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java @@ -58,6 +58,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class TestParquetUtils extends HoodieCommonTestHarness { + private ParquetUtils parquetUtils = new ParquetUtils(); + public static List bloomFilterTypeCodes() { return Arrays.asList( Arguments.of(BloomFilterTypeCode.SIMPLE.name()), @@ -83,13 +85,13 @@ public class TestParquetUtils extends HoodieCommonTestHarness { // Read and verify List rowKeysInFile = new ArrayList<>( - ParquetUtils.readRowKeysFromParquet(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath))); + parquetUtils.readRowKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath))); Collections.sort(rowKeysInFile); Collections.sort(rowKeys); assertEquals(rowKeys, rowKeysInFile, "Did not read back the expected list of keys"); BloomFilter filterInFile = - ParquetUtils.readBloomFilterFromParquetMetadata(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); + parquetUtils.readBloomFilterFromMetadata(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); for (String rowKey : rowKeys) { assertTrue(filterInFile.mightContain(rowKey), "key should be found in bloom filter"); } @@ -113,7 +115,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness { // Read and verify Set filtered = - ParquetUtils.filterParquetRowKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), filter); + parquetUtils.filterRowKeys(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath), filter); assertEquals(filter.size(), filtered.size(), "Filtered count does not match"); @@ -140,7 +142,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness { // Read and verify List fetchedRows = - ParquetUtils.fetchRecordKeyPartitionPathFromParquet(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); + parquetUtils.fetchRecordKeyPartitionPath(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)); assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match"); for (HoodieKey entry : fetchedRows) { @@ -157,7 +159,7 @@ public class TestParquetUtils extends HoodieCommonTestHarness { } writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys); - assertEquals(123, ParquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath))); + assertEquals(123, parquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath))); } private void writeParquetFile(String typeCode, String filePath, List rowKeys) throws Exception { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java index 05031f035..2251a4cad 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -36,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; public final class TestTablePathUtils { + private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension(); @TempDir static File tempDir; @@ -73,9 +75,9 @@ public final class TestTablePathUtils { partitionMetadata2.trySave(2); // Create files - URI filePathURI1 = Paths.get(partitionPathURI1.getPath(), "data1.parquet").toUri(); + URI filePathURI1 = Paths.get(partitionPathURI1.getPath(), "data1" + BASE_FILE_EXTENSION).toUri(); filePath1 = new Path(filePathURI1); - URI filePathURI2 = Paths.get(partitionPathURI2.getPath(), "data2.parquet").toUri(); + URI filePathURI2 = Paths.get(partitionPathURI2.getPath(), "data2" + BASE_FILE_EXTENSION).toUri(); filePath2 = new Path(filePathURI2); assertTrue(new File(filePathURI1).createNewFile()); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 79a3f44c9..0f599d2cc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -27,7 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; @@ -221,6 +221,7 @@ public class BucketAssignFunction> private void loadRecords(String partitionPath) throws Exception { LOG.info("Start loading records under partition {} into the index state", partitionPath); HoodieTable hoodieTable = bucketAssigner.getTable(); + BaseFileUtils fileUtils = BaseFileUtils.getInstance(hoodieTable.getBaseFileFormat()); List latestBaseFiles = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable); final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); @@ -230,7 +231,7 @@ public class BucketAssignFunction> final List hoodieKeys; try { hoodieKeys = - ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath())); + fileUtils.fetchRecordKeyPartitionPath(hadoopConf, new Path(baseFile.getPath())); } catch (Exception e) { // in case there was some empty parquet file when the pipeline // crushes exceptionally. diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index fcfa7cf5f..0ce46980d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -116,7 +116,7 @@ public class TestInputFormat { void testReadBaseAndLogFiles() throws Exception { beforeEach(HoodieTableType.MERGE_ON_READ); - // write parquet first with compaction + // write base first with compaction conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); TestData.writeData(TestData.DATA_SET_INSERT, conf); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 19568938d..60092f22b 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -172,7 +172,7 @@ public class InputFormatTestUtil { public static File prepareParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber, HoodieTableType tableType) throws IOException { - HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType); + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET); java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); createData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber); return partitionPath.toFile(); @@ -185,7 +185,7 @@ public class InputFormatTestUtil { public static File prepareSimpleParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber, HoodieTableType tableType) throws Exception { - HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType); + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET); java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); createSimpleData(schema, partitionPath, numberOfFiles, numberOfRecords, commitNumber); return partitionPath.toFile(); @@ -198,7 +198,7 @@ public class InputFormatTestUtil { public static File prepareNonPartitionedParquetTable(java.nio.file.Path basePath, Schema schema, int numberOfFiles, int numberOfRecords, String commitNumber, HoodieTableType tableType) throws IOException { - HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType); + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), tableType, HoodieFileFormat.PARQUET); createData(schema, basePath, numberOfFiles, numberOfRecords, commitNumber); return basePath.toFile(); } @@ -207,7 +207,7 @@ public class InputFormatTestUtil { String commitNumber) throws IOException { AvroParquetWriter parquetWriter; for (int i = 0; i < numberOfFiles; i++) { - String fileId = FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i); + String fileId = FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i, HoodieFileFormat.PARQUET.getFileExtension()); parquetWriter = new AvroParquetWriter(new Path(partitionPath.resolve(fileId).toString()), schema); try { for (GenericRecord record : generateAvroRecords(schema, numberOfRecords, commitNumber, fileId)) { @@ -223,7 +223,7 @@ public class InputFormatTestUtil { String commitNumber) throws Exception { AvroParquetWriter parquetWriter; for (int i = 0; i < numberOfFiles; i++) { - String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" + i); + String fileId = FSUtils.makeDataFileName(commitNumber, "1", "fileid" + i, HoodieFileFormat.PARQUET.getFileExtension()); parquetWriter = new AvroParquetWriter(new Path(partitionPath.resolve(fileId).toString()), schema); try { List records = SchemaTestUtil.generateTestRecords(0, numberOfRecords); @@ -254,7 +254,8 @@ public class InputFormatTestUtil { int totalNumberOfRecords, int numberOfRecordsToUpdate, String newCommit) throws IOException { File fileToUpdate = Objects.requireNonNull(directory.listFiles((dir, name) -> name.endsWith("parquet")))[0]; String fileId = FSUtils.getFileId(fileToUpdate.getName()); - File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId)); + File dataFile = new File(directory, + FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId, HoodieFileFormat.PARQUET.getFileExtension())); try (AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema)) { for (GenericRecord record : generateAvroRecords(schema, totalNumberOfRecords, originalCommit, fileId)) { if (numberOfRecordsToUpdate > 0) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java index 5b5282ae1..c8cb15f31 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.checkpointing; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.exception.HoodieException; @@ -33,6 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { + private static final String BASE_FILE_EXTENSION = HoodieTableConfig.DEFAULT_BASE_FILE_FORMAT.getFileExtension(); @Test public void testValidKafkaConnectPath() throws Exception { @@ -46,19 +48,19 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { // kafka connect tmp folder new File(topicPath + "/TMP").mkdirs(); // tmp file that being written - new File(topicPath + "/TMP/" + "topic1+0+301+400.parquet").createNewFile(); - // regular parquet files + new File(topicPath + "/TMP/" + "topic1+0+301+400" + BASE_FILE_EXTENSION).createNewFile(); + // regular base files new File(topicPath + "/year=2016/month=05/day=01/" - + "topic1+0+100+200.parquet").createNewFile(); + + "topic1+0+100+200" + BASE_FILE_EXTENSION).createNewFile(); new File(topicPath + "/year=2016/month=05/day=01/" - + "topic1+1+100+200.parquet").createNewFile(); + + "topic1+1+100+200" + BASE_FILE_EXTENSION).createNewFile(); new File(topicPath + "/year=2016/month=05/day=02/" - + "topic1+0+201+300.parquet").createNewFile(); - // noise parquet file + + "topic1+0+201+300" + BASE_FILE_EXTENSION).createNewFile(); + // noise base file new File(topicPath + "/year=2016/month=05/day=01/" - + "random_snappy_1.parquet").createNewFile(); + + "random_snappy_1" + BASE_FILE_EXTENSION).createNewFile(); new File(topicPath + "/year=2016/month=05/day=02/" - + "random_snappy_2.parquet").createNewFile(); + + "random_snappy_2" + BASE_FILE_EXTENSION).createNewFile(); final TypedProperties props = new TypedProperties(); props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString()); final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props); @@ -73,13 +75,13 @@ public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness { // create regular kafka connect hdfs dirs new File(topicPath + "/year=2016/month=05/day=01/").mkdirs(); new File(topicPath + "/year=2016/month=05/day=02/").mkdirs(); - // parquet files with missing partition + // base files with missing partition new File(topicPath + "/year=2016/month=05/day=01/" - + "topic1+0+100+200.parquet").createNewFile(); + + "topic1+0+100+200" + BASE_FILE_EXTENSION).createNewFile(); new File(topicPath + "/year=2016/month=05/day=01/" - + "topic1+2+100+200.parquet").createNewFile(); + + "topic1+2+100+200" + BASE_FILE_EXTENSION).createNewFile(); new File(topicPath + "/year=2016/month=05/day=02/" - + "topic1+0+201+300.parquet").createNewFile(); + + "topic1+0+201+300" + BASE_FILE_EXTENSION).createNewFile(); final TypedProperties props = new TypedProperties(); props.put("hoodie.deltastreamer.checkpoint.provider.path", topicPath.toString()); final InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(props);