From 4f7ea8c79a9d13accf72d094296993c588d87beb Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 6 Jun 2022 10:14:26 -0700 Subject: [PATCH] [HUDI-4176] Fixing `TableSchemaResolver` to avoid repeated `HoodieCommitMetadata` parsing (#5733) As has been outlined in HUDI-4176, we've hit a roadblock while testing Hudi on a large dataset (~1Tb) having pretty fat commits where Hudi's commit metadata could reach into 100s of Mbs. Given the size some of ours commit metadata instances Spark's parsing and resolving phase (when spark.sql(...) is involved, but before returned Dataset is dereferenced) starts to dominate some of our queries' execution time. - Rebased onto new APIs to avoid excessive Hadoop's Path allocations - Eliminated hasOperationField completely to avoid repeatitive computations - Cleaning up duplication in HoodieActiveTimeline - Added caching for common instances of HoodieCommitMetadata - Made tableStructSchema lazy; --- .../TestHoodieClientOnCopyOnWriteStorage.java | 4 +- .../hudi/testutils/HoodieClientTestUtils.java | 2 +- .../org/apache/hudi/avro/AvroSchemaUtils.java | 11 + .../org/apache/hudi/avro/HoodieAvroUtils.java | 12 - .../common/model/HoodieCommitMetadata.java | 2 +- .../common/table/TableSchemaResolver.java | 475 +++++++++--------- .../table/timeline/HoodieActiveTimeline.java | 74 +-- .../main/java/org/apache/hudi/util/Lazy.java | 2 + .../common/table/TestTableSchemaResolver.java | 16 +- .../scala/org/apache/hudi/DefaultSource.scala | 13 +- .../org/apache/hudi/HoodieBaseRelation.scala | 19 +- .../org/apache/hudi/IncrementalRelation.scala | 10 +- .../TestTableSchemaResolverWithSparkSQL.scala | 2 +- .../spark/sql/hudi/TestInsertTable.scala | 2 +- 14 files changed, 318 insertions(+), 326 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index bf3063c5d..cebf3145b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -2094,13 +2094,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimeline.getInstantDetails(commitInstant).get(), HoodieCommitMetadata.class); String basePath = table.getMetaClient().getBasePath(); - Collection commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values(); + Collection commitPathNames = commitMetadata.getFileIdAndFullPaths(new Path(basePath)).values(); // Read from commit file try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime))) { String everything = FileIOUtils.readAsUTFString(inputStream); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class); - HashMap paths = metadata.getFileIdAndFullPaths(basePath); + HashMap paths = metadata.getFileIdAndFullPaths(new Path(basePath)); // Compare values in both to make sure they are equal. for (String pathName : paths.values()) { assertTrue(commitPathNames.contains(pathName)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 75d2d1422..3387dd24b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -109,7 +109,7 @@ public class HoodieClientTestUtils { for (HoodieInstant commit : commitsToReturn) { HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); - fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(basePath)); + fileIdToFullPath.putAll(metadata.getFileIdAndFullPaths(new Path(basePath))); } return fileIdToFullPath; } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index dd14dca67..d45cfe035 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -109,4 +109,15 @@ public class AvroSchemaUtils { return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(avroType)); } + /** + * Returns true in case when schema contains the field w/ provided name + */ + public static boolean containsFieldInSchema(Schema schema, String fieldName) { + try { + Schema.Field field = schema.getField(fieldName); + return field != null; + } catch (Exception e) { + return false; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index e2b586964..660660402 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -166,18 +166,6 @@ public class HoodieAvroUtils { return reader.read(null, jsonDecoder); } - /** - * True if the schema contains this name of field - */ - public static boolean containsFieldInSchema(Schema schema, String fieldName) { - try { - Field field = schema.getField(fieldName); - return field != null; - } catch (Exception e) { - return false; - } - } - public static boolean isMetadataField(String fieldName) { return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName) || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index f5077dea8..41d83813f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -125,7 +125,7 @@ public class HoodieCommitMetadata implements Serializable { return this.operationType; } - public HashMap getFileIdAndFullPaths(String basePath) { + public HashMap getFileIdAndFullPaths(Path basePath) { HashMap fullPaths = new HashMap<>(); for (Map.Entry entry : getFileIdAndRelativePaths().entrySet()) { String fullPath = entry.getValue() != null diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index b76f71161..4f9992688 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -43,6 +43,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIncompatibleSchemaException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.internal.schema.InternalSchema; @@ -50,6 +51,7 @@ import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; import org.apache.hudi.internal.schema.utils.SerDeHelper; import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.io.storage.HoodieOrcReader; +import org.apache.hudi.util.Lazy; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -58,100 +60,56 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; +import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema; +import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema; import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; /** * Helper class to read schema from data files and log files and to convert it between different formats. - * - * TODO(HUDI-3626) cleanup */ +@ThreadSafe public class TableSchemaResolver { private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class); + private final HoodieTableMetaClient metaClient; - private final boolean hasOperationField; + + /** + * Signals whether suite of the meta-fields should have additional field designating + * operation particular record was added by. Note, that determining whether this meta-field + * should be appended to the schema requires reading out the actual schema of some data file, + * since it's ultimately the source of truth whether this field has to be represented in + * the schema + */ + private final Lazy hasOperationField; + + /** + * NOTE: {@link HoodieCommitMetadata} could be of non-trivial size for large tables (in 100s of Mbs) + * and therefore we'd want to limit amount of throw-away work being performed while fetching + * commits' metadata + * + * Please check out corresponding methods to fetch commonly used instances of {@link HoodieCommitMetadata}: + * {@link #getLatestCommitMetadataWithValidSchema()}, + * {@link #getLatestCommitMetadataWithValidSchema()}, + * {@link #getCachedCommitMetadata(HoodieInstant)} + */ + private final Lazy> commitMetadataCache; + + private volatile HoodieInstant latestCommitWithValidSchema = null; + private volatile HoodieInstant latestCommitWithValidData = null; public TableSchemaResolver(HoodieTableMetaClient metaClient) { this.metaClient = metaClient; - this.hasOperationField = hasOperationField(); - } - - /** - * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest - * commit. We will assume that the schema has not changed within a single atomic write. - * - * @return Parquet schema for this table - */ - private MessageType getTableParquetSchemaFromDataFile() { - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - Option> instantAndCommitMetadata = - activeTimeline.getLastCommitMetadataWithValidData(); - try { - switch (metaClient.getTableType()) { - case COPY_ON_WRITE: - // For COW table, the file has data written must be in parquet or orc format currently. - if (instantAndCommitMetadata.isPresent()) { - HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); - Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator(); - return fetchSchemaFromFiles(filePaths); - } else { - throw new IllegalArgumentException("Could not find any data file written for commit, " - + "so could not get schema for table " + metaClient.getBasePath()); - } - case MERGE_ON_READ: - // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile. - // Determine the file format based on the file name, and then extract schema from it. - if (instantAndCommitMetadata.isPresent()) { - HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); - Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator(); - return fetchSchemaFromFiles(filePaths); - } else { - throw new IllegalArgumentException("Could not find any data file written for commit, " - + "so could not get schema for table " + metaClient.getBasePath()); - } - default: - LOG.error("Unknown table type " + metaClient.getTableType()); - throw new InvalidTableException(metaClient.getBasePath()); - } - } catch (IOException e) { - throw new HoodieException("Failed to read data schema", e); - } - } - - private MessageType fetchSchemaFromFiles(Iterator filePaths) throws IOException { - MessageType type = null; - while (filePaths.hasNext() && type == null) { - String filePath = filePaths.next(); - if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { - // this is a log file - type = readSchemaFromLogFile(new Path(filePath)); - } else { - type = readSchemaFromBaseFile(filePath); - } - } - return type; - } - - private MessageType readSchemaFromBaseFile(String filePath) throws IOException { - if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { - // this is a parquet file - return readSchemaFromParquetBaseFile(new Path(filePath)); - } else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) { - // this is a HFile - return readSchemaFromHFileBaseFile(new Path(filePath)); - } else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) { - // this is a ORC file - return readSchemaFromORCBaseFile(new Path(filePath)); - } else { - throw new IllegalArgumentException("Unknown base file format :" + filePath); - } + this.commitMetadataCache = Lazy.lazily(() -> new ConcurrentHashMap<>(2)); + this.hasOperationField = Lazy.lazily(this::hasOperationField); } public Schema getTableAvroSchemaFromDataFile() { @@ -176,86 +134,25 @@ public class TableSchemaResolver { * @throws Exception */ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception { - Schema schema; - Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(includeMetadataFields); - if (schemaFromCommitMetadata.isPresent()) { - schema = schemaFromCommitMetadata.get(); - } else { - Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); - if (schemaFromTableConfig.isPresent()) { - if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField); - } else { - schema = schemaFromTableConfig.get(); - } - } else { - if (includeMetadataFields) { - schema = getTableAvroSchemaFromDataFile(); - } else { - schema = HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); - } - } - } - - Option partitionFieldsOpt = metaClient.getTableConfig().getPartitionFields(); - if (metaClient.getTableConfig().shouldDropPartitionColumns()) { - schema = recreateSchemaWhenDropPartitionColumns(partitionFieldsOpt, schema); - } - return schema; + return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); } - public static Schema recreateSchemaWhenDropPartitionColumns(Option partitionFieldsOpt, Schema originSchema) { - // when hoodie.datasource.write.drop.partition.columns is true, partition columns can't be persisted in data files. - // And there are no partition schema if the schema is parsed from data files. - // Here we create partition Fields for this case, and use StringType as the data type. - Schema schema = originSchema; - if (partitionFieldsOpt.isPresent() && partitionFieldsOpt.get().length != 0) { - List partitionFields = Arrays.asList(partitionFieldsOpt.get()); - - final Schema schema0 = originSchema; - boolean hasPartitionColNotInSchema = partitionFields.stream().anyMatch( - pt -> !HoodieAvroUtils.containsFieldInSchema(schema0, pt) - ); - boolean hasPartitionColInSchema = partitionFields.stream().anyMatch( - pt -> HoodieAvroUtils.containsFieldInSchema(schema0, pt) - ); - if (hasPartitionColNotInSchema && hasPartitionColInSchema) { - throw new HoodieIncompatibleSchemaException( - "Not support: Partial partition fields are still in the schema " - + "when enable hoodie.datasource.write.drop.partition.columns"); - } - - if (hasPartitionColNotInSchema) { - // when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns - // are not in originSchema. So we create and add them. - List newFields = new ArrayList<>(); - for (String partitionField: partitionFields) { - newFields.add(new Schema.Field( - partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); - } - schema = appendFieldsToSchema(schema, newFields); - } - } - return schema; + /** + * Fetches tables schema in Avro format as of the given instant + * + * @param instant as of which table's schema will be fetched + */ + public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception { + return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)); } /** * Gets full schema (user + metadata) for a hoodie table in Parquet format. * * @return Parquet schema for the table - * @throws Exception */ public MessageType getTableParquetSchema() throws Exception { - Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true); - if (schemaFromCommitMetadata.isPresent()) { - return convertAvroSchemaToParquet(schemaFromCommitMetadata.get()); - } - Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); - if (schemaFromTableConfig.isPresent()) { - Schema schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField); - return convertAvroSchemaToParquet(schema); - } - return getTableParquetSchemaFromDataFile(); + return convertAvroSchemaToParquet(getTableAvroSchema(true)); } /** @@ -271,41 +168,43 @@ public class TableSchemaResolver { return getTableAvroSchema(false); } - /** - * Gets users data schema for a hoodie table in Avro format of the instant. - * - * @param instant will get the instant data schema - * @return Avro user data schema - * @throws Exception - * @deprecated use {@link #getTableSchemaFromCommitMetadata} instead - */ - @Deprecated - public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant) throws Exception { - Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(instant, false); - if (schemaFromCommitMetadata.isPresent()) { - return schemaFromCommitMetadata.get(); + private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option instantOpt) { + Schema schema = + (instantOpt.isPresent() + ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) + : getTableSchemaFromLatestCommitMetadata(includeMetadataFields)) + .or(() -> + metaClient.getTableConfig().getTableCreateSchema() + .map(tableSchema -> + includeMetadataFields + ? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get()) + : tableSchema) + ) + .orElseGet(() -> { + Schema schemaFromDataFile = getTableAvroSchemaFromDataFile(); + return includeMetadataFields + ? schemaFromDataFile + : HoodieAvroUtils.removeMetadataFields(schemaFromDataFile); + }); + + // TODO partition columns have to be appended in all read-paths + if (metaClient.getTableConfig().shouldDropPartitionColumns()) { + return metaClient.getTableConfig().getPartitionFields() + .map(partitionFields -> appendPartitionColumns(schema, partitionFields)) + .orElse(schema); } - Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); - if (schemaFromTableConfig.isPresent()) { - return schemaFromTableConfig.get(); - } - return HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); + + return schema; } - /** - * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit with valid schema. - * - * @return Avro schema for this table - */ - private Option getTableSchemaFromCommitMetadata(boolean includeMetadataFields) { - Option> instantAndCommitMetadata = - metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(); + private Option getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) { + Option> instantAndCommitMetadata = getLatestCommitMetadataWithValidSchema(); if (instantAndCommitMetadata.isPresent()) { HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); String schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); Schema schema = new Schema.Parser().parse(schemaStr); if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField); + schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField.get()); } return Option.of(schema); } else { @@ -313,17 +212,9 @@ public class TableSchemaResolver { } } - - /** - * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the instant. - * - * @return Avro schema for this table - */ private Option getTableSchemaFromCommitMetadata(HoodieInstant instant, boolean includeMetadataFields) { try { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - byte[] data = timeline.getInstantDetails(instant).get(); - HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + HoodieCommitMetadata metadata = getCachedCommitMetadata(instant); String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); if (StringUtils.isNullOrEmpty(existingSchemaStr)) { @@ -332,7 +223,7 @@ public class TableSchemaResolver { Schema schema = new Schema.Parser().parse(existingSchemaStr); if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField); + schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField.get()); } return Option.of(schema); } catch (Exception e) { @@ -341,23 +232,41 @@ public class TableSchemaResolver { } /** - * Convert a parquet scheme to the avro format. - * - * @param parquetSchema The parquet schema to convert - * @return The converted avro schema + * Fetches the schema for a table from any the table's data files */ - public Schema convertParquetSchemaToAvro(MessageType parquetSchema) { + private MessageType getTableParquetSchemaFromDataFile() { + Option> instantAndCommitMetadata = getLatestCommitMetadataWithValidData(); + try { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + case MERGE_ON_READ: + // For COW table, data could be written in either Parquet or Orc format currently; + // For MOR table, data could be written in either Parquet, Orc, Hfile or Delta-log format currently; + // + // Determine the file format based on the file name, and then extract schema from it. + if (instantAndCommitMetadata.isPresent()) { + HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); + Iterator filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().iterator(); + return fetchSchemaFromFiles(filePaths); + } else { + throw new IllegalArgumentException("Could not find any data file written for commit, " + + "so could not get schema for table " + metaClient.getBasePath()); + } + default: + LOG.error("Unknown table type " + metaClient.getTableType()); + throw new InvalidTableException(metaClient.getBasePath()); + } + } catch (IOException e) { + throw new HoodieException("Failed to read data schema", e); + } + } + + private Schema convertParquetSchemaToAvro(MessageType parquetSchema) { AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf()); return avroSchemaConverter.convert(parquetSchema); } - /** - * Convert a avro scheme to the parquet format. - * - * @param schema The avro schema to convert - * @return The converted parquet schema - */ - public MessageType convertAvroSchemaToParquet(Schema schema) { + private MessageType convertAvroSchemaToParquet(Schema schema) { AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf()); return avroSchemaConverter.convert(schema); } @@ -482,10 +391,7 @@ public class TableSchemaResolver { return latestSchema; } - /** - * Read the parquet schema from a parquet File. - */ - public MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException { + private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException { LOG.info("Reading schema from " + parquetFilePath); FileSystem fs = metaClient.getRawFs(); @@ -494,35 +400,27 @@ public class TableSchemaResolver { return fileFooter.getFileMetaData().getSchema(); } - /** - * Read the parquet schema from a HFile. - */ - public MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException { + private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException { LOG.info("Reading schema from " + hFilePath); FileSystem fs = metaClient.getRawFs(); CacheConfig cacheConfig = new CacheConfig(fs.getConf()); HoodieHFileReader hFileReader = new HoodieHFileReader<>(fs.getConf(), hFilePath, cacheConfig); - return convertAvroSchemaToParquet(hFileReader.getSchema()); } - - /** - * Read the parquet schema from a ORC file. - */ - public MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException { + private MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException { LOG.info("Reading schema from " + orcFilePath); FileSystem fs = metaClient.getRawFs(); HoodieOrcReader orcReader = new HoodieOrcReader<>(fs.getConf(), orcFilePath); - return convertAvroSchemaToParquet(orcReader.getSchema()); } /** * Read schema from a data file from the last compaction commit done. - * @throws Exception + * + * @deprecated please use {@link #getTableAvroSchema(HoodieInstant, boolean)} instead */ public MessageType readSchemaFromLastCompaction(Option lastCompactionCommitOpt) throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -533,18 +431,13 @@ public class TableSchemaResolver { // Read from the compacted file wrote HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class); - String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() + String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().stream().findAny() .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); return readSchemaFromBaseFile(filePath); } - /** - * Read the schema from the log file on path. - * - * @return - */ - public MessageType readSchemaFromLogFile(Path path) throws IOException { + private MessageType readSchemaFromLogFile(Path path) throws IOException { return readSchemaFromLogFile(metaClient.getRawFs(), path); } @@ -566,20 +459,6 @@ public class TableSchemaResolver { } } - public boolean isHasOperationField() { - return hasOperationField; - } - - private boolean hasOperationField() { - try { - Schema tableAvroSchema = getTableAvroSchemaFromDataFile(); - return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null; - } catch (Exception e) { - LOG.info(String.format("Failed to read operation field from avro schema (%s)", e.getMessage())); - return false; - } - } - /** * Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata of the instant. * @@ -587,11 +466,7 @@ public class TableSchemaResolver { */ public Option getTableInternalSchemaFromCommitMetadata() { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - if (timeline.lastInstant().isPresent()) { - return getTableInternalSchemaFromCommitMetadata(timeline.lastInstant().get()); - } else { - return Option.empty(); - } + return timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata); } /** @@ -601,9 +476,7 @@ public class TableSchemaResolver { */ private Option getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) { try { - HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedInstants(); - byte[] data = timeline.getInstantDetails(instant).get(); - HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + HoodieCommitMetadata metadata = getCachedCommitMetadata(instant); String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA); if (latestInternalSchemaStr != null) { return SerDeHelper.fromJson(latestInternalSchemaStr); @@ -626,4 +499,128 @@ public class TableSchemaResolver { String result = manager.getHistorySchemaStr(); return result.isEmpty() ? Option.empty() : Option.of(result); } + + /** + * NOTE: This method could only be used in tests + * + * @VisibleForTesting + */ + public boolean hasOperationField() { + try { + Schema tableAvroSchema = getTableAvroSchemaFromDataFile(); + return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null; + } catch (Exception e) { + LOG.info(String.format("Failed to read operation field from avro schema (%s)", e.getMessage())); + return false; + } + } + + private Option> getLatestCommitMetadataWithValidSchema() { + if (latestCommitWithValidSchema == null) { + Option> instantAndCommitMetadata = + metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema(); + if (instantAndCommitMetadata.isPresent()) { + HoodieInstant instant = instantAndCommitMetadata.get().getLeft(); + HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight(); + synchronized (this) { + if (latestCommitWithValidSchema == null) { + latestCommitWithValidSchema = instant; + } + commitMetadataCache.get().putIfAbsent(instant, metadata); + } + } + } + + return Option.ofNullable(latestCommitWithValidSchema) + .map(instant -> Pair.of(instant, commitMetadataCache.get().get(instant))); + } + + private Option> getLatestCommitMetadataWithValidData() { + if (latestCommitWithValidData == null) { + Option> instantAndCommitMetadata = + metaClient.getActiveTimeline().getLastCommitMetadataWithValidData(); + if (instantAndCommitMetadata.isPresent()) { + HoodieInstant instant = instantAndCommitMetadata.get().getLeft(); + HoodieCommitMetadata metadata = instantAndCommitMetadata.get().getRight(); + synchronized (this) { + if (latestCommitWithValidData == null) { + latestCommitWithValidData = instant; + } + commitMetadataCache.get().putIfAbsent(instant, metadata); + } + } + } + + return Option.ofNullable(latestCommitWithValidData) + .map(instant -> Pair.of(instant, commitMetadataCache.get().get(instant))); + } + + private HoodieCommitMetadata getCachedCommitMetadata(HoodieInstant instant) { + return commitMetadataCache.get() + .computeIfAbsent(instant, (missingInstant) -> { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + byte[] data = timeline.getInstantDetails(missingInstant).get(); + try { + return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to fetch HoodieCommitMetadata for instant (%s)", missingInstant), e); + } + }); + } + + private MessageType fetchSchemaFromFiles(Iterator filePaths) throws IOException { + MessageType type = null; + while (filePaths.hasNext() && type == null) { + String filePath = filePaths.next(); + if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { + // this is a log file + type = readSchemaFromLogFile(new Path(filePath)); + } else { + type = readSchemaFromBaseFile(filePath); + } + } + return type; + } + + private MessageType readSchemaFromBaseFile(String filePath) throws IOException { + if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { + return readSchemaFromParquetBaseFile(new Path(filePath)); + } else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) { + return readSchemaFromHFileBaseFile(new Path(filePath)); + } else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) { + return readSchemaFromORCBaseFile(new Path(filePath)); + } else { + throw new IllegalArgumentException("Unknown base file format :" + filePath); + } + } + + static Schema appendPartitionColumns(Schema dataSchema, String[] partitionFields) { + // In cases when {@link DROP_PARTITION_COLUMNS} config is set true, partition columns + // won't be persisted w/in the data files, and therefore we need to append such columns + // when schema is parsed from data files + // + // Here we append partition columns with {@code StringType} as the data type + if (partitionFields.length == 0) { + return dataSchema; + } + + boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf)); + boolean hasPartitionColInSchema = Arrays.stream(partitionFields).anyMatch(pf -> containsFieldInSchema(dataSchema, pf)); + if (hasPartitionColNotInSchema && hasPartitionColInSchema) { + throw new HoodieIncompatibleSchemaException("Partition columns could not be partially contained w/in the data schema"); + } + + if (hasPartitionColNotInSchema) { + // when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns + // are not in originSchema. So we create and add them. + List newFields = new ArrayList<>(); + for (String partitionField: partitionFields) { + newFields.add(new Schema.Field( + partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE)); + } + return appendFieldsToSchema(dataSchema, newFields); + } + + return dataSchema; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index a62068e65..c069e41ad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -18,6 +18,10 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -28,11 +32,6 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -45,11 +44,10 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashSet; -import java.util.List; import java.util.Objects; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -267,43 +265,47 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } /** - * Get the last instant with valid schema, and convert this to HoodieCommitMetadata + * Returns most recent instant having valid schema in its {@link HoodieCommitMetadata} */ public Option> getLastCommitMetadataWithValidSchema() { - List completed = getCommitsTimeline().filterCompletedInstants().getInstants() - .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList()); - for (HoodieInstant instant : completed) { - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - getInstantDetails(instant).get(), HoodieCommitMetadata.class); - if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) { - return Option.of(Pair.of(instant, commitMetadata)); - } - } catch (IOException e) { - LOG.warn("Failed to convert instant to HoodieCommitMetadata: " + instant.toString()); - } - } - return Option.empty(); + return Option.fromJavaOptional( + getCommitMetadataStream() + .filter(instantCommitMetadataPair -> + !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) + .findFirst() + ); } /** * Get the last instant with valid data, and convert this to HoodieCommitMetadata */ public Option> getLastCommitMetadataWithValidData() { - List completed = getCommitsTimeline().filterCompletedInstants().getInstants() - .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList()); - for (HoodieInstant instant : completed) { - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - getInstantDetails(instant).get(), HoodieCommitMetadata.class); - if (!commitMetadata.getFileIdAndRelativePaths().isEmpty()) { - return Option.of(Pair.of(instant, commitMetadata)); - } - } catch (IOException e) { - LOG.warn("Failed to convert instant to HoodieCommitMetadata: " + instant.toString()); - } - } - return Option.empty(); + return Option.fromJavaOptional( + getCommitMetadataStream() + .filter(instantCommitMetadataPair -> + !instantCommitMetadataPair.getValue().getFileIdAndRelativePaths().isEmpty()) + .findFirst() + ); + } + + /** + * Returns stream of {@link HoodieCommitMetadata} in order reverse to chronological (ie most + * recent metadata being the first element) + */ + private Stream> getCommitMetadataStream() { + // NOTE: Streams are lazy + return getCommitsTimeline().filterCompletedInstants() + .getInstants() + .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()) + .map(instant -> { + try { + HoodieCommitMetadata commitMetadata = + HoodieCommitMetadata.fromBytes(getInstantDetails(instant).get(), HoodieCommitMetadata.class); + return Pair.of(instant, commitMetadata); + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to fetch HoodieCommitMetadata for instant (%s)", instant), e); + } + }); } public Option readCleanerInfoAsBytes(HoodieInstant instant) { diff --git a/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java index 106969b70..1a843430b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java +++ b/hudi-common/src/main/java/org/apache/hudi/util/Lazy.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import javax.annotation.concurrent.ThreadSafe; import java.util.function.Supplier; /** @@ -25,6 +26,7 @@ import java.util.function.Supplier; * * @param type of the object being held by {@link Lazy} */ +@ThreadSafe public class Lazy { private volatile boolean initialized; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java index e0e57e812..51d5c5212 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java @@ -19,11 +19,8 @@ package org.apache.hudi.common.table; import org.apache.avro.Schema; - import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.util.Option; - import org.apache.hudi.exception.HoodieIncompatibleSchemaException; import org.junit.jupiter.api.Test; @@ -37,24 +34,19 @@ public class TestTableSchemaResolver { public void testRecreateSchemaWhenDropPartitionColumns() { Schema originSchema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - // case1 - Option emptyPartitionFieldsOpt = Option.empty(); - Schema s1 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(emptyPartitionFieldsOpt, originSchema); - assertEquals(originSchema, s1); - // case2 String[] pts1 = new String[0]; - Schema s2 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts1), originSchema); + Schema s2 = TableSchemaResolver.appendPartitionColumns(originSchema, pts1); assertEquals(originSchema, s2); // case3: partition_path is in originSchema String[] pts2 = {"partition_path"}; - Schema s3 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts2), originSchema); + Schema s3 = TableSchemaResolver.appendPartitionColumns(originSchema, pts2); assertEquals(originSchema, s3); // case4: user_partition is not in originSchema String[] pts3 = {"user_partition"}; - Schema s4 = TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts3), originSchema); + Schema s4 = TableSchemaResolver.appendPartitionColumns(originSchema, pts3); assertNotEquals(originSchema, s4); assertTrue(s4.getFields().stream().anyMatch(f -> f.name().equals("user_partition"))); Schema.Field f = s4.getField("user_partition"); @@ -63,7 +55,7 @@ public class TestTableSchemaResolver { // case5: user_partition is in originSchema, but partition_path is in originSchema String[] pts4 = {"user_partition", "partition_path"}; try { - TableSchemaResolver.recreateSchemaWhenDropPartitionColumns(Option.of(pts3), originSchema); + TableSchemaResolver.appendPartitionColumns(originSchema, pts3); } catch (HoodieIncompatibleSchemaException e) { assertTrue(e.getMessage().contains("Partial partition fields are still in the schema")); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index c1229d550..eee5a4881 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -213,27 +213,18 @@ class DefaultSource extends RelationProvider globPaths: Seq[Path], userSchema: Option[StructType], metaClient: HoodieTableMetaClient, - optParams: Map[String, String]) = { + optParams: Map[String, String]): BaseRelation = { val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths) - val enableSchemaOnRead: Boolean = !tryFetchInternalSchema(metaClient).isEmptySchema // NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of // [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/ // vanilla Spark, since some of the Spark optimizations are predicated on the using of [[HadoopFsRelation]]. // // You can check out HUDI-3896 for more details - if (enableSchemaOnRead) { + if (baseRelation.hasSchemaOnRead) { baseRelation } else { baseRelation.toHadoopFsRelation } } - - private def tryFetchInternalSchema(metaClient: HoodieTableMetaClient) = - try { - new TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata - .orElse(InternalSchema.getEmptyInternalSchema) - } catch { - case _: Exception => InternalSchema.getEmptyInternalSchema - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 08f87816d..4a12256c4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -122,9 +122,13 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) .map(HoodieSqlCommonUtils.formatQueryInstant) + /** + * NOTE: Initialization of teh following members is coupled on purpose to minimize amount of I/O + * required to fetch table's Avro and Internal schemas + */ protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = { - val schemaUtil = new TableSchemaResolver(metaClient) - val avroSchema = Try(schemaUtil.getTableAvroSchema) match { + val schemaResolver = new TableSchemaResolver(metaClient) + val avroSchema = Try(schemaResolver.getTableAvroSchema) match { case Success(schema) => schema case Failure(e) => logWarning("Failed to fetch schema from the table", e) @@ -137,14 +141,14 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, } // try to find internalSchema val internalSchemaFromMeta = try { - schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) + schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema) } catch { case _: Exception => InternalSchema.getEmptyInternalSchema } (avroSchema, internalSchemaFromMeta) } - protected val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) protected val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) @@ -196,7 +200,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * meaning that regardless of whether this columns are being requested by the query they will be fetched * regardless so that relation is able to combine records properly (if necessary) * - * @VisibleInTests + * @VisibleForTesting */ val mandatoryFields: Seq[String] @@ -215,6 +219,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def queryTimestamp: Option[String] = specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp)) + /** + * Returns true in case table supports Schema on Read (Schema Evolution) + */ + def hasSchemaOnRead: Boolean = !internalSchema.isEmptySchema + override def schema: StructType = tableStructSchema /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index d9d5812ad..81b12dbcb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -58,9 +58,9 @@ class IncrementalRelation(val sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[IncrementalRelation]) val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema - private val basePath = metaClient.getBasePath + private val basePath = metaClient.getBasePathV2 // TODO : Figure out a valid HoodieWriteConfig - private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(), + private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath.toString).build(), new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)), metaClient) private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() @@ -98,7 +98,7 @@ class IncrementalRelation(val sqlContext: SQLContext, val tableSchema = if (useEndInstantSchema && iSchema.isEmptySchema) { if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else - schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last) + schemaResolver.getTableAvroSchema(commitsToReturn.last, false) } else { schemaResolver.getTableAvroSchemaWithoutMetadataFields() } @@ -202,7 +202,7 @@ class IncrementalRelation(val sqlContext: SQLContext, var doFullTableScan = false if (fallbackToFullTableScan) { - val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration); + val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration); val timer = new HoodieTimer().startTimer(); val allFilesToCheck = filteredMetaBootstrapFullPaths ++ filteredRegularFullPaths @@ -223,7 +223,7 @@ class IncrementalRelation(val sqlContext: SQLContext, val hudiDF = sqlContext.read .format("hudi_v1") .schema(usedSchema) - .load(basePath) + .load(basePath.toString) .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, //Notice the > in place of >= because we are working with optParam instead of first commit > optParam optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key))) .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala index 85e1925bc..3258c7536 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala @@ -210,7 +210,7 @@ class TestTableSchemaResolverWithSparkSQL { .setConf(spark.sessionState.newHadoopConf()) .build() - assertTrue(new TableSchemaResolver(metaClient).isHasOperationField) + assertTrue(new TableSchemaResolver(metaClient).hasOperationField) schemaValuationBasedOnDataFile(metaClient, schema.toString()) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 760d1269c..fc9de60c6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -615,7 +615,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase { .setConf(spark.sessionState.newHadoopConf()) .build() - assertResult(true)(new TableSchemaResolver(metaClient).isHasOperationField) + assertResult(true)(new TableSchemaResolver(metaClient).hasOperationField) spark.sql( s"""