1
0

[HUDI-2093] Fix empty avro schema path caused by duplicate parameters (#3177)

* [HUDI-2093] Fix empty avro schema path caused by duplicate parameters

* rename shcmea option key

* fix doc

* rename var name
This commit is contained in:
wangxianghu
2021-07-06 15:14:30 +08:00
committed by GitHub
parent 60e0254e67
commit f2621da32f
8 changed files with 33 additions and 39 deletions

View File

@@ -106,17 +106,17 @@ public class FlinkOptions {
.defaultValue(4)
.withDescription("Parallelism of tasks that do actual read, default is 4");
public static final ConfigOption<String> READ_AVRO_SCHEMA_PATH = ConfigOptions
.key("read.avro-schema.path")
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA_PATH = ConfigOptions
.key("source.avro-schema.path")
.stringType()
.noDefaultValue()
.withDescription("Avro schema file path, the parsed schema is used for deserialization");
.withDescription("Source avro schema file path, the parsed schema is used for deserialization");
public static final ConfigOption<String> READ_AVRO_SCHEMA = ConfigOptions
.key("read.avro-schema")
public static final ConfigOption<String> SOURCE_AVRO_SCHEMA = ConfigOptions
.key("source.avro-schema")
.stringType()
.noDefaultValue()
.withDescription("Avro schema string, the parsed schema is used for deserialization");
.withDescription("Source avro schema string, the parsed schema is used for deserialization");
public static final String QUERY_TYPE_SNAPSHOT = "snapshot";
public static final String QUERY_TYPE_READ_OPTIMIZED = "read_optimized";

View File

@@ -64,10 +64,10 @@ public class FilebasedSchemaProvider extends SchemaProvider {
}
public FilebasedSchemaProvider(Configuration conf) {
final String readSchemaPath = conf.getString(FlinkOptions.READ_AVRO_SCHEMA_PATH);
final FileSystem fs = FSUtils.getFs(readSchemaPath, StreamerUtil.getHadoopConf());
final String sourceSchemaPath = conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH);
final FileSystem fs = FSUtils.getFs(sourceSchemaPath, StreamerUtil.getHadoopConf());
try {
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(readSchemaPath)));
this.sourceSchema = new Schema.Parser().parse(fs.open(new Path(sourceSchemaPath)));
} catch (IOException ioe) {
throw new HoodieIOException("Error reading schema", ioe);
}

View File

@@ -63,11 +63,6 @@ public class FlinkStreamerConfig extends Configuration {
required = true)
public String targetBasePath;
@Parameter(names = {"--read-schema-path"},
description = "Avro schema file path, the parsed schema is used for deserializing.",
required = true)
public String readSchemaFilePath;
@Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true)
public String targetTableName;
@@ -150,11 +145,11 @@ public class FlinkStreamerConfig extends Configuration {
description = "Whether to load partitions in state if partition path matching default *")
public String indexPartitionRegex = ".*";
@Parameter(names = {"--avro-schema-path"}, description = "Avro schema file path, the parsed schema is used for deserialization")
public String avroSchemaPath = "";
@Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization")
public String sourceAvroSchemaPath = "";
@Parameter(names = {"--avro-schema"}, description = "Avro schema string, the parsed schema is used for deserialization")
public String avroSchema = "";
@Parameter(names = {"--source-avro-schema"}, description = "Source avro schema string, the parsed schema is used for deserialization")
public String sourceAvroSchema = "";
@Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch"
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
@@ -292,7 +287,6 @@ public class FlinkStreamerConfig extends Configuration {
org.apache.flink.configuration.Configuration conf = fromMap(propsMap);
conf.setString(FlinkOptions.PATH, config.targetBasePath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.readSchemaFilePath);
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
@@ -316,8 +310,8 @@ public class FlinkStreamerConfig extends Configuration {
conf.setDouble(FlinkOptions.INDEX_STATE_TTL, config.indexStateTtl);
conf.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, config.indexGlobalEnabled);
conf.setString(FlinkOptions.INDEX_PARTITION_REGEX, config.indexPartitionRegex);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.avroSchemaPath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, config.avroSchema);
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, config.sourceAvroSchemaPath);
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, config.sourceAvroSchema);
conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone);
conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode);
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning);

View File

@@ -203,17 +203,17 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
/**
* Inferences the deserialization Avro schema from the table schema (e.g. the DDL)
* if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and
* {@link FlinkOptions#READ_AVRO_SCHEMA} are not specified.
* if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
* {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified.
*
* @param conf The configuration
* @param rowType The specified table row type
*/
private static void inferAvroSchema(Configuration conf, LogicalType rowType) {
if (!conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent()
&& !conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) {
if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()
&& !conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
String inferredSchema = AvroSchemaConverter.convertToSchema(rowType).toString();
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, inferredSchema);
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
}
}
}

View File

@@ -72,7 +72,7 @@ public class CompactionUtil {
public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA, tableAvroSchema.toString());
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
}
/**

View File

@@ -90,15 +90,15 @@ public class StreamerUtil {
}
public static Schema getSourceSchema(org.apache.flink.configuration.Configuration conf) {
if (conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA_PATH).isPresent()) {
if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()) {
return new FilebasedSchemaProvider(conf).getSourceSchema();
} else if (conf.getOptional(FlinkOptions.READ_AVRO_SCHEMA).isPresent()) {
final String schemaStr = conf.get(FlinkOptions.READ_AVRO_SCHEMA);
} else if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
final String schemaStr = conf.get(FlinkOptions.SOURCE_AVRO_SCHEMA);
return new Schema.Parser().parse(schemaStr);
} else {
final String errorMsg = String.format("Either option '%s' or '%s' "
+ "should be specified for avro schema deserialization",
FlinkOptions.READ_AVRO_SCHEMA_PATH.key(), FlinkOptions.READ_AVRO_SCHEMA.key());
FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), FlinkOptions.SOURCE_AVRO_SCHEMA.key());
throw new HoodieException(errorMsg);
}
}

View File

@@ -120,14 +120,14 @@ public class TestHoodieTableFactory {
final HoodieTableSource tableSource1 =
(HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.get(FlinkOptions.READ_AVRO_SCHEMA), is(INFERRED_SCHEMA));
assertThat(conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(INFERRED_SCHEMA));
// set up the explicit schema using the file path
this.conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
HoodieTableSource tableSource2 =
(HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(MockContext.getInstance(this.conf));
Configuration conf2 = tableSource2.getConf();
assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null");
assertNull(conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null");
}
@Test
@@ -207,14 +207,14 @@ public class TestHoodieTableFactory {
final HoodieTableSink tableSink1 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.get(FlinkOptions.READ_AVRO_SCHEMA), is(INFERRED_SCHEMA));
assertThat(conf1.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(INFERRED_SCHEMA));
// set up the explicit schema using the file path
this.conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
this.conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH, AVRO_SCHEMA_FILE_PATH);
HoodieTableSink tableSink2 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
Configuration conf2 = tableSink2.getConf();
assertNull(conf2.get(FlinkOptions.READ_AVRO_SCHEMA), "expect schema string as null");
assertNull(conf2.get(FlinkOptions.SOURCE_AVRO_SCHEMA), "expect schema string as null");
}
@Test

View File

@@ -144,7 +144,7 @@ public class TestConfigurations {
public static Configuration getDefaultConf(String tablePath) {
Configuration conf = new Configuration();
conf.setString(FlinkOptions.PATH, tablePath);
conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH,
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH,
Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_read_schema.avsc")).toString());
conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable");
@@ -155,7 +155,7 @@ public class TestConfigurations {
public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) {
FlinkStreamerConfig streamerConf = new FlinkStreamerConfig();
streamerConf.targetBasePath = tablePath;
streamerConf.readSchemaFilePath = Objects.requireNonNull(Thread.currentThread()
streamerConf.sourceAvroSchemaPath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_read_schema.avsc")).toString();
streamerConf.targetTableName = "TestHoodieTable";
streamerConf.partitionPathField = "partition";