diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 747470f1d..a9652a1b3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -77,6 +77,7 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -245,14 +246,17 @@ public abstract class HoodieTable implem public abstract HoodieWriteMetadata insertOverwriteTable(HoodieEngineContext context, String instantTime, I records); /** - * update statistics info for current table. - * to do adaptation, once RFC-27 is finished. + * Updates Metadata Indexes (like Z-Index) + * TODO rebase onto metadata table (post RFC-27) * - * @param context HoodieEngineContext - * @param instantTime Instant time for the replace action - * @param isOptimizeOperation whether current operation is OPTIMIZE type + * @param context instance of {@link HoodieEngineContext} + * @param instantTime instant of the carried operation triggering the update */ - public abstract void updateStatistics(HoodieEngineContext context, List stats, String instantTime, Boolean isOptimizeOperation); + public abstract void updateMetadataIndexes( + @Nonnull HoodieEngineContext context, + @Nonnull List stats, + @Nonnull String instantTime + ) throws Exception; public HoodieWriteConfig getConfig() { return config; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index a2e4962be..9aceffe44 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -66,6 +66,7 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import java.io.IOException; import java.util.Collections; import java.util.Iterator; @@ -244,7 +245,7 @@ public class HoodieFlinkCopyOnWriteTable } @Override - public void updateStatistics(HoodieEngineContext context, List stats, String instantTime, Boolean isOptimizeOperation) { + public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List stats, @Nonnull String instantTime) { throw new HoodieNotSupportedException("update statistics is not supported yet"); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index cead7aa47..62a6980d5 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; import java.util.Iterator; +import javax.annotation.Nonnull; import java.util.List; import java.util.Map; @@ -170,7 +171,7 @@ public class HoodieJavaCopyOnWriteTable } @Override - public void updateStatistics(HoodieEngineContext context, List stats, String instantTime, Boolean isOptimizeOperation) { + public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List stats, @Nonnull String instantTime) { throw new HoodieNotSupportedException("update statistics is not supported yet"); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 6672028a6..e31ec9b20 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -396,15 +396,16 @@ public class SparkRDDWriteClient extends this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); finalizeWrite(table, clusteringCommitTime, writeStats); writeTableMetadataForTableServices(table, metadata,clusteringInstant); - // try to save statistics info to hudi - if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) { - table.updateStatistics(context, writeStats, clusteringCommitTime, true); + // Update outstanding metadata indexes + if (config.isLayoutOptimizationEnabled() + && !config.getClusteringSortColumns().isEmpty()) { + table.updateMetadataIndexes(context, writeStats, clusteringCommitTime); } LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); table.getActiveTimeline().transitionReplaceInflightToComplete( HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } catch (IOException e) { + } catch (Exception e) { throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); } finally { this.txnManager.endTransaction(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java index b809f4243..03fdf5a5b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java @@ -33,7 +33,7 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.spark.ZCurveOptimizeHelper; +import org.apache.hudi.index.zorder.ZOrderingIndexHelper; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -79,10 +79,10 @@ public class RDDSpatialCurveOptimizationSortPartitioner createZIndexedDataFrameByMapValue(Dataset df, List zCols, int fileNum) { + Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); + int fieldNum = df.schema().fields().length; + List checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList()); + if (zCols.size() != checkCols.size()) { + return df; + } + // only one col to sort, no need to use z-order + if (zCols.size() == 1) { + return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(zCols.get(0))); + } + Map fieldMap = zCols + .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e))); + // z-sort + JavaRDD sortedRdd = df.toJavaRDD().map(row -> { + List zBytesList = fieldMap.entrySet().stream().map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + DataType dataType = field.dataType(); + if (dataType instanceof LongType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + } else if (dataType instanceof DoubleType) { + return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + } else if (dataType instanceof FloatType) { + return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + } else if (dataType instanceof StringType) { + return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + } else if (dataType instanceof DateType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + } else if (dataType instanceof TimestampType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + } else if (dataType instanceof ByteType) { + return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + } else if (dataType instanceof ShortType) { + return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + } else if (dataType instanceof DecimalType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return ZOrderingUtil.intTo8Byte(value ? 1 : 0); + } else if (dataType instanceof BinaryType) { + return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + } + return null; + }).filter(f -> f != null).collect(Collectors.toList()); + byte[][] zBytes = new byte[zBytesList.size()][]; + for (int i = 0; i < zBytesList.size(); i++) { + zBytes[i] = zBytesList.get(i); + } + List zVaules = new ArrayList<>(); + zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); + zVaules.add(ZOrderingUtil.interleaving(zBytes, 8)); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); + }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); + + // create new StructType + List newFields = new ArrayList<>(); + newFields.addAll(Arrays.asList(df.schema().fields())); + newFields.add(new StructField("zIndex", BinaryType$.MODULE$, true, Metadata.empty())); + + // create new DataFrame + return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex"); + } + + public static Dataset createZIndexedDataFrameByMapValue(Dataset df, String zCols, int fileNum) { + if (zCols == null || zCols.isEmpty() || fileNum <= 0) { + return df; + } + return createZIndexedDataFrameByMapValue(df, + Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum); + } + + public static Dataset createZIndexedDataFrameBySample(Dataset df, List zCols, int fileNum) { + return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum); + } + + public static Dataset createZIndexedDataFrameBySample(Dataset df, String zCols, int fileNum) { + if (zCols == null || zCols.isEmpty() || fileNum <= 0) { + return df; + } + return createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum); + } + + /** + * Parse min/max statistics from Parquet footers for provided columns and composes Z-index + * table in the following format with 3 statistics denominated for each Z-ordered column. + * For ex, if original table contained Z-ordered column {@code A}: + * + *
+   * +---------------------------+------------+------------+-------------+
+   * |          file             | A_minValue | A_maxValue | A_num_nulls |
+   * +---------------------------+------------+------------+-------------+
+   * | one_base_file.parquet     |          1 |         10 |           0 |
+   * | another_base_file.parquet |        -10 |          0 |           5 |
+   * +---------------------------+------------+------------+-------------+
+   * 
+ * + * NOTE: Currently {@link TimestampType} is not supported, since Parquet writer + * does not support statistics for it. + * + * TODO leverage metadata table after RFC-27 lands + * @VisibleForTesting + * + * @param sparkSession encompassing Spark session + * @param baseFilesPaths list of base-files paths to be sourced for Z-index + * @param zorderedColumnSchemas target Z-ordered columns + * @return Spark's {@link Dataset} holding an index table + */ + @Nonnull + public static Dataset buildZIndexTableFor( + @Nonnull SparkSession sparkSession, + @Nonnull List baseFilesPaths, + @Nonnull List zorderedColumnSchemas + ) { + SparkContext sc = sparkSession.sparkContext(); + JavaSparkContext jsc = new JavaSparkContext(sc); + + SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); + int numParallelism = (baseFilesPaths.size() / 3 + 1); + List> colMinMaxInfos; + String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION); + try { + jsc.setJobDescription("Listing parquet column statistics"); + colMinMaxInfos = + jsc.parallelize(baseFilesPaths, numParallelism) + .mapPartitions(paths -> { + ParquetUtils utils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + Iterable iterable = () -> paths; + return StreamSupport.stream(iterable.spliterator(), false) + .flatMap(path -> + utils.readRangeFromParquetMetadata( + serializableConfiguration.value(), + new Path(path), + zorderedColumnSchemas.stream() + .map(StructField::name) + .collect(Collectors.toList()) + ) + .stream() + ) + .iterator(); + }) + .collect(); + } finally { + jsc.setJobDescription(previousJobDescription); + } + + // Group column's metadata by file-paths of the files it belongs to + Map>> filePathToColumnMetadataMap = + colMinMaxInfos.stream() + .collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getFilePath)); + + JavaRDD allMetaDataRDD = + jsc.parallelize(new ArrayList<>(filePathToColumnMetadataMap.values()), 1) + .map(fileColumnsMetadata -> { + int colSize = fileColumnsMetadata.size(); + if (colSize == 0) { + return null; + } + + String filePath = fileColumnsMetadata.get(0).getFilePath(); + + List indexRow = new ArrayList<>(); + + // First columns of the Z-index's row is target file-path + indexRow.add(filePath); + + // For each column + zorderedColumnSchemas.forEach(colSchema -> { + String colName = colSchema.name(); + + HoodieColumnRangeMetadata colMetadata = + fileColumnsMetadata.stream() + .filter(s -> s.getColumnName().trim().equalsIgnoreCase(colName)) + .findFirst() + .orElse(null); + + DataType colType = colSchema.dataType(); + if (colMetadata == null || colType == null) { + throw new HoodieException(String.format("Cannot collect min/max statistics for column (%s)", colSchema)); + } + + Pair minMaxValue = fetchMinMaxValues(colType, colMetadata); + + indexRow.add(minMaxValue.getLeft()); // min + indexRow.add(minMaxValue.getRight()); // max + indexRow.add(colMetadata.getNumNulls()); + }); + + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(indexRow)); + }) + .filter(Objects::nonNull); + + StructType indexSchema = composeIndexSchema(zorderedColumnSchemas); + + return sparkSession.createDataFrame(allMetaDataRDD, indexSchema); + } + + /** + *

+ * Updates state of the Z-index by: + *

    + *
  1. Updating Z-index with statistics for {@code sourceBaseFiles}, collecting corresponding + * column statistics from Parquet footers
  2. + *
  3. Merging newly built Z-index table with the most recent one (if present and not preempted)
  4. + *
  5. Cleans up any residual index tables, that weren't cleaned up before
  6. + *
+ * + * @param sparkSession encompassing Spark session + * @param sourceTableSchema instance of {@link StructType} bearing source table's writer's schema + * @param sourceBaseFiles list of base-files to be indexed + * @param zorderedCols target Z-ordered columns + * @param zindexFolderPath Z-index folder path + * @param commitTime current operation commit instant + * @param completedCommits all previously completed commit instants + */ + public static void updateZIndexFor( + @Nonnull SparkSession sparkSession, + @Nonnull StructType sourceTableSchema, + @Nonnull List sourceBaseFiles, + @Nonnull List zorderedCols, + @Nonnull String zindexFolderPath, + @Nonnull String commitTime, + @Nonnull List completedCommits + ) { + FileSystem fs = FSUtils.getFs(zindexFolderPath, sparkSession.sparkContext().hadoopConfiguration()); + + // Compose new Z-index table for the given source base files + Dataset newZIndexDf = + buildZIndexTableFor( + sparkSession, + sourceBaseFiles, + zorderedCols.stream() + .map(col -> sourceTableSchema.fields()[sourceTableSchema.fieldIndex(col)]) + .collect(Collectors.toList()) + ); + + try { + // + // Z-Index has the following folder structure: + // + // .hoodie/ + // ├── .zindex/ + // │ ├── / + // │ │ ├── .parquet + // │ │ └── ... + // + // If index is currently empty (no persisted tables), we simply create one + // using clustering operation's commit instance as it's name + Path newIndexTablePath = new Path(zindexFolderPath, commitTime); + + if (!fs.exists(new Path(zindexFolderPath))) { + newZIndexDf.repartition(1) + .write() + .format("parquet") + .mode("overwrite") + .save(newIndexTablePath.toString()); + return; + } + + // Filter in all index tables (w/in {@code .zindex} folder) + List allIndexTables = + Arrays.stream( + fs.listStatus(new Path(zindexFolderPath)) + ) + .filter(FileStatus::isDirectory) + .map(f -> f.getPath().getName()) + .collect(Collectors.toList()); + + // Compile list of valid index tables that were produced as part + // of previously successfully committed iterations + List validIndexTables = + allIndexTables.stream() + .filter(completedCommits::contains) + .sorted() + .collect(Collectors.toList()); + + List tablesToCleanup = + allIndexTables.stream() + .filter(f -> !completedCommits.contains(f)) + .collect(Collectors.toList()); + + Dataset finalZIndexDf; + + // Before writing out new version of the Z-index table we need to merge it + // with the most recent one that were successfully persisted previously + if (validIndexTables.isEmpty()) { + finalZIndexDf = newZIndexDf; + } else { + // NOTE: That Parquet schema might deviate from the original table schema (for ex, + // by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it + // prior to merging, since merging might fail otherwise due to schemas incompatibility + finalZIndexDf = + tryMergeMostRecentIndexTableInto( + sparkSession, + newZIndexDf, + // Load current most recent Z-index table + sparkSession.read().load( + new Path(zindexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString() + ) + ); + + // Clean up all index tables (after creation of the new index) + tablesToCleanup.addAll(validIndexTables); + } + + // Persist new Z-index table + finalZIndexDf + .repartition(1) + .write() + .format("parquet") + .save(newIndexTablePath.toString()); + + // Clean up residual Z-index tables that have might have been dangling since + // previous iterations (due to intermittent failures during previous clean up) + tablesToCleanup.forEach(f -> { + try { + fs.delete(new Path(zindexFolderPath, f), true); + } catch (IOException ie) { + // NOTE: Exception is deliberately swallowed to not affect overall clustering operation, + // since failing Z-index table will be attempted to be cleaned up upon subsequent + // clustering iteration + LOG.warn(String.format("Failed to cleanup residual Z-index table: %s", f), ie); + } + }); + } catch (IOException e) { + LOG.error("Failed to build new Z-index table", e); + throw new HoodieException("Failed to build new Z-index table", e); + } + } + + @Nonnull + private static Dataset tryMergeMostRecentIndexTableInto( + @Nonnull SparkSession sparkSession, + @Nonnull Dataset newIndexTableDf, + @Nonnull Dataset existingIndexTableDf + ) { + // NOTE: If new Z-index table schema is incompatible with that one of existing table + // that is most likely due to changing settings of list of Z-ordered columns, that + // occurred since last index table have been persisted. + // + // In that case, we simply drop existing index table and just persist the new one; + // + // Also note that we're checking compatibility of _old_ index-table with new one and that + // COMPATIBILITY OPERATION DOES NOT COMMUTE (ie if A is compatible w/ B, + // B might not necessarily be compatible w/ A) + if (!areCompatible(existingIndexTableDf.schema(), newIndexTableDf.schema())) { + return newIndexTableDf; + } + + String randomSuffix = UUID.randomUUID().toString().replace("-", ""); + + String existingIndexTempTableName = "existingIndexTable_" + randomSuffix; + String newIndexTempTableName = "newIndexTable_" + randomSuffix; + + existingIndexTableDf.registerTempTable(existingIndexTempTableName); + newIndexTableDf.registerTempTable(newIndexTempTableName); + + List newTableColumns = Arrays.asList(newIndexTableDf.schema().fieldNames()); + + // Create merged table by doing full-out join + return sparkSession.sql(createIndexMergeSql(existingIndexTempTableName, newIndexTempTableName, newTableColumns)); + } + + /** + * @VisibleForTesting + */ + @Nonnull + public static StructType composeIndexSchema(@Nonnull List zorderedColumnsSchemas) { + List schema = new ArrayList<>(); + schema.add(new StructField(Z_INDEX_FILE_COLUMN_NAME, StringType$.MODULE$, true, Metadata.empty())); + zorderedColumnsSchemas.forEach(colSchema -> { + schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType())); + schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType())); + schema.add(composeColumnStatStructType(colSchema.name(), Z_INDEX_NUM_NULLS_STAT_NAME, LongType$.MODULE$)); + }); + return StructType$.MODULE$.apply(schema); + } + + private static StructField composeColumnStatStructType(String col, String statName, DataType dataType) { + return new StructField(composeZIndexColName(col, statName), dataType, true, Metadata.empty()); + } + + @Nullable + private static String mapToSourceTableColumnName(StructField fieldStruct) { + String name = fieldStruct.name(); + int maxStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MAX_VALUE_STAT_NAME)); + if (maxStatSuffixIdx != -1) { + return name.substring(0, maxStatSuffixIdx); + } + + int minStatSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_MIN_VALUE_STAT_NAME)); + if (minStatSuffixIdx != -1) { + return name.substring(0, minStatSuffixIdx); + } + + int numNullsSuffixIdx = name.lastIndexOf(String.format("_%s", Z_INDEX_NUM_NULLS_STAT_NAME)); + if (numNullsSuffixIdx != -1) { + return name.substring(0, numNullsSuffixIdx); + } + + return null; + } + + private static String composeZIndexColName(String col, String statName) { + // TODO add escaping for + return String.format("%s_%s", col, statName); + } + + private static Pair + fetchMinMaxValues( + @Nonnull DataType colType, + @Nonnull HoodieColumnRangeMetadata colMetadata) { + if (colType instanceof IntegerType) { + return Pair.of( + new Integer(colMetadata.getMinValue().toString()), + new Integer(colMetadata.getMaxValue().toString()) + ); + } else if (colType instanceof DoubleType) { + return Pair.of( + new Double(colMetadata.getMinValue().toString()), + new Double(colMetadata.getMaxValue().toString()) + ); + } else if (colType instanceof StringType) { + return Pair.of( + new String(((Binary) colMetadata.getMinValue()).getBytes()), + new String(((Binary) colMetadata.getMaxValue()).getBytes()) + ); + } else if (colType instanceof DecimalType) { + return Pair.of( + new BigDecimal(colMetadata.getMinValue().toString()), + new BigDecimal(colMetadata.getMaxValue().toString())); + } else if (colType instanceof DateType) { + return Pair.of( + java.sql.Date.valueOf(colMetadata.getMinValue().toString()), + java.sql.Date.valueOf(colMetadata.getMaxValue().toString())); + } else if (colType instanceof LongType) { + return Pair.of( + new Long(colMetadata.getMinValue().toString()), + new Long(colMetadata.getMaxValue().toString())); + } else if (colType instanceof ShortType) { + return Pair.of( + new Short(colMetadata.getMinValue().toString()), + new Short(colMetadata.getMaxValue().toString())); + } else if (colType instanceof FloatType) { + return Pair.of( + new Float(colMetadata.getMinValue().toString()), + new Float(colMetadata.getMaxValue().toString())); + } else if (colType instanceof BinaryType) { + return Pair.of( + ((Binary) colMetadata.getMinValue()).getBytes(), + ((Binary) colMetadata.getMaxValue()).getBytes()); + } else if (colType instanceof BooleanType) { + return Pair.of( + Boolean.valueOf(colMetadata.getMinValue().toString()), + Boolean.valueOf(colMetadata.getMaxValue().toString())); + } else if (colType instanceof ByteType) { + return Pair.of( + Byte.valueOf(colMetadata.getMinValue().toString()), + Byte.valueOf(colMetadata.getMaxValue().toString())); + } else { + throw new HoodieException(String.format("Not support type: %s", colType)); + } + } + + /** + * @VisibleForTesting + */ + @Nonnull + static String createIndexMergeSql( + @Nonnull String originalIndexTable, + @Nonnull String newIndexTable, + @Nonnull List columns + ) { + StringBuilder selectBody = new StringBuilder(); + + for (int i = 0; i < columns.size(); ++i) { + String col = columns.get(i); + String originalTableColumn = String.format("%s.%s", originalIndexTable, col); + String newTableColumn = String.format("%s.%s", newIndexTable, col); + + selectBody.append( + // NOTE: We prefer values from the new index table, and fallback to the original one only + // in case it does not contain statistics for the given file path + String.format("if (%s is null, %s, %s) AS %s", newTableColumn, originalTableColumn, newTableColumn, col) + ); + + if (i < columns.size() - 1) { + selectBody.append(", "); + } + } + + return String.format( + "SELECT %s FROM %s FULL JOIN %s ON %s = %s", + selectBody, + originalIndexTable, + newIndexTable, + String.format("%s.%s", originalIndexTable, columns.get(0)), + String.format("%s.%s", newIndexTable, columns.get(0)) + ); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 74d4718a9..280d24f71 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -18,7 +18,10 @@ package org.apache.hudi.table; +import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -37,9 +40,11 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -73,11 +78,12 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.ZCurveOptimizeHelper; +import org.apache.hudi.index.zorder.ZOrderingIndexHelper; import org.apache.spark.api.java.JavaRDD; -import scala.collection.JavaConversions; +import javax.annotation.Nonnull; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -163,29 +169,61 @@ public class HoodieSparkCopyOnWriteTable } @Override - public void updateStatistics(HoodieEngineContext context, List stats, String instantTime, Boolean isOptimizeOperation) { - // deal with z-order/hilbert statistic info - if (isOptimizeOperation) { - updateOptimizeOperationStatistics(context, stats, instantTime); - } + public void updateMetadataIndexes(@Nonnull HoodieEngineContext context, @Nonnull List stats, @Nonnull String instantTime) throws Exception { + // Updates Z-ordering Index + updateZIndex(context, stats, instantTime); } - private void updateOptimizeOperationStatistics(HoodieEngineContext context, List stats, String instantTime) { - String cols = config.getClusteringSortColumns(); + private void updateZIndex( + @Nonnull HoodieEngineContext context, + @Nonnull List updatedFilesStats, + @Nonnull String instantTime + ) throws Exception { + String sortColsList = config.getClusteringSortColumns(); String basePath = metaClient.getBasePath(); String indexPath = metaClient.getZindexPath(); - List validateCommits = metaClient.getCommitsTimeline() - .filterCompletedInstants().getInstants().map(f -> f.getTimestamp()).collect(Collectors.toList()); - List touchFiles = stats.stream().map(s -> new Path(basePath, s.getPath()).toString()).collect(Collectors.toList()); - if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) { - LOG.warn("save nothing to index table"); + + List completedCommits = + metaClient.getCommitsTimeline() + .filterCompletedInstants() + .getInstants() + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + + List touchedFiles = + updatedFilesStats.stream() + .map(s -> new Path(basePath, s.getPath()).toString()) + .collect(Collectors.toList()); + + if (touchedFiles.isEmpty() || StringUtils.isNullOrEmpty(sortColsList) || StringUtils.isNullOrEmpty(indexPath)) { return; } + + LOG.info(String.format("Updating Z-index table (%s)", indexPath)); + + List sortCols = Arrays.stream(sortColsList.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context; - ZCurveOptimizeHelper.saveStatisticsInfo(sparkEngineContext - .getSqlContext().sparkSession().read().load(JavaConversions.asScalaBuffer(touchFiles)), - cols, indexPath, instantTime, validateCommits); - LOG.info(String.format("save statistic info sucessfully at commitTime: %s", instantTime)); + + // Fetch table schema to appropriately construct Z-index schema + Schema tableWriteSchema = + HoodieAvroUtils.createHoodieWriteSchema( + new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields() + ); + + ZOrderingIndexHelper.updateZIndexFor( + sparkEngineContext.getSqlContext().sparkSession(), + AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema), + touchedFiles, + sortCols, + indexPath, + instantTime, + completedCommits + ); + + LOG.info(String.format("Successfully updated Z-index at instant (%s)", instantTime)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java new file mode 100644 index 000000000..b934f5f6e --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.spark.sql.types.ByteType$; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DoubleType$; +import org.apache.spark.sql.types.FloatType$; +import org.apache.spark.sql.types.IntegerType$; +import org.apache.spark.sql.types.LongType$; +import org.apache.spark.sql.types.ShortType$; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.VarcharType$; + +import javax.annotation.Nonnull; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class DataTypeUtils { + + private static Map, Set>> sparkPrimitiveTypesCompatibilityMap = + new HashMap, Set>>() {{ + + // Integral types + put(ShortType$.class, + newHashSet(ByteType$.class, ShortType$.class)); + put(IntegerType$.class, + newHashSet(ByteType$.class, ShortType$.class, IntegerType$.class)); + put(LongType$.class, + newHashSet(ByteType$.class, ShortType$.class, IntegerType$.class, LongType$.class)); + + // Float types + put(DoubleType$.class, + newHashSet(FloatType$.class, DoubleType$.class)); + + // String types + put(StringType$.class, + newHashSet(VarcharType$.class, StringType$.class)); + }}; + + /** + * Validates whether one {@link StructType} is compatible w/ the other one. + * Compatibility rules are defined like following: types A and B are considered + * compatible iff + * + *
    + *
  1. A and B are identical
  2. + *
  3. All values comprising A domain are contained w/in B domain (for ex, {@code ShortType} + * in this sense is compatible w/ {@code IntegerType})
  4. + *
+ * + * @param left operand + * @param right operand + * @return true if {@code left} instance of {@link StructType} is compatible w/ the {@code right} + */ + public static boolean areCompatible(@Nonnull DataType left, @Nonnull DataType right) { + // First, check if types are equal + if (Objects.equals(left, right)) { + return true; + } + + // If not, check whether both are instances of {@code StructType} that + // should be matched structurally + if (left instanceof StructType && right instanceof StructType) { + return areCompatible((StructType) left, (StructType) right); + } + + // If not, simply check if those data-types constitute compatibility + // relationship outlined above; otherwise return false + return sparkPrimitiveTypesCompatibilityMap.getOrDefault(left.getClass(), Collections.emptySet()) + .contains(right.getClass()); + } + + private static boolean areCompatible(@Nonnull StructType left, @Nonnull StructType right) { + StructField[] oneSchemaFields = left.fields(); + StructField[] anotherSchemaFields = right.fields(); + + if (oneSchemaFields.length != anotherSchemaFields.length) { + return false; + } + + for (int i = 0; i < oneSchemaFields.length; ++i) { + StructField oneField = oneSchemaFields[i]; + StructField anotherField = anotherSchemaFields[i]; + // NOTE: Metadata is deliberately omitted from comparison + if (!Objects.equals(oneField.name(), anotherField.name()) + || !areCompatible(oneField.dataType(), anotherField.dataType()) + || oneField.nullable() != anotherField.nullable()) { + return false; + } + } + + return true; + } + + private static HashSet newHashSet(T... ts) { + return new HashSet<>(Arrays.asList(ts)); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java deleted file mode 100644 index 72274c248..000000000 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java +++ /dev/null @@ -1,356 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark; - -import scala.collection.JavaConversions; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hudi.HoodieSparkUtils$; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieColumnRangeMetadata; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.util.BaseFileUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.optimize.ZOrderingUtil; -import org.apache.parquet.io.api.Binary; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.Row$; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.hudi.execution.RangeSampleSort$; -import org.apache.spark.sql.hudi.execution.ZorderingBinarySort; -import org.apache.spark.sql.types.BinaryType; -import org.apache.spark.sql.types.BinaryType$; -import org.apache.spark.sql.types.BooleanType; -import org.apache.spark.sql.types.ByteType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DateType; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.DoubleType; -import org.apache.spark.sql.types.FloatType; -import org.apache.spark.sql.types.IntegerType; -import org.apache.spark.sql.types.LongType; -import org.apache.spark.sql.types.LongType$; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.ShortType; -import org.apache.spark.sql.types.StringType; -import org.apache.spark.sql.types.StringType$; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType$; -import org.apache.spark.sql.types.TimestampType; -import org.apache.spark.util.SerializableConfiguration; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class ZCurveOptimizeHelper { - - private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; - - /** - * Create z-order DataFrame directly - * first, map all base type data to byte[8], then create z-order DataFrame - * only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte - * this method is more effective than createZIndexDataFrameBySample - * - * @param df a spark DataFrame holds parquet files to be read. - * @param zCols z-sort cols - * @param fileNum spark partition num - * @return a dataFrame sorted by z-order. - */ - public static Dataset createZIndexedDataFrameByMapValue(Dataset df, List zCols, int fileNum) { - Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); - int fieldNum = df.schema().fields().length; - List checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList()); - if (zCols.size() != checkCols.size()) { - return df; - } - // only one col to sort, no need to use z-order - if (zCols.size() == 1) { - return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(zCols.get(0))); - } - Map fieldMap = zCols - .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e))); - // z-sort - JavaRDD sortedRdd = df.toJavaRDD().map(row -> { - List zBytesList = fieldMap.entrySet().stream().map(entry -> { - int index = entry.getKey(); - StructField field = entry.getValue(); - DataType dataType = field.dataType(); - if (dataType instanceof LongType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); - } else if (dataType instanceof DoubleType) { - return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); - } else if (dataType instanceof IntegerType) { - return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); - } else if (dataType instanceof FloatType) { - return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); - } else if (dataType instanceof StringType) { - return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); - } else if (dataType instanceof DateType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); - } else if (dataType instanceof TimestampType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); - } else if (dataType instanceof ByteType) { - return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); - } else if (dataType instanceof ShortType) { - return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); - } else if (dataType instanceof DecimalType) { - return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); - } else if (dataType instanceof BooleanType) { - boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return ZOrderingUtil.intTo8Byte(value ? 1 : 0); - } else if (dataType instanceof BinaryType) { - return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); - } - return null; - }).filter(f -> f != null).collect(Collectors.toList()); - byte[][] zBytes = new byte[zBytesList.size()][]; - for (int i = 0; i < zBytesList.size(); i++) { - zBytes[i] = zBytesList.get(i); - } - List zVaules = new ArrayList<>(); - zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); - zVaules.add(ZOrderingUtil.interleaving(zBytes, 8)); - return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); - }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); - - // create new StructType - List newFields = new ArrayList<>(); - newFields.addAll(Arrays.asList(df.schema().fields())); - newFields.add(new StructField("zIndex", BinaryType$.MODULE$, true, Metadata.empty())); - - // create new DataFrame - return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex"); - } - - public static Dataset createZIndexedDataFrameByMapValue(Dataset df, String zCols, int fileNum) { - if (zCols == null || zCols.isEmpty() || fileNum <= 0) { - return df; - } - return createZIndexedDataFrameByMapValue(df, - Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum); - } - - public static Dataset createZIndexedDataFrameBySample(Dataset df, List zCols, int fileNum) { - return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum); - } - - public static Dataset createZIndexedDataFrameBySample(Dataset df, String zCols, int fileNum) { - if (zCols == null || zCols.isEmpty() || fileNum <= 0) { - return df; - } - return createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum); - } - - /** - * Parse min/max statistics stored in parquet footers for z-sort cols. - * no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType. - * to do adapt for rfc-27 - * - * @param df a spark DataFrame holds parquet files to be read. - * @param cols z-sort cols - * @return a dataFrame holds all statistics info. - */ - public static Dataset getMinMaxValue(Dataset df, List cols) { - Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType())); - - List scanFiles = Arrays.asList(df.inputFiles()); - SparkContext sc = df.sparkSession().sparkContext(); - JavaSparkContext jsc = new JavaSparkContext(sc); - - SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); - int numParallelism = (scanFiles.size() / 3 + 1); - List> colMinMaxInfos; - String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION); - try { - jsc.setJobDescription("Listing parquet column statistics"); - colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> { - Configuration conf = serializableConfiguration.value(); - ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); - List>> results = new ArrayList<>(); - while (paths.hasNext()) { - String path = paths.next(); - results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols)); - } - return results.stream().flatMap(f -> f.stream()).iterator(); - }).collect(); - } finally { - jsc.setJobDescription(previousJobDescription); - } - - Map>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath())); - JavaRDD allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> { - int colSize = f.size(); - if (colSize == 0) { - return null; - } else { - List rows = new ArrayList<>(); - rows.add(f.get(0).getFilePath()); - cols.stream().forEach(col -> { - HoodieColumnRangeMetadata currentColRangeMetaData = - f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null); - DataType colType = columnsMap.get(col); - if (currentColRangeMetaData == null || colType == null) { - throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col)); - } - if (colType instanceof IntegerType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof DoubleType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof StringType) { - rows.add(currentColRangeMetaData.getMinValueAsString()); - rows.add(currentColRangeMetaData.getMaxValueAsString()); - } else if (colType instanceof DecimalType) { - rows.add(new BigDecimal(currentColRangeMetaData.getMinValueAsString())); - rows.add(new BigDecimal(currentColRangeMetaData.getMaxValueAsString())); - } else if (colType instanceof DateType) { - rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValueAsString())); - rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValueAsString())); - } else if (colType instanceof LongType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof ShortType) { - rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString())); - rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString())); - } else if (colType instanceof FloatType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof BinaryType) { - rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes()); - rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes()); - } else if (colType instanceof BooleanType) { - rows.add(currentColRangeMetaData.getMinValue()); - rows.add(currentColRangeMetaData.getMaxValue()); - } else if (colType instanceof ByteType) { - rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString())); - rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString())); - } else { - throw new HoodieException(String.format("Not support type: %s", colType)); - } - rows.add(currentColRangeMetaData.getNumNulls()); - }); - return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows)); - } - }).filter(f -> f != null); - List allMetaDataSchema = new ArrayList<>(); - allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty())); - cols.forEach(col -> { - allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty())); - allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty())); - allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty())); - }); - return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema)); - } - - public static Dataset getMinMaxValue(Dataset df, String cols) { - List rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList()); - return getMinMaxValue(df, rawCols); - } - - /** - * Update statistics info. - * this method will update old index table by full out join, - * and save the updated table into a new index table based on commitTime. - * old index table will be cleaned also. - * - * @param df a spark DataFrame holds parquet files to be read. - * @param cols z-sort cols. - * @param indexPath index store path. - * @param commitTime current operation commitTime. - * @param validateCommits all validate commits for current table. - * @return - */ - public static void saveStatisticsInfo(Dataset df, String cols, String indexPath, String commitTime, List validateCommits) { - Path savePath = new Path(indexPath, commitTime); - SparkSession spark = df.sparkSession(); - FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration()); - Dataset statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols); - // try to find last validate index table from index path - try { - // If there's currently no index, create one - if (!fs.exists(new Path(indexPath))) { - statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); - return; - } - - // Otherwise, clean up all indexes but the most recent one - - List allIndexTables = Arrays - .stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList()); - List candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList()); - List residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList()); - Option latestIndexData = Option.empty(); - if (!candidateIndexTables.isEmpty()) { - latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString())); - // clean old index table, keep at most 1 index table. - candidateIndexTables.remove(candidateIndexTables.size() - 1); - candidateIndexTables.forEach(f -> { - try { - fs.delete(new Path(indexPath, f)); - } catch (IOException ie) { - throw new HoodieException(ie); - } - }); - } - - // clean residualTables - // retried cluster operations at the same instant time is also considered, - // the residual files produced by retried are cleaned up before save statistics - // save statistics info to index table which named commitTime - residualTables.forEach(f -> { - try { - fs.delete(new Path(indexPath, f)); - } catch (IOException ie) { - throw new HoodieException(ie); - } - }); - - if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) { - // update the statistics info - String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); - String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); - latestIndexData.get().registerTempTable(originalTable); - statisticsDF.registerTempTable(updateTable); - // update table by full out join - List columns = Arrays.asList(statisticsDF.schema().fieldNames()); - spark.sql(HoodieSparkUtils$ - .MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString()); - } else { - statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); - } - } catch (IOException e) { - throw new HoodieException(e); - } - } -} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index 11cc49594..2900f08cc 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -20,7 +20,6 @@ package org.apache.hudi import java.nio.ByteBuffer import java.sql.{Date, Timestamp} -import java.util import org.apache.avro.Conversions.DecimalConversion import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} @@ -318,7 +317,7 @@ object AvroConversionHelper { } else { val sourceArray = item.asInstanceOf[Seq[Any]] val sourceArraySize = sourceArray.size - val targetList = new util.ArrayList[Any](sourceArraySize) + val targetList = new java.util.ArrayList[Any](sourceArraySize) var idx = 0 while (idx < sourceArraySize) { targetList.add(elementConverter(sourceArray(idx))) @@ -336,7 +335,7 @@ object AvroConversionHelper { if (item == null) { null } else { - val javaMap = new util.HashMap[String, Any]() + val javaMap = new java.util.HashMap[String, Any]() item.asInstanceOf[Map[String, Any]].foreach { case (key, value) => javaMap.put(key, valueConverter(value)) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 360a08089..a791edfdf 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -287,43 +287,4 @@ object HoodieSparkUtils extends SparkAdapterSupport { s"${tableSchema.fieldNames.mkString(",")}") AttributeReference(columnName, field.get.dataType, field.get.nullable)() } - - /** - * Create merge sql to merge leftTable and right table. - * - * @param leftTable table name. - * @param rightTable table name. - * @param cols merged cols. - * @return merge sql. - */ - def createMergeSql(leftTable: String, rightTable: String, cols: Seq[String]): String = { - var selectsql = "" - for (i <- cols.indices) { - selectsql = selectsql + s" if (${leftTable}.${cols(i)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ," - } - "select " + selectsql.dropRight(1) + s" from ${leftTable} full join ${rightTable} on ${leftTable}.${cols(0)} = ${rightTable}.${cols(0)}" - } - - /** - * Collect min/max statistics for candidate cols. - * support all col types. - * - * @param df dataFrame holds read files. - * @param cols candidate cols to collect statistics. - * @return - */ - def getMinMaxValueSpark(df: DataFrame, cols: Seq[String]): DataFrame = { - val sqlContext = df.sparkSession.sqlContext - import sqlContext.implicits._ - - val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount"))) - val valueCounts = count("*").as("totalNum") - val projectValues = Seq(col("file")) ++ cols.flatMap(c => - Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls"))) - - val result = df.select(input_file_name() as "file", col("*")) - .groupBy($"file") - .agg(valueCounts, values: _*).select(projectValues:_*) - result - } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index da993b754..c392f127d 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.hudi.execution -import java.util - import org.apache.hudi.config.HoodieClusteringConfig import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow} @@ -197,22 +195,22 @@ class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends S // For primitive keys, we can use the natural ordering. Otherwise, use the Ordering comparator. classTag[K] match { case ClassTag.Float => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float]) + (l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float]) case ClassTag.Double => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double]) + (l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double]) case ClassTag.Byte => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte]) + (l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte]) case ClassTag.Char => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char]) + (l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char]) case ClassTag.Short => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short]) + (l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short]) case ClassTag.Int => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int]) + (l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int]) case ClassTag.Long => - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long]) + (l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long]) case _ => val comparator = ordering.asInstanceOf[java.util.Comparator[Any]] - (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator) + (l, x) => java.util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator) } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/zorder/TestZOrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/zorder/TestZOrderingIndexHelper.java new file mode 100644 index 000000000..1fad1dc31 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/zorder/TestZOrderingIndexHelper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.index.zorder; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestZOrderingIndexHelper { + + @Test + public void testMergeSql() { + String q = ZOrderingIndexHelper.createIndexMergeSql("old", "new", Arrays.asList("file", "a", "b")); + assertEquals( + "SELECT " + + "if (new.file is null, old.file, new.file) AS file, " + + "if (new.a is null, old.a, new.a) AS a, " + + "if (new.b is null, old.b, new.b) AS b " + + "FROM old FULL JOIN new ON old.file = new.file", q); + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index 33491ff13..ca977ae53 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.model; +import org.apache.parquet.schema.PrimitiveStringifier; + import java.util.Objects; /** @@ -28,28 +30,16 @@ public class HoodieColumnRangeMetadata { private final String columnName; private final T minValue; private final T maxValue; - private long numNulls; - // For Decimal Type/Date Type, minValue/maxValue cannot represent it's original value. - // eg: when parquet collects column information, the decimal type is collected as int/binary type. - // so we cannot use minValue and maxValue directly, use minValueAsString/maxValueAsString instead. - private final String minValueAsString; - private final String maxValueAsString; + private final long numNulls; + private final PrimitiveStringifier stringifier; - public HoodieColumnRangeMetadata( - final String filePath, - final String columnName, - final T minValue, - final T maxValue, - long numNulls, - final String minValueAsString, - final String maxValueAsString) { + public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) { this.filePath = filePath; this.columnName = columnName; this.minValue = minValue; this.maxValue = maxValue; - this.numNulls = numNulls == -1 ? 0 : numNulls; - this.minValueAsString = minValueAsString; - this.maxValueAsString = maxValueAsString; + this.numNulls = numNulls; + this.stringifier = stringifier; } public String getFilePath() { @@ -68,12 +58,8 @@ public class HoodieColumnRangeMetadata { return this.maxValue; } - public String getMaxValueAsString() { - return maxValueAsString; - } - - public String getMinValueAsString() { - return minValueAsString; + public PrimitiveStringifier getStringifier() { + return stringifier; } public long getNumNulls() { @@ -93,14 +79,12 @@ public class HoodieColumnRangeMetadata { && Objects.equals(getColumnName(), that.getColumnName()) && Objects.equals(getMinValue(), that.getMinValue()) && Objects.equals(getMaxValue(), that.getMaxValue()) - && Objects.equals(getNumNulls(), that.getNumNulls()) - && Objects.equals(getMinValueAsString(), that.getMinValueAsString()) - && Objects.equals(getMaxValueAsString(), that.getMaxValueAsString()); + && Objects.equals(getNumNulls(), that.getNumNulls()); } @Override public int hashCode() { - return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls(), getMinValueAsString(), getMaxValueAsString()); + return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls()); } @Override @@ -110,8 +94,6 @@ public class HoodieColumnRangeMetadata { + "columnName='" + columnName + '\'' + ", minValue=" + minValue + ", maxValue=" + maxValue - + ", numNulls=" + numNulls - + ", minValueAsString=" + minValueAsString - + ", minValueAsString=" + maxValueAsString + '}'; + + ", numNulls=" + numNulls + '}'; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 985c9788e..136206150 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -18,6 +18,10 @@ package org.apache.hudi.common.util; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; @@ -26,11 +30,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.MetadataNotFoundException; import org.apache.hudi.keygen.BaseKeyGenerator; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroSchemaConverter; @@ -38,12 +37,15 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import javax.annotation.Nonnull; import java.io.IOException; +import java.math.BigDecimal; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -51,14 +53,13 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Utility functions involving with parquet. */ public class ParquetUtils extends BaseFileUtils { - private static Object lock = new Object(); - /** * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will * return all the rowkeys. @@ -286,95 +287,97 @@ public class ParquetUtils extends BaseFileUtils { /** * Parse min/max statistics stored in parquet footers for all columns. - * ParquetRead.readFooter is not a thread safe method. - * - * @param conf hadoop conf. - * @param parquetFilePath file to be read. - * @param cols cols which need to collect statistics. - * @return a HoodieColumnRangeMetadata instance. */ - public Collection> readRangeFromParquetMetadata( - Configuration conf, - Path parquetFilePath, - List cols) { + public List> readRangeFromParquetMetadata( + @Nonnull Configuration conf, + @Nonnull Path parquetFilePath, + @Nonnull List cols + ) { ParquetMetadata metadata = readMetadata(conf, parquetFilePath); - // collect stats from all parquet blocks - Map>> columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> { - return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> { - String minAsString; - String maxAsString; - if (columnChunkMetaData.getPrimitiveType().getOriginalType() == OriginalType.DATE) { - synchronized (lock) { - minAsString = columnChunkMetaData.getStatistics().minAsString(); - maxAsString = columnChunkMetaData.getStatistics().maxAsString(); - } - } else { - minAsString = columnChunkMetaData.getStatistics().minAsString(); - maxAsString = columnChunkMetaData.getStatistics().maxAsString(); - } - return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(), - columnChunkMetaData.getStatistics().genericGetMin(), - columnChunkMetaData.getStatistics().genericGetMax(), - columnChunkMetaData.getStatistics().getNumNulls(), - minAsString, maxAsString); - }); - }).collect(Collectors.groupingBy(e -> e.getColumnName())); + // Collect stats from all individual Parquet blocks + Map>> columnToStatsListMap = metadata.getBlocks().stream().sequential() + .flatMap(blockMetaData -> blockMetaData.getColumns().stream() + .filter(f -> cols.contains(f.getPath().toDotString())) + .map(columnChunkMetaData -> + new HoodieColumnRangeMetadata( + parquetFilePath.getName(), + columnChunkMetaData.getPath().toDotString(), + convertToNativeJavaType( + columnChunkMetaData.getPrimitiveType(), + columnChunkMetaData.getStatistics().genericGetMin()), + convertToNativeJavaType( + columnChunkMetaData.getPrimitiveType(), + columnChunkMetaData.getStatistics().genericGetMax()), + columnChunkMetaData.getStatistics().getNumNulls(), + columnChunkMetaData.getPrimitiveType().stringifier())) + ).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName)); - // we only intend to keep file level statistics. - return new ArrayList<>(columnToStatsListMap.values().stream() - .map(blocks -> getColumnRangeInFile(blocks)) - .collect(Collectors.toList())); + // Combine those into file-level statistics + // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer + // expression type correctly) + Stream> stream = columnToStatsListMap.values() + .stream() + .map(this::getColumnRangeInFile); + + return stream.collect(Collectors.toList()); } - private HoodieColumnRangeMetadata getColumnRangeInFile(final List> blockRanges) { + private > HoodieColumnRangeMetadata getColumnRangeInFile( + @Nonnull List> blockRanges + ) { if (blockRanges.size() == 1) { // only one block in parquet file. we can just return that range. return blockRanges.get(0); - } else { - // there are multiple blocks. Compute min(block_mins) and max(block_maxs) - return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, b2)).get(); } + + // there are multiple blocks. Compute min(block_mins) and max(block_maxs) + return blockRanges.stream() + .sequential() + .reduce(this::combineRanges).get(); } - private HoodieColumnRangeMetadata combineRanges(HoodieColumnRangeMetadata range1, - HoodieColumnRangeMetadata range2) { - final Comparable minValue; - final Comparable maxValue; - final String minValueAsString; - final String maxValueAsString; - if (range1.getMinValue() != null && range2.getMinValue() != null) { - if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) { - minValue = range1.getMinValue(); - minValueAsString = range1.getMinValueAsString(); - } else { - minValue = range2.getMinValue(); - minValueAsString = range2.getMinValueAsString(); - } - } else if (range1.getMinValue() == null) { - minValue = range2.getMinValue(); - minValueAsString = range2.getMinValueAsString(); + private > HoodieColumnRangeMetadata combineRanges( + HoodieColumnRangeMetadata one, + HoodieColumnRangeMetadata another + ) { + final T minValue; + final T maxValue; + if (one.getMinValue() != null && another.getMinValue() != null) { + minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ? one.getMinValue() : another.getMinValue(); + } else if (one.getMinValue() == null) { + minValue = another.getMinValue(); } else { - minValue = range1.getMinValue(); - minValueAsString = range1.getMinValueAsString(); + minValue = one.getMinValue(); } - if (range1.getMaxValue() != null && range2.getMaxValue() != null) { - if (range1.getMaxValue().compareTo(range2.getMaxValue()) < 0) { - maxValue = range2.getMaxValue(); - maxValueAsString = range2.getMaxValueAsString(); - } else { - maxValue = range1.getMaxValue(); - maxValueAsString = range1.getMaxValueAsString(); - } - } else if (range1.getMaxValue() == null) { - maxValue = range2.getMaxValue(); - maxValueAsString = range2.getMaxValueAsString(); + if (one.getMaxValue() != null && another.getMaxValue() != null) { + maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ? another.getMaxValue() : one.getMaxValue(); + } else if (one.getMaxValue() == null) { + maxValue = another.getMaxValue(); } else { - maxValue = range1.getMaxValue(); - maxValueAsString = range1.getMaxValueAsString(); + maxValue = one.getMaxValue(); } - return new HoodieColumnRangeMetadata<>(range1.getFilePath(), - range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), minValueAsString, maxValueAsString); + return new HoodieColumnRangeMetadata( + one.getFilePath(), + one.getColumnName(), minValue, maxValue, one.getNumNulls() + another.getNumNulls(), one.getStringifier()); + } + + private static Comparable convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) { + if (primitiveType.getOriginalType() == OriginalType.DECIMAL) { + DecimalMetadata decimalMetadata = primitiveType.getDecimalMetadata(); + return BigDecimal.valueOf((Integer) val, decimalMetadata.getScale()); + } else if (primitiveType.getOriginalType() == OriginalType.DATE) { + // NOTE: This is a workaround to address race-condition in using + // {@code SimpleDataFormat} concurrently (w/in {@code DateStringifier}) + // TODO cleanup after Parquet upgrade to 1.12 + synchronized (primitiveType.stringifier()) { + return java.sql.Date.valueOf( + primitiveType.stringifier().stringify((Integer) val) + ); + } + } + + return val; } } diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml index 11ebeca39..cf39f0c4c 100644 --- a/hudi-spark-datasource/hudi-spark/pom.xml +++ b/hudi-spark-datasource/hudi-spark/pom.xml @@ -48,6 +48,7 @@ -nobootcp + -target:jvm-1.8 false diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index bc4557f9c..add7fabf6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -30,21 +30,24 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging +import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} -import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlUtils} +import org.apache.spark.sql.hudi.DataSkippingUtils.createZIndexLookupFilter +import org.apache.spark.sql.hudi.HoodieSqlUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{AnalysisException, Column, SparkSession} import org.apache.spark.unsafe.types.UTF8String import java.util.Properties - -import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.util.{Failure, Success, Try} /** * A file index which support partition prune for hoodie snapshot and read-optimized query. @@ -169,16 +172,16 @@ case class HoodieFileIndex( * ultimately be scanned as part of query execution. Hence, this method has to maintain the * invariant of conservatively including every base-file's name, that is NOT referenced in its index. * - * @param dataFilters list of original data filters passed down from querying engine + * @param queryFilters list of original data filters passed down from querying engine * @return list of pruned (data-skipped) candidate base-files' names */ - private def lookupCandidateFilesNamesInZIndex(dataFilters: Seq[Expression]): Option[Set[String]] = { + private def lookupCandidateFilesInZIndex(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { val indexPath = metaClient.getZindexPath val fs = metaClient.getFs - if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || dataFilters.isEmpty) { + if (!enableDataSkipping() || !fs.exists(new Path(indexPath)) || queryFilters.isEmpty) { // scalastyle:off return - return Option.empty + return Success(Option.empty) // scalastyle:on return } @@ -192,7 +195,7 @@ case class HoodieFileIndex( if (candidateIndexTables.isEmpty) { // scalastyle:off return - return Option.empty + return Success(Option.empty) // scalastyle:on return } @@ -207,7 +210,7 @@ case class HoodieFileIndex( dataFrameOpt.map(df => { val indexSchema = df.schema val indexFilter = - dataFilters.map(DataSkippingUtils.createZIndexLookupFilter(_, indexSchema)) + queryFilters.map(createZIndexLookupFilter(_, indexSchema)) .reduce(And) logInfo(s"Index filter condition: $indexFilter") @@ -221,7 +224,7 @@ case class HoodieFileIndex( .toSet val prunedCandidateFileNames = - df.filter(new Column(indexFilter)) + df.where(new Column(indexFilter)) .select("file") .collect() .map(_.getString(0)) @@ -261,11 +264,22 @@ case class HoodieFileIndex( // - Data-skipping is enabled // - Z-index is present // - List of predicates (filters) is present - val candidateFilesNamesOpt: Option[Set[String]] = lookupCandidateFilesNamesInZIndex(dataFilters) + val candidateFilesNamesOpt: Option[Set[String]] = + lookupCandidateFilesInZIndex(dataFilters) match { + case Success(opt) => opt + case Failure(e) => + if (e.isInstanceOf[AnalysisException]) { + logDebug("Failed to relay provided data filters to Z-index lookup", e) + } else { + logError("Failed to lookup candidate files in Z-index", e) + } + Option.empty + } logDebug(s"Overlapping candidate files (from Z-index): ${candidateFilesNamesOpt.getOrElse(Set.empty)}") - if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table. + if (queryAsNonePartitionedTable) { + // Read as Non-Partitioned table // Filter in candidate files based on the Z-index lookup val candidateFiles = allFiles.filter(fileStatus => @@ -273,9 +287,10 @@ case class HoodieFileIndex( candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName)) ) - logInfo(s"Total files : ${allFiles.size}," + - s" candidate files after data skipping: ${candidateFiles.size} " + - s" skipping percent ${if (allFiles.length != 0) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}") + logInfo(s"Total files : ${allFiles.size}; " + + s"candidate files after data skipping: ${candidateFiles.size}; " + + s"skipping percent ${if (allFiles.nonEmpty) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}") + Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) } else { // Prune the partition path by the partition filters @@ -284,27 +299,27 @@ case class HoodieFileIndex( var candidateFileSize = 0 val result = prunedPartitions.map { partition => - val baseFileStatuses = cachedAllInputFileSlices(partition).map(fileSlice => { - if (fileSlice.getBaseFile.isPresent) { - fileSlice.getBaseFile.get().getFileStatus - } else { - null - } - }).filterNot(_ == null) + val baseFileStatuses: Seq[FileStatus] = + cachedAllInputFileSlices(partition) + .map(fs => fs.getBaseFile.orElse(null)) + .filter(_ != null) + .map(_.getFileStatus) // Filter in candidate files based on the Z-index lookup val candidateFiles = - baseFileStatuses.filter(fileStatus => + baseFileStatuses.filter(fs => // NOTE: This predicate is true when {@code Option} is empty - candidateFilesNamesOpt.forall(_.contains(fileStatus.getPath.getName))) + candidateFilesNamesOpt.forall(_.contains(fs.getPath.getName))) totalFileSize += baseFileStatuses.size candidateFileSize += candidateFiles.size PartitionDirectory(partition.values, candidateFiles) } - logInfo(s"Total files: ${totalFileSize}," + - s" Candidate files after data skipping : ${candidateFileSize} " + - s"skipping percent ${if (allFiles.length != 0) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}") + + logInfo(s"Total base files: ${totalFileSize}; " + + s"candidate files after data skipping : ${candidateFileSize}; " + + s"skipping percent ${if (allFiles.nonEmpty) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}") + result } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 63b9c4d61..8ebc896fc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -54,7 +54,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkContext} -import java.util import java.util.Properties import scala.collection.JavaConversions._ @@ -289,7 +288,7 @@ object HoodieSparkSqlWriter { } def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = { - val fieldsToRemove = new util.ArrayList[String]() + val fieldsToRemove = new java.util.ArrayList[String]() partitionParam.split(",").map(partitionField => partitionField.trim) .filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field)) HoodieAvroUtils.removeFields(schema, fieldsToRemove) @@ -629,7 +628,7 @@ object HoodieSparkSqlWriter { kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key))) val commitSuccess = client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses, - common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))), + common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap))), tableInstantInfo.commitActionType, writeResult.getPartitionToReplaceFileIds) @@ -643,7 +642,7 @@ object HoodieSparkSqlWriter { val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration()) val compactionInstant: common.util.Option[java.lang.String] = if (asyncCompactionEnabled) { - client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + client.scheduleCompaction(common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap)))) } else { common.util.Option.empty() } @@ -653,7 +652,7 @@ object HoodieSparkSqlWriter { val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters) val clusteringInstant: common.util.Option[java.lang.String] = if (asyncClusteringEnabled) { - client.scheduleClustering(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + client.scheduleClustering(common.util.Option.of(new java.util.HashMap[String, String](mapAsJavaMap(metaMap)))) } else { common.util.Option.empty() } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index 90980395a..5aebe93d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -19,9 +19,11 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.hudi.index.zorder.ZOrderingIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor} +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith} import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -29,181 +31,230 @@ import org.apache.spark.sql.functions.col import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ -object DataSkippingUtils { +object DataSkippingUtils extends Logging { /** * Translates provided {@link filterExpr} into corresponding filter-expression for Z-index index table * to filter out candidate files that would hold records matching the original filter * - * @param filterExpr original filter from query + * @param sourceFilterExpr original filter from query * @param indexSchema index table schema * @return filter for Z-index table */ - def createZIndexLookupFilter(filterExpr: Expression, indexSchema: StructType): Expression = { - - def rewriteCondition(colName: Seq[String], conditionExpress: Expression): Expression = { - val stats = Set.apply( - UnresolvedAttribute(colName).name + "_minValue", - UnresolvedAttribute(colName).name + "_maxValue", - UnresolvedAttribute(colName).name + "_num_nulls" - ) - - if (stats.forall(stat => indexSchema.exists(_.name == stat))) { - conditionExpress - } else { - Literal.TrueLiteral - } + def createZIndexLookupFilter(sourceFilterExpr: Expression, indexSchema: StructType): Expression = { + // Try to transform original Source Table's filter expression into + // Column-Stats Index filter expression + tryComposeIndexFilterExpr(sourceFilterExpr, indexSchema) match { + case Some(e) => e + // NOTE: In case we can't transform source filter expression, we fallback + // to {@code TrueLiteral}, to essentially avoid pruning any indexed files from scanning + case None => TrueLiteral } + } - def refColExpr(colName: Seq[String], statisticValue: String): Expression = - col(UnresolvedAttribute(colName).name + statisticValue).expr + private def tryComposeIndexFilterExpr(sourceExpr: Expression, indexSchema: StructType): Option[Expression] = { + def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr + def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr + def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr - def minValue(colName: Seq[String]) = refColExpr(colName, "_minValue") - def maxValue(colName: Seq[String]) = refColExpr(colName, "_maxValue") - def numNulls(colName: Seq[String]) = refColExpr(colName, "_num_nulls") - - def colContainsValuesEqualToLiteral(colName: Seq[String], value: Literal) = + def colContainsValuesEqualToLiteral(colName: String, value: Literal): Expression = + // Only case when column C contains value V is when min(C) <= V <= max(c) And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value)) - def colContainsValuesEqualToLiterals(colName: Seq[String], list: Seq[Literal]) = - list.map { lit => colContainsValuesEqualToLiteral(colName, lit) }.reduce(Or) + def colContainsOnlyValuesEqualToLiteral(colName: String, value: Literal) = + // Only case when column C contains _only_ value V is when min(C) = V AND max(c) = V + And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value)) - filterExpr match { + sourceExpr match { // Filter "colA = b" // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup case EqualTo(attribute: AttributeReference, value: Literal) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, value)) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => colContainsValuesEqualToLiteral(colName, value)) + // Filter "b = colA" // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition for index lookup case EqualTo(value: Literal, attribute: AttributeReference) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, value)) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => colContainsValuesEqualToLiteral(colName, value)) + + // Filter "colA != b" + // Translates to "NOT(colA_minValue = b AND colA_maxValue = b)" + // NOTE: This is NOT an inversion of `colA = b` + case Not(EqualTo(attribute: AttributeReference, value: Literal)) => + getTargetIndexedColName(attribute, indexSchema) + .map(colName => Not(colContainsOnlyValuesEqualToLiteral(colName, value))) + + // Filter "b != colA" + // Translates to "NOT(colA_minValue = b AND colA_maxValue = b)" + // NOTE: This is NOT an inversion of `colA = b` + case Not(EqualTo(value: Literal, attribute: AttributeReference)) => + getTargetIndexedColName(attribute, indexSchema) + .map(colName => Not(colContainsOnlyValuesEqualToLiteral(colName, value))) + // Filter "colA = null" // Translates to "colA_num_nulls = null" for index lookup case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) => - val colName = getTargetColNameParts(equalNullSafe.left) - rewriteCondition(colName, EqualTo(numNulls(colName), equalNullSafe.right)) + getTargetIndexedColName(equalNullSafe.left, indexSchema) + .map(colName => EqualTo(numNulls(colName), equalNullSafe.right)) + // Filter "colA < b" // Translates to "colA_minValue < b" for index lookup case LessThan(attribute: AttributeReference, value: Literal) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, LessThan(minValue(colName), value)) - // Filter "b < colA" - // Translates to "b < colA_maxValue" for index lookup - case LessThan(value: Literal, attribute: AttributeReference) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, GreaterThan(maxValue(colName), value)) - // Filter "colA > b" - // Translates to "colA_maxValue > b" for index lookup - case GreaterThan(attribute: AttributeReference, value: Literal) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, GreaterThan(maxValue(colName), value)) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => LessThan(minValue(colName), value)) + // Filter "b > colA" // Translates to "b > colA_minValue" for index lookup case GreaterThan(value: Literal, attribute: AttributeReference) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, LessThan(minValue(colName), value)) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => LessThan(minValue(colName), value)) + + // Filter "b < colA" + // Translates to "b < colA_maxValue" for index lookup + case LessThan(value: Literal, attribute: AttributeReference) => + getTargetIndexedColName(attribute, indexSchema) + .map(colName => GreaterThan(maxValue(colName), value)) + + // Filter "colA > b" + // Translates to "colA_maxValue > b" for index lookup + case GreaterThan(attribute: AttributeReference, value: Literal) => + getTargetIndexedColName(attribute, indexSchema) + .map(colName => GreaterThan(maxValue(colName), value)) + // Filter "colA <= b" // Translates to "colA_minValue <= b" for index lookup case LessThanOrEqual(attribute: AttributeReference, value: Literal) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, LessThanOrEqual(minValue(colName), value)) - // Filter "b <= colA" - // Translates to "b <= colA_maxValue" for index lookup - case LessThanOrEqual(value: Literal, attribute: AttributeReference) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value)) - // Filter "colA >= b" - // Translates to "colA_maxValue >= b" for index lookup - case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right)) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => LessThanOrEqual(minValue(colName), value)) + // Filter "b >= colA" // Translates to "b >= colA_minValue" for index lookup case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, LessThanOrEqual(minValue(colName), value)) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => LessThanOrEqual(minValue(colName), value)) + + // Filter "b <= colA" + // Translates to "b <= colA_maxValue" for index lookup + case LessThanOrEqual(value: Literal, attribute: AttributeReference) => + getTargetIndexedColName(attribute, indexSchema) + .map(colName => GreaterThanOrEqual(maxValue(colName), value)) + + // Filter "colA >= b" + // Translates to "colA_maxValue >= b" for index lookup + case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) => + getTargetIndexedColName(attribute, indexSchema) + .map(colName => GreaterThanOrEqual(maxValue(colName), right)) + // Filter "colA is null" // Translates to "colA_num_nulls > 0" for index lookup case IsNull(attribute: AttributeReference) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, GreaterThan(numNulls(colName), Literal(0))) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => GreaterThan(numNulls(colName), Literal(0))) + // Filter "colA is not null" // Translates to "colA_num_nulls = 0" for index lookup case IsNotNull(attribute: AttributeReference) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, EqualTo(numNulls(colName), Literal(0))) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => EqualTo(numNulls(colName), Literal(0))) + // Filter "colA in (a, b, ...)" // Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR (colA_minValue <= b AND colA_maxValue >= b)" for index lookup + // NOTE: This is equivalent to "colA = a OR colA = b OR ..." case In(attribute: AttributeReference, list: Seq[Literal]) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, colContainsValuesEqualToLiterals(colName, list)) - // Filter "colA like xxx" + getTargetIndexedColName(attribute, indexSchema) + .map(colName => + list.map { lit => colContainsValuesEqualToLiteral(colName, lit) }.reduce(Or) + ) + + // Filter "colA not in (a, b, ...)" + // Translates to "NOT((colA_minValue = a AND colA_maxValue = a) OR (colA_minValue = b AND colA_maxValue = b))" for index lookup + // NOTE: This is NOT an inversion of `in (a, b, ...)` expr, this is equivalent to "colA != a AND colA != b AND ..." + case Not(In(attribute: AttributeReference, list: Seq[Literal])) => + getTargetIndexedColName(attribute, indexSchema) + .map(colName => + Not( + list.map { lit => colContainsOnlyValuesEqualToLiteral(colName, lit) }.reduce(Or) + ) + ) + + // Filter "colA like 'xxx%'" // Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for index lookup // NOTE: That this operator only matches string prefixes, and this is // essentially equivalent to "colA = b" expression case StartsWith(attribute, v @ Literal(_: UTF8String, _)) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, colContainsValuesEqualToLiteral(colName, v)) - // Filter "colA not in (a, b, ...)" - // Translates to "(colA_minValue > a OR colA_maxValue < a) AND (colA_minValue > b OR colA_maxValue < b)" for index lookup - // NOTE: This is an inversion of `in (a, b, ...)` expr - case Not(In(attribute: AttributeReference, list: Seq[Literal])) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, Not(colContainsValuesEqualToLiterals(colName, list))) - // Filter "colA != b" - // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an inversion of expr for "colA = b") for index lookup - // NOTE: This is an inversion of `colA = b` expr - case Not(EqualTo(attribute: AttributeReference, value: Literal)) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value))) - // Filter "b != colA" - // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an inversion of expr for "colA = b") for index lookup - // NOTE: This is an inversion of `colA != b` expr - case Not(EqualTo(value: Literal, attribute: AttributeReference)) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value))) - // Filter "colA not like xxx" - // Translates to "!(colA_minValue <= xxx AND colA_maxValue >= xxx)" for index lookup - // NOTE: This is a inversion of "colA like xxx" assuming that colA is a string-based type + getTargetIndexedColName(attribute, indexSchema) + .map(colName => colContainsValuesEqualToLiteral(colName, v)) + + // Filter "colA not like 'xxx%'" + // Translates to "NOT(colA_minValue like 'xxx%' AND colA_maxValue like 'xxx%')" for index lookup + // NOTE: This is NOT an inversion of "colA like xxx" case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) => - val colName = getTargetColNameParts(attribute) - rewriteCondition(colName, Not(colContainsValuesEqualToLiteral(colName, value))) + getTargetIndexedColName(attribute, indexSchema) + .map(colName => + Not(And(StartsWith(minValue(colName), value), StartsWith(maxValue(colName), value))) + ) case or: Or => val resLeft = createZIndexLookupFilter(or.left, indexSchema) val resRight = createZIndexLookupFilter(or.right, indexSchema) - Or(resLeft, resRight) + + Option(Or(resLeft, resRight)) case and: And => val resLeft = createZIndexLookupFilter(and.left, indexSchema) val resRight = createZIndexLookupFilter(and.right, indexSchema) - And(resLeft, resRight) - case expr: Expression => - Literal.TrueLiteral + Option(And(resLeft, resRight)) + + // + // Pushing Logical NOT inside the AND/OR expressions + // NOTE: This is required to make sure we're properly handling negations in + // cases like {@code NOT(colA = 0)}, {@code NOT(colA in (a, b, ...)} + // + + case Not(And(left: Expression, right: Expression)) => + Option(createZIndexLookupFilter(Or(Not(left), Not(right)), indexSchema)) + + case Not(Or(left: Expression, right: Expression)) => + Option(createZIndexLookupFilter(And(Not(left), Not(right)), indexSchema)) + + case _: Expression => None } } - /** - * Extracts name from a resolved expression referring to a nested or non-nested column. - */ - def getTargetColNameParts(resolvedTargetCol: Expression): Seq[String] = { + private def checkColIsIndexed(colName: String, indexSchema: StructType): Boolean = { + Set.apply( + getMinColumnNameFor(colName), + getMaxColumnNameFor(colName), + getNumNullsColumnNameFor(colName) + ) + .forall(stat => indexSchema.exists(_.name == stat)) + } + + private def getTargetIndexedColName(resolvedExpr: Expression, indexSchema: StructType): Option[String] = { + val colName = UnresolvedAttribute(getTargetColNameParts(resolvedExpr)).name + + // Verify that the column is indexed + if (checkColIsIndexed(colName, indexSchema)) { + Option.apply(colName) + } else { + None + } + } + + private def getTargetColNameParts(resolvedTargetCol: Expression): Seq[String] = { resolvedTargetCol match { case attr: Attribute => Seq(attr.name) - case Alias(c, _) => getTargetColNameParts(c) - case GetStructField(c, _, Some(name)) => getTargetColNameParts(c) :+ name - case ex: ExtractValue => throw new AnalysisException(s"convert reference to name failed, Updating nested fields is only supported for StructType: ${ex}.") - case other => throw new AnalysisException(s"convert reference to name failed, Found unsupported expression ${other}") } diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json new file mode 100644 index 000000000..5c876126a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json @@ -0,0 +1,8 @@ +{"c1_maxValue":1000,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 993sdc","c2_minValue":" 1000sdc","c2_num_nulls":0,"c3_maxValue":999.348,"c3_minValue":5.102,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"} +{"c1_maxValue":1000,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 996sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":999.779,"c3_minValue":2.992,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} +{"c1_maxValue":998,"c1_minValue":2,"c1_num_nulls":0,"c2_maxValue":" 998sdc","c2_minValue":" 104sdc","c2_num_nulls":0,"c3_maxValue":997.905,"c3_minValue":0.876,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"} +{"c1_maxValue":997,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 102sdc","c2_num_nulls":0,"c3_maxValue":990.531,"c3_minValue":2.336,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} +{"c1_maxValue":994,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":997.496,"c3_minValue":7.742,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"} +{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 999sdc","c2_minValue":" 100sdc","c2_num_nulls":0,"c3_maxValue":980.676,"c3_minValue":0.120,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} +{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 10sdc","c2_num_nulls":0,"c3_maxValue":993.940,"c3_minValue":4.598,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"} +{"c1_maxValue":998,"c1_minValue":6,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":999.282,"c3_minValue":1.217,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json new file mode 100644 index 000000000..45cb9aaf8 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json @@ -0,0 +1,4 @@ +{"c1_maxValue":1000,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 996sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":999.779,"c3_minValue":2.992,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} +{"c1_maxValue":997,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 102sdc","c2_num_nulls":0,"c3_maxValue":990.531,"c3_minValue":2.336,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} +{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 999sdc","c2_minValue":" 100sdc","c2_num_nulls":0,"c3_maxValue":980.676,"c3_minValue":0.120,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} +{"c1_maxValue":998,"c1_minValue":6,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":999.282,"c3_minValue":1.217,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala index 96fb18db3..ad372c462 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala @@ -46,7 +46,6 @@ import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, intercept} import java.time.Instant -import java.util import java.util.{Collections, Date, UUID} import scala.collection.JavaConversions._ @@ -147,7 +146,7 @@ class HoodieSparkSqlWriterSuite { * @param inputList list of Row * @return list of Seq */ - def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] = + def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala new file mode 100644 index 000000000..75da45354 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.index.zorder.ZOrderingIndexHelper +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Expression, Not} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.hudi.DataSkippingUtils +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, VarcharType} +import org.apache.spark.sql.{Column, SparkSession} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments.arguments +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +import scala.collection.JavaConverters._ + +// NOTE: Only A, B columns are indexed +case class IndexRow( + file: String, + A_minValue: Long, + A_maxValue: Long, + A_num_nulls: Long, + B_minValue: String = null, + B_maxValue: String = null, + B_num_nulls: Long = -1 +) + +class TestDataSkippingUtils extends HoodieClientTestBase { + + var spark: SparkSession = _ + + @BeforeEach + override def setUp(): Unit = { + initSparkContexts() + spark = sqlContext.sparkSession + } + + val indexedCols = Seq("A", "B") + val sourceTableSchema = + StructType( + Seq( + StructField("A", LongType), + StructField("B", StringType), + StructField("C", VarcharType(32)) + ) + ) + + val indexSchema = + ZOrderingIndexHelper.composeIndexSchema( + sourceTableSchema.fields.toSeq + .filter(f => indexedCols.contains(f.name)) + .asJava + ) + + @ParameterizedTest + @MethodSource(Array("testBaseLookupFilterExpressionsSource", "testAdvancedLookupFilterExpressionsSource")) + def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = { + val resolvedExpr: Expression = resolveFilterExpr(sourceExpr, sourceTableSchema) + + val lookupFilter = DataSkippingUtils.createZIndexLookupFilter(resolvedExpr, indexSchema) + + val spark2 = spark + import spark2.implicits._ + + val indexDf = spark.createDataset(input) + + val rows = indexDf.where(new Column(lookupFilter)) + .select("file") + .collect() + .map(_.getString(0)) + .toSeq + + assertEquals(output, rows) + } + + @ParameterizedTest + @MethodSource(Array("testStringsLookupFilterExpressionsSource")) + def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = { + val resolvedExpr = resolveFilterExpr(sourceExpr, sourceTableSchema) + val lookupFilter = DataSkippingUtils.createZIndexLookupFilter(resolvedExpr, indexSchema) + + val spark2 = spark + import spark2.implicits._ + + val indexDf = spark.createDataset(input) + + val rows = indexDf.where(new Column(lookupFilter)) + .select("file") + .collect() + .map(_.getString(0)) + .toSeq + + assertEquals(output, rows) + } + + private def resolveFilterExpr(exprString: String, tableSchema: StructType): Expression = { + val expr = spark.sessionState.sqlParser.parseExpression(exprString) + resolveFilterExpr(expr, tableSchema) + } + + private def resolveFilterExpr(expr: Expression, tableSchema: StructType): Expression = { + val schemaFields = tableSchema.fields + val resolvedExpr = spark.sessionState.analyzer.ResolveReferences( + Filter(expr, LocalRelation(schemaFields.head, schemaFields.drop(1): _*)) + ) + .asInstanceOf[Filter].condition + + checkForUnresolvedRefs(resolvedExpr) + } + + def checkForUnresolvedRefs(resolvedExpr: Expression): Expression = + resolvedExpr match { + case UnresolvedAttribute(_) => throw new IllegalStateException("unresolved attribute") + case _ => resolvedExpr.mapChildren(e => checkForUnresolvedRefs(e)) + } +} + +object TestDataSkippingUtils { + def testStringsLookupFilterExpressionsSource(): java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + arguments( + col("B").startsWith("abc").expr, + Seq( + IndexRow("file_1", 0, 0, 0, "aba", "adf", 1), // may contain strings starting w/ "abc" + IndexRow("file_2", 0, 0, 0, "adf", "azy", 0), + IndexRow("file_3", 0, 0, 0, "aaa", "aba", 0) + ), + Seq("file_1")), + arguments( + Not(col("B").startsWith("abc").expr), + Seq( + IndexRow("file_1", 0, 0, 0, "aba", "adf", 1), // may contain strings starting w/ "abc" + IndexRow("file_2", 0, 0, 0, "adf", "azy", 0), + IndexRow("file_3", 0, 0, 0, "aaa", "aba", 0), + IndexRow("file_4", 0, 0, 0, "abc123", "abc345", 0) // all strings start w/ "abc" + ), + Seq("file_1", "file_2", "file_3")) + ) + } + + def testBaseLookupFilterExpressionsSource(): java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + // TODO cases + // A = null + arguments( + "A = 0", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0) + ), + Seq("file_2")), + arguments( + "0 = A", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0) + ), + Seq("file_2")), + arguments( + "A != 0", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", 0, 0, 0) // Contains only 0s + ), + Seq("file_1", "file_2")), + arguments( + "0 != A", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", 0, 0, 0) // Contains only 0s + ), + Seq("file_1", "file_2")), + arguments( + "A < 0", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_2", "file_3")), + arguments( + "0 > A", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_2", "file_3")), + arguments( + "A > 0", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_1", "file_2")), + arguments( + "0 < A", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_1", "file_2")), + arguments( + "A <= -1", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_2", "file_3")), + arguments( + "-1 >= A", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_2", "file_3")), + arguments( + "A >= 1", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_1", "file_2")), + arguments( + "1 <= A", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_1", "file_2")), + arguments( + "A is null", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 1) + ), + Seq("file_2")), + arguments( + "A is not null", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 1) + ), + Seq("file_1")), + arguments( + "A in (0, 1)", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0) + ), + Seq("file_1", "file_2")), + arguments( + "A not in (0, 1)", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0), + IndexRow("file_4", 0, 0, 0), // only contains 0 + IndexRow("file_5", 1, 1, 0) // only contains 1 + ), + Seq("file_1", "file_2", "file_3")) + ) + } + + def testAdvancedLookupFilterExpressionsSource(): java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + arguments( + // Filter out all rows that contain either A = 0 OR A = 1 + "A != 0 AND A != 1", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0), + IndexRow("file_4", 0, 0, 0), // only contains 0 + IndexRow("file_5", 1, 1, 0) // only contains 1 + ), + Seq("file_1", "file_2", "file_3")), + arguments( + // This is an equivalent to the above expression + "NOT(A = 0 OR A = 1)", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0), + IndexRow("file_4", 0, 0, 0), // only contains 0 + IndexRow("file_5", 1, 1, 0) // only contains 1 + ), + Seq("file_1", "file_2", "file_3")), + + arguments( + // Filter out all rows that contain A = 0 AND B = 'abc' + "A != 0 OR B != 'abc'", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0), + IndexRow("file_4", 0, 0, 0, "abc", "abc", 0), // only contains A = 0, B = 'abc' + IndexRow("file_5", 0, 0, 0, "abc", "abc", 0) // only contains A = 0, B = 'abc' + ), + Seq("file_1", "file_2", "file_3")), + arguments( + // This is an equivalent to the above expression + "NOT(A = 0 AND B = 'abc')", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0), + IndexRow("file_4", 0, 0, 0, "abc", "abc", 0), // only contains A = 0, B = 'abc' + IndexRow("file_5", 0, 0, 0, "abc", "abc", 0) // only contains A = 0, B = 'abc' + ), + Seq("file_1", "file_2", "file_3")), + + arguments( + // Queries contains expression involving non-indexed column C + "A = 0 AND B = 'abc' AND C = '...'", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0), + IndexRow("file_4", 0, 0, 0, "aaa", "xyz", 0) // might contain A = 0 AND B = 'abc' + ), + Seq("file_4")), + + arguments( + // Queries contains expression involving non-indexed column C + "A = 0 OR B = 'abc' OR C = '...'", + Seq( + IndexRow("file_1", 1, 2, 0), + IndexRow("file_2", -1, 1, 0), + IndexRow("file_3", -2, -1, 0), + IndexRow("file_4", 0, 0, 0, "aaa", "xyz", 0) // might contain B = 'abc' + ), + Seq("file_1", "file_2", "file_3", "file_4")) + ) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 7c58cc07e..0c3918b4f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -59,7 +59,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase { DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL ) - @BeforeEach override def setUp() { + @BeforeEach + override def setUp() { setTableName("hoodie_test") initPath() initSparkContexts() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index 1b756b5e2..ad974286a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -19,20 +19,17 @@ package org.apache.hudi import org.apache.avro.generic.GenericRecord - -import java.io.File -import java.nio.file.Paths import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.spark.sql.avro.IncompatibleSchemaException -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SparkSession} -import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue, fail} +import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir -import java.util +import java.io.File +import java.nio.file.Paths import scala.collection.JavaConverters class TestHoodieSparkUtils { @@ -235,6 +232,6 @@ class TestHoodieSparkUtils { spark.stop() } - def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] = + def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala deleted file mode 100644 index 2fddefb32..000000000 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.functional - -import org.apache.hadoop.fs.Path -import org.apache.hudi.common.model.HoodieFileFormat -import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} -import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings -import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils} -import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} -import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} -import org.apache.spark.ZCurveOptimizeHelper -import org.apache.spark.sql._ -import org.apache.spark.sql.types._ -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource - -import java.sql.{Date, Timestamp} -import scala.collection.JavaConversions._ -import scala.util.Random - -@Tag("functional") -class TestTableLayoutOptimization extends HoodieClientTestBase { - var spark: SparkSession = _ - - val commonOpts = Map( - "hoodie.insert.shuffle.parallelism" -> "4", - "hoodie.upsert.shuffle.parallelism" -> "4", - "hoodie.bulkinsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", - DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" - ) - - @BeforeEach override def setUp() { - initPath() - initSparkContexts() - spark = sqlContext.sparkSession - initTestDataGenerator() - initFileSystem() - } - - @AfterEach override def tearDown() = { - cleanupSparkContexts() - cleanupTestDataGenerator() - cleanupFileSystem() - } - - @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testOptimizeWithClustering(tableType: String): Unit = { - val targetRecordsCount = 10000 - // Bulk Insert Operation - val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList - val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) - - writeDf.write.format("org.apache.hudi") - .options(commonOpts) - .option("hoodie.compact.inline", "false") - .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) - // option for clustering - .option("hoodie.parquet.small.file.limit", "0") - .option("hoodie.clustering.inline", "true") - .option("hoodie.clustering.inline.max.commits", "1") - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") - .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") - .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) - .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) - .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") - .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") - .mode(SaveMode.Overwrite) - .save(basePath) - - val readDf = - spark.read - .format("hudi") - .load(basePath) - - val readDfSkip = - spark.read - .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") - .format("hudi") - .load(basePath) - - assertEquals(targetRecordsCount, readDf.count()) - assertEquals(targetRecordsCount, readDfSkip.count()) - - readDf.createOrReplaceTempView("hudi_snapshot_raw") - readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping") - - def select(tableName: String) = - spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51") - - assertRowsMatch( - select("hudi_snapshot_raw"), - select("hudi_snapshot_skipping") - ) - } - - def assertRowsMatch(one: DataFrame, other: DataFrame) = { - val rows = one.count() - assert(rows == other.count() && one.intersect(other).count() == rows) - } - - @Test - def testCollectMinMaxStatistics(): Unit = { - val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax") - val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat") - val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration) - val complexDataFrame = createComplexDataFrame(spark) - complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString) - val df = spark.read.load(testPath.toString) - try { - // test z-order sort for all primitive type, should not throw exception. - ZCurveOptimizeHelper.createZIndexedDataFrameByMapValue(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1) - ZCurveOptimizeHelper.createZIndexedDataFrameBySample(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1) - // do not support TimeStampType, so if we collect statistics for c4, should throw exception - val colDf = ZCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8") - colDf.cache() - assertEquals(colDf.count(), 3) - assertEquals(colDf.take(1)(0).length, 22) - colDf.unpersist() - // try to save statistics - ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1")) - // save again - ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2")) - // test old index table clean - ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3")) - assertEquals(!fs.exists(new Path(statisticPath, "2")), true) - assertEquals(fs.exists(new Path(statisticPath, "3")), true) - // test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved. - ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4")) - assertEquals(fs.exists(new Path(statisticPath, "5")), true) - } finally { - if (fs.exists(testPath)) fs.delete(testPath) - if (fs.exists(statisticPath)) fs.delete(statisticPath) - } - } - - // test collect min-max statistic info for DateType in the case of multithreading. - // parquet will give a wrong statistic result for DateType in the case of multithreading. - @Test - def testMultiThreadParquetFooterReadForDateType(): Unit = { - // create parquet file with DateType - val rdd = spark.sparkContext.parallelize(0 to 100, 1) - .map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}"))) - val df = spark.createDataFrame(rdd, new StructType().add("id", DateType)) - val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType") - val conf = spark.sparkContext.hadoopConfiguration - val cols = new java.util.ArrayList[String] - cols.add("id") - try { - df.repartition(3).write.mode("overwrite").save(testPath.toString) - val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x) - - val realResult = new Array[(String, String)](3) - inputFiles.zipWithIndex.foreach { case (f, index) => - val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] - val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() - realResult(index) = (res.getMinValueAsString, res.getMaxValueAsString) - } - - // multi thread read with no lock - val resUseLock = new Array[(String, String)](3) - inputFiles.zipWithIndex.par.foreach { case (f, index) => - val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] - val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() - resUseLock(index) = (res.getMinValueAsString, res.getMaxValueAsString) - } - - // check resUseNoLock, - // We can't guarantee that there must be problems in the case of multithreading. - // In order to make ut pass smoothly, we will not check resUseNoLock. - // check resUseLock - // should pass assert - realResult.zip(resUseLock).foreach { case (realValue, testValue) => - assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}") - } - } finally { - if (fs.exists(testPath)) fs.delete(testPath) - } - } - - def createComplexDataFrame(spark: SparkSession): DataFrame = { - val schema = new StructType() - .add("c1", IntegerType) - .add("c2", StringType) - .add("c3", DecimalType(9,3)) - .add("c4", TimestampType) - .add("c5", ShortType) - .add("c6", DateType) - .add("c7", BinaryType) - .add("c8", ByteType) - - val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => - val c1 = Integer.valueOf(item) - val c2 = s" ${item}sdc" - val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}") - val c4 = new Timestamp(System.currentTimeMillis()) - val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}") - val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}") - val c7 = Array(item).map(_.toByte) - val c8 = java.lang.Byte.valueOf("9") - - RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8) - } - spark.createDataFrame(rdd, schema) - } -} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestZOrderLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestZOrderLayoutOptimization.scala new file mode 100644 index 000000000..657b4f16e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestZOrderLayoutOptimization.scala @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.{LocatedFileStatus, Path} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} +import org.apache.hudi.index.zorder.ZOrderingIndexHelper +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.typedLit +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Tag, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.sql.{Date, Timestamp} +import scala.collection.JavaConversions._ +import scala.util.Random + +@Tag("functional") +class TestZOrderLayoutOptimization extends HoodieClientTestBase { + var spark: SparkSession = _ + + val sourceTableSchema = + new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DecimalType(9,3)) + .add("c4", TimestampType) + .add("c5", ShortType) + .add("c6", DateType) + .add("c7", BinaryType) + .add("c8", ByteType) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + @BeforeEach + override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach + override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testZOrderingLayoutClustering(tableType: String): Unit = { + val targetRecordsCount = 10000 + // Bulk Insert Operation + val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList + val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + writeDf.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) + // option for clustering + .option("hoodie.parquet.small.file.limit", "0") + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") + .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") + .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) + .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") + .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") + .mode(SaveMode.Overwrite) + .save(basePath) + + val hudiMetaClient = HoodieTableMetaClient.builder + .setConf(hadoopConf) + .setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true) + .build + + val lastCommit = hudiMetaClient.getActiveTimeline.getAllCommitsTimeline.lastInstant().get() + + assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, lastCommit.getAction) + assertEquals(HoodieInstant.State.COMPLETED, lastCommit.getState) + + val readDf = + spark.read + .format("hudi") + .load(basePath) + + val readDfSkip = + spark.read + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") + .format("hudi") + .load(basePath) + + assertEquals(targetRecordsCount, readDf.count()) + assertEquals(targetRecordsCount, readDfSkip.count()) + + readDf.createOrReplaceTempView("hudi_snapshot_raw") + readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping") + + def select(tableName: String) = + spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51") + + assertRowsMatch( + select("hudi_snapshot_raw"), + select("hudi_snapshot_skipping") + ) + } + + @Test + @Disabled + def testZIndexTableComposition(): Unit = { + val inputDf = + // NOTE: Schema here is provided for validation that the input date is in the appropriate format + spark.read + .schema(sourceTableSchema) + .parquet( + getClass.getClassLoader.getResource("index/zorder/input-table").toString + ) + + val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8") + val zorderedColsSchemaFields = inputDf.schema.fields.filter(f => zorderedCols.contains(f.name)).toSeq + + // {@link TimestampType} is not supported, and will throw -- hence skipping "c4" + val newZIndexTableDf = + ZOrderingIndexHelper.buildZIndexTableFor( + inputDf.sparkSession, + inputDf.inputFiles.toSeq, + zorderedColsSchemaFields + ) + + val indexSchema = + ZOrderingIndexHelper.composeIndexSchema( + sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq + ) + + // Collect Z-index stats manually (reading individual Parquet files) + val manualZIndexTableDf = + buildZIndexTableManually( + getClass.getClassLoader.getResource("index/zorder/input-table").toString, + zorderedCols, + indexSchema + ) + + // NOTE: Z-index is built against stats collected w/in Parquet footers, which will be + // represented w/ corresponding Parquet schema (INT, INT64, INT96, etc). + // + // When stats are collected manually, produced Z-index table is inherently coerced into the + // schema of the original source Parquet base-file and therefore we have to similarly coerce newly + // built Z-index table (built off Parquet footers) into the canonical index schema (built off the + // original source file schema) + assertEquals(asJson(sort(manualZIndexTableDf)), asJson(sort(newZIndexTableDf))) + + // Match against expected Z-index table + val expectedZIndexTableDf = + spark.read + .schema(indexSchema) + .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString) + + assertEquals(asJson(sort(expectedZIndexTableDf)), asJson(sort(newZIndexTableDf))) + } + + @Test + @Disabled + def testZIndexTableMerge(): Unit = { + val testZIndexPath = new Path(basePath, "zindex") + + val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8") + val indexSchema = + ZOrderingIndexHelper.composeIndexSchema( + sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq + ) + + // + // Bootstrap Z-index table + // + + val firstCommitInstance = "0" + val firstInputDf = + spark.read.parquet( + getClass.getClassLoader.getResource("index/zorder/input-table").toString + ) + + ZOrderingIndexHelper.updateZIndexFor( + firstInputDf.sparkSession, + sourceTableSchema, + firstInputDf.inputFiles.toSeq, + zorderedCols.toSeq, + testZIndexPath.toString, + firstCommitInstance, + Seq() + ) + + // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able + // to reliably retrieve it + val initialZIndexTable = + spark.read + .parquet(new Path(testZIndexPath, firstCommitInstance).toString) + + val expectedInitialZIndexTableDf = + spark.read + .schema(indexSchema) + .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString) + + assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(initialZIndexTable))) + + val secondCommitInstance = "1" + val secondInputDf = + spark.read + .schema(sourceTableSchema) + .parquet( + getClass.getClassLoader.getResource("index/zorder/another-input-table").toString + ) + + // + // Update Z-index table + // + + ZOrderingIndexHelper.updateZIndexFor( + secondInputDf.sparkSession, + sourceTableSchema, + secondInputDf.inputFiles.toSeq, + zorderedCols.toSeq, + testZIndexPath.toString, + secondCommitInstance, + Seq(firstCommitInstance) + ) + + // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able + // to reliably retrieve it + val mergedZIndexTable = + spark.read + .parquet(new Path(testZIndexPath, secondCommitInstance).toString) + + val expectedMergedZIndexTableDf = + spark.read + .schema(indexSchema) + .json(getClass.getClassLoader.getResource("index/zorder/z-index-table-merged.json").toString) + + assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(mergedZIndexTable))) + } + + @Test + @Disabled + def testZIndexTablesGarbageCollection(): Unit = { + val testZIndexPath = new Path(System.getProperty("java.io.tmpdir"), "zindex") + val fs = testZIndexPath.getFileSystem(spark.sparkContext.hadoopConfiguration) + + val inputDf = + spark.read.parquet( + getClass.getClassLoader.getResource("index/zorder/input-table").toString + ) + + // Try to save statistics + ZOrderingIndexHelper.updateZIndexFor( + inputDf.sparkSession, + sourceTableSchema, + inputDf.inputFiles.toSeq, + Seq("c1","c2","c3","c5","c6","c7","c8"), + testZIndexPath.toString, + "2", + Seq("0", "1") + ) + + // Save again + ZOrderingIndexHelper.updateZIndexFor( + inputDf.sparkSession, + sourceTableSchema, + inputDf.inputFiles.toSeq, + Seq("c1","c2","c3","c5","c6","c7","c8"), + testZIndexPath.toString, + "3", + Seq("0", "1", "2") + ) + + // Test old index table being cleaned up + ZOrderingIndexHelper.updateZIndexFor( + inputDf.sparkSession, + sourceTableSchema, + inputDf.inputFiles.toSeq, + Seq("c1","c2","c3","c5","c6","c7","c8"), + testZIndexPath.toString, + "4", + Seq("0", "1", "3") + ) + + assertEquals(!fs.exists(new Path(testZIndexPath, "2")), true) + assertEquals(!fs.exists(new Path(testZIndexPath, "3")), true) + assertEquals(fs.exists(new Path(testZIndexPath, "4")), true) + } + + private def buildZIndexTableManually(tablePath: String, zorderedCols: Seq[String], indexSchema: StructType) = { + val files = { + val it = fs.listFiles(new Path(tablePath), true) + var seq = Seq[LocatedFileStatus]() + while (it.hasNext) { + seq = seq :+ it.next() + } + seq + } + + spark.createDataFrame( + files.flatMap(file => { + val df = spark.read.schema(sourceTableSchema).parquet(file.getPath.toString) + val exprs: Seq[String] = + s"'${typedLit(file.getPath.getName)}' AS file" +: + df.columns + .filter(col => zorderedCols.contains(col)) + .flatMap(col => { + val minColName = s"${col}_minValue" + val maxColName = s"${col}_maxValue" + Seq( + s"min($col) AS $minColName", + s"max($col) AS $maxColName", + s"sum(cast(isnull($col) AS long)) AS ${col}_num_nulls" + ) + }) + + df.selectExpr(exprs: _*) + .collect() + }), + indexSchema + ) + } + + private def asJson(df: DataFrame) = + df.toJSON + .select("value") + .collect() + .toSeq + .map(_.getString(0)) + .mkString("\n") + + private def assertRowsMatch(one: DataFrame, other: DataFrame) = { + val rows = one.count() + assert(rows == other.count() && one.intersect(other).count() == rows) + } + + private def sort(df: DataFrame): DataFrame = { + // Since upon parsing JSON, Spark re-order columns in lexicographical order + // of their names, we have to shuffle new Z-index table columns order to match + // Rows are sorted by filename as well to avoid + val sortedCols = df.columns.sorted + df.select(sortedCols.head, sortedCols.tail: _*) + .sort("file") + } + + def createComplexDataFrame(spark: SparkSession): DataFrame = { + val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => + val c1 = Integer.valueOf(item) + val c2 = s" ${item}sdc" + val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}") + val c4 = new Timestamp(System.currentTimeMillis()) + val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}") + val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}") + val c7 = Array(item).map(_.toByte) + val c8 = java.lang.Byte.valueOf("9") + + RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8) + } + spark.createDataFrame(rdd, sourceTableSchema) + } +} diff --git a/style/checkstyle.xml b/style/checkstyle.xml index ff90b7f3c..82325c953 100644 --- a/style/checkstyle.xml +++ b/style/checkstyle.xml @@ -275,7 +275,9 @@ - + + +