1
0

[HUDI-3522] Introduce DropColumnSchemaPostProcessor to support drop columns from schema (#4972)

* [HUDI-3522] Introduce DropColumnSchemaPostProcessor to support drop columns from schema

* Fix case sensitivity
This commit is contained in:
wangxianghu
2022-03-11 09:30:37 +04:00
committed by GitHub
parent 9dc6df5dca
commit 83cff3afee
2 changed files with 113 additions and 0 deletions

View File

@@ -19,7 +19,9 @@
package org.apache.hudi.utilities;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
import org.apache.hudi.utilities.schema.DeleteSupportSchemaPostProcessor;
import org.apache.hudi.utilities.schema.DropColumnSchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor;
import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -29,6 +31,7 @@ import org.apache.hudi.utilities.transform.FlatteningTransformer;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -116,6 +119,28 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
assertNotNull(targetSchema.getField("testString"));
}
@Test
public void testDeleteColumn() {
// remove column ums_id_ from source schema
properties.put(DropColumnSchemaPostProcessor.Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP, "ums_id_");
DropColumnSchemaPostProcessor processor = new DropColumnSchemaPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Schema targetSchema = processor.processSchema(schema);
assertNull(targetSchema.getField("ums_id_"));
assertNotNull(targetSchema.getField("ums_ts_"));
}
@Test
public void testDeleteColumnThrows() {
// remove all columns from source schema
properties.put(DropColumnSchemaPostProcessor.Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP, "ums_id_,ums_ts_");
DropColumnSchemaPostProcessor processor = new DropColumnSchemaPostProcessor(properties, null);
Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
Assertions.assertThrows(HoodieSchemaPostProcessException.class, () -> processor.processSchema(schema));
}
@Test
public void testSparkAvroSchema() throws IOException {
SparkAvroPostProcessor processor = new SparkAvroPostProcessor(properties, null);