diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java index 4fdb6a6be..b98893344 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java @@ -80,22 +80,21 @@ public class ColumnStatsIndexHelper { private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; - private static final String Z_INDEX_FILE_COLUMN_NAME = "file"; - - private static final String Z_INDEX_MIN_VALUE_STAT_NAME = "minValue"; - private static final String Z_INDEX_MAX_VALUE_STAT_NAME = "maxValue"; - private static final String Z_INDEX_NUM_NULLS_STAT_NAME = "num_nulls"; + private static final String COLUMN_STATS_INDEX_FILE_COLUMN_NAME = "file"; + private static final String COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME = "minValue"; + private static final String COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue"; + private static final String COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls"; public static String getMinColumnNameFor(String colName) { - return composeZIndexColName(colName, Z_INDEX_MIN_VALUE_STAT_NAME); + return composeZIndexColName(colName, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME); } public static String getMaxColumnNameFor(String colName) { - return composeZIndexColName(colName, Z_INDEX_MAX_VALUE_STAT_NAME); + return composeZIndexColName(colName, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME); } public static String getNumNullsColumnNameFor(String colName) { - return composeZIndexColName(colName, Z_INDEX_NUM_NULLS_STAT_NAME); + return composeZIndexColName(colName, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME); } /** @@ -407,11 +406,11 @@ public class ColumnStatsIndexHelper { @Nonnull public static StructType composeIndexSchema(@Nonnull List zorderedColumnsSchemas) { List schema = new ArrayList<>(); - schema.add(new StructField(Z_INDEX_FILE_COLUMN_NAME, StringType$.MODULE$, true, Metadata.empty())); + schema.add(new StructField(COLUMN_STATS_INDEX_FILE_COLUMN_NAME, StringType$.MODULE$, true, Metadata.empty())); zorderedColumnsSchemas.forEach(colSchema -> { - schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType())); - schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType())); - schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_NUM_NULLS_STAT_NAME, LongType$.MODULE$)); + schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType())); + schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType())); + schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME, LongType$.MODULE$)); }); return StructType$.MODULE$.apply(schema); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index 8cd68ff83..931714fd2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -62,10 +62,10 @@ public class HoodieMergeOnReadTestUtils { } public static List getRecordsUsingInputFormat(Configuration conf, List inputPaths, - String basePath, JobConf jobConf, boolean realtime, boolean populateMetaFieldsConfigValue) { + String basePath, JobConf jobConf, boolean realtime, boolean populateMetaFields) { Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, schema, - HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>(), populateMetaFieldsConfigValue); + HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>(), populateMetaFields); } public static List getRecordsUsingInputFormat(Configuration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, @@ -74,14 +74,23 @@ public class HoodieMergeOnReadTestUtils { } public static List getRecordsUsingInputFormat(Configuration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, - String rawHiveColumnTypes, boolean projectCols, List projectedColumns, boolean populateMetaFieldsConfigValue) { + String rawHiveColumnTypes, boolean projectCols, List projectedColumns, boolean populateMetaFields) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build(); FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(), realtime, jobConf); - Schema schema = HoodieAvroUtils.addMetadataFields(rawSchema); - String hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(rawHiveColumnTypes); - setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedColumns, populateMetaFieldsConfigValue); + Schema schema; + String hiveColumnTypes; + + if (populateMetaFields) { + schema = HoodieAvroUtils.addMetadataFields(rawSchema); + hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(rawHiveColumnTypes); + } else { + schema = rawSchema; + hiveColumnTypes = rawHiveColumnTypes; + } + + setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedColumns, populateMetaFields); final List fields; if (projectCols) { fields = schema.getFields().stream().filter(f -> projectedColumns.contains(f.name())) diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc b/hudi-common/src/main/avro/HoodieMetadata.avsc index 4037dd0f1..a6807d58e 100644 --- a/hudi-common/src/main/avro/HoodieMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieMetadata.avsc @@ -109,6 +109,14 @@ "string" ] }, + { + "doc": "Column name for which this column statistics applies", + "name": "columnName", + "type": [ + "null", + "string" + ] + }, { "doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type", "name": "minValue", diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 4c1eac79d..3de93005d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -83,6 +83,7 @@ public class HoodieTableMetaClient implements Serializable { public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".aux"; public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap"; public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat"; + public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata"; public static final String COLUMN_STATISTICS_INDEX_NAME = ".colstatsindex"; public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR + ".partitions"; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index f2db4d692..a0771d124 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -58,6 +58,8 @@ import java.io.IOException; /** * Helper class to read schema from data files and log files and to convert it between different formats. + * + * TODO(HUDI-3626) cleanup */ public class TableSchemaResolver { @@ -143,7 +145,7 @@ public class TableSchemaResolver { * @throws Exception */ public Schema getTableAvroSchema() throws Exception { - return getTableAvroSchema(true); + return getTableAvroSchema(metaClient.getTableConfig().populateMetaFields()); } /** @@ -197,7 +199,10 @@ public class TableSchemaResolver { * * @return Avro user data schema * @throws Exception + * + * @deprecated use {@link #getTableAvroSchema(boolean)} instead */ + @Deprecated public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { return getTableAvroSchema(false); } @@ -208,7 +213,9 @@ public class TableSchemaResolver { * @param instant will get the instant data schema * @return Avro user data schema * @throws Exception + * @deprecated use {@link #getTableSchemaFromCommitMetadata} instead */ + @Deprecated public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant) throws Exception { Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(instant, false); if (schemaFromCommitMetadata.isPresent()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java index d17cd4bc7..9d279d532 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java @@ -50,13 +50,13 @@ public class TablePathUtils { FileStatus fileStatus = fs.getFileStatus(path); Path directory = fileStatus.isFile() ? fileStatus.getPath().getParent() : fileStatus.getPath(); - if (TablePathUtils.hasTableMetadataFolder(fs, directory)) { + if (hasTableMetadataFolder(fs, directory)) { // Handle table folder itself return Option.of(directory); } // Handle metadata folder or metadata sub folder path - Option tablePath = getTablePathFromTableMetadataPath(directory); + Option tablePath = getTablePathFromMetaFolderPath(directory); if (tablePath.isPresent()) { return tablePath; } @@ -65,20 +65,20 @@ public class TablePathUtils { return getTablePathFromPartitionPath(fs, directory); } - private static boolean isTableMetadataFolder(String path) { - return path != null && path.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME); + private static boolean isInsideTableMetaFolder(String path) { + return path != null && path.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME); } - private static boolean isInsideTableMetadataFolder(String path) { - return path != null && path.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"); + private static boolean isInsideMetadataTableInMetaFolder(String path) { + return path != null && path.contains("/" + HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH); } - private static Option getTablePathFromTableMetadataPath(Path path) { + private static Option getTablePathFromMetaFolderPath(Path path) { String pathStr = path.toString(); - if (isTableMetadataFolder(pathStr)) { - return Option.of(path.getParent()); - } else if (isInsideTableMetadataFolder(pathStr)) { + // NOTE: Since Metadata Table itself resides w/in the Meta-folder, we need to make sure + // that we don't misinterpret attempt to read MT table itself + if (isInsideTableMetaFolder(pathStr) && !isInsideMetadataTableInMetaFolder(pathStr)) { int index = pathStr.indexOf("/" + HoodieTableMetaClient.METAFOLDER_NAME); return Option.of(new Path(pathStr.substring(0, index))); } @@ -92,12 +92,21 @@ public class TablePathUtils { HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath); metadata.readFromFS(); return Option.of(getNthParent(partitionPath, metadata.getPartitionDepth())); + } else { + // Simply traverse directory structure until found .hoodie folder + Path current = partitionPath; + while (current != null) { + if (hasTableMetadataFolder(fs, current)) { + return Option.of(current); + } + current = current.getParent(); + } + + return Option.empty(); } } catch (IOException ioe) { throw new HoodieException("Error reading partition metadata for " + partitionPath, ioe); } - - return Option.empty(); } private static Path getNthParent(Path path, int n) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index c0ad8b147..f7d45db8b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -108,14 +108,15 @@ public class HoodieMetadataPayload implements HoodieRecordPayload> columnRangeMetadataList = new ArrayList<>(); final Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePathWithPartition); + List> columnRangeMetadataList; if (!isDeleted) { - try { - columnRangeMetadataList = new ParquetUtils().readRangeFromParquetMetadata( - datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex); - } catch (Exception e) { - LOG.error("Failed to read column stats for " + fullFilePath, e); - } + columnRangeMetadataList = new ParquetUtils().readRangeFromParquetMetadata( + datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex); } else { + // TODO we should delete records instead of stubbing them columnRangeMetadataList = columnsToIndex.stream().map(entry -> new HoodieColumnRangeMetadata(fileName, entry, null, null, 0, 0, 0, 0)) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java index ab573f0d8..056f2121c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java @@ -93,20 +93,28 @@ public final class TestTablePathUtils { @Test void getTablePathFromMetadataFolderPath() throws IOException { - Path metadataFolder = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME); - Option inferredTablePath = TablePathUtils.getTablePath(fs, metadataFolder); + Path metaFolder = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME); + Option inferredTablePath = TablePathUtils.getTablePath(fs, metaFolder); assertEquals(tablePath, inferredTablePath.get()); } @Test void getTablePathFromMetadataSubFolderPath() throws IOException { Path auxFolder = new Path(tablePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); - Option inferredTablePath = TablePathUtils.getTablePath(fs, auxFolder); - assertEquals(tablePath, inferredTablePath.get()); + assertEquals(tablePath, TablePathUtils.getTablePath(fs, auxFolder).get()); Path bootstrapIndexFolder = new Path(tablePath, HoodieTableMetaClient.BOOTSTRAP_INDEX_ROOT_FOLDER_PATH); - inferredTablePath = TablePathUtils.getTablePath(fs, bootstrapIndexFolder); - assertEquals(tablePath, inferredTablePath.get()); + assertEquals(tablePath, TablePathUtils.getTablePath(fs, bootstrapIndexFolder).get()); + + Path metadataTableFolder = new Path(tablePath, HoodieTableMetaClient.METADATA_TABLE_FOLDER_PATH); + Path metadataTableMetaFolder = new Path(metadataTableFolder, HoodieTableMetaClient.METAFOLDER_NAME); + assertTrue(new File(metadataTableMetaFolder.toUri()).mkdirs()); + + assertEquals(metadataTableFolder, TablePathUtils.getTablePath(fs, metadataTableFolder).get()); + + Path metadataTablePartitionFolder = new Path(metadataTableFolder, "column_stats"); + assertTrue(new File(metadataTablePartitionFolder.toUri()).mkdir()); + assertEquals(metadataTableFolder, TablePathUtils.getTablePath(fs, metadataTablePartitionFolder).get()); } @Test diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 2b8dae255..5ae91dc46 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.hadoop; +import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -47,7 +48,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; -import org.apache.parquet.schema.MessageType; import javax.annotation.Nonnull; import java.io.IOException; @@ -278,10 +278,13 @@ public class HoodieCopyOnWriteTableInputFormat extends HoodieTableInputFormat { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); try { - MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); - return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), - parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp()))); + Schema schema = tableSchemaResolver.getTableAvroSchema(); + return Option.of( + new HoodieVirtualKeyInfo( + tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp(), + schema.getField(tableConfig.getRecordKeyFieldProp()).pos(), + schema.getField(tableConfig.getPartitionFieldProp()).pos())); } catch (Exception exception) { throw new HoodieException("Fetching table schema failed with exception ", exception); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 1508babcb..7d558ec8c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -18,21 +18,15 @@ package org.apache.hudi import org.apache.hadoop.fs.Path - import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION} import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException -import org.apache.hudi.hadoop.HoodieROTablePathFilter - import org.apache.log4j.LogManager - -import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} -import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.hudi.streaming.HoodieStreamSource import org.apache.spark.sql.sources._ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala new file mode 100644 index 000000000..a6c689610 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql.DataFrame +import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK + +object HoodieDatasetUtils { + + /** + * Executes provided function while keeping provided [[DataFrame]] instance persisted for the + * duration of the execution + * + * @param df target [[DataFrame]] to be persisted + * @param level desired [[StorageLevel]] of the persistence + * @param f target function to be executed while [[DataFrame]] is kept persisted + * @tparam T return value of the target function + * @return execution outcome of the [[f]] function + */ + def withPersistence[T](df: DataFrame, level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = { + df.persist(level) + try { + f + } finally { + df.unpersist() + } + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 9cdf5cc63..6aa6fbb0e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -18,30 +18,31 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} - -import org.apache.hudi.HoodieFileIndex.getConfigProperties +import org.apache.hudi.HoodieDatasetUtils.withPersistence +import org.apache.hudi.HoodieFileIndex.{collectReferencedColumns, getConfigProperties} import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.StringUtils +import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} - +import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, MetadataPartitionType} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} -import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{AnalysisException, Column, SparkSession} import org.apache.spark.unsafe.types.UTF8String -import scala.collection.JavaConverters._ -import scala.util.{Failure, Success, Try} -import scala.util.control.NonFatal - import java.text.SimpleDateFormat +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} /** * A file index which support partition prune for hoodie snapshot and read-optimized query. @@ -84,7 +85,7 @@ case class HoodieFileIndex(spark: SparkSession, override def rootPaths: Seq[Path] = queryPaths.asScala - def enableDataSkipping(): Boolean = { + def isDataSkippingEnabled(): Boolean = { options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean } @@ -112,7 +113,6 @@ case class HoodieFileIndex(spark: SparkSession, * @return list of PartitionDirectory containing partition to base files mapping */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - val convertedPartitionFilters = HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters) @@ -121,18 +121,14 @@ case class HoodieFileIndex(spark: SparkSession, // - Col-Stats Index is present // - List of predicates (filters) is present val candidateFilesNamesOpt: Option[Set[String]] = - lookupCandidateFilesInColStatsIndex(dataFilters) match { + lookupCandidateFilesInMetadataTable(dataFilters) match { case Success(opt) => opt case Failure(e) => - if (e.isInstanceOf[AnalysisException]) { - logDebug("Failed to relay provided data filters to Z-index lookup", e) - } else { - logError("Failed to lookup candidate files in Z-index", e) - } + logError("Failed to lookup candidate files in Z-index", e) Option.empty } - logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}") + logDebug(s"Overlapping candidate files from Column Stats Index: ${candidateFilesNamesOpt.getOrElse(Set.empty)}") if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table @@ -185,8 +181,8 @@ case class HoodieFileIndex(spark: SparkSession, /** * Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters} - * conditions, by leveraging custom Column Statistics index (col-stats-index) bearing "min", "max", - * "num_nulls" statistics for all clustered columns. + * conditions, by leveraging Metadata Table's Column Statistics index (hereon referred as ColStats for brevity) + * bearing "min", "max", "num_nulls" statistics for all columns. * * NOTE: This method has to return complete set of candidate files, since only provided candidates will * ultimately be scanned as part of query execution. Hence, this method has to maintain the @@ -195,77 +191,102 @@ case class HoodieFileIndex(spark: SparkSession, * @param queryFilters list of original data filters passed down from querying engine * @return list of pruned (data-skipped) candidate base-files' names */ - private def lookupCandidateFilesInColStatsIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { - val indexPath = metaClient.getColumnStatsIndexPath + private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { val fs = metaClient.getFs + val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath) - if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) { - // scalastyle:off return - return Success(Option.empty) - // scalastyle:on return + if (!isDataSkippingEnabled() || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) { + Option.empty + } else { + val targetColStatsIndexColumns = Seq( + HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, + HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, + HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, + HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT) + + val requiredMetadataIndexColumns = + (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName => + s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}") + + // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] + val metadataTableDF = spark.read.format("org.apache.hudi") + .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}") + + // TODO filter on (column, partition) prefix + val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull) + .select(requiredMetadataIndexColumns.map(col): _*) + + val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) + + // Persist DF to avoid re-computing column statistics unraveling + withPersistence(colStatsDF) { + // Metadata Table bears rows in the following format + // + // +---------------------------+------------+------------+------------+-------------+ + // | fileName | columnName | minValue | maxValue | num_nulls | + // +---------------------------+------------+------------+------------+-------------+ + // | one_base_file.parquet | A | 1 | 10 | 0 | + // | another_base_file.parquet | A | -10 | 0 | 5 | + // +---------------------------+------------+------------+------------+-------------+ + // + // While Data Skipping utils are expecting following (transposed) format, where per-column stats are + // essentially transposed (from rows to columns): + // + // +---------------------------+------------+------------+-------------+ + // | file | A_minValue | A_maxValue | A_num_nulls | + // +---------------------------+------------+------------+-------------+ + // | one_base_file.parquet | 1 | 10 | 0 | + // | another_base_file.parquet | -10 | 0 | 5 | + // +---------------------------+------------+------------+-------------+ + // + // NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while + // query at hand might only be referencing a handful of those. As such, we collect all the + // column references from the filtering expressions, and only transpose records corresponding to the + // columns referenced in those + val transposedColStatsDF = + queryReferencedColumns.map(colName => + colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName)) + .select(targetColStatsIndexColumns.map(col): _*) + .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName)) + .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName)) + .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName)) + ) + .reduceLeft((left, right) => + left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)) + + // Persist DF to avoid re-computing column statistics unraveling + withPersistence(transposedColStatsDF) { + val indexSchema = transposedColStatsDF.schema + val indexFilter = + queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)) + .reduce(And) + + val allIndexedFileNames = + transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) + .collect() + .map(_.getString(0)) + .toSet + + val prunedCandidateFileNames = + transposedColStatsDF.where(new Column(indexFilter)) + .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) + .collect() + .map(_.getString(0)) + .toSet + + // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every + // base-file: since it's bound to clustering, which could occur asynchronously + // at arbitrary point in time, and is not likely to be touching all of the base files. + // + // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) + // files and all outstanding base-files, and make sure that all base files not + // represented w/in the index are included in the output of this method + val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames) + + Some(prunedCandidateFileNames ++ notIndexedFileNames) + } + } } - - val completedCommits = getActiveTimeline.filterCompletedInstants().getInstants.iterator.asScala.toList.map(_.getTimestamp) - - // Collect all index tables present in `.zindex` folder - val candidateIndexTables = - fs.listStatus(new Path(indexPath)) - .filter(_.isDirectory) - .map(_.getPath.getName) - .filter(completedCommits.contains(_)) - .sortBy(x => x) - - if (candidateIndexTables.isEmpty) { - // scalastyle:off return - return Success(Option.empty) - // scalastyle:on return - } - - val dataFrameOpt = try { - Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString)) - } catch { - case t: Throwable => - logError("Failed to read col-stats index; skipping", t) - None - } - - dataFrameOpt.map(df => { - val indexSchema = df.schema - val indexFilter = - queryFilters.map(createColumnStatsIndexFilterExpr(_, indexSchema)) - .reduce(And) - - logInfo(s"Index filter condition: $indexFilter") - - df.persist() - - val allIndexedFileNames = - df.select("file") - .collect() - .map(_.getString(0)) - .toSet - - val prunedCandidateFileNames = - df.where(new Column(indexFilter)) - .select("file") - .collect() - .map(_.getString(0)) - .toSet - - df.unpersist() - - // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every - // base-file: since it's bound to clustering, which could occur asynchronously - // at arbitrary point in time, and is not likely to be touching all of the base files. - // - // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) - // files and all outstanding base-files, and make sure that all base files not - // represented w/in the index are included in the output of this method - val notIndexedFileNames = - lookupFileNamesMissingFromIndex(allIndexedFileNames) - - prunedCandidateFileNames ++ notIndexedFileNames - }) } override def refresh(): Unit = super.refresh() @@ -282,6 +303,12 @@ case class HoodieFileIndex(spark: SparkSession, object HoodieFileIndex extends Logging { + private def collectReferencedColumns(spark: SparkSession, queryFilters: Seq[Expression], schema: StructType): Seq[String] = { + val resolver = spark.sessionState.analyzer.resolver + val refs = queryFilters.flatMap(_.references) + schema.fieldNames.filter { colName => refs.exists(r => resolver.apply(colName, r.name)) } + } + def getConfigProperties(spark: SparkSession, options: Map[String, String]) = { val sqlConf: SQLConf = spark.sessionState.conf val properties = new TypedProperties() @@ -331,6 +358,9 @@ object HoodieFileIndex extends Logging { } private def getQueryPath(options: Map[String, String]) = { - new Path(options.getOrElse("path", "'path' option required")) + new Path(options.get("path") match { + case Some(p) => p + case None => throw new IllegalArgumentException("'path' option required") + }) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index d4c50b73f..74b8e10c4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -307,7 +307,7 @@ object SparkHoodieTableFileIndex { } private def deduceQueryType(configProperties: TypedProperties): HoodieTableQueryType = { - configProperties.asScala(QUERY_TYPE.key()) match { + configProperties.asScala(QUERY_TYPE.key) match { case QUERY_TYPE_SNAPSHOT_OPT_VAL => HoodieTableQueryType.SNAPSHOT case QUERY_TYPE_INCREMENTAL_OPT_VAL => HoodieTableQueryType.INCREMENTAL case QUERY_TYPE_READ_OPTIMIZED_OPT_VAL => HoodieTableQueryType.READ_OPTIMIZED diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index e5d6f525b..06b92e204 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -17,39 +17,40 @@ package org.apache.spark.sql.hudi -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor} import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith} -import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.functions.col -import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String -import scala.collection.JavaConverters._ - object DataSkippingUtils extends Logging { /** * Translates provided {@link filterExpr} into corresponding filter-expression for column-stats index index table * to filter out candidate files that would hold records matching the original filter * - * @param sourceFilterExpr source table's query's filter expression + * @param dataTableFilterExpr source table's query's filter expression * @param indexSchema index table schema * @return filter for column-stats index's table */ - def createColumnStatsIndexFilterExpr(sourceFilterExpr: Expression, indexSchema: StructType): Expression = { + def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression, indexSchema: StructType): Expression = { + try { + createColumnStatsIndexFilterExprInternal(dataTableFilterExpr, indexSchema) + } catch { + case e: AnalysisException => + logDebug(s"Failed to translated provided data table filter expr into column stats one ($dataTableFilterExpr)", e) + throw e + } + } + + private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr: Expression, indexSchema: StructType): Expression = { // Try to transform original Source Table's filter expression into // Column-Stats Index filter expression - tryComposeIndexFilterExpr(sourceFilterExpr, indexSchema) match { + tryComposeIndexFilterExpr(dataTableFilterExpr, indexSchema) match { case Some(e) => e // NOTE: In case we can't transform source filter expression, we fallback // to {@code TrueLiteral}, to essentially avoid pruning any indexed files from scanning @@ -201,14 +202,14 @@ object DataSkippingUtils extends Logging { ) case or: Or => - val resLeft = createColumnStatsIndexFilterExpr(or.left, indexSchema) - val resRight = createColumnStatsIndexFilterExpr(or.right, indexSchema) + val resLeft = createColumnStatsIndexFilterExprInternal(or.left, indexSchema) + val resRight = createColumnStatsIndexFilterExprInternal(or.right, indexSchema) Option(Or(resLeft, resRight)) case and: And => - val resLeft = createColumnStatsIndexFilterExpr(and.left, indexSchema) - val resRight = createColumnStatsIndexFilterExpr(and.right, indexSchema) + val resLeft = createColumnStatsIndexFilterExprInternal(and.left, indexSchema) + val resRight = createColumnStatsIndexFilterExprInternal(and.right, indexSchema) Option(And(resLeft, resRight)) @@ -219,10 +220,10 @@ object DataSkippingUtils extends Logging { // case Not(And(left: Expression, right: Expression)) => - Option(createColumnStatsIndexFilterExpr(Or(Not(left), Not(right)), indexSchema)) + Option(createColumnStatsIndexFilterExprInternal(Or(Not(left), Not(right)), indexSchema)) case Not(Or(left: Expression, right: Expression)) => - Option(createColumnStatsIndexFilterExpr(And(Not(left), Not(right)), indexSchema)) + Option(createColumnStatsIndexFilterExprInternal(And(Not(left), Not(right)), indexSchema)) case _: Expression => None } @@ -259,34 +260,4 @@ object DataSkippingUtils extends Logging { throw new AnalysisException(s"convert reference to name failed, Found unsupported expression ${other}") } } - - def getIndexFiles(conf: Configuration, indexPath: String): Seq[FileStatus] = { - val basePath = new Path(indexPath) - basePath.getFileSystem(conf) - .listStatus(basePath).filter(f => f.getPath.getName.endsWith(".parquet")) - } - - /** - * read parquet files concurrently by local. - * this method is mush faster than spark - */ - def readParquetFile(spark: SparkSession, indexFiles: Seq[FileStatus], filters: Seq[Filter] = Nil, schemaOpts: Option[StructType] = None): Set[String] = { - val hadoopConf = spark.sparkContext.hadoopConfiguration - val partitionedFiles = indexFiles.map(f => PartitionedFile(InternalRow.empty, f.getPath.toString, 0, f.getLen)) - - val requiredSchema = new StructType().add("file", StringType, true) - val schema = schemaOpts.getOrElse(requiredSchema) - val parquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(spark - , schema , StructType(Nil), requiredSchema, filters, Map.empty, hadoopConf) - val results = new Array[Iterator[String]](partitionedFiles.size) - partitionedFiles.zipWithIndex.par.foreach { case (pf, index) => - val fileIterator = parquetReader(pf).asInstanceOf[Iterator[Any]] - val rows = fileIterator.flatMap(_ match { - case r: InternalRow => Seq(r) - case b: ColumnarBatch => b.rowIterator().asScala - }).map(r => r.getString(0)) - results(index) = rows - } - results.flatMap(f => f).toSet - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index ac866ba3e..6b96472d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -74,7 +74,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @MethodSource(Array("testBaseLookupFilterExpressionsSource", "testAdvancedLookupFilterExpressionsSource")) def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = { val resolvedExpr: Expression = HoodieCatalystExpressionUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) - val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) + val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark import spark2.implicits._ @@ -94,7 +94,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase { @MethodSource(Array("testStringsLookupFilterExpressionsSource")) def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = { val resolvedExpr = HoodieCatalystExpressionUtils.resolveFilterExpr(spark, sourceExpr, sourceTableSchema) - val lookupFilter = DataSkippingUtils.createColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) + val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) val spark2 = spark import spark2.implicits._ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index fa07c573f..899fc4cc2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -18,43 +18,43 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration - +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.HoodieJavaWriteClient import org.apache.hudi.client.common.HoodieJavaEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.engine.EngineType -import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} -import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableQueryType, HoodieTableType} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtils} import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.common.util.StringUtils.isNullOrEmpty -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig} import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config -import org.apache.hudi.testutils.HoodieClientTestBase - +import org.apache.hudi.metadata.{HoodieTableMetadata, MetadataPartitionType} +import org.apache.hudi.testutils.{HoodieClientTestBase, SparkClientFunctionalTestHarness} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal} -import org.apache.spark.sql.execution.datasources.PartitionDirectory +import org.apache.spark.sql.execution.datasources.{NoopCache, PartitionDirectory} import org.apache.spark.sql.functions.{lit, struct} -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession} - import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.{BeforeEach, Tag, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource} import java.util.Properties - - import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.util.Random class TestHoodieFileIndex extends HoodieClientTestBase { @@ -333,6 +333,57 @@ class TestHoodieFileIndex extends HoodieClientTestBase { assert(fileIndex.getAllQueryPartitionPaths.get(0).path.equals("c")) } + @Test + def testDataSkippingWhileFileListing(): Unit = { + val r = new Random(0xDEED) + val tuples = for (i <- 1 to 1000) yield (i, 1000 - i, r.nextString(5), r.nextInt(4)) + + val _spark = spark + import _spark.implicits._ + val inputDF = tuples.toDF("id", "inv_id", "str", "rand") + + val opts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + RECORDKEY_FIELD.key -> "id", + PRECOMBINE_FIELD.key -> "id", + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) + + inputDF.repartition(4) + .write + .format("hudi") + .options(opts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 100 * 1024) + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.reload(metaClient) + + val props = Map[String, String]( + "path" -> basePath, + QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL, + DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true" + ) + + val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache) + + val allFilesPartitions = fileIndex.listFiles(Seq(), Seq()) + assertEquals(10, allFilesPartitions.head.files.length) + + // We're selecting a single file that contains "id" == 1 row, which there should be + // strictly 1. Given that 1 is minimal possible value, Data Skipping should be able to + // truncate search space to just a single file + val dataFilter = EqualTo(AttributeReference("id", IntegerType, nullable = false)(), Literal(1)) + val filteredPartitions = fileIndex.listFiles(Seq(), Seq(dataFilter)) + assertEquals(1, filteredPartitions.head.files.length) + } + private def attribute(partition: String): AttributeReference = { AttributeReference(partition, StringType, true)() } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala index 818addaf8..96728f620 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala @@ -18,6 +18,7 @@ package org.apache.hudi.functional +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings @@ -53,6 +54,8 @@ class TestLayoutOptimization extends HoodieClientTestBase { "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", "hoodie.bulkinsert.shuffle.parallelism" -> "4", + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",