diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 0607601ca..e1c1b947c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -21,8 +21,10 @@ package org.apache.hudi.common.table; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.SchemaCompatibility; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -41,6 +43,9 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; +import org.apache.hudi.io.storage.HoodieHFileReader; + +import org.apache.hudi.io.storage.HoodieOrcReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -78,29 +83,26 @@ public class TableSchemaResolver { try { switch (metaClient.getTableType()) { case COPY_ON_WRITE: - // For COW table, the file has data written must be in parquet format currently. + // For COW table, the file has data written must be in parquet or orc format currently. if (instantAndCommitMetadata.isPresent()) { HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get(); - return readSchemaFromBaseFile(new Path(filePath)); + return readSchemaFromBaseFile(filePath); } else { throw new IllegalArgumentException("Could not find any data file written for commit, " + "so could not get schema for table " + metaClient.getBasePath()); } case MERGE_ON_READ: - // For MOR table, the file has data written may be a parquet file or .log file. + // For MOR table, the file has data written may be a parquet file, .log file, orc file or hfile. // Determine the file format based on the file name, and then extract schema from it. if (instantAndCommitMetadata.isPresent()) { HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get(); - if (filePath.contains(HoodieLogFile.DELTA_EXTENSION)) { + if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { // this is a log file return readSchemaFromLogFile(new Path(filePath)); - } else if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { - // this is a parquet file - return readSchemaFromBaseFile(new Path(filePath)); } else { - throw new IllegalArgumentException("Unknown file format :" + filePath); + return readSchemaFromBaseFile(filePath); } } else { throw new IllegalArgumentException("Could not find any data file written for commit, " @@ -115,6 +117,21 @@ public class TableSchemaResolver { } } + private MessageType readSchemaFromBaseFile(String filePath) throws IOException { + if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { + // this is a parquet file + return readSchemaFromParquetBaseFile(new Path(filePath)); + } else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) { + // this is a HFile + return readSchemaFromHFileBaseFile(new Path(filePath)); + } else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) { + // this is a ORC file + return readSchemaFromORCBaseFile(new Path(filePath)); + } else { + throw new IllegalArgumentException("Unknown base file format :" + filePath); + } + } + public Schema getTableAvroSchemaFromDataFile() { return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile()); } @@ -409,19 +426,41 @@ public class TableSchemaResolver { /** * Read the parquet schema from a parquet File. */ - public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException { + public MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException { LOG.info("Reading schema from " + parquetFilePath); FileSystem fs = metaClient.getRawFs(); - if (!fs.exists(parquetFilePath)) { - throw new IllegalArgumentException( - "Failed to read schema from data file " + parquetFilePath + ". File does not exist."); - } ParquetMetadata fileFooter = ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, ParquetMetadataConverter.NO_FILTER); return fileFooter.getFileMetaData().getSchema(); } + /** + * Read the parquet schema from a HFile. + */ + public MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException { + LOG.info("Reading schema from " + hFilePath); + + FileSystem fs = metaClient.getRawFs(); + CacheConfig cacheConfig = new CacheConfig(fs.getConf()); + HoodieHFileReader hFileReader = new HoodieHFileReader<>(fs.getConf(), hFilePath, cacheConfig); + + return convertAvroSchemaToParquet(hFileReader.getSchema()); + } + + + /** + * Read the parquet schema from a ORC file. + */ + public MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException { + LOG.info("Reading schema from " + orcFilePath); + + FileSystem fs = metaClient.getRawFs(); + HoodieOrcReader orcReader = new HoodieOrcReader<>(fs.getConf(), orcFilePath); + + return convertAvroSchemaToParquet(orcReader.getSchema()); + } + /** * Read schema from a data file from the last compaction commit done. * @throws Exception @@ -438,7 +477,7 @@ public class TableSchemaResolver { String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); - return readSchemaFromBaseFile(new Path(filePath)); + return readSchemaFromBaseFile(filePath); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index e418043fe..26fa6928a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.util; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -222,8 +223,15 @@ public class OrcUtils extends BaseFileUtils { public Schema readAvroSchema(Configuration conf, Path orcFilePath) { try { Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); - TypeDescription orcSchema = reader.getSchema(); - return AvroOrcUtils.createAvroSchema(orcSchema); + if (reader.hasMetadataValue("orc.avro.schema")) { + ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema"); + byte[] bytes = new byte[metadataValue.remaining()]; + metadataValue.get(bytes); + return new Schema.Parser().parse(new String(bytes)); + } else { + TypeDescription orcSchema = reader.getSchema(); + return AvroOrcUtils.createAvroSchema(orcSchema); + } } catch (IOException io) { throw new HoodieIOException("Unable to get Avro schema for ORC file:" + orcFilePath, io); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala new file mode 100644 index 000000000..85e1925bc --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.model.HoodieMetadataRecord +import org.apache.hudi.common.model._ +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.DataSourceTestUtils +import org.apache.spark.SparkContext +import org.apache.spark.sql._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource + +import scala.collection.JavaConverters + +/** + * Test suite for TableSchemaResolver with SparkSqlWriter. + */ +@Tag("functional") +class TestTableSchemaResolverWithSparkSQL { + var spark: SparkSession = _ + var sqlContext: SQLContext = _ + var sc: SparkContext = _ + var tempPath: java.nio.file.Path = _ + var tempBootStrapPath: java.nio.file.Path = _ + var hoodieFooTableName = "hoodie_foo_tbl" + var tempBasePath: String = _ + var commonTableModifier: Map[String, String] = Map() + + case class StringLongTest(uuid: String, ts: Long) + + /** + * Setup method running before each test. + */ + @BeforeEach + def setUp(): Unit = { + initSparkContext() + tempPath = java.nio.file.Files.createTempDirectory("hoodie_test_path") + tempBootStrapPath = java.nio.file.Files.createTempDirectory("hoodie_test_bootstrap") + tempBasePath = tempPath.toAbsolutePath.toString + commonTableModifier = getCommonParams(tempPath, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + } + + /** + * Tear down method running after each test. + */ + @AfterEach + def tearDown(): Unit = { + cleanupSparkContexts() + FileUtils.deleteDirectory(tempPath.toFile) + FileUtils.deleteDirectory(tempBootStrapPath.toFile) + } + + /** + * Utility method for initializing the spark context. + */ + def initSparkContext(): Unit = { + spark = SparkSession.builder() + .appName(hoodieFooTableName) + .master("local[2]") + .withExtensions(new HoodieSparkSessionExtension) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() + sc = spark.sparkContext + sc.setLogLevel("ERROR") + sqlContext = spark.sqlContext + } + + /** + * Utility method for cleaning up spark resources. + */ + def cleanupSparkContexts(): Unit = { + if (sqlContext != null) { + sqlContext.clearCache(); + sqlContext = null; + } + if (sc != null) { + sc.stop() + sc = null + } + if (spark != null) { + spark.close() + } + } + + /** + * Utility method for creating common params for writer. + * + * @param path Path for hoodie table + * @param hoodieFooTableName Name of hoodie table + * @param tableType Type of table + * @return Map of common params + */ + def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, tableType: String): Map[String, String] = { + Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.TABLE_TYPE.key -> tableType, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + } + + /** + * Utility method for converting list of Row to list of Seq. + * + * @param inputList list of Row + * @return list of Seq + */ + def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] = + JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq + + @Test + def testTableSchemaResolverInMetadataTable(): Unit = { + val schema = DataSourceTestUtils.getStructTypeExampleSchema + //create a new table + val tableName = hoodieFooTableName + val fooTableModifier = Map("path" -> tempPath.toAbsolutePath.toString, + HoodieWriteConfig.TBL_NAME.key -> tableName, + "hoodie.avro.schema" -> schema.toString(), + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator", + "hoodie.metadata.compact.max.delta.commits" -> "2", + HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key -> "true" + ) + + // generate the inserts + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val records = DataSourceTestUtils.generateRandomRows(10) + val recordsSeq = convertRowListToSeq(records) + val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1) + + // do update + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df1) + + val metadataTablePath = tempPath.toAbsolutePath.toString + "/.hoodie/metadata" + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(metadataTablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + + // Delete latest metadata table deltacommit + // Get schema from metadata table hfile format base file. + val latestInstant = metaClient.getActiveTimeline.getCommitsTimeline.getReverseOrderedInstants.findFirst() + val path = new Path(metadataTablePath + "/.hoodie", latestInstant.get().getFileName) + val fs = path.getFileSystem(new Configuration()) + fs.delete(path, false) + schemaValuationBasedOnDataFile(metaClient, HoodieMetadataRecord.getClassSchema.toString()) + } + + @ParameterizedTest + @CsvSource(Array("COPY_ON_WRITE,parquet", "COPY_ON_WRITE,orc", "COPY_ON_WRITE,hfile", + "MERGE_ON_READ,parquet", "MERGE_ON_READ,orc", "MERGE_ON_READ,hfile")) + def testTableSchemaResolver(tableType: String, baseFileFormat: String): Unit = { + val schema = DataSourceTestUtils.getStructTypeExampleSchema + + //create a new table + val tableName = hoodieFooTableName + val fooTableModifier = Map("path" -> tempPath.toAbsolutePath.toString, + HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat, + DataSourceWriteOptions.TABLE_TYPE.key -> tableType, + HoodieWriteConfig.TBL_NAME.key -> tableName, + "hoodie.avro.schema" -> schema.toString(), + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> "org.apache.hudi.keygen.SimpleKeyGenerator", + HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key -> "true" + ) + + // generate the inserts + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val records = DataSourceTestUtils.generateRandomRows(10) + val recordsSeq = convertRowListToSeq(records) + val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1) + + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tempPath.toAbsolutePath.toString) + .setConf(spark.sessionState.newHadoopConf()) + .build() + + assertTrue(new TableSchemaResolver(metaClient).isHasOperationField) + schemaValuationBasedOnDataFile(metaClient, schema.toString()) + } + + /** + * Test and valuate schema read from data file --> getTableAvroSchemaFromDataFile + * @param metaClient + * @param schemaString + */ + def schemaValuationBasedOnDataFile(metaClient: HoodieTableMetaClient, schemaString: String): Unit = { + metaClient.reloadActiveTimeline() + var tableSchemaResolverParsingException: Exception = null + try { + val schemaFromData = new TableSchemaResolver(metaClient).getTableAvroSchemaFromDataFile + val structFromData = AvroConversionUtils.convertAvroSchemaToStructType(HoodieAvroUtils.removeMetadataFields(schemaFromData)) + val schemeDesign = new Schema.Parser().parse(schemaString) + val structDesign = AvroConversionUtils.convertAvroSchemaToStructType(schemeDesign) + assertEquals(structFromData, structDesign) + } catch { + case e: Exception => tableSchemaResolverParsingException = e; + } + assert(tableSchemaResolverParsingException == null) + } +}