[HUDI-3513] Make sure Column Stats does not fail in case it fails to load previous Index Table state (#5015)
This commit is contained in:
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.util.BaseFileUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ParquetUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
@@ -304,21 +305,28 @@ public class ColumnStatsIndexHelper {
|
||||
if (validIndexTables.isEmpty()) {
|
||||
finalColStatsIndexDf = newColStatsIndexDf;
|
||||
} else {
|
||||
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
|
||||
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
|
||||
// prior to merging, since merging might fail otherwise due to schemas incompatibility
|
||||
finalColStatsIndexDf =
|
||||
tryMergeMostRecentIndexTableInto(
|
||||
sparkSession,
|
||||
newColStatsIndexDf,
|
||||
// Load current most recent col-stats-index table
|
||||
sparkSession.read().load(
|
||||
new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1)).toString()
|
||||
)
|
||||
);
|
||||
Path latestIndexTablePath = new Path(indexFolderPath, validIndexTables.get(validIndexTables.size() - 1));
|
||||
|
||||
// Clean up all index tables (after creation of the new index)
|
||||
tablesToCleanup.addAll(validIndexTables);
|
||||
Option<Dataset<Row>> existingIndexTableOpt =
|
||||
tryLoadExistingIndexTable(sparkSession, latestIndexTablePath);
|
||||
|
||||
if (!existingIndexTableOpt.isPresent()) {
|
||||
finalColStatsIndexDf = newColStatsIndexDf;
|
||||
} else {
|
||||
// NOTE: That Parquet schema might deviate from the original table schema (for ex,
|
||||
// by upcasting "short" to "integer" types, etc), and hence we need to re-adjust it
|
||||
// prior to merging, since merging might fail otherwise due to schemas incompatibility
|
||||
finalColStatsIndexDf =
|
||||
tryMergeMostRecentIndexTableInto(
|
||||
sparkSession,
|
||||
newColStatsIndexDf,
|
||||
// Load current most recent col-stats-index table
|
||||
existingIndexTableOpt.get()
|
||||
);
|
||||
|
||||
// Clean up all index tables (after creation of the new index)
|
||||
tablesToCleanup.addAll(validIndexTables);
|
||||
}
|
||||
}
|
||||
|
||||
// Persist new col-stats-index table
|
||||
@@ -349,6 +357,17 @@ public class ColumnStatsIndexHelper {
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private static Option<Dataset<Row>> tryLoadExistingIndexTable(@Nonnull SparkSession sparkSession, @Nonnull Path indexTablePath) {
|
||||
try {
|
||||
Dataset<Row> indexTableDataset = sparkSession.read().load(indexTablePath.toUri().toString());
|
||||
return Option.of(indexTableDataset);
|
||||
} catch (Exception e) {
|
||||
LOG.error(String.format("Failed to load existing Column Stats index table from (%s)", indexTablePath), e);
|
||||
return Option.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private static Dataset<Row> tryMergeMostRecentIndexTableInto(
|
||||
@Nonnull SparkSession sparkSession,
|
||||
|
||||
Reference in New Issue
Block a user