From 5d59bf67aeecafb65a2231d808b538878eca49fb Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 11 Mar 2022 14:39:22 -0800 Subject: [PATCH] [HUDI-3513] Make sure Column Stats does not fail in case it fails to load previous Index Table state (#5015) --- .../columnstats/ColumnStatsIndexHelper.java | 47 +++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java index 521bdb20c..4fdb6a6be 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -304,21 +305,28 @@ public class ColumnStatsIndexHelper { if (validIndexTables.isEmpty()) { finalColStatsIndexDf = newColStatsIndexDf; } else { - // NOTE: That Parquet schema might deviate from the original table schema (for ex, - // by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it - // prior to merging, since merging might fail otherwise due to schemas incompatibility - finalColStatsIndexDf = - tryMergeMostRecentIndexTableInto( - sparkSession, - newColStatsIndexDf, - // Load current most recent col-stats-index table - sparkSession.read().load( - new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString() - ) - ); + Path latestIndexTablePath = new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)); - // Clean up all index tables (after creation of the new index) - tablesToCleanup.addAll(validIndexTables); + Option> existingIndexTableOpt = + tryLoadExistingIndexTable(sparkSession, latestIndexTablePath); + + if (!existingIndexTableOpt.isPresent()) { + finalColStatsIndexDf = newColStatsIndexDf; + } else { + // NOTE: That Parquet schema might deviate from the original table schema (for ex, + // by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it + // prior to merging, since merging might fail otherwise due to schemas incompatibility + finalColStatsIndexDf = + tryMergeMostRecentIndexTableInto( + sparkSession, + newColStatsIndexDf, + // Load current most recent col-stats-index table + existingIndexTableOpt.get() + ); + + // Clean up all index tables (after creation of the new index) + tablesToCleanup.addAll(validIndexTables); + } } // Persist new col-stats-index table @@ -349,6 +357,17 @@ public class ColumnStatsIndexHelper { } } + @Nonnull + private static Option> tryLoadExistingIndexTable(@Nonnull SparkSession sparkSession, @Nonnull Path indexTablePath) { + try { + Dataset indexTableDataset = sparkSession.read().load(indexTablePath.toUri().toString()); + return Option.of(indexTableDataset); + } catch (Exception e) { + LOG.error(String.format("Failed to load existing Column Stats index table from (%s)", indexTablePath), e); + return Option.empty(); + } + } + @Nonnull private static Dataset tryMergeMostRecentIndexTableInto( @Nonnull SparkSession sparkSession,