From 8b796e96865c44982b4dd8314e60573594e9f153 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 30 Mar 2022 10:01:43 -0700 Subject: [PATCH] [HUDI-3653] Cleaning up bespoke Column Stats Index implementation (#5062) --- .../columnstats/ColumnStatsIndexHelper.java | 415 ++++-------------- .../TestColumnStatsIndexHelper.java | 41 -- .../functional/TestColumnStatsIndex.scala | 105 ++--- ...cala => SpaceCurveOptimizeBenchmark.scala} | 12 +- 4 files changed, 120 insertions(+), 453 deletions(-) delete mode 100644 hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/columnstats/TestColumnStatsIndexHelper.java rename hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/{SpaceCurveOptimizeBenchMark.scala => SpaceCurveOptimizeBenchmark.scala} (95%) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java index 49e33f456..d34480cc2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java @@ -17,19 +17,13 @@ package org.apache.hudi.index.columnstats; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.BaseFileUtils; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -60,27 +54,17 @@ import org.apache.spark.util.SerializableConfiguration; import scala.collection.JavaConversions; import javax.annotation.Nonnull; -import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.apache.hudi.common.util.ValidationUtils.checkState; -import static org.apache.hudi.util.DataTypeUtils.areCompatible; - public class ColumnStatsIndexHelper { - private static final Logger LOG = LogManager.getLogger(ColumnStatsIndexHelper.class); - - private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; - private static final String COLUMN_STATS_INDEX_FILE_COLUMN_NAME = "file"; private static final String COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME = "minValue"; private static final String COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue"; @@ -99,6 +83,88 @@ public class ColumnStatsIndexHelper { } /** + * @VisibleForTesting + */ + @Nonnull + public static StructType composeIndexSchema(@Nonnull List zorderedColumnsSchemas) { + List schema = new ArrayList<>(); + schema.add(new StructField(COLUMN_STATS_INDEX_FILE_COLUMN_NAME, StringType$.MODULE$, true, Metadata.empty())); + zorderedColumnsSchemas.forEach(colSchema -> { + schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType())); + schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType())); + schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME, LongType$.MODULE$)); + }); + return StructType$.MODULE$.apply(schema); + } + + private static StructField composeColumnStatStructType(String col, String statName, DataType dataType) { + return new StructField(composeZIndexColName(col, statName), dataType, true, Metadata.empty()); + } + + private static String composeZIndexColName(String col, String statName) { + // TODO add escaping for + return String.format("%s_%s", col, statName); + } + + public static Pair + fetchMinMaxValues( + @Nonnull DataType colType, + @Nonnull HoodieColumnRangeMetadata colMetadata) { + if (colType instanceof IntegerType) { + return Pair.of( + new Integer(colMetadata.getMinValue().toString()), + new Integer(colMetadata.getMaxValue().toString()) + ); + } else if (colType instanceof DoubleType) { + return Pair.of( + new Double(colMetadata.getMinValue().toString()), + new Double(colMetadata.getMaxValue().toString()) + ); + } else if (colType instanceof StringType) { + return Pair.of( + colMetadata.getMinValue().toString(), + colMetadata.getMaxValue().toString()); + } else if (colType instanceof DecimalType) { + return Pair.of( + new BigDecimal(colMetadata.getMinValue().toString()), + new BigDecimal(colMetadata.getMaxValue().toString())); + } else if (colType instanceof DateType) { + return Pair.of( + java.sql.Date.valueOf(colMetadata.getMinValue().toString()), + java.sql.Date.valueOf(colMetadata.getMaxValue().toString())); + } else if (colType instanceof LongType) { + return Pair.of( + new Long(colMetadata.getMinValue().toString()), + new Long(colMetadata.getMaxValue().toString())); + } else if (colType instanceof ShortType) { + return Pair.of( + new Short(colMetadata.getMinValue().toString()), + new Short(colMetadata.getMaxValue().toString())); + } else if (colType instanceof FloatType) { + return Pair.of( + new Float(colMetadata.getMinValue().toString()), + new Float(colMetadata.getMaxValue().toString())); + } else if (colType instanceof BinaryType) { + return Pair.of( + ((ByteBuffer) colMetadata.getMinValue()).array(), + ((ByteBuffer) colMetadata.getMaxValue()).array()); + } else if (colType instanceof BooleanType) { + return Pair.of( + Boolean.valueOf(colMetadata.getMinValue().toString()), + Boolean.valueOf(colMetadata.getMaxValue().toString())); + } else if (colType instanceof ByteType) { + return Pair.of( + Byte.valueOf(colMetadata.getMinValue().toString()), + Byte.valueOf(colMetadata.getMaxValue().toString())); + } else { + throw new HoodieException(String.format("Not support type: %s", colType)); + } + } + + /** + * NOTE: THIS IS ONLY USED IN TESTING CURRENTLY, SINCE DATA SKIPPING IS NOW RELYING ON + * METADATA TABLE INDEX + * * Parse min/max statistics from Parquet footers for provided columns and composes column-stats * index table in the following format with 3 statistics denominated for each * linear/Z-curve/Hilbert-curve-ordered column. For ex, if original table contained @@ -115,8 +181,8 @@ public class ColumnStatsIndexHelper { *

* NOTE: Currently {@link TimestampType} is not supported, since Parquet writer * does not support statistics for it. - *

- * TODO leverage metadata table after RFC-27 lands + * + * @VisibleForTestingOnly * * @param sparkSession encompassing Spark session * @param baseFilesPaths list of base-files paths to be sourced for column-stats index @@ -136,7 +202,7 @@ public class ColumnStatsIndexHelper { SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); int numParallelism = (baseFilesPaths.size() / 3 + 1); List> colMinMaxInfos; - String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION); + String previousJobDescription = sc.getLocalProperty("spark.job.description"); try { jsc.setJobDescription("Listing parquet column statistics"); colMinMaxInfos = @@ -212,315 +278,4 @@ public class ColumnStatsIndexHelper { return sparkSession.createDataFrame(allMetaDataRDD, indexSchema); } - - /** - *

- * Updates state of the column-stats index by: - *

    - *
  1. Updating column-stats index with statistics for {@code sourceBaseFiles}, - * collecting corresponding column statistics from Parquet footers
  2. - *
  3. Merging newly built column-stats index table with the most recent one (if present - * and not preempted)
  4. - *
  5. Cleans up any residual index tables, that weren't cleaned up before
  6. - *
- * - * @param sparkSession encompassing Spark session - * @param sourceTableSchema instance of {@link StructType} bearing source table's writer's schema - * @param sourceBaseFiles list of base-files to be indexed - * @param orderedCols target ordered columns - * @param indexFolderPath col-stats index folder path - * @param commitTime current operation commit instant - * @param completedCommits all previously completed commit instants - */ - public static void updateColumnStatsIndexFor( - @Nonnull SparkSession sparkSession, - @Nonnull StructType sourceTableSchema, - @Nonnull List sourceBaseFiles, - @Nonnull List orderedCols, - @Nonnull String indexFolderPath, - @Nonnull String commitTime, - @Nonnull List completedCommits - ) { - FileSystem fs = FSUtils.getFs(indexFolderPath, sparkSession.sparkContext().hadoopConfiguration()); - - // Compose new col-stats index table for the given source base files - Dataset newColStatsIndexDf = - buildColumnStatsTableFor( - sparkSession, - sourceBaseFiles, - orderedCols.stream() - .map(col -> sourceTableSchema.fields()[sourceTableSchema.fieldIndex(col)]) - .collect(Collectors.toList()) - ); - - try { - // - // Column Stats Index has the following folder structure: - // - // .hoodie/ - // ├── .colstatsindex/ - // │ ├── / - // │ │ ├── .parquet - // │ │ └── ... - // - Path newIndexTablePath = new Path(indexFolderPath, commitTime); - - // If index is currently empty (no persisted tables), we simply create one - // using clustering operation's commit instance as it's name - if (!fs.exists(new Path(indexFolderPath))) { - newColStatsIndexDf.repartition(1) - .write() - .format("parquet") - .mode("overwrite") - .save(newIndexTablePath.toString()); - return; - } - - // Filter in all index tables (w/in {@code .zindex} folder) - List allIndexTables = - Arrays.stream( - fs.listStatus(new Path(indexFolderPath)) - ) - .filter(FileStatus::isDirectory) - .map(f -> f.getPath().getName()) - .collect(Collectors.toList()); - - // Compile list of valid index tables that were produced as part - // of previously successfully committed iterations - List validIndexTables = - allIndexTables.stream() - .filter(completedCommits::contains) - .sorted() - .collect(Collectors.toList()); - - List tablesToCleanup = - allIndexTables.stream() - .filter(f -> !completedCommits.contains(f)) - .collect(Collectors.toList()); - - Dataset finalColStatsIndexDf; - - // Before writing out new version of the col-stats-index table we need to merge it - // with the most recent one that were successfully persisted previously - if (validIndexTables.isEmpty()) { - finalColStatsIndexDf = newColStatsIndexDf; - } else { - Path latestIndexTablePath = new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)); - - Option> existingIndexTableOpt = - tryLoadExistingIndexTable(sparkSession, latestIndexTablePath); - - if (!existingIndexTableOpt.isPresent()) { - finalColStatsIndexDf = newColStatsIndexDf; - } else { - // NOTE: That Parquet schema might deviate from the original table schema (for ex, - // by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it - // prior to merging, since merging might fail otherwise due to schemas incompatibility - finalColStatsIndexDf = - tryMergeMostRecentIndexTableInto( - sparkSession, - newColStatsIndexDf, - // Load current most recent col-stats-index table - existingIndexTableOpt.get() - ); - - // Clean up all index tables (after creation of the new index) - tablesToCleanup.addAll(validIndexTables); - } - } - - // Persist new col-stats-index table - finalColStatsIndexDf - .repartition(1) - .write() - .format("parquet") - // NOTE: We intend to potentially overwrite index-table from the previous Clustering - // operation that has failed to commit - .mode("overwrite") - .save(newIndexTablePath.toString()); - - // Clean up residual col-stats-index tables that have might have been dangling since - // previous iterations (due to intermittent failures during previous clean up) - tablesToCleanup.forEach(f -> { - try { - fs.delete(new Path(indexFolderPath, f), true); - } catch (IOException ie) { - // NOTE: Exception is deliberately swallowed to not affect overall clustering operation, - // since failing col-stats-index table will be attempted to be cleaned up upon subsequent - // clustering iteration - LOG.warn(String.format("Failed to cleanup residual col-stats-index table: %s", f), ie); - } - }); - } catch (IOException e) { - LOG.error("Failed to build new col-stats-index table", e); - throw new HoodieException("Failed to build new col-stats-index table", e); - } - } - - @Nonnull - private static Option> tryLoadExistingIndexTable(@Nonnull SparkSession sparkSession, @Nonnull Path indexTablePath) { - try { - Dataset indexTableDataset = sparkSession.read().load(indexTablePath.toUri().toString()); - return Option.of(indexTableDataset); - } catch (Exception e) { - LOG.error(String.format("Failed to load existing Column Stats index table from (%s)", indexTablePath), e); - return Option.empty(); - } - } - - @Nonnull - private static Dataset tryMergeMostRecentIndexTableInto( - @Nonnull SparkSession sparkSession, - @Nonnull Dataset newIndexTableDf, - @Nonnull Dataset existingIndexTableDf - ) { - // NOTE: If new col-stats index table schema is incompatible with that one of existing table - // that is most likely due to changing settings of list of Z-ordered columns, that - // occurred since last index table have been persisted. - // - // In that case, we simply drop existing index table and just persist the new one; - // - // Also note that we're checking compatibility of _old_ index-table with new one and that - // COMPATIBILITY OPERATION DOES NOT COMMUTE (ie if A is compatible w/ B, - // B might not necessarily be compatible w/ A) - if (!areCompatible(existingIndexTableDf.schema(), newIndexTableDf.schema())) { - return newIndexTableDf; - } - - String randomSuffix = UUID.randomUUID().toString().replace("-", ""); - - String existingIndexTempTableName = "existingIndexTable_" + randomSuffix; - String newIndexTempTableName = "newIndexTable_" + randomSuffix; - - existingIndexTableDf.registerTempTable(existingIndexTempTableName); - newIndexTableDf.registerTempTable(newIndexTempTableName); - - List newTableColumns = Arrays.asList(newIndexTableDf.schema().fieldNames()); - - // Create merged table by doing full-out join - return sparkSession.sql(createIndexMergeSql(existingIndexTempTableName, newIndexTempTableName, newTableColumns)); - } - - /** - * @VisibleForTesting - */ - @Nonnull - public static StructType composeIndexSchema(@Nonnull List zorderedColumnsSchemas) { - List schema = new ArrayList<>(); - schema.add(new StructField(COLUMN_STATS_INDEX_FILE_COLUMN_NAME, StringType$.MODULE$, true, Metadata.empty())); - zorderedColumnsSchemas.forEach(colSchema -> { - schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType())); - schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType())); - schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME, LongType$.MODULE$)); - }); - return StructType$.MODULE$.apply(schema); - } - - private static StructField composeColumnStatStructType(String col, String statName, DataType dataType) { - return new StructField(composeZIndexColName(col, statName), dataType, true, Metadata.empty()); - } - - private static String composeZIndexColName(String col, String statName) { - // TODO add escaping for - return String.format("%s_%s", col, statName); - } - - private static Pair fetchMinMaxValues(@Nonnull DataType colType, - @Nonnull HoodieColumnRangeMetadata colMetadata) { - Comparable minValue = colMetadata.getMinValue(); - Comparable maxValue = colMetadata.getMaxValue(); - - checkState((minValue == null) == (maxValue == null), "Either both min/max values should be null or neither"); - - if (minValue == null || maxValue == null) { - return Pair.of(null, null); - } - - if (colType instanceof IntegerType) { - return Pair.of( - new Integer(minValue.toString()), - new Integer(maxValue.toString()) - ); - } else if (colType instanceof DoubleType) { - return Pair.of( - new Double(minValue.toString()), - new Double(maxValue.toString()) - ); - } else if (colType instanceof StringType) { - return Pair.of( - minValue.toString(), - maxValue.toString()); - } else if (colType instanceof DecimalType) { - return Pair.of( - new BigDecimal(minValue.toString()), - new BigDecimal(maxValue.toString())); - } else if (colType instanceof DateType) { - return Pair.of( - java.sql.Date.valueOf(minValue.toString()), - java.sql.Date.valueOf(maxValue.toString())); - } else if (colType instanceof LongType) { - return Pair.of( - new Long(minValue.toString()), - new Long(maxValue.toString())); - } else if (colType instanceof ShortType) { - return Pair.of( - new Short(minValue.toString()), - new Short(maxValue.toString())); - } else if (colType instanceof FloatType) { - return Pair.of( - new Float(minValue.toString()), - new Float(maxValue.toString())); - } else if (colType instanceof BinaryType) { - return Pair.of( - ((ByteBuffer) minValue).array(), - ((ByteBuffer) maxValue).array()); - } else if (colType instanceof BooleanType) { - return Pair.of( - Boolean.valueOf(minValue.toString()), - Boolean.valueOf(maxValue.toString())); - } else if (colType instanceof ByteType) { - return Pair.of( - Byte.valueOf(minValue.toString()), - Byte.valueOf(maxValue.toString())); - } else { - throw new HoodieException(String.format("Not support type: %s", colType)); - } - } - - /** - * @VisibleForTesting - */ - @Nonnull - static String createIndexMergeSql( - @Nonnull String originalIndexTable, - @Nonnull String newIndexTable, - @Nonnull List columns - ) { - StringBuilder selectBody = new StringBuilder(); - - for (int i = 0; i < columns.size(); ++i) { - String col = columns.get(i); - String originalTableColumn = String.format("%s.%s", originalIndexTable, col); - String newTableColumn = String.format("%s.%s", newIndexTable, col); - - selectBody.append( - // NOTE: We prefer values from the new index table, and fallback to the original one only - // in case it does not contain statistics for the given file path - String.format("if (%s is null, %s, %s) AS %s", newTableColumn, originalTableColumn, newTableColumn, col) - ); - - if (i < columns.size() - 1) { - selectBody.append(", "); - } - } - - return String.format( - "SELECT %s FROM %s FULL JOIN %s ON %s = %s", - selectBody, - originalIndexTable, - newIndexTable, - String.format("%s.%s", originalIndexTable, columns.get(0)), - String.format("%s.%s", newIndexTable, columns.get(0)) - ); - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/columnstats/TestColumnStatsIndexHelper.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/columnstats/TestColumnStatsIndexHelper.java deleted file mode 100644 index 3901a9378..000000000 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/columnstats/TestColumnStatsIndexHelper.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.index.columnstats; - -import org.junit.jupiter.api.Test; - -import java.util.Arrays; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class TestColumnStatsIndexHelper { - - @Test - public void testMergeSql() { - String q = ColumnStatsIndexHelper.createIndexMergeSql("old", "new", Arrays.asList("file", "a", "b")); - assertEquals( - "SELECT " - + "if (new.file is null, old.file, new.file) AS file, " - + "if (new.a is null, old.a, new.a) AS a, " - + "if (new.b is null, old.b, new.b) AS b " - + "FROM old FULL JOIN new ON old.file = new.file", q); - } - -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index ae41fa8eb..7c20be63b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -28,13 +28,16 @@ import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.typedLit import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Tag, Test} import java.math.BigInteger import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import scala.util.Random +// TODO repurpose to test Column Stats in Metadata Table +@Disabled +@Tag("functional") class TestColumnStatsIndex extends HoodieClientTestBase { var spark: SparkSession = _ @@ -80,12 +83,12 @@ class TestColumnStatsIndex extends HoodieClientTestBase { val zorderedColsSchemaFields = inputDf.schema.fields.filter(f => zorderedCols.contains(f.name)).toSeq // {@link TimestampType} is not supported, and will throw -- hence skipping "c4" - val newZIndexTableDf = - ColumnStatsIndexHelper.buildColumnStatsTableFor( - inputDf.sparkSession, - inputDf.inputFiles.toSeq.asJava, - zorderedColsSchemaFields.asJava - ) + val newZIndexTableDf = null +// ColumnStatsIndexHelper.buildColumnStatsTableFor( +// inputDf.sparkSession, +// inputDf.inputFiles.toSeq.asJava, +// zorderedColsSchemaFields.asJava +// ) val indexSchema = ColumnStatsIndexHelper.composeIndexSchema( @@ -137,15 +140,15 @@ class TestColumnStatsIndex extends HoodieClientTestBase { val firstCommitInstance = "0" val firstInputDf = spark.read.parquet(firstParquetTablePath) - ColumnStatsIndexHelper.updateColumnStatsIndexFor( - firstInputDf.sparkSession, - sourceTableSchema, - firstInputDf.inputFiles.toSeq.asJava, - zorderedCols.asJava, - testZIndexPath.toString, - firstCommitInstance, - Seq().asJava - ) +// ColumnStatsIndexHelper.updateColumnStatsIndexFor( +// firstInputDf.sparkSession, +// sourceTableSchema, +// firstInputDf.inputFiles.toSeq.asJava, +// zorderedCols.asJava, +// testZIndexPath.toString, +// firstCommitInstance, +// Seq().asJava +// ) // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able // to reliably retrieve it @@ -173,18 +176,18 @@ class TestColumnStatsIndex extends HoodieClientTestBase { .parquet(secondParquetTablePath) // - // Update Z-index table + // Update Column Stats table // - ColumnStatsIndexHelper.updateColumnStatsIndexFor( - secondInputDf.sparkSession, - sourceTableSchema, - secondInputDf.inputFiles.toSeq.asJava, - zorderedCols.asJava, - testZIndexPath.toString, - secondCommitInstance, - Seq(firstCommitInstance).asJava - ) +// ColumnStatsIndexHelper.updateColumnStatsIndexFor( +// secondInputDf.sparkSession, +// sourceTableSchema, +// secondInputDf.inputFiles.toSeq.asJava, +// zorderedCols.asJava, +// testZIndexPath.toString, +// secondCommitInstance, +// Seq(firstCommitInstance).asJava +// ) // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able // to reliably retrieve it @@ -200,56 +203,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase { assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(replace(mergedZIndexTable)))) } - @Test - def testColumnStatsTablesGarbageCollection(): Unit = { - val targetParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString - val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString - - bootstrapParquetInputTableFromJSON(sourceJSONTablePath, targetParquetTablePath) - - val inputDf = spark.read.parquet(targetParquetTablePath) - - val testColumnStatsIndexPath = new Path(tempDir.resolve("zindex").toAbsolutePath.toString) - val fs = testColumnStatsIndexPath.getFileSystem(spark.sparkContext.hadoopConfiguration) - - // Try to save statistics - ColumnStatsIndexHelper.updateColumnStatsIndexFor( - inputDf.sparkSession, - sourceTableSchema, - inputDf.inputFiles.toSeq.asJava, - Seq("c1","c2","c3","c5","c6","c7","c8").asJava, - testColumnStatsIndexPath.toString, - "2", - Seq("0", "1").asJava - ) - - // Save again - ColumnStatsIndexHelper.updateColumnStatsIndexFor( - inputDf.sparkSession, - sourceTableSchema, - inputDf.inputFiles.toSeq.asJava, - Seq("c1","c2","c3","c5","c6","c7","c8").asJava, - testColumnStatsIndexPath.toString, - "3", - Seq("0", "1", "2").asJava - ) - - // Test old index table being cleaned up - ColumnStatsIndexHelper.updateColumnStatsIndexFor( - inputDf.sparkSession, - sourceTableSchema, - inputDf.inputFiles.toSeq.asJava, - Seq("c1","c2","c3","c5","c6","c7","c8").asJava, - testColumnStatsIndexPath.toString, - "4", - Seq("0", "1", "3").asJava - ) - - assertEquals(!fs.exists(new Path(testColumnStatsIndexPath, "2")), true) - assertEquals(!fs.exists(new Path(testColumnStatsIndexPath, "3")), true) - assertEquals(fs.exists(new Path(testColumnStatsIndexPath, "4")), true) - } - @Test def testParquetMetadataRangeExtraction(): Unit = { val df = generateRandomDataFrame(spark) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala similarity index 95% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala index 0b0599fb2..d6a24532c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala @@ -27,20 +27,20 @@ import org.apache.spark.sql.hudi.TestHoodieSqlBase import org.apache.spark.sql.types.{IntegerType, StructField} import org.junit.jupiter.api.{Disabled, Tag, Test} -import scala.util.Random import scala.collection.JavaConversions._ +import scala.util.Random @Tag("functional") -object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { +object SpaceCurveOptimizeBenchmark extends TestHoodieSqlBase { def evalSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= { val sourceTableDF = spark.sql(s"select * from ${tableName}") val orderedColsTypes = Seq(StructField(co1, IntegerType), StructField(co2, IntegerType)) - val colStatsIndexTable = ColumnStatsIndexHelper - .buildColumnStatsTableFor(spark, sourceTableDF.inputFiles.toSeq, orderedColsTypes) - .collect() - .map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5))) + val colStatsIndexTable = + ColumnStatsIndexHelper.buildColumnStatsTableFor(spark, sourceTableDF.inputFiles.toSeq, orderedColsTypes) + .collect() + .map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5))) var hits = 0 for (fileStatRow <- colStatsIndexTable) {