diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 0d67da7c0..db1ab164f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; @@ -36,7 +37,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; -import org.apache.hudi.hive.util.SchemaUtil; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; @@ -90,7 +90,7 @@ public class HoodieLogFileCommand implements CommandMarker { for (String logFilePath : logFilePaths) { FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath)); Schema writerSchema = new AvroSchemaConverter() - .convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath)))); + .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePath)))); Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); // read the avro blocks @@ -179,7 +179,7 @@ public class HoodieLogFileCommand implements CommandMarker { AvroSchemaConverter converter = new AvroSchemaConverter(); // get schema from last log file Schema readerSchema = - converter.convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1))))); + converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1))))); List allRecords = new ArrayList<>(); @@ -202,7 +202,7 @@ public class HoodieLogFileCommand implements CommandMarker { } else { for (String logFile : logFilePaths) { Schema writerSchema = new AvroSchemaConverter() - .convert(Objects.requireNonNull(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile)))); + .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new Path(logFile)))); HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema); // read the avro blocks diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 1fd34a317..c7de8dfbc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -19,6 +19,8 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; +import org.apache.avro.Schema; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -36,6 +38,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; @@ -50,9 +53,11 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieRestoreException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieSavepointException; +import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.HoodieCommitArchiveLog; @@ -166,6 +171,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD upsert(JavaRDD> records, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT); + validateSchema(table, true); setOperationType(WriteOperationType.UPSERT); HoodieWriteMetadata result = table.upsert(jsc,instantTime, records); if (result.getIndexLookupDuration().isPresent()) { @@ -185,6 +191,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED); + validateSchema(table, true); setOperationType(WriteOperationType.UPSERT_PREPPED); HoodieWriteMetadata result = table.upsertPrepped(jsc,instantTime, preppedRecords); return postWrite(result, instantTime, table); @@ -202,6 +209,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD insert(JavaRDD> records, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT); + validateSchema(table, false); setOperationType(WriteOperationType.INSERT); HoodieWriteMetadata result = table.insert(jsc,instantTime, records); return postWrite(result, instantTime, table); @@ -220,6 +228,7 @@ public class HoodieWriteClient extends AbstractHo */ public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, final String instantTime) { HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_PREPPED); + validateSchema(table, false); setOperationType(WriteOperationType.INSERT_PREPPED); HoodieWriteMetadata result = table.insertPrepped(jsc,instantTime, preppedRecords); return postWrite(result, instantTime, table); @@ -882,6 +891,8 @@ public class HoodieWriteClient extends AbstractHo metadata.addWriteStat(stat.getPartitionPath(), stat); } + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); + // Finalize write finalizeWrite(table, compactionCommitTime, updateStatusMap); @@ -919,4 +930,55 @@ public class HoodieWriteClient extends AbstractHo }); return compactionInstantTimeOpt; } -} + + /** + * Ensure that the current writerSchema is compatible with the latest schema of this dataset. + * + * When inserting/updating data, we read records using the last used schema and convert them to the + * GenericRecords with writerSchema. Hence, we need to ensure that this conversion can take place without errors. + * + * @param hoodieTable The Hoodie Table + * @param isUpsert If this is a check during upserts + * @throws HoodieUpsertException If schema check fails during upserts + * @throws HoodieInsertException If schema check fails during inserts + */ + private void validateSchema(HoodieTable hoodieTable, final boolean isUpsert) + throws HoodieUpsertException, HoodieInsertException { + + if (!getConfig().getAvroSchemaValidate()) { + // Check not required + return; + } + + boolean isValid = false; + String errorMsg = "WriterSchema is not compatible with the schema present in the Table"; + Throwable internalError = null; + Schema tableSchema = null; + Schema writerSchema = null; + try { + TableSchemaResolver schemaUtil = new TableSchemaResolver(hoodieTable.getMetaClient()); + writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); + tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata()); + isValid = schemaUtil.isSchemaCompatible(tableSchema, writerSchema); + } catch (Exception e) { + // Two error cases are possible: + // 1. There was no schema as no data has been inserted yet (first time only) + // 2. Failure in reading the schema + isValid = hoodieTable.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0; + errorMsg = "Failed to read latest schema on path " + basePath; + internalError = e; + } + + if (!isValid) { + LOG.error(errorMsg); + LOG.warn("WriterSchema: " + writerSchema); + LOG.warn("Table latest schema: " + tableSchema); + if (isUpsert) { + throw new HoodieUpsertException(errorMsg, internalError); + } else { + throw new HoodieInsertException(errorMsg, internalError); + } + } + } + +} \ No newline at end of file diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 3db17c74d..5ac87da2e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -52,6 +52,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version"; private static final String BASE_PATH_PROP = "hoodie.base.path"; private static final String AVRO_SCHEMA = "hoodie.avro.schema"; + private static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate"; + private static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false"; private static final String DEFAULT_PARALLELISM = "1500"; private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; @@ -131,6 +133,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { props.setProperty(AVRO_SCHEMA, schemaStr); } + public boolean getAvroSchemaValidate() { + return Boolean.parseBoolean(props.getProperty(AVRO_SCHEMA_VALIDATE)); + } + public String getTableName() { return props.getProperty(TABLE_NAME); } @@ -577,6 +583,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withAvroSchemaValidate(boolean enable) { + props.setProperty(AVRO_SCHEMA_VALIDATE, String.valueOf(enable)); + return this; + } + public Builder forTable(String tableName) { props.setProperty(TABLE_NAME, tableName); return this; @@ -721,6 +732,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS)); setDefaultOnCondition(props, !props.containsKey(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP), FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP, DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED); + setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build()); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 980d731d0..cc91e8961 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -191,8 +191,6 @@ public class HoodieAppendHandle extends HoodieWri return Option.empty(); } - // TODO (NA) - Perform a writerSchema check of current input record with the last writerSchema on log file - // to make sure we don't append records with older (shorter) writerSchema than already appended public void doAppend() { while (recordItr.hasNext()) { HoodieRecord record = recordItr.next(); diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 0b1506d51..08bb06e3c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -63,7 +63,7 @@ public abstract class HoodieWriteHandle extends H this.partitionPath = partitionPath; this.fileId = fileId; this.originalSchema = new Schema.Parser().parse(config.getSchema()); - this.writerSchema = createHoodieWriteSchema(originalSchema); + this.writerSchema = HoodieAvroUtils.createHoodieWriteSchema(originalSchema); this.timer = new HoodieTimer().startTimer(); this.writeStatus = (WriteStatus) ReflectionUtils.loadClass(config.getWriteStatusClassName(), !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction()); @@ -78,10 +78,6 @@ public abstract class HoodieWriteHandle extends H return FSUtils.makeWriteToken(getPartitionId(), getStageId(), getAttemptId()); } - public static Schema createHoodieWriteSchema(Schema originalSchema) { - return HoodieAvroUtils.addMetadataFields(originalSchema); - } - public Path makeNewPath(String partitionPath) { Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath); try { diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java index ee76ed362..401573527 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java @@ -341,6 +341,34 @@ public class TestHoodieClientBase extends HoodieClientTestHarness { recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1); } + /** + * Helper to insert another batch of records and do regular assertions on the state after successful completion. + * + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param writeFn Write Function to be used for insertion + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @param expTotalCommits Expected number of commits (including this commit) + * @return RDD of write-status + * @throws Exception in case of error + */ + JavaRDD insertBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, + String initCommitTime, int numRecordsInThisCommit, + Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { + final Function2, String, Integer> recordGenFunction = + generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts); + + return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit, + recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits); + } + /** * Helper to upsert batch of records and do regular assertions on the state after successful completion. * diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 7382b7e3f..089947435 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -199,15 +199,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { String recordKey = UUID.randomUUID().toString(); HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01"); HoodieRecord recordOne = - new HoodieRecord(keyOne, HoodieTestDataGenerator.generateRandomValue(keyOne, newCommitTime)); + new HoodieRecord(keyOne, dataGen.generateRandomValue(keyOne, newCommitTime)); HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01"); HoodieRecord recordTwo = - new HoodieRecord(keyTwo, HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime)); + new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime)); // Same key and partition as keyTwo HoodieRecord recordThree = - new HoodieRecord(keyTwo, HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime)); + new HoodieRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime)); JavaRDD> records = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java new file mode 100644 index 000000000..7b75d5a35 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.HoodieClientTestUtils; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.TestRawTripPayload; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.index.HoodieIndex.IndexType; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.HoodieTestDataGenerator.FARE_NESTED_SCHEMA; +import static org.apache.hudi.common.HoodieTestDataGenerator.MAP_TYPE_SCHEMA; +import static org.apache.hudi.common.HoodieTestDataGenerator.TIP_NESTED_SCHEMA; +import static org.apache.hudi.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.common.HoodieTestDataGenerator.TRIP_SCHEMA_PREFIX; +import static org.apache.hudi.common.HoodieTestDataGenerator.TRIP_SCHEMA_SUFFIX; +import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestTableSchemaEvolution extends TestHoodieClientBase { + private final String initCommitTime = "000"; + private HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE; + private HoodieTestDataGenerator dataGenEvolved = new HoodieTestDataGenerator(); + private HoodieTestDataGenerator dataGenDevolved = new HoodieTestDataGenerator(); + + public static final String EXTRA_FIELD_SCHEMA = + "{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},"; + + // TRIP_EXAMPLE_SCHEMA with a new_field added + public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX; + + // TRIP_EXAMPLE_SCHEMA with tip field removed + public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + + TRIP_SCHEMA_SUFFIX; + + @Before + public void setUp() throws Exception { + initResources(); + } + + @After + public void tearDown() { + cleanupSparkContexts(); + } + + @Test + public void testSchemaCompatibilityBasic() throws Exception { + assertTrue("Same schema is compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA)); + + String reorderedSchema = TRIP_SCHEMA_PREFIX + TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA + + MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX; + assertTrue("Reordered fields are compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema)); + assertTrue("Reordered fields are compatible", + TableSchemaResolver.isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA)); + + String renamedSchema = TRIP_EXAMPLE_SCHEMA.replace("tip_history", "tip_future"); + assertFalse("Renamed fields are not compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema)); + + assertFalse("Deleted single field is not compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_DEVOLVED)); + String deletedMultipleFieldSchema = TRIP_SCHEMA_PREFIX + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; + assertFalse("Deleted multiple fields are not compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, deletedMultipleFieldSchema)); + + String renamedRecordSchema = TRIP_EXAMPLE_SCHEMA.replace("triprec", "triprec_renamed"); + assertFalse("Renamed record name is not compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema)); + + String swappedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA.replace("city_to_state", "fare") + + FARE_NESTED_SCHEMA.replace("fare", "city_to_state") + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; + assertFalse("Swapped fields are not compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema)); + + String typeChangeSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + + TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX; + assertFalse("Field type change is not compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchema)); + + assertTrue("Added field with default is compatible (Evolved Schema)", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED)); + + String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field") + + TRIP_SCHEMA_SUFFIX; + assertTrue("Multiple added fields with defauls are compatible", + TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema)); + } + + @Test + public void testMORTable() throws Exception { + tableType = HoodieTableType.MERGE_ON_READ; + initMetaClient(); + + // Create the table + HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), + HoodieTableType.MERGE_ON_READ, metaClient.getTableConfig().getTableName(), + metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1); + + HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); + HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); + + // Initial inserts with TRIP_EXAMPLE_SCHEMA + int numRecords = 10; + insertFirstBatch(hoodieWriteConfig, client, "001", initCommitTime, + numRecords, HoodieWriteClient::insert, false, false, numRecords); + checkLatestDeltaCommit("001"); + + // Compact once so we can incrementally read later + assertTrue(client.scheduleCompactionAtInstant("002", Option.empty())); + client.compact("002"); + + // Updates with same schema is allowed + final int numUpdateRecords = 5; + updateBatch(hoodieWriteConfig, client, "003", "002", Option.empty(), + initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, false, 0, 0, 0); + checkLatestDeltaCommit("003"); + checkReadRecords("000", numRecords); + + // Delete with same schema is allowed + final int numDeleteRecords = 2; + numRecords -= numDeleteRecords; + deleteBatch(hoodieWriteConfig, client, "004", "003", initCommitTime, numDeleteRecords, + HoodieWriteClient::delete, false, false, 0, 0); + checkLatestDeltaCommit("004"); + checkReadRecords("000", numRecords); + + // Insert with evolved schema is not allowed + HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED); + client = getHoodieWriteClient(hoodieDevolvedWriteConfig, false); + final List failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED); + try { + // We cannot use insertBatch directly here because we want to insert records + // with a devolved schema and insertBatch inserts records using the TRIP_EXMPLE_SCHEMA. + writeBatch(client, "005", "004", Option.empty(), "003", numRecords, + (String s, Integer a) -> failedRecords, HoodieWriteClient::insert, false, 0, 0, 0); + fail("Insert with devolved scheme should fail"); + } catch (HoodieInsertException ex) { + // no new commit + checkLatestDeltaCommit("004"); + checkReadRecords("000", numRecords); + client.rollback("005"); + } + + // Update with devolved schema is also not allowed + try { + updateBatch(hoodieDevolvedWriteConfig, client, "005", "004", Option.empty(), + initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, false, 0, 0, 0); + fail("Update with devolved scheme should fail"); + } catch (HoodieUpsertException ex) { + // no new commit + checkLatestDeltaCommit("004"); + checkReadRecords("000", numRecords); + client.rollback("005"); + } + + // Insert with an evolved scheme is allowed + HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED); + client = getHoodieWriteClient(hoodieEvolvedWriteConfig, false); + + // We cannot use insertBatch directly here because we want to insert records + // with a evolved schemaand insertBatch inserts records using the TRIP_EXMPLE_SCHEMA. + final List evolvedRecords = generateInsertsWithSchema("005", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED); + writeBatch(client, "005", "004", Option.empty(), initCommitTime, numRecords, + (String s, Integer a) -> evolvedRecords, HoodieWriteClient::insert, false, 0, 0, 0); + + // new commit + checkLatestDeltaCommit("005"); + checkReadRecords("000", 2 * numRecords); + + // Updates with evolved schema is allowed + final List updateRecords = generateUpdatesWithSchema("006", numUpdateRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED); + writeBatch(client, "006", "005", Option.empty(), initCommitTime, + numUpdateRecords, (String s, Integer a) -> updateRecords, HoodieWriteClient::upsert, false, 0, 0, 0); + // new commit + checkLatestDeltaCommit("006"); + checkReadRecords("000", 2 * numRecords); + + // Now even the original schema cannot be used for updates as it is devolved in relation to the + // current schema of the dataset. + client = getHoodieWriteClient(hoodieWriteConfig, false); + try { + updateBatch(hoodieWriteConfig, client, "007", "006", Option.empty(), + initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, false, 0, 0, 0); + fail("Update with original scheme should fail"); + } catch (HoodieUpsertException ex) { + // no new commit + checkLatestDeltaCommit("006"); + checkReadRecords("000", 2 * numRecords); + client.rollback("007"); + } + + // Now even the original schema cannot be used for inserts as it is devolved in relation to the + // current schema of the dataset. + try { + // We are not using insertBatch directly here because insertion of these + // records will fail and we dont want to keep these records within HoodieTestDataGenerator as we + // will be testing updates later. + failedRecords.clear(); + failedRecords.addAll(dataGen.generateInserts("007", numRecords)); + writeBatch(client, "007", "006", Option.empty(), initCommitTime, numRecords, + (String s, Integer a) -> failedRecords, HoodieWriteClient::insert, true, numRecords, numRecords, 1); + fail("Insert with original scheme should fail"); + } catch (HoodieInsertException ex) { + // no new commit + checkLatestDeltaCommit("006"); + checkReadRecords("000", 2 * numRecords); + client.rollback("007"); + + // Remove the inserts from the in-memory state of HoodieTestDataGenerator + // as these records were never inserted in the dataset. This is required so + // that future calls to updateBatch or deleteBatch do not generate updates + // or deletes for records which do not even exist. + for (HoodieRecord record : failedRecords) { + assertTrue(dataGen.deleteExistingKeyIfPresent(record.getKey())); + } + } + + // Rollback to the original schema + client.restoreToInstant("004"); + checkLatestDeltaCommit("004"); + + // Updates with original schema are now allowed + client = getHoodieWriteClient(hoodieWriteConfig, false); + updateBatch(hoodieWriteConfig, client, "008", "004", Option.empty(), + initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, false, 0, 0, 0); + // new commit + checkLatestDeltaCommit("008"); + checkReadRecords("000", 2 * numRecords); + + // Insert with original schema is allowed now + insertBatch(hoodieWriteConfig, client, "009", "008", numRecords, HoodieWriteClient::insert, + false, false, 0, 0, 0); + checkLatestDeltaCommit("009"); + checkReadRecords("000", 3 * numRecords); + } + + @Test + public void testCopyOnWriteTable() throws Exception { + // Create the table + HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), + HoodieTableType.COPY_ON_WRITE, metaClient.getTableConfig().getTableName(), + metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1); + + HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); + HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); + + // Initial inserts with TRIP_EXAMPLE_SCHEMA + int numRecords = 10; + insertFirstBatch(hoodieWriteConfig, client, "001", initCommitTime, + numRecords, HoodieWriteClient::insert, false, true, numRecords); + checkReadRecords("000", numRecords); + + // Updates with same schema is allowed + final int numUpdateRecords = 5; + updateBatch(hoodieWriteConfig, client, "002", "001", Option.empty(), + initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, true, + numUpdateRecords, numRecords, 2); + checkReadRecords("000", numRecords); + + // Delete with same schema is allowed + final int numDeleteRecords = 2; + numRecords -= numDeleteRecords; + deleteBatch(hoodieWriteConfig, client, "003", "002", initCommitTime, numDeleteRecords, + HoodieWriteClient::delete, false, true, 0, numRecords); + checkReadRecords("000", numRecords); + + // Insert with devolved schema is not allowed + HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED); + client = getHoodieWriteClient(hoodieDevolvedWriteConfig, false); + final List failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED); + try { + // We cannot use insertBatch directly here because we want to insert records + // with a devolved schema. + writeBatch(client, "004", "003", Option.empty(), "003", numRecords, + (String s, Integer a) -> failedRecords, HoodieWriteClient::insert, true, numRecords, numRecords, 1); + fail("Insert with devolved scheme should fail"); + } catch (HoodieInsertException ex) { + // no new commit + HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003")); + client.rollback("004"); + } + + // Update with devolved schema is not allowed + try { + updateBatch(hoodieDevolvedWriteConfig, client, "004", "003", Option.empty(), + initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, true, + numUpdateRecords, 2 * numRecords, 5); + fail("Update with devolved scheme should fail"); + } catch (HoodieUpsertException ex) { + // no new commit + HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003")); + client.rollback("004"); + } + + // Insert with evolved scheme is allowed + HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED); + client = getHoodieWriteClient(hoodieEvolvedWriteConfig, false); + final List evolvedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED); + // We cannot use insertBatch directly here because we want to insert records + // with a evolved schema. + writeBatch(client, "004", "003", Option.empty(), initCommitTime, numRecords, + (String s, Integer a) -> evolvedRecords, HoodieWriteClient::insert, true, numRecords, 2 * numRecords, 4); + // new commit + HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("004")); + checkReadRecords("000", 2 * numRecords); + + // Updates with evolved schema is allowed + final List updateRecords = generateUpdatesWithSchema("005", numUpdateRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED); + writeBatch(client, "005", "004", Option.empty(), initCommitTime, + numUpdateRecords, (String s, Integer a) -> updateRecords, HoodieWriteClient::upsert, true, numUpdateRecords, 2 * numRecords, 5); + checkReadRecords("000", 2 * numRecords); + + // Now even the original schema cannot be used for updates as it is devolved + // in relation to the current schema of the dataset. + client = getHoodieWriteClient(hoodieWriteConfig, false); + try { + updateBatch(hoodieWriteConfig, client, "006", "005", Option.empty(), + initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, true, + numUpdateRecords, numRecords, 2); + fail("Update with original scheme should fail"); + } catch (HoodieUpsertException ex) { + // no new commit + curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("005")); + client.rollback("006"); + } + + // Now even the original schema cannot be used for inserts as it is devolved + // in relation to the current schema of the dataset. + try { + // We are not using insertBatch directly here because insertion of these + // records will fail and we dont want to keep these records within + // HoodieTestDataGenerator. + failedRecords.clear(); + failedRecords.addAll(dataGen.generateInserts("006", numRecords)); + writeBatch(client, "006", "005", Option.empty(), initCommitTime, numRecords, + (String s, Integer a) -> failedRecords, HoodieWriteClient::insert, true, numRecords, numRecords, 1); + fail("Insert with original scheme should fail"); + } catch (HoodieInsertException ex) { + // no new commit + curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("005")); + client.rollback("006"); + + // Remove the inserts from the in-memory state of HoodieTestDataGenerator + // as these records were never inserted in the dataset. This is required so + // that future calls to updateBatch or deleteBatch do not generate updates + // or deletes for records which do not even exist. + for (HoodieRecord record : failedRecords) { + assertTrue(dataGen.deleteExistingKeyIfPresent(record.getKey())); + } + } + + // Revert to the older commit and ensure that the original schema can now + // be used for inserts and inserts. + client.restoreToInstant("003"); + curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003")); + checkReadRecords("000", numRecords); + + // Insert with original schema is allowed now + insertBatch(hoodieWriteConfig, client, "007", "003", numRecords, HoodieWriteClient::insert, + false, true, numRecords, 2 * numRecords, 1); + checkReadRecords("000", 2 * numRecords); + + // Update with original schema is allowed now + updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(), + initCommitTime, numUpdateRecords, HoodieWriteClient::upsert, false, true, + numUpdateRecords, 2 * numRecords, 5); + checkReadRecords("000", 2 * numRecords); + } + + private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException { + if (tableType == HoodieTableType.COPY_ON_WRITE) { + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline(); + assertEquals(numExpectedRecords, HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, instantTime).count()); + } else { + // TODO: This code fails to read records under the following conditions: + // 1. No parquet files yet (i.e. no compaction done yet) + // 2. Log file but no base file with the same FileID + /* + FileStatus[] allFiles = HoodieTestUtils.listAllDataAndLogFilesInPath(metaClient.getFs(), basePath); + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitsTimeline(); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, allFiles); + List dataFiles = fsView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + + Configuration conf = new Configuration(); + String absTableName = "hoodie." + metaClient.getTableConfig().getTableName(); + conf.set(absTableName + ".consume.mode", "INCREMENTAL"); + conf.set(absTableName + ".consume.start.timestamp", instantTime); + conf.set(absTableName + ".consume.max.commits", "-1"); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath, conf); + assertEquals(recordsRead.size(), numExpectedRecords); + */ + } + } + + private void checkLatestDeltaCommit(String instantTime) { + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + assertTrue(timeline.lastInstant().get().getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + assertTrue(timeline.lastInstant().get().getTimestamp().equals(instantTime)); + } + + private List generateInsertsWithSchema(String commitTime, int numRecords, String schemaStr) { + HoodieTestDataGenerator gen = schemaStr.equals(TRIP_EXAMPLE_SCHEMA_EVOLVED) ? dataGenEvolved : dataGenDevolved; + List records = gen.generateInserts(commitTime, numRecords); + return convertToSchema(records, schemaStr); + } + + private List generateUpdatesWithSchema(String commitTime, int numRecords, String schemaStr) { + HoodieTestDataGenerator gen = schemaStr.equals(TRIP_EXAMPLE_SCHEMA_EVOLVED) ? dataGenEvolved : dataGenDevolved; + List records = gen.generateUniqueUpdates(commitTime, numRecords); + return convertToSchema(records, schemaStr); + } + + private List convertToSchema(List records, String schemaStr) { + Schema newSchema = new Schema.Parser().parse(schemaStr); + return records.stream().map(r -> { + HoodieKey key = r.getKey(); + GenericRecord payload; + try { + payload = (GenericRecord)r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get(); + GenericRecord newPayload = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(payload, newSchema); + return new HoodieRecord(key, new TestRawTripPayload(newPayload.toString(), key.getRecordKey(), key.getPartitionPath(), schemaStr)); + } catch (IOException e) { + throw new RuntimeException("Conversion to new schema failed"); + } + }).collect(Collectors.toList()); + } + + private HoodieWriteConfig getWriteConfig(String schema) { + return getConfigBuilder(schema) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build()) + .withAvroSchemaValidate(true) + .build(); + } + + protected HoodieTableType getTableType() { + return tableType; + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 8e9036a1f..65b567c77 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -662,6 +662,23 @@ public class HoodieTestDataGenerator { return result.stream(); } + public boolean deleteExistingKeyIfPresent(HoodieKey key) { + Map existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); + Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA); + for (Map.Entry entry: existingKeys.entrySet()) { + if (entry.getValue().key.equals(key)) { + int index = entry.getKey(); + existingKeys.put(index, existingKeys.get(numExistingKeys - 1)); + existingKeys.remove(numExistingKeys - 1); + numExistingKeys--; + numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys); + return true; + } + } + + return false; + } + public String[] getPartitionPaths() { return partitionPaths; } @@ -679,4 +696,4 @@ public class HoodieTestDataGenerator { public void close() { existingKeysBySchema.clear(); } -} +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index af44a3164..20b49e2d5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -97,6 +97,14 @@ public class HoodieAvroUtils { || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName); } + public static Schema createHoodieWriteSchema(Schema originalSchema) { + return HoodieAvroUtils.addMetadataFields(originalSchema); + } + + public static Schema createHoodieWriteSchema(String originalSchema) { + return createHoodieWriteSchema(new Schema.Parser().parse(originalSchema)); + } + /** * Adds the Hoodie metadata fields to the given schema. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java new file mode 100644 index 000000000..0bab862a6 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table; + +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.SchemaCompatibility; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.InvalidTableException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +/** + * Helper class to read schema from data files and log files and to convert it between different formats. + */ +public class TableSchemaResolver { + + private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class); + private HoodieTableMetaClient metaClient; + + public TableSchemaResolver(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + } + + /** + * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest + * commit. We will assume that the schema has not changed within a single atomic write. + * + * @return Parquet schema for this table + * @throws Exception + */ + public MessageType getDataSchema() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + try { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + // If this is COW, get the last commit and read the schema from a file written in the + // last commit + HoodieInstant lastCommit = + activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath())); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class); + String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() + .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit " + + lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :" + + commitMetadata)); + return readSchemaFromBaseFile(new Path(filePath)); + case MERGE_ON_READ: + // If this is MOR, depending on whether the latest commit is a delta commit or + // compaction commit + // Get a datafile written and get the schema from that file + Option lastCompactionCommit = + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + LOG.info("Found the last compaction commit as " + lastCompactionCommit); + + Option lastDeltaCommit; + if (lastCompactionCommit.isPresent()) { + lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() + .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant(); + } else { + lastDeltaCommit = + metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant(); + } + LOG.info("Found the last delta commit " + lastDeltaCommit); + + if (lastDeltaCommit.isPresent()) { + HoodieInstant lastDeltaInstant = lastDeltaCommit.get(); + // read from the log file wrote + commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(), + HoodieCommitMetadata.class); + Pair filePathWithFormat = + commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() + .filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny() + .map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> { + // No Log files in Delta-Commit. Check if there are any parquet files + return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() + .filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension()))) + .findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() -> + new IllegalArgumentException("Could not find any data file written for commit " + + lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath() + + ", CommitMetadata :" + commitMetadata)); + }); + switch (filePathWithFormat.getRight()) { + case HOODIE_LOG: + return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft())); + case PARQUET: + return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft())); + default: + throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight() + + " for file " + filePathWithFormat.getLeft()); + } + } else { + return readSchemaFromLastCompaction(lastCompactionCommit); + } + default: + LOG.error("Unknown table type " + metaClient.getTableType()); + throw new InvalidTableException(metaClient.getBasePath()); + } + } catch (IOException e) { + throw new HoodieException("Failed to read data schema", e); + } + } + + /** + * Gets the schema for a hoodie table in Avro format. + * + * @return Avro schema for this table + * @throws Exception + */ + public Schema getTableSchema() throws Exception { + return convertParquetSchemaToAvro(getDataSchema()); + } + + /** + * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit. + * + * @return Avro schema for this table + * @throws Exception + */ + public Schema getTableSchemaFromCommitMetadata() throws Exception { + try { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get(); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); + return new Schema.Parser().parse(existingSchemaStr); + } catch (Exception e) { + throw new HoodieException("Failed to read schema from commit metadata", e); + } + } + + /** + * Convert a parquet scheme to the avro format. + * + * @param parquetSchema The parquet schema to convert + * @return The converted avro schema + */ + public Schema convertParquetSchemaToAvro(MessageType parquetSchema) { + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf()); + return avroSchemaConverter.convert(parquetSchema); + } + + /** + * HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by + * checking if the data written using the old schema can be read using the new schema. + * + * HUDI requires a Schema to be specified in HoodieWriteConfig and is used by the HoodieWriteClient to + * create the records. The schema is also saved in the data files (parquet format) and log files (avro format). + * Since a schema is required each time new data is ingested into a HUDI dataset, schema can be evolved over time. + * + * New Schema is compatible only if: + * A1. There is no change in schema + * A2. A field has been added and it has a default value specified + * + * New Schema is incompatible if: + * B1. A field has been deleted + * B2. A field has been renamed (treated as delete + add) + * B3. A field's type has changed to be incompatible with the older type + * + * Issue with org.apache.avro.SchemaCompatibility: + * org.apache.avro.SchemaCompatibility checks schema compatibility between a writer schema (which originally wrote + * the AVRO record) and a readerSchema (with which we are reading the record). It ONLY guarantees that that each + * field in the reader record can be populated from the writer record. Hence, if the reader schema is missing a + * field, it is still compatible with the writer schema. + * + * In other words, org.apache.avro.SchemaCompatibility was written to guarantee that we can read the data written + * earlier. It does not guarantee schema evolution for HUDI (B1 above). + * + * Implementation: This function implements specific HUDI specific checks (listed below) and defers the remaining + * checks to the org.apache.avro.SchemaCompatibility code. + * + * Checks: + * C1. If there is no change in schema: success + * C2. If a field has been deleted in new schema: failure + * C3. If a field has been added in new schema: it should have default value specified + * C4. If a field has been renamed(treated as delete + add): failure + * C5. If a field type has changed: failure + * + * @param oldSchema Older schema to check. + * @param newSchema Newer schema to check. + * @return True if the schema validation is successful + */ + public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { + if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) { + // record names must match: + if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) { + return false; + } + + // Check that each field in the oldSchema can populated the newSchema + for (final Field oldSchemaField : oldSchema.getFields()) { + final Field newSchemaField = SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField); + if (newSchemaField == null) { + // C4 or C2: newSchema does not correspond to any field in the oldSchema + return false; + } else { + if (!isSchemaCompatible(oldSchemaField.schema(), newSchemaField.schema())) { + // C5: The fields do not have a compatible type + return false; + } + } + } + + // Check that new fields added in newSchema have default values as they will not be + // present in oldSchema and hence cannot be populated on reading records from existing data. + for (final Field newSchemaField : newSchema.getFields()) { + final Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField); + if (oldSchemaField == null) { + if (newSchemaField.defaultValue() == null) { + // C3: newly added field in newSchema does not have a default value + return false; + } + } + } + + // All fields in the newSchema record can be populated from the oldSchema record + return true; + } else { + // Use the checks implemented by + org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult = + org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, newSchema); + return compatResult.getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; + } + } + + public static boolean isSchemaCompatible(String oldSchema, String newSchema) { + return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema)); + } + + /** + * Read the parquet schema from a parquet File. + */ + public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException { + LOG.info("Reading schema from " + parquetFilePath); + + FileSystem fs = metaClient.getRawFs(); + if (!fs.exists(parquetFilePath)) { + throw new IllegalArgumentException( + "Failed to read schema from data file " + parquetFilePath + ". File does not exist."); + } + ParquetMetadata fileFooter = + ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, ParquetMetadataConverter.NO_FILTER); + return fileFooter.getFileMetaData().getSchema(); + } + + /** + * Read schema from a data file from the last compaction commit done. + * @throws Exception + */ + public MessageType readSchemaFromLastCompaction(Option lastCompactionCommitOpt) throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new Exception( + "Could not read schema from last compaction, no compaction commits found on path " + metaClient)); + + // Read from the compacted file wrote + HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata + .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class); + String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() + .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); + return readSchemaFromBaseFile(new Path(filePath)); + } + + /** + * Read the schema from the log file on path. + * + * @return + */ + public MessageType readSchemaFromLogFile(Path path) throws IOException { + FileSystem fs = metaClient.getRawFs(); + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); + HoodieAvroDataBlock lastBlock = null; + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + if (block instanceof HoodieAvroDataBlock) { + lastBlock = (HoodieAvroDataBlock) block; + } + } + reader.close(); + if (lastBlock != null) { + return new AvroSchemaConverter().convert(lastBlock.getSchema()); + } + return null; + } + + /** + * Read the schema from the log file on path. + * @throws Exception + */ + public MessageType readSchemaFromLogFile(Option lastCompactionCommitOpt, Path path) + throws Exception { + MessageType messageType = readSchemaFromLogFile(path); + // Fall back to read the schema from last compaction + if (messageType == null) { + LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); + return readSchemaFromLastCompaction(lastCompactionCommitOpt); + } + return messageType; + } + + /** + * Read the schema from the log file on path. + * + * @return + */ + public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); + HoodieAvroDataBlock lastBlock = null; + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + if (block instanceof HoodieAvroDataBlock) { + lastBlock = (HoodieAvroDataBlock) block; + } + } + reader.close(); + if (lastBlock != null) { + return new AvroSchemaConverter().convert(lastBlock.getSchema()); + } + return null; + } +} diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index e6572c93a..273635cd0 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -25,7 +25,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent; import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType; -import org.apache.hudi.hive.util.SchemaUtil; +import org.apache.hudi.hive.util.HiveSchemaUtil; import com.beust.jcommander.JCommander; import org.apache.hadoop.conf.Configuration; @@ -158,7 +158,7 @@ public class HiveSyncTool { } else { // Check if the table schema has evolved Map tableSchema = hoodieHiveClient.getTableSchema(tableName); - SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields); + SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields); if (!schemaDiff.isEmpty()) { LOG.info("Schema difference found for " + tableName); hoodieHiveClient.updateTableDefinition(tableName, schema); diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 55a496885..9f1a04025 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -21,18 +21,14 @@ package org.apache.hudi.hive; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.exception.InvalidTableException; -import org.apache.hudi.hive.util.SchemaUtil; +import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,9 +45,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.jdbc.HiveDriver; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.format.converter.ParquetMetadataConverter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.apache.thrift.TException; @@ -74,7 +67,7 @@ public class HoodieHiveClient { private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync"; // Make sure we have the hive JDBC driver in classpath private static String driverName = HiveDriver.class.getName(); - private static final String HIVE_ESCAPE_CHARACTER = SchemaUtil.HIVE_ESCAPE_CHARACTER; + private static final String HIVE_ESCAPE_CHARACTER = HiveSchemaUtil.HIVE_ESCAPE_CHARACTER; static { try { @@ -250,7 +243,7 @@ public class HoodieHiveClient { void updateTableDefinition(String tableName, MessageType newSchema) { try { - String newSchemaStr = SchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields); + String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, syncConfig.partitionFields); // Cascade clause should not be present for non-partitioned tables String cascadeClause = syncConfig.partitionFields.size() > 0 ? " cascade" : ""; StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER) @@ -268,7 +261,7 @@ public class HoodieHiveClient { void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) { try { String createSQLQuery = - SchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass); + HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass); LOG.info("Creating table with " + createSQLQuery); updateHiveSQL(createSQLQuery); } catch (IOException e) { @@ -340,122 +333,14 @@ public class HoodieHiveClient { * * @return Parquet schema for this table */ - @SuppressWarnings("WeakerAccess") public MessageType getDataSchema() { try { - switch (tableType) { - case COPY_ON_WRITE: - // If this is COW, get the last commit and read the schema from a file written in the - // last commit - HoodieInstant lastCommit = - activeTimeline.lastInstant().orElseThrow(() -> new InvalidTableException(syncConfig.basePath)); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class); - String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() - .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit " - + lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :" - + commitMetadata)); - return readSchemaFromBaseFile(new Path(filePath)); - case MERGE_ON_READ: - // If this is MOR, depending on whether the latest commit is a delta commit or - // compaction commit - // Get a datafile written and get the schema from that file - Option lastCompactionCommit = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); - LOG.info("Found the last compaction commit as " + lastCompactionCommit); - - Option lastDeltaCommit; - if (lastCompactionCommit.isPresent()) { - lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() - .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant(); - } else { - lastDeltaCommit = - metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant(); - } - LOG.info("Found the last delta commit " + lastDeltaCommit); - - if (lastDeltaCommit.isPresent()) { - HoodieInstant lastDeltaInstant = lastDeltaCommit.get(); - // read from the log file wrote - commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(), - HoodieCommitMetadata.class); - Pair filePathWithFormat = - commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() - .filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny() - .map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> { - // No Log files in Delta-Commit. Check if there are any parquet files - return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() - .filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension()))) - .findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() -> - new IllegalArgumentException("Could not find any data file written for commit " - + lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath() - + ", CommitMetadata :" + commitMetadata)); - }); - switch (filePathWithFormat.getRight()) { - case HOODIE_LOG: - return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft())); - case PARQUET: - return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft())); - default: - throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight() - + " for file " + filePathWithFormat.getLeft()); - } - } else { - return readSchemaFromLastCompaction(lastCompactionCommit); - } - default: - LOG.error("Unknown table type " + tableType); - throw new InvalidTableException(syncConfig.basePath); - } - } catch (IOException e) { + return new TableSchemaResolver(metaClient).getDataSchema(); + } catch (Exception e) { throw new HoodieHiveSyncException("Failed to read data schema", e); } } - /** - * Read schema from a data file from the last compaction commit done. - */ - private MessageType readSchemaFromLastCompaction(Option lastCompactionCommitOpt) throws IOException { - HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new HoodieHiveSyncException( - "Could not read schema from last compaction, no compaction commits found on path " + syncConfig.basePath)); - - // Read from the compacted file wrote - HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata - .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class); - String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() - .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " - + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); - return readSchemaFromBaseFile(new Path(filePath)); - } - - /** - * Read the schema from the log file on path. - */ - private MessageType readSchemaFromLogFile(Option lastCompactionCommitOpt, Path path) - throws IOException { - MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path); - // Fall back to read the schema from last compaction - if (messageType == null) { - LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); - return readSchemaFromLastCompaction(lastCompactionCommitOpt); - } - return messageType; - } - - /** - * Read the parquet schema from a parquet File. - */ - private MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException { - LOG.info("Reading schema from " + parquetFilePath); - if (!fs.exists(parquetFilePath)) { - throw new IllegalArgumentException( - "Failed to read schema from data file " + parquetFilePath + ". File does not exist."); - } - ParquetMetadata fileFooter = - ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, ParquetMetadataConverter.NO_FILTER); - return fileFooter.getFileMetaData().getSchema(); - } - /** * @return true if the configured table exists */ diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java similarity index 93% rename from hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java rename to hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 499f2a732..7fd64bd8b 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java +++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -18,20 +18,12 @@ package org.apache.hudi.hive.util; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.SchemaDifference; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; @@ -50,9 +42,9 @@ import java.util.Set; /** * Schema Utilities. */ -public class SchemaUtil { +public class HiveSchemaUtil { - private static final Logger LOG = LogManager.getLogger(SchemaUtil.class); + private static final Logger LOG = LogManager.getLogger(HiveSchemaUtil.class); public static final String HIVE_ESCAPE_CHARACTER = "`"; /** @@ -424,25 +416,4 @@ public class SchemaUtil { // Dont do that return "String"; } - - /** - * Read the schema from the log file on path. - * - * @return - */ - public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { - Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); - HoodieAvroDataBlock lastBlock = null; - while (reader.hasNext()) { - HoodieLogBlock block = reader.next(); - if (block instanceof HoodieAvroDataBlock) { - lastBlock = (HoodieAvroDataBlock) block; - } - } - reader.close(); - if (lastBlock != null) { - return new AvroSchemaConverter().convert(lastBlock.getSchema()); - } - return null; - } } diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index d92c9d96a..14dfada26 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SchemaTestUtil; import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent; import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType; -import org.apache.hudi.hive.util.SchemaUtil; +import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.parquet.schema.MessageType; @@ -79,7 +79,7 @@ public class TestHiveSyncTool { .optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list") .named("ArrayOfInts"); - String schemaString = SchemaUtil.generateSchemaString(schema); + String schemaString = HiveSchemaUtil.generateSchemaString(schema); assertEquals("`int_list` ARRAY< int>", schemaString); // A array of arrays @@ -87,14 +87,14 @@ public class TestHiveSyncTool { .as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element") .named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts"); - schemaString = SchemaUtil.generateSchemaString(schema); + schemaString = HiveSchemaUtil.generateSchemaString(schema); assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString); // A list of integers schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32) .named("element").named("int_list").named("ArrayOfInts"); - schemaString = SchemaUtil.generateSchemaString(schema); + schemaString = HiveSchemaUtil.generateSchemaString(schema); assertEquals("`int_list` ARRAY< int>", schemaString); // A list of structs with two fields @@ -102,7 +102,7 @@ public class TestHiveSyncTool { .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32) .named("num").named("element").named("tuple_list").named("ArrayOfTuples"); - schemaString = SchemaUtil.generateSchemaString(schema); + schemaString = HiveSchemaUtil.generateSchemaString(schema); assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString); // A list of structs with a single field @@ -112,7 +112,7 @@ public class TestHiveSyncTool { .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list") .named("ArrayOfOneTuples"); - schemaString = SchemaUtil.generateSchemaString(schema); + schemaString = HiveSchemaUtil.generateSchemaString(schema); assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString); // A list of structs with a single field @@ -122,7 +122,7 @@ public class TestHiveSyncTool { .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple") .named("one_tuple_list").named("ArrayOfOneTuples2"); - schemaString = SchemaUtil.generateSchemaString(schema); + schemaString = HiveSchemaUtil.generateSchemaString(schema); assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString); // A list of structs with a single field @@ -132,7 +132,7 @@ public class TestHiveSyncTool { .required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list") .named("ArrayOfOneTuples3"); - schemaString = SchemaUtil.generateSchemaString(schema); + schemaString = HiveSchemaUtil.generateSchemaString(schema); assertEquals("`one_tuple_list` ARRAY< binary>", schemaString); // A list of maps @@ -141,7 +141,7 @@ public class TestHiveSyncTool { .as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value") .named("key_value").named("array").named("map_list").named("ArrayOfMaps"); - schemaString = SchemaUtil.generateSchemaString(schema); + schemaString = HiveSchemaUtil.generateSchemaString(schema); assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString); }