diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 990441139..62509e429 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -535,7 +535,7 @@ public abstract class HoodieTable implements Seri try { TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient()); writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); - tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata()); + tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields()); isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema); } catch (Exception e) { throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index c3a9d963e..d56b7d92d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -171,6 +171,16 @@ public class HoodieAvroUtils { return mergedSchema; } + public static Schema removeMetadataFields(Schema schema) { + List filteredFields = schema.getFields() + .stream() + .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name())) + .collect(Collectors.toList()); + Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false); + filteredSchema.setFields(filteredFields); + return filteredSchema; + } + public static String addMetadataColumnTypes(String hiveColumnTypes) { return "string,string,string,string,string," + hiveColumnTypes; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 0bab862a6..129f85fa8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Field; import org.apache.avro.SchemaCompatibility; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; @@ -36,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; @@ -66,7 +68,7 @@ public class TableSchemaResolver { * @return Parquet schema for this table * @throws Exception */ - public MessageType getDataSchema() throws Exception { + private MessageType getTableParquetSchemaFromDataFile() throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); try { @@ -139,29 +141,66 @@ public class TableSchemaResolver { } } + private Schema getTableAvroSchemaFromDataFile() throws Exception { + return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile()); + } + /** - * Gets the schema for a hoodie table in Avro format. + * Gets full schema (user + metadata) for a hoodie table in Avro format. * * @return Avro schema for this table * @throws Exception */ - public Schema getTableSchema() throws Exception { - return convertParquetSchemaToAvro(getDataSchema()); + public Schema getTableAvroSchema() throws Exception { + Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true); + return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile(); + } + + /** + * Gets full schema (user + metadata) for a hoodie table in Parquet format. + * + * @return Parquet schema for the table + * @throws Exception + */ + public MessageType getTableParquetSchema() throws Exception { + Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true); + return schemaFromCommitMetadata.isPresent() ? convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) : + getTableParquetSchemaFromDataFile(); + } + + /** + * Gets users data schema for a hoodie table in Avro format. + * + * @return Avro user data schema + * @throws Exception + */ + public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { + Option schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(false); + return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : + HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); } /** * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit. * * @return Avro schema for this table - * @throws Exception */ - public Schema getTableSchemaFromCommitMetadata() throws Exception { + private Option getTableSchemaFromCommitMetadata(boolean includeMetadataFields) { try { HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get(); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); - return new Schema.Parser().parse(existingSchemaStr); + + if (StringUtils.isNullOrEmpty(existingSchemaStr)) { + return Option.empty(); + } + + Schema schema = new Schema.Parser().parse(existingSchemaStr); + if (includeMetadataFields) { + schema = HoodieAvroUtils.addMetadataFields(schema); + } + return Option.of(schema); } catch (Exception e) { throw new HoodieException("Failed to read schema from commit metadata", e); } @@ -178,6 +217,17 @@ public class TableSchemaResolver { return avroSchemaConverter.convert(parquetSchema); } + /** + * Convert a avro scheme to the parquet format. + * + * @param schema The avro schema to convert + * @return The converted parquet schema + */ + public MessageType convertAvroSchemaToParquet(Schema schema) { + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf()); + return avroSchemaConverter.convert(schema); + } + /** * HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by * checking if the data written using the old schema can be read using the new schema. diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 9f1a04025..f1034e3cd 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -328,14 +328,15 @@ public class HoodieHiveClient { } /** - * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest - * commit. We will assume that the schema has not changed within a single atomic write. + * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if + * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has + * not changed within a single atomic write. * * @return Parquet schema for this table */ public MessageType getDataSchema() { try { - return new TableSchemaResolver(metaClient).getDataSchema(); + return new TableSchemaResolver(metaClient).getTableParquetSchema(); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to read data schema", e); } diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 14dfada26..a8837575b 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -18,6 +18,7 @@ package org.apache.hudi.hive; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent; @@ -52,6 +53,10 @@ public class TestHiveSyncTool { return Stream.of(false, true); } + private static Iterable useJdbcAndSchemaFromCommitMetadata() { + return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } }); + } + @BeforeEach public void setUp() throws IOException, InterruptedException { TestUtil.setUp(); @@ -146,11 +151,11 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testBasicSync(boolean useJdbc) throws Exception { + @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) + public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { TestUtil.hiveSyncConfig.useJdbc = useJdbc; String instantTime = "100"; - TestUtil.createCOWTable(instantTime, 5); + TestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata); HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); assertFalse(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName), @@ -214,7 +219,7 @@ public class TestHiveSyncTool { public void testSyncIncremental(boolean useJdbc) throws Exception { TestUtil.hiveSyncConfig.useJdbc = useJdbc; String commitTime1 = "100"; - TestUtil.createCOWTable(commitTime1, 5); + TestUtil.createCOWTable(commitTime1, 5, true); HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); // Lets do the sync @@ -228,7 +233,7 @@ public class TestHiveSyncTool { // Now lets create more parititions and these are the only ones which needs to be synced DateTime dateTime = DateTime.now().plusDays(6); String commitTime2 = "101"; - TestUtil.addCOWPartitions(1, true, dateTime, commitTime2); + TestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2); // Lets do the sync hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -253,7 +258,7 @@ public class TestHiveSyncTool { public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception { TestUtil.hiveSyncConfig.useJdbc = useJdbc; String commitTime1 = "100"; - TestUtil.createCOWTable(commitTime1, 5); + TestUtil.createCOWTable(commitTime1, 5, true); HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); // Lets do the sync @@ -265,7 +270,7 @@ public class TestHiveSyncTool { // Now lets create more parititions and these are the only ones which needs to be synced DateTime dateTime = DateTime.now().plusDays(6); String commitTime2 = "101"; - TestUtil.addCOWPartitions(1, false, dateTime, commitTime2); + TestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2); // Lets do the sync tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -286,12 +291,13 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testSyncMergeOnRead(boolean useJdbc) throws Exception { + @MethodSource("useJdbcAndSchemaFromCommitMetadata") + public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { TestUtil.hiveSyncConfig.useJdbc = useJdbc; String instantTime = "100"; String deltaCommitTime = "101"; - TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true); + TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, + useSchemaFromCommitMetadata); String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE; HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -301,8 +307,19 @@ public class TestHiveSyncTool { tool.syncHoodieTable(); assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes"); - assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1, - "Hive Schema should match the table schema + partition field"); + + if (useSchemaFromCommitMetadata) { + assertEquals(hiveClient.getTableSchema(roTableName).size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size() + + HoodieRecord.HOODIE_META_COLUMNS.size(), + "Hive Schema should match the table schema + partition field"); + } else { + // The data generated and schema in the data file do not have metadata columns, so we need a separate check. + assertEquals(hiveClient.getTableSchema(roTableName).size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(), + "Hive Schema should match the table schema + partition field"); + } + assertEquals(5, hiveClient.scanTablePartitions(roTableName).size(), "Table partitions should match the number of partitions we wrote"); assertEquals(deltaCommitTime, hiveClient.getLastCommitTimeSynced(roTableName).get(), @@ -313,15 +330,25 @@ public class TestHiveSyncTool { String commitTime2 = "102"; String deltaCommitTime2 = "103"; - TestUtil.addCOWPartitions(1, true, dateTime, commitTime2); - TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); + TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2); + TestUtil.addMORPartitions(1, true, false, + useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2); // Lets do the sync tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); tool.syncHoodieTable(); hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); - assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1, - "Hive Schema should match the evolved table schema + partition field"); + if (useSchemaFromCommitMetadata) { + assertEquals(hiveClient.getTableSchema(roTableName).size(), + SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size() + + HoodieRecord.HOODIE_META_COLUMNS.size(), + "Hive Schema should match the evolved table schema + partition field"); + } else { + // The data generated and schema in the data file do not have metadata columns, so we need a separate check. + assertEquals(hiveClient.getTableSchema(roTableName).size(), + SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(), + "Hive Schema should match the evolved table schema + partition field"); + } // Sync should add the one partition assertEquals(6, hiveClient.scanTablePartitions(roTableName).size(), "The 2 partitions we wrote should be added to hive"); @@ -330,13 +357,13 @@ public class TestHiveSyncTool { } @ParameterizedTest - @MethodSource("useJdbc") - public void testSyncMergeOnReadRT(boolean useJdbc) throws Exception { + @MethodSource("useJdbcAndSchemaFromCommitMetadata") + public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { TestUtil.hiveSyncConfig.useJdbc = useJdbc; String instantTime = "100"; String deltaCommitTime = "101"; String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; - TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true); + TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata); HoodieHiveClient hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -352,8 +379,18 @@ public class TestHiveSyncTool { "Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should exist after sync completes"); - assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1, - "Hive Schema should match the table schema + partition field"); + if (useSchemaFromCommitMetadata) { + assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size() + + HoodieRecord.HOODIE_META_COLUMNS.size(), + "Hive Schema should match the table schema + partition field"); + } else { + // The data generated and schema in the data file do not have metadata columns, so we need a separate check. + assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(), + "Hive Schema should match the table schema + partition field"); + } + assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote"); assertEquals(deltaCommitTime, hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(), @@ -364,15 +401,24 @@ public class TestHiveSyncTool { String commitTime2 = "102"; String deltaCommitTime2 = "103"; - TestUtil.addCOWPartitions(1, true, dateTime, commitTime2); - TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); + TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2); + TestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2); // Lets do the sync tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); tool.syncHoodieTable(); hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); - assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1, - "Hive Schema should match the evolved table schema + partition field"); + if (useSchemaFromCommitMetadata) { + assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), + SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size() + + HoodieRecord.HOODIE_META_COLUMNS.size(), + "Hive Schema should match the evolved table schema + partition field"); + } else { + // The data generated and schema in the data file do not have metadata columns, so we need a separate check. + assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), + SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(), + "Hive Schema should match the evolved table schema + partition field"); + } // Sync should add the one partition assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 2 partitions we wrote should be added to hive"); @@ -385,7 +431,7 @@ public class TestHiveSyncTool { public void testMultiPartitionKeySync(boolean useJdbc) throws Exception { TestUtil.hiveSyncConfig.useJdbc = useJdbc; String instantTime = "100"; - TestUtil.createCOWTable(instantTime, 5); + TestUtil.createCOWTable(instantTime, 5, true); HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(TestUtil.hiveSyncConfig); hiveSyncConfig.partitionValueExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName(); @@ -416,7 +462,7 @@ public class TestHiveSyncTool { TestUtil.hiveSyncConfig.useJdbc = useJdbc; String commitTime = "100"; String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; - TestUtil.createMORTable(commitTime, "", 5, false); + TestUtil.createMORTable(commitTime, "", 5, false, true); HoodieHiveClient hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -431,7 +477,9 @@ public class TestHiveSyncTool { + " should exist after sync completes"); // Schema being read from compacted base files - assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1, + assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size() + + HoodieRecord.HOODIE_META_COLUMNS.size(), "Hive Schema should match the table schema + partition field"); assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote"); @@ -440,14 +488,16 @@ public class TestHiveSyncTool { String commitTime2 = "102"; String deltaCommitTime2 = "103"; - TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); + TestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, deltaCommitTime2); // Lets do the sync tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); tool.syncHoodieTable(); hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); // Schema being read from the log files - assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1, + assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), + SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size() + + HoodieRecord.HOODIE_META_COLUMNS.size(), "Hive Schema should match the evolved table schema + partition field"); // Sync should add the one partition assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 1 partition we wrote should be added to hive"); diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java index 86c78fb78..960a01001 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java @@ -155,7 +155,7 @@ public class TestUtil { } } - static void createCOWTable(String instantTime, int numberOfPartitions) + static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata) throws IOException, URISyntaxException { Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); @@ -164,13 +164,15 @@ public class TestUtil { boolean result = fileSystem.mkdirs(path); checkResult(result); DateTime dateTime = DateTime.now(); - HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, instantTime); + HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, + useSchemaFromCommitMetadata, dateTime, instantTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createCommitFile(commitMetadata, instantTime); } static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, - boolean createDeltaCommit) throws IOException, URISyntaxException, InterruptedException { + boolean createDeltaCommit, boolean useSchemaFromCommitMetadata) + throws IOException, URISyntaxException, InterruptedException { Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ, @@ -179,46 +181,54 @@ public class TestUtil { boolean result = fileSystem.mkdirs(path); checkResult(result); DateTime dateTime = DateTime.now(); - HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime); + HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, + useSchemaFromCommitMetadata, dateTime, commitTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); commitMetadata.getPartitionToWriteStats() .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l))); + addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY), + useSchemaFromCommitMetadata); createCompactionCommitFile(compactionMetadata, commitTime); if (createDeltaCommit) { // Write a delta commit - HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true); + HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true, + useSchemaFromCommitMetadata); createDeltaCommitFile(deltaMetadata, deltaCommitTime); } } - static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom, - String instantTime) throws IOException, URISyntaxException { + static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = - createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, instantTime); + createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createCommitFile(commitMetadata, instantTime); } static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, - DateTime startFrom, String instantTime, String deltaCommitTime) + boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) throws IOException, URISyntaxException, InterruptedException { - HoodieCommitMetadata commitMetadata = - createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, instantTime); + HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, + useSchemaFromCommitMetadata, startFrom, instantTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); commitMetadata.getPartitionToWriteStats() .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l))); + addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY), + useSchemaFromCommitMetadata); createCompactionCommitFile(compactionMetadata, instantTime); - HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple); + HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple, + useSchemaFromCommitMetadata); createDeltaCommitFile(deltaMetadata, deltaCommitTime); } private static HoodieCommitMetadata createLogFiles(Map> partitionWriteStats, - boolean isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException { + boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata) + throws InterruptedException, IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); for (Entry> wEntry : partitionWriteStats.entrySet()) { String partitionPath = wEntry.getKey(); @@ -232,11 +242,12 @@ public class TestUtil { commitMetadata.addWriteStat(partitionPath, writeStat); } } + addSchemaToCommitMetadata(commitMetadata, isLogSchemaSimple, useSchemaFromCommitMetadata); return commitMetadata; } private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, - DateTime startFrom, String instantTime) throws IOException, URISyntaxException { + boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException { startFrom = startFrom.withTimeAtStartOfDay(); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); @@ -249,6 +260,7 @@ public class TestUtil { startFrom = startFrom.minusDays(1); writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); } + addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata); return commitMetadata; } @@ -271,7 +283,7 @@ public class TestUtil { @SuppressWarnings({"unchecked", "deprecation"}) private static void generateParquetData(Path filePath, boolean isParquetSchemaSimple) throws IOException, URISyntaxException { - Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema()); + Schema schema = getTestDataSchema(isParquetSchemaSimple); org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1, BloomFilterTypeCode.SIMPLE.name()); @@ -294,7 +306,7 @@ public class TestUtil { private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple) throws IOException, InterruptedException, URISyntaxException { - Schema schema = (isLogSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema()); + Schema schema = getTestDataSchema(isLogSchemaSimple); HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(parquetFilePath)); // Write a log file for this parquet file Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent()) @@ -311,6 +323,25 @@ public class TestUtil { return logWriter.getLogFile(); } + private static Schema getTestDataSchema(boolean isSimpleSchema) throws IOException { + return isSimpleSchema ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema(); + } + + private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, boolean isSimpleSchema, + boolean useSchemaFromCommitMetadata) throws IOException { + if (useSchemaFromCommitMetadata) { + Schema dataSchema = getTestDataSchema(isSimpleSchema); + commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, dataSchema.toString()); + } + } + + private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, String schema, + boolean useSchemaFromCommitMetadata) { + if (useSchemaFromCommitMetadata) { + commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema); + } + } + private static void checkResult(boolean result) { if (!result) { throw new JUnitException("Could not initialize");