[HUDI-741] Added checks to validate Hoodie's schema evolution.
HUDI specific validation of schema evolution should ensure that a newer schema can be used for the dataset by checking that the data written using the old schema can be read using the new schema. Code changes: 1. Added a new config in HoodieWriteConfig to enable schema validation check (disabled by default) 2. Moved code that reads schema from base/log files into hudi-common from hudi-hive-sync 3. Added writerSchema to the extraMetadata of compaction commits in MOR table. This is same as that for commits on COW table. Testing changes: 4. Extended TestHoodieClientBase to add insertBatch API which allows inserting a new batch of unique records into a HUDI table 5. Added a unit test to verify schema evolution for both COW and MOR tables. 6. Added unit tests for schema compatiblity checks.
This commit is contained in:
@@ -97,6 +97,14 @@ public class HoodieAvroUtils {
|
||||
|| HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName);
|
||||
}
|
||||
|
||||
public static Schema createHoodieWriteSchema(Schema originalSchema) {
|
||||
return HoodieAvroUtils.addMetadataFields(originalSchema);
|
||||
}
|
||||
|
||||
public static Schema createHoodieWriteSchema(String originalSchema) {
|
||||
return createHoodieWriteSchema(new Schema.Parser().parse(originalSchema));
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the Hoodie metadata fields to the given schema.
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,360 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.table;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.SchemaCompatibility;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
|
||||
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.InvalidTableException;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.format.converter.ParquetMetadataConverter;
|
||||
import org.apache.parquet.hadoop.ParquetFileReader;
|
||||
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
/**
|
||||
* Helper class to read schema from data files and log files and to convert it between different formats.
|
||||
*/
|
||||
public class TableSchemaResolver {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class);
|
||||
private HoodieTableMetaClient metaClient;
|
||||
|
||||
public TableSchemaResolver(HoodieTableMetaClient metaClient) {
|
||||
this.metaClient = metaClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest
|
||||
* commit. We will assume that the schema has not changed within a single atomic write.
|
||||
*
|
||||
* @return Parquet schema for this table
|
||||
* @throws Exception
|
||||
*/
|
||||
public MessageType getDataSchema() throws Exception {
|
||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||
|
||||
try {
|
||||
switch (metaClient.getTableType()) {
|
||||
case COPY_ON_WRITE:
|
||||
// If this is COW, get the last commit and read the schema from a file written in the
|
||||
// last commit
|
||||
HoodieInstant lastCommit =
|
||||
activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
|
||||
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
|
||||
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit "
|
||||
+ lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :"
|
||||
+ commitMetadata));
|
||||
return readSchemaFromBaseFile(new Path(filePath));
|
||||
case MERGE_ON_READ:
|
||||
// If this is MOR, depending on whether the latest commit is a delta commit or
|
||||
// compaction commit
|
||||
// Get a datafile written and get the schema from that file
|
||||
Option<HoodieInstant> lastCompactionCommit =
|
||||
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
|
||||
LOG.info("Found the last compaction commit as " + lastCompactionCommit);
|
||||
|
||||
Option<HoodieInstant> lastDeltaCommit;
|
||||
if (lastCompactionCommit.isPresent()) {
|
||||
lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
|
||||
.findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant();
|
||||
} else {
|
||||
lastDeltaCommit =
|
||||
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
|
||||
}
|
||||
LOG.info("Found the last delta commit " + lastDeltaCommit);
|
||||
|
||||
if (lastDeltaCommit.isPresent()) {
|
||||
HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
|
||||
// read from the log file wrote
|
||||
commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
|
||||
HoodieCommitMetadata.class);
|
||||
Pair<String, HoodieFileFormat> filePathWithFormat =
|
||||
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
|
||||
.filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
|
||||
.map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
|
||||
// No Log files in Delta-Commit. Check if there are any parquet files
|
||||
return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
|
||||
.filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
|
||||
.findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() ->
|
||||
new IllegalArgumentException("Could not find any data file written for commit "
|
||||
+ lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()
|
||||
+ ", CommitMetadata :" + commitMetadata));
|
||||
});
|
||||
switch (filePathWithFormat.getRight()) {
|
||||
case HOODIE_LOG:
|
||||
return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft()));
|
||||
case PARQUET:
|
||||
return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft()));
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight()
|
||||
+ " for file " + filePathWithFormat.getLeft());
|
||||
}
|
||||
} else {
|
||||
return readSchemaFromLastCompaction(lastCompactionCommit);
|
||||
}
|
||||
default:
|
||||
LOG.error("Unknown table type " + metaClient.getTableType());
|
||||
throw new InvalidTableException(metaClient.getBasePath());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieException("Failed to read data schema", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the schema for a hoodie table in Avro format.
|
||||
*
|
||||
* @return Avro schema for this table
|
||||
* @throws Exception
|
||||
*/
|
||||
public Schema getTableSchema() throws Exception {
|
||||
return convertParquetSchemaToAvro(getDataSchema());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit.
|
||||
*
|
||||
* @return Avro schema for this table
|
||||
* @throws Exception
|
||||
*/
|
||||
public Schema getTableSchemaFromCommitMetadata() throws Exception {
|
||||
try {
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get();
|
||||
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
|
||||
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
|
||||
return new Schema.Parser().parse(existingSchemaStr);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Failed to read schema from commit metadata", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a parquet scheme to the avro format.
|
||||
*
|
||||
* @param parquetSchema The parquet schema to convert
|
||||
* @return The converted avro schema
|
||||
*/
|
||||
public Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
|
||||
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf());
|
||||
return avroSchemaConverter.convert(parquetSchema);
|
||||
}
|
||||
|
||||
/**
|
||||
* HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by
|
||||
* checking if the data written using the old schema can be read using the new schema.
|
||||
*
|
||||
* HUDI requires a Schema to be specified in HoodieWriteConfig and is used by the HoodieWriteClient to
|
||||
* create the records. The schema is also saved in the data files (parquet format) and log files (avro format).
|
||||
* Since a schema is required each time new data is ingested into a HUDI dataset, schema can be evolved over time.
|
||||
*
|
||||
* New Schema is compatible only if:
|
||||
* A1. There is no change in schema
|
||||
* A2. A field has been added and it has a default value specified
|
||||
*
|
||||
* New Schema is incompatible if:
|
||||
* B1. A field has been deleted
|
||||
* B2. A field has been renamed (treated as delete + add)
|
||||
* B3. A field's type has changed to be incompatible with the older type
|
||||
*
|
||||
* Issue with org.apache.avro.SchemaCompatibility:
|
||||
* org.apache.avro.SchemaCompatibility checks schema compatibility between a writer schema (which originally wrote
|
||||
* the AVRO record) and a readerSchema (with which we are reading the record). It ONLY guarantees that that each
|
||||
* field in the reader record can be populated from the writer record. Hence, if the reader schema is missing a
|
||||
* field, it is still compatible with the writer schema.
|
||||
*
|
||||
* In other words, org.apache.avro.SchemaCompatibility was written to guarantee that we can read the data written
|
||||
* earlier. It does not guarantee schema evolution for HUDI (B1 above).
|
||||
*
|
||||
* Implementation: This function implements specific HUDI specific checks (listed below) and defers the remaining
|
||||
* checks to the org.apache.avro.SchemaCompatibility code.
|
||||
*
|
||||
* Checks:
|
||||
* C1. If there is no change in schema: success
|
||||
* C2. If a field has been deleted in new schema: failure
|
||||
* C3. If a field has been added in new schema: it should have default value specified
|
||||
* C4. If a field has been renamed(treated as delete + add): failure
|
||||
* C5. If a field type has changed: failure
|
||||
*
|
||||
* @param oldSchema Older schema to check.
|
||||
* @param newSchema Newer schema to check.
|
||||
* @return True if the schema validation is successful
|
||||
*/
|
||||
public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) {
|
||||
if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) {
|
||||
// record names must match:
|
||||
if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check that each field in the oldSchema can populated the newSchema
|
||||
for (final Field oldSchemaField : oldSchema.getFields()) {
|
||||
final Field newSchemaField = SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField);
|
||||
if (newSchemaField == null) {
|
||||
// C4 or C2: newSchema does not correspond to any field in the oldSchema
|
||||
return false;
|
||||
} else {
|
||||
if (!isSchemaCompatible(oldSchemaField.schema(), newSchemaField.schema())) {
|
||||
// C5: The fields do not have a compatible type
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check that new fields added in newSchema have default values as they will not be
|
||||
// present in oldSchema and hence cannot be populated on reading records from existing data.
|
||||
for (final Field newSchemaField : newSchema.getFields()) {
|
||||
final Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField);
|
||||
if (oldSchemaField == null) {
|
||||
if (newSchemaField.defaultValue() == null) {
|
||||
// C3: newly added field in newSchema does not have a default value
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// All fields in the newSchema record can be populated from the oldSchema record
|
||||
return true;
|
||||
} else {
|
||||
// Use the checks implemented by
|
||||
org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult =
|
||||
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, newSchema);
|
||||
return compatResult.getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isSchemaCompatible(String oldSchema, String newSchema) {
|
||||
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the parquet schema from a parquet File.
|
||||
*/
|
||||
public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException {
|
||||
LOG.info("Reading schema from " + parquetFilePath);
|
||||
|
||||
FileSystem fs = metaClient.getRawFs();
|
||||
if (!fs.exists(parquetFilePath)) {
|
||||
throw new IllegalArgumentException(
|
||||
"Failed to read schema from data file " + parquetFilePath + ". File does not exist.");
|
||||
}
|
||||
ParquetMetadata fileFooter =
|
||||
ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, ParquetMetadataConverter.NO_FILTER);
|
||||
return fileFooter.getFileMetaData().getSchema();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read schema from a data file from the last compaction commit done.
|
||||
* @throws Exception
|
||||
*/
|
||||
public MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws Exception {
|
||||
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||
|
||||
HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new Exception(
|
||||
"Could not read schema from last compaction, no compaction commits found on path " + metaClient));
|
||||
|
||||
// Read from the compacted file wrote
|
||||
HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
|
||||
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class);
|
||||
String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
|
||||
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction "
|
||||
+ lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath()));
|
||||
return readSchemaFromBaseFile(new Path(filePath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the schema from the log file on path.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public MessageType readSchemaFromLogFile(Path path) throws IOException {
|
||||
FileSystem fs = metaClient.getRawFs();
|
||||
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
|
||||
HoodieAvroDataBlock lastBlock = null;
|
||||
while (reader.hasNext()) {
|
||||
HoodieLogBlock block = reader.next();
|
||||
if (block instanceof HoodieAvroDataBlock) {
|
||||
lastBlock = (HoodieAvroDataBlock) block;
|
||||
}
|
||||
}
|
||||
reader.close();
|
||||
if (lastBlock != null) {
|
||||
return new AvroSchemaConverter().convert(lastBlock.getSchema());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the schema from the log file on path.
|
||||
* @throws Exception
|
||||
*/
|
||||
public MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path)
|
||||
throws Exception {
|
||||
MessageType messageType = readSchemaFromLogFile(path);
|
||||
// Fall back to read the schema from last compaction
|
||||
if (messageType == null) {
|
||||
LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
|
||||
return readSchemaFromLastCompaction(lastCompactionCommitOpt);
|
||||
}
|
||||
return messageType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the schema from the log file on path.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException {
|
||||
Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null);
|
||||
HoodieAvroDataBlock lastBlock = null;
|
||||
while (reader.hasNext()) {
|
||||
HoodieLogBlock block = reader.next();
|
||||
if (block instanceof HoodieAvroDataBlock) {
|
||||
lastBlock = (HoodieAvroDataBlock) block;
|
||||
}
|
||||
}
|
||||
reader.close();
|
||||
if (lastBlock != null) {
|
||||
return new AvroSchemaConverter().convert(lastBlock.getSchema());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user