diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index fd9b1ddb1..dd274a599 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -18,8 +18,6 @@ package org.apache.hudi.metadata; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -55,6 +53,9 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieClientTestHarness; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -71,6 +72,7 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; @@ -227,9 +229,14 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { "Must not contain the filtered directory " + filteredDirectoryThree); FileStatus[] statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p1")); - assertTrue(statuses.length == 2); + assertEquals(2, statuses.length); statuses = metadata(client).getAllFilesInPartition(new Path(basePath, "p2")); - assertTrue(statuses.length == 5); + assertEquals(5, statuses.length); + Map partitionsToFilesMap = metadata(client).getAllFilesInPartitions( + Arrays.asList(basePath + "/p1", basePath + "/p2")); + assertEquals(2, partitionsToFilesMap.size()); + assertEquals(2, partitionsToFilesMap.get(basePath + "/p1").length); + assertEquals(5, partitionsToFilesMap.get(basePath + "/p2").length); } } @@ -881,6 +888,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieSparkTable.create(config, engineContext); TableFileSystemView tableView = table.getHoodieView(); + List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); + Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); + assertEquals(fsPartitions.size(), partitionToFilesMap.size()); + fsPartitions.forEach(partition -> { try { Path partitionPath; @@ -899,6 +910,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { Collections.sort(fsFileNames); Collections.sort(metadataFilenames); + assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length); + // File sizes should be valid Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 8669fb3cb..cb778e682 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -52,6 +52,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; @@ -277,13 +278,16 @@ public class FSUtils { } } - public static FileStatus[] getFilesInPartition(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, - String basePathStr, Path partitionPath) { - try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, - metadataConfig, basePathStr, FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue())) { - return tableMetadata.getAllFilesInPartition(partitionPath); - } catch (Exception e) { - throw new HoodieException("Error get files in partition: " + partitionPath, e); + public static Map getFilesInPartitions(HoodieEngineContext engineContext, + HoodieMetadataConfig metadataConfig, + String basePathStr, + String[] partitionPaths, + String spillableMapPath) { + try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr, + spillableMapPath, true)) { + return tableMetadata.getAllFilesInPartitions(Arrays.asList(partitionPaths)); + } catch (Exception ex) { + throw new HoodieException("Error get files in partitions: " + String.join(",", partitionPaths), ex); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 85a4d69b7..e408ad939 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -44,7 +44,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public abstract class BaseTableMetadata implements HoodieTableMetadata { @@ -134,6 +136,26 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { .getAllFilesInPartition(partitionPath); } + @Override + public Map getAllFilesInPartitions(List partitionPaths) + throws IOException { + if (enabled) { + Map partitionsFilesMap = new HashMap<>(); + + try { + for (String partitionPath : partitionPaths) { + partitionsFilesMap.put(partitionPath, fetchAllFilesInPartition(new Path(partitionPath))); + } + return partitionsFilesMap; + } catch (Exception e) { + throw new HoodieMetadataException("Failed to retrieve files in partition from metadata", e); + } + } + + return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, datasetBasePath, metadataConfig.shouldAssumeDatePartitioning()) + .getAllFilesInPartitions(partitionPaths); + } + /** * Returns a list of all partitions. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 31c74a1f6..ce1cf5502 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -33,8 +33,10 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; public class FileSystemBackedTableMetadata implements HoodieTableMetadata { @@ -105,6 +107,24 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { return partitionPaths; } + @Override + public Map getAllFilesInPartitions(List partitionPaths) + throws IOException { + if (partitionPaths == null || partitionPaths.isEmpty()) { + return Collections.emptyMap(); + } + + int parallelism = Math.min(DEFAULT_LISTING_PARALLELISM, partitionPaths.size()); + + List> partitionToFiles = engineContext.map(partitionPaths, partitionPathStr -> { + Path partitionPath = new Path(partitionPathStr); + FileSystem fs = partitionPath.getFileSystem(hadoopConf.get()); + return Pair.of(partitionPathStr, FSUtils.getAllDataFilesInPartition(fs, partitionPath)); + }, parallelism); + + return partitionToFiles.stream().collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + } + @Override public Option getSyncedInstantTime() { throw new UnsupportedOperationException(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index 9e6222a87..506792125 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.io.Serializable; import java.util.List; +import java.util.Map; /** * Interface that supports querying various pieces of metadata about a hudi table. @@ -95,6 +96,11 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { */ List getAllPartitionPaths() throws IOException; + /** + * Fetch all files for given partition paths. + */ + Map getAllFilesInPartitions(List partitionPaths) throws IOException; + /** * Get the instant time to which the metadata is synced w.r.t data timeline. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java index e44054f3a..9ec793daa 100644 --- a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java @@ -18,6 +18,7 @@ package org.apache.hudi.metadata; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieLocalEngineContext; @@ -30,7 +31,10 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.IntStream; public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { @@ -63,8 +67,10 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 0); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath)).length == 10); + Assertions.assertEquals(0, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); + Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath)).length); + Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartitions( + Collections.singletonList(basePath)).get(basePath).length); } /** @@ -86,8 +92,14 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, true); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + DATE_PARTITIONS.get(0))).length == 10); + Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); + Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + DATE_PARTITIONS.get(0))).length); + + List fullPartitionPaths = DATE_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList()); + Map partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths); + for (String p : fullPartitionPaths) { + Assertions.assertEquals(10, partitionToFilesMap.get(p).length); + } } /** @@ -101,7 +113,9 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { // Generate 10 files under each partition DATE_PARTITIONS.stream().forEach(p -> { try { - hoodieTestTable = hoodieTestTable.withBaseFilesInPartition(p, IntStream.range(0, 10).toArray()); + hoodieTestTable = hoodieTestTable + .withPartitionMetaFiles(p) + .withBaseFilesInPartition(p, IntStream.range(0, 10).toArray()); } catch (Exception e) { throw new RuntimeException(e); } @@ -109,7 +123,13 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 0); + Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); + + List fullPartitionPaths = DATE_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList()); + Map partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths); + for (String p : fullPartitionPaths) { + Assertions.assertEquals(10, partitionToFilesMap.get(p).length); + } } @Test @@ -128,8 +148,14 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + ONE_LEVEL_PARTITIONS.get(0))).length == 10); + Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); + Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + ONE_LEVEL_PARTITIONS.get(0))).length); + + List fullPartitionPaths = ONE_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList()); + Map partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths); + for (String p : fullPartitionPaths) { + Assertions.assertEquals(10, partitionToFilesMap.get(p).length); + } } @Test @@ -148,8 +174,14 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length == 10); + Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); + Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length); + + List fullPartitionPaths = MULTI_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList()); + Map partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths); + for (String p : fullPartitionPaths) { + Assertions.assertEquals(10, partitionToFilesMap.get(p).length); + } } @Test @@ -167,8 +199,14 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllPartitionPaths().size() == 3); - Assertions.assertTrue(fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length == 0); + Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); + Assertions.assertEquals(0, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length); + + List fullPartitionPaths = MULTI_LEVEL_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList()); + Map partitionToFilesMap = fileSystemBackedTableMetadata.getAllFilesInPartitions(fullPartitionPaths); + for (String p : fullPartitionPaths) { + Assertions.assertEquals(0, partitionToFilesMap.get(p).length); + } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 710db943b..fa1a1daa1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -17,11 +17,13 @@ package org.apache.hudi +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.common.config.ConfigProperty import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator} @@ -471,4 +473,16 @@ object DataSourceOptionsHelper { }) translatedOpt.toMap } + + def parametersWithReadDefaults(parameters: Map[String, String]): Map[String, String] = { + // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool, + // or else use query type from QUERY_TYPE_OPT_KEY. + val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) + .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL) + .getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue())) + + Map( + QUERY_TYPE_OPT_KEY.key -> queryType + ) ++ translateConfigurations(parameters) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 146971e18..04e6b01a4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -72,7 +72,7 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], schema: StructType): BaseRelation = { // Add default options for unspecified read options keys. - val parameters = DataSourceOptionsHelper.translateConfigurations(optParams) + val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(optParams) val path = parameters.get("path") val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY.key) @@ -106,12 +106,7 @@ class DefaultSource extends RelationProvider val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent val tableType = metaClient.getTableType - - // First check if the ConfigUtils.IS_QUERY_AS_RO_TABLE has set by HiveSyncTool, - // or else use query type from QUERY_TYPE_OPT_KEY. - val queryType = parameters.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) - .map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else QUERY_TYPE_SNAPSHOT_OPT_VAL) - .getOrElse(parameters.getOrElse(QUERY_TYPE_OPT_KEY.key, QUERY_TYPE_OPT_KEY.defaultValue())) + val queryType = parameters(QUERY_TYPE_OPT_KEY.key) log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 3c8bac749..7253143f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -21,14 +21,14 @@ import java.util.Properties import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} -import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} +import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{InternalRow, expressions} @@ -76,6 +76,11 @@ case class HoodieFileIndex( private val basePath = metaClient.getBasePath @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) + + private val queryType = options(QUERY_TYPE_OPT_KEY.key()) + + private val tableType = metaClient.getTableType + /** * Get the schema of the table. */ @@ -106,16 +111,35 @@ case class HoodieFileIndex( } } - private lazy val metadataConfig = { + private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + + private lazy val configProperties = { + val sqlConf: SQLConf = spark.sessionState.conf val properties = new Properties() + + // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users + // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing. + properties.put(HoodieMetadataConfig.METADATA_ENABLE_PROP, + sqlConf.getConfString(HoodieMetadataConfig.METADATA_ENABLE_PROP.key(), + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS.toString)) + properties.put(HoodieMetadataConfig.METADATA_VALIDATE_PROP, + sqlConf.getConfString(HoodieMetadataConfig.METADATA_VALIDATE_PROP.key(), + HoodieMetadataConfig.METADATA_VALIDATE_PROP.defaultValue().toString)) properties.putAll(options.asJava) - HoodieMetadataConfig.newBuilder.fromProperties(properties).build() + properties } + private lazy val fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder() + .fromProperties(configProperties) + .build() + + private lazy val metadataConfig = HoodieMetadataConfig.newBuilder + .fromProperties(configProperties) + .build() + @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _ - @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _ + @transient @volatile private var cachedAllInputFileSlices: Map[PartitionRowPath, Seq[FileSlice]] = _ @transient @volatile private var cachedFileSize: Long = 0L - @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionRowPath] = _ @volatile private var queryAsNonePartitionedTable: Boolean = _ @@ -123,24 +147,58 @@ case class HoodieFileIndex( override def rootPaths: Seq[Path] = queryPath :: Nil + /** + * Invoked by Spark to fetch list of latest base files per partition. + * + * @param partitionFilters partition column filters + * @param dataFilters data columns filters + * @return list of PartitionDirectory containing partition to base files mapping + */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table. Seq(PartitionDirectory(InternalRow.empty, allFiles)) } else { // Prune the partition path by the partition filters - val prunedPartitions = prunePartition(cachedAllPartitionPaths, partitionFilters) + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) prunedPartitions.map { partition => - val fileStatues = fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator() - .asScala.toSeq - .map(_.getFileStatus) - PartitionDirectory(partition.values, fileStatues) + val baseFileStatuses = cachedAllInputFileSlices(partition).map(fileSlice => { + if (fileSlice.getBaseFile.isPresent) { + fileSlice.getBaseFile.get().getFileStatus + } else { + null + } + }).filterNot(_ == null) + + PartitionDirectory(partition.values, baseFileStatuses) } } } + /** + * Fetch list of latest base files and log files per partition. + * + * @param partitionFilters partition column filters + * @param dataFilters data column filters + * @return mapping from string partition paths to its base/log files + */ + def listFileSlices(partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { + if (queryAsNonePartitionedTable) { + // Read as Non-Partitioned table. + cachedAllInputFileSlices.map(entry => (entry._1.partitionPath, entry._2)) + } else { + // Prune the partition path by the partition filters + val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) + prunedPartitions.map(partition => { + (partition.partitionPath, cachedAllInputFileSlices(partition)) + }).toMap + } + } + override def inputFiles: Array[String] = { - cachedAllInputFiles.map(_.getFileStatus.getPath.toString) + val fileStatusList = allFiles + fileStatusList.map(_.getPath.toString).toArray } override def refresh(): Unit = { @@ -157,13 +215,36 @@ case class HoodieFileIndex( metaClient.reloadActiveTimeline() val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) - cachedAllInputFiles = fileSystemView.getLatestBaseFiles.iterator().asScala.toArray - cachedAllPartitionPaths = partitionFiles.keys.toSeq - cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum + + (tableType, queryType) match { + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) => + // Fetch and store latest base and log files, and their sizes + cachedAllInputFileSlices = partitionFiles.map(p => { + val latestSlices = if (activeInstants.lastInstant().isPresent) { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(p._1.partitionPath, + activeInstants.lastInstant().get().getTimestamp).iterator().asScala.toSeq + } else { + Seq() + } + (p._1, latestSlices) + }) + cachedFileSize = cachedAllInputFileSlices.values.flatten.map(fileSlice => { + if (fileSlice.getBaseFile.isPresent) { + fileSlice.getBaseFile.get().getFileLen + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum + } else { + fileSlice.getLogFiles.iterator().asScala.map(_.getFileSize).sum + } + }).sum + case (_, _) => + // Fetch and store latest base files and its sizes + cachedAllInputFileSlices = partitionFiles.map(p => { + (p._1, fileSystemView.getLatestFileSlices(p._1.partitionPath).iterator().asScala.toSeq) + }) + cachedFileSize = cachedAllInputFileSlices.values.flatten.map(_.getBaseFile.get().getFileLen).sum + } // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. - queryAsNonePartitionedTable = cachedAllPartitionPaths - .exists(p => p.values == InternalRow.empty) + queryAsNonePartitionedTable = partitionFiles.keys.exists(p => p.values == InternalRow.empty) val flushSpend = System.currentTimeMillis() - startTime logInfo(s"Refresh for table ${metaClient.getTableConfig.getTableName}," + s" spend: $flushSpend ms") @@ -192,7 +273,20 @@ case class HoodieFileIndex( StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name))) } - def allFiles: Seq[FileStatus] = cachedAllInputFiles.map(_.getFileStatus) + /** + * Returns the FileStatus for all the base files (excluding log files). This should be used only for + * cases where Spark directly fetches the list of files via HoodieFileIndex or for read optimized query logic + * implemented internally within Hudi like HoodieBootstrapRelation. This helps avoid the use of path filter + * to filter out log files within Spark. + * + * @return List of FileStatus for base files + */ + def allFiles: Seq[FileStatus] = { + cachedAllInputFileSlices.values.flatten + .filter(_.getBaseFile.isPresent) + .map(_.getBaseFile.get().getFileStatus) + .toSeq + } /** * Prune the partition by the filter.This implementation is fork from @@ -229,12 +323,12 @@ case class HoodieFileIndex( } def getAllQueryPartitionPaths: Seq[PartitionRowPath] = { - val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) // Load all the partition path from the basePath, and filter by the query partition path. // TODO load files from the queryPartitionPath directly. - val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala + val partitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath).asScala .filter(_.startsWith(queryPartitionPath)) + val partitionSchema = _partitionSchemaFromProperties val timeZoneId = CaseInsensitiveMap(options) .get(DateTimeUtils.TIMEZONE_OPTION) @@ -307,13 +401,6 @@ case class HoodieFileIndex( * Load all partition paths and it's files under the query table path. */ private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { - val properties = new Properties() - properties.putAll(options.asJava) - val writeConfig = HoodieWriteConfig.newBuilder() - .withPath(basePath).withProperties(properties).build() - - val maxListParallelism = writeConfig.getFileListingParallelism - val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) val partitionRowPaths = getAllQueryPartitionPaths // List files in all of the partition path. val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]() @@ -327,28 +414,25 @@ case class HoodieFileIndex( case None => pathToFetch.append(partitionRowPath) } } - // Fetch the rest from the file system. - val fetchedPartition2Files = + + val fetchedPartitionToFiles = if (pathToFetch.nonEmpty) { - spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size, maxListParallelism)) - .map { partitionRowPath => - // Here we use a LocalEngineContext to get the files in the partition. - // We can do this because the TableMetadata.getAllFilesInPartition only rely on the - // hadoopConf of the EngineContext. - val engineContext = new HoodieLocalEngineContext(serializableConf.get()) - val filesInPartition = FSUtils.getFilesInPartition(engineContext, metadataConfig, - basePath, partitionRowPath.fullPartitionPath(basePath)) - (partitionRowPath, filesInPartition) - }.collect().map(f => f._1 -> f._2).toMap + val fullPartitionPathsToFetch = pathToFetch.map(p => (p, p.fullPartitionPath(basePath).toString)).toMap + val partitionToFilesMap = FSUtils.getFilesInPartitions(engineContext, metadataConfig, basePath, + fullPartitionPathsToFetch.values.toArray, fileSystemStorageConfig.getSpillableDir) + fullPartitionPathsToFetch.map(p => { + (p._1, partitionToFilesMap.get(p._2)) + }) } else { Map.empty[PartitionRowPath, Array[FileStatus]] } + // Update the fileStatusCache - fetchedPartition2Files.foreach { + fetchedPartitionToFiles.foreach { case (partitionRowPath, filesInPartition) => fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition) } - cachePartitionToFiles.toMap ++ fetchedPartition2Files + cachePartitionToFiles.toMap ++ fetchedPartitionToFiles } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 07213feac..7fffb7e8a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -19,6 +19,7 @@ package org.apache.hudi import org.apache.avro.Schema +import org.apache.hudi.common.model.HoodieLogFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils @@ -142,15 +143,45 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, } def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = { - // Get all partition paths - val partitionPaths = if (globPaths.isDefined) { + if (globPaths.isDefined) { // Load files from the global paths if it has defined to be compatible with the original mode val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get) val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline .filterCompletedInstants, inMemoryFileIndex.allFiles().toArray) - fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent) - } else { // Load partition path by the HoodieFileIndex. + val partitionPaths = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus.getPath.getParent) + + + if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list. + List.empty[HoodieMergeOnReadFileSplit] + } else { + val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant() + if (!lastInstant.isPresent) { // Return empty list if the table has no commit + List.empty + } else { + val latestCommit = lastInstant.get().getTimestamp + val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala + val fileSplits = baseAndLogsList.map(kv => { + val baseFile = kv.getLeft + val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList) + + val baseDataPath = if (baseFile.isPresent) { + Some(PartitionedFile( + InternalRow.empty, + MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath), + 0, baseFile.get.getFileLen) + ) + } else { + None + } + HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit, + metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) + }).toList + fileSplits + } + } + } else { + // Load files by the HoodieFileIndex. val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) @@ -160,36 +191,34 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val partitionFilterExpression = HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema) - val allPartitionPaths = hoodieFileIndex.getAllQueryPartitionPaths // If convert success to catalyst expression, use the partition prune - hoodieFileIndex.prunePartition(allPartitionPaths, partitionFilterExpression.map(Seq(_)).getOrElse(Seq.empty)) - .map(_.fullPartitionPath(metaClient.getBasePath)) - } - - if (partitionPaths.isEmpty) { // If this an empty table, return an empty split list. - List.empty[HoodieMergeOnReadFileSplit] - } else { - val lastInstant = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants.lastInstant() - if (!lastInstant.isPresent) { // Return empty list if the table has no commit - List.empty + val fileSlices = if (partitionFilterExpression.isDefined) { + hoodieFileIndex.listFileSlices(Seq(partitionFilterExpression.get), Seq.empty) } else { - val latestCommit = lastInstant.get().getTimestamp - val baseAndLogsList = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, partitionPaths.asJava).asScala - val fileSplits = baseAndLogsList.map(kv => { - val baseFile = kv.getLeft - val logPaths = if (kv.getRight.isEmpty) Option.empty else Option(kv.getRight.asScala.toList) + hoodieFileIndex.listFileSlices(Seq.empty, Seq.empty) + } - val baseDataPath = if (baseFile.isPresent) { - Some(PartitionedFile( - InternalRow.empty, - MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath), - 0, baseFile.get.getFileLen) - ) + if (fileSlices.isEmpty) { + // If this an empty table, return an empty split list. + List.empty[HoodieMergeOnReadFileSplit] + } else { + val fileSplits = fileSlices.values.flatten.map(fileSlice => { + val latestCommit = metaClient.getActiveTimeline.getCommitsTimeline + .filterCompletedInstants.lastInstant().get().getTimestamp + + val partitionedFile = if (fileSlice.getBaseFile.isPresent) { + val baseFile = fileSlice.getBaseFile.get() + val baseFilePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath) + Option(PartitionedFile(InternalRow.empty, baseFilePath, 0, baseFile.getFileLen)) } else { - None + Option.empty } - HoodieMergeOnReadFileSplit(baseDataPath, logPaths, latestCommit, - metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) + + val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala + .map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList + val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths) + HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, latestCommit, metaClient.getBasePath, + maxCompactionMemoryInBytes, mergeType) }).toList fileSplits } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 070d1e8d4..c1f8bd132 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -53,6 +53,10 @@ class TestHoodieFileIndex extends HoodieClientTestBase { HoodieWriteConfig.TABLE_NAME.key -> "hoodie_test" ) + var queryOpts = Map( + DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL + ) + @BeforeEach override def setUp() { initPath() initSparkContexts() @@ -60,6 +64,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase { initTestDataGenerator() initFileSystem() initMetaClient() + + queryOpts = queryOpts ++ Map("path" -> basePath) } @ParameterizedTest @@ -74,7 +80,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) metaClient = HoodieTableMetaClient.reload(metaClient) - val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts) assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(",")) } @@ -96,7 +102,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) metaClient = HoodieTableMetaClient.reload(metaClient) - val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts) assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(",")) } @@ -115,7 +121,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) metaClient = HoodieTableMetaClient.reload(metaClient) - val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts) assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(",")) } @@ -133,7 +139,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) metaClient = HoodieTableMetaClient.reload(metaClient) - val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + val fileIndex = HoodieFileIndex(spark, metaClient, None, queryOpts) val partitionFilter1 = EqualTo(attribute("partition"), literal("2021/03/08")) val partitionName = if (partitionEncode) PartitionPathEncodeUtils.escapePathName("2021/03/08") @@ -176,7 +182,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) metaClient = HoodieTableMetaClient.reload(metaClient) - val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + val fileIndex = HoodieFileIndex(spark, metaClient, None, + queryOpts ++ Map(HoodieMetadataConfig.METADATA_ENABLE_PROP.key -> useMetaFileList.toString)) val partitionFilter1 = And( EqualTo(attribute("dt"), literal("2021-03-01")), @@ -190,7 +197,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase { assertEquals(partitionValues.toSeq(Seq(StringType)).mkString(","), "2021-03-01,10") assertEquals(getFileCountInPartitionPath("2021-03-01/10"), filesAfterPrune.size) - val readDF1 = spark.read.format("hudi").load(basePath) + val readDF1 = spark.read.format("hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key(), useMetaFileList) + .load(basePath) assertEquals(10, readDF1.count()) assertEquals(5, readDF1.filter("dt = '2021-03-01' and hh = '10'").count()) @@ -206,6 +215,7 @@ class TestHoodieFileIndex extends HoodieClientTestBase { .option(PARTITIONPATH_FIELD_OPT_KEY.key, "dt,hh") .option(KEYGENERATOR_CLASS_OPT_KEY.key, classOf[ComplexKeyGenerator].getName) .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, "false") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key(), useMetaFileList) .mode(SaveMode.Overwrite) .save(basePath) @@ -224,7 +234,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase { // The returned file size should equal to the whole file size in all the partition paths. assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"), filesAfterPrune2.length) - val readDF2 = spark.read.format("hudi").load(basePath) + val readDF2 = spark.read.format("hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, useMetaFileList) + .load(basePath) assertEquals(10, readDF2.count()) // There are 5 rows in the dt = 2021/03/01 and hh = 10 diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 0cdb16a9e..99905e0ec 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -41,7 +41,7 @@ import org.joda.time.format.DateTimeFormat import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.{CsvSource, ValueSource} /** @@ -160,9 +160,7 @@ class TestCOWDataSource extends HoodieClientTestBase { @ParameterizedTest - //TODO(metadata): Needs HUDI-1459 to be fixed - //@ValueSource(booleans = Array(true, false)) - @ValueSource(booleans = Array(false)) + @ValueSource(booleans = Array(true, false)) def testCopyOnWriteStorage(isMetadataEnabled: Boolean) { // Insert Operation val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList @@ -180,7 +178,7 @@ class TestCOWDataSource extends HoodieClientTestBase { // Snapshot query val snapshotDF1 = spark.read.format("org.apache.hudi") .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) - .load(basePath + "/*/*/*/*") + .load(basePath + "/*/*/*") assertEquals(100, snapshotDF1.count()) // Upsert based on the written table with Hudi metadata columns @@ -189,11 +187,14 @@ class TestCOWDataSource extends HoodieClientTestBase { updateDf.write.format("org.apache.hudi") .options(commonOpts) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath) val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath) - val snapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*/*/*") + val snapshotDF2 = spark.read.format("hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) + .load(basePath + "/*/*/*") assertEquals(100, snapshotDF2.count()) assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) @@ -214,7 +215,7 @@ class TestCOWDataSource extends HoodieClientTestBase { // Snapshot Query val snapshotDF3 = spark.read.format("org.apache.hudi") .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) - .load(basePath + "/*/*/*/*") + .load(basePath + "/*/*/*") assertEquals(100, snapshotDF3.count()) // still 100, since we only updated // Read Incremental Query @@ -666,8 +667,8 @@ class TestCOWDataSource extends HoodieClientTestBase { } @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = { + @CsvSource(Array("true,false", "true,true", "false,true", "false,false")) + def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean): Unit = { val N = 20 // Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has enable val records1 = dataGen.generateInsertsContainsAllPartitions("000", N) @@ -676,6 +677,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .options(commonOpts) .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, partitionEncode) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .mode(SaveMode.Overwrite) .save(basePath) val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) @@ -683,6 +685,7 @@ class TestCOWDataSource extends HoodieClientTestBase { val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15") // query the partition by filter val count1 = spark.read.format("hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .load(basePath) .filter("partition = '2016/03/15'") .count() @@ -691,6 +694,7 @@ class TestCOWDataSource extends HoodieClientTestBase { // query the partition by path val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15" val count2 = spark.read.format("hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .load(basePath + s"/$partitionPath") .count() assertEquals(countIn20160315, count2) @@ -702,6 +706,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .options(commonOpts) .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, partitionEncode) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath) // Incremental query without "*" in path diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index aacc46ae8..dc299d115 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import scala.collection.JavaConverters._ import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient @@ -75,7 +76,9 @@ class TestMORDataSource extends HoodieClientTestBase { cleanupFileSystem() } - @Test def testMergeOnReadStorage() { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testMergeOnReadStorage(isMetadataEnabled: Boolean) { val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) // Bulk Insert Operation @@ -86,6 +89,7 @@ class TestMORDataSource extends HoodieClientTestBase { .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .mode(SaveMode.Overwrite) .save(basePath) @@ -94,7 +98,9 @@ class TestMORDataSource extends HoodieClientTestBase { // Read RO View val hudiRODF1 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .load(basePath + "/*/*/*/*") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) + .load(basePath + "/*/*/*") + assertEquals(100, hudiRODF1.count()) // still 100, since we only updated val insertCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath) val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList @@ -105,6 +111,7 @@ class TestMORDataSource extends HoodieClientTestBase { val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath) @@ -112,7 +119,9 @@ class TestMORDataSource extends HoodieClientTestBase { val updateCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath) val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - .load(basePath + "/*/*/*/*") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) + .load(basePath + "/*/*/*") + val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList assertEquals(List(updateCommitTime), updateCommitTimes) @@ -122,10 +131,13 @@ class TestMORDataSource extends HoodieClientTestBase { inputDF3.write.format("org.apache.hudi") .options(commonOpts) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath) - val hudiSnapshotDF3 = spark.read.format("hudi").load(basePath + "/*/*/*/*") + val hudiSnapshotDF3 = spark.read.format("hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) + .load(basePath + "/*/*/*") assertEquals(100, hudiSnapshotDF3.count()) assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) } @@ -580,8 +592,8 @@ class TestMORDataSource extends HoodieClientTestBase { } @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testQueryMORWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = { + @CsvSource(Array("true,false", "true,true", "false,true", "false,false")) + def testQueryMORWithBasePathAndFileIndex(partitionEncode: Boolean, isMetadataEnabled: Boolean): Unit = { val N = 20 // Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has enable val records1 = dataGen.generateInsertsContainsAllPartitions("000", N) @@ -591,6 +603,7 @@ class TestMORDataSource extends HoodieClientTestBase { .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, partitionEncode) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .mode(SaveMode.Overwrite) .save(basePath) val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) @@ -598,6 +611,7 @@ class TestMORDataSource extends HoodieClientTestBase { val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15") // query the partition by filter val count1 = spark.read.format("hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .load(basePath) .filter("partition = '2016/03/15'") .count() @@ -606,6 +620,7 @@ class TestMORDataSource extends HoodieClientTestBase { // query the partition by path val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15" val count2 = spark.read.format("hudi") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .load(basePath + s"/$partitionPath") .count() assertEquals(countIn20160315, count2) @@ -618,6 +633,7 @@ class TestMORDataSource extends HoodieClientTestBase { .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY.key, partitionEncode) + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP.key, isMetadataEnabled) .mode(SaveMode.Append) .save(basePath) // Incremental query without "*" in path