diff --git a/hudi-flink-datasource/hudi-flink/pom.xml b/hudi-flink-datasource/hudi-flink/pom.xml index 77e8a77cf..cd584304f 100644 --- a/hudi-flink-datasource/hudi-flink/pom.xml +++ b/hudi-flink-datasource/hudi-flink/pom.xml @@ -269,18 +269,6 @@ - - javax.transaction - jta - 1.1 - test - - - javax.transaction - javax.transaction-api - 1.3 - test - ${hive.groupid} hive-metastore @@ -421,5 +409,18 @@ ${flink.version} test + + + javax.transaction + jta + 1.1 + test + + + javax.transaction + javax.transaction-api + 1.3 + test + diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index 36503c152..c9590ff4a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -19,11 +19,13 @@ package org.apache.hudi.table.catalog; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.configuration.FlinkOptions; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; @@ -37,7 +39,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -60,9 +61,8 @@ public class HiveSchemaUtils { allCols.addAll(hiveTable.getPartitionKeys()); String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME); - List primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName) - ? Collections.EMPTY_LIST - : StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),","); + String pkColumnStr = hiveTable.getParameters().getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue()); + List pkColumns = StringUtils.split(pkColumnStr,","); String[] colNames = new String[allCols.size()]; DataType[] colTypes = new DataType[allCols.size()]; @@ -73,14 +73,16 @@ public class HiveSchemaUtils { colNames[i] = fs.getName(); colTypes[i] = toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType())); - if (primaryColNames.contains(colNames[i])) { + if (pkColumns.contains(colNames[i])) { colTypes[i] = colTypes[i].notNull(); } } org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes); if (!StringUtils.isNullOrEmpty(pkConstraintName)) { - builder.primaryKeyNamed(pkConstraintName, primaryColNames); + builder.primaryKeyNamed(pkConstraintName, pkColumns); + } else { + builder.primaryKey(pkColumns); } return builder.build(); @@ -152,7 +154,8 @@ public class HiveSchemaUtils { case DATE: return DataTypes.DATE(); case TIMESTAMP: - return DataTypes.TIMESTAMP(9); + // see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details. + return DataTypes.TIMESTAMP(6); case BINARY: return DataTypes.BYTES(); case DECIMAL: @@ -168,8 +171,10 @@ public class HiveSchemaUtils { /** Create Hive columns from Flink TableSchema. */ public static List createHiveColumns(TableSchema schema) { - String[] fieldNames = schema.getFieldNames(); - DataType[] fieldTypes = schema.getFieldDataTypes(); + final DataType dataType = schema.toPersistedRowDataType(); + final RowType rowType = (RowType) dataType.getLogicalType(); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + final DataType[] fieldTypes = dataType.getChildren().toArray(new DataType[0]); List columns = new ArrayList<>(fieldNames.length); @@ -177,7 +182,7 @@ public class HiveSchemaUtils { columns.add( new FieldSchema( fieldNames[i], - toHiveTypeInfo(fieldTypes[i], true).getTypeName(), + toHiveTypeInfo(fieldTypes[i]).getTypeName(), null)); } @@ -191,13 +196,12 @@ public class HiveSchemaUtils { * checkPrecision is true. * * @param dataType a Flink DataType - * @param checkPrecision whether to fail the conversion if the precision of the DataType is not - * supported by Hive + * * @return the corresponding Hive data type */ - public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) { + public static TypeInfo toHiveTypeInfo(DataType dataType) { checkNotNull(dataType, "type cannot be null"); LogicalType logicalType = dataType.getLogicalType(); - return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision)); + return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType)); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java index 4a63b7a26..e9fbd95e8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java @@ -34,7 +34,6 @@ import java.util.Locale; import java.util.Set; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; -import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH; /** * A catalog factory impl that creates {@link HoodieCatalog}. @@ -59,6 +58,7 @@ public class HoodieCatalogFactory implements CatalogFactory { case "hms": return new HoodieHiveCatalog( context.getName(), + helper.getOptions().get(CatalogOptions.CATALOG_PATH), helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE), helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR)); case "dfs": @@ -82,7 +82,7 @@ public class HoodieCatalogFactory implements CatalogFactory { options.add(PROPERTY_VERSION); options.add(CatalogOptions.HIVE_CONF_DIR); options.add(CatalogOptions.MODE); - options.add(CATALOG_PATH); + options.add(CatalogOptions.CATALOG_PATH); return options; } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index ff80a7004..1e877b133 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -18,7 +18,6 @@ package org.apache.hudi.table.catalog; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -69,6 +68,7 @@ import org.apache.flink.table.expressions.Expression; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -92,7 +92,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP; import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME; @@ -104,7 +103,6 @@ import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; import static org.apache.hudi.configuration.FlinkOptions.PATH; import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB; import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT; -import static org.apache.hudi.table.catalog.TableOptionProperties.PK_COLUMNS; import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME; import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER; @@ -117,12 +115,22 @@ public class HoodieHiveCatalog extends AbstractCatalog { private final HiveConf hiveConf; private IMetaStoreClient client; - public HoodieHiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) { - this(catalogName, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false); + // optional catalog base path: used for db/table path inference. + private final String catalogPath; + + public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) { + this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false); } - public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, boolean allowEmbedded) { + public HoodieHiveCatalog( + String catalogName, + String catalogPath, + String defaultDatabase, + HiveConf hiveConf, + boolean allowEmbedded) { super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase); + // fallback to hive.metastore.warehouse.dir if catalog path is not specified + this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath; this.hiveConf = hiveConf; if (!allowEmbedded) { checkArgument( @@ -145,7 +153,7 @@ public class HoodieHiveCatalog extends AbstractCatalog { } if (!databaseExists(getDefaultDatabase())) { LOG.info("{} does not exist, will be created.", getDefaultDatabase()); - CatalogDatabase database = new CatalogDatabaseImpl(Collections.EMPTY_MAP, "default database"); + CatalogDatabase database = new CatalogDatabaseImpl(Collections.emptyMap(), "default database"); try { createDatabase(getDefaultDatabase(), database, true); } catch (DatabaseAlreadyExistException e) { @@ -227,6 +235,10 @@ public class HoodieHiveCatalog extends AbstractCatalog { Map properties = database.getProperties(); String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI); + if (dbLocationUri == null && this.catalogPath != null) { + // infer default location uri + dbLocationUri = new Path(this.catalogPath, databaseName).toString(); + } Database hiveDatabase = new Database(databaseName, database.getComment(), dbLocationUri, properties); @@ -381,8 +393,7 @@ public class HoodieHiveCatalog extends AbstractCatalog { @Override public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { checkNotNull(tablePath, "Table path cannot be null"); - Table hiveTable = getHiveTable(tablePath); - hiveTable = translateSparkTable2Flink(tablePath, hiveTable); + Table hiveTable = translateSparkTable2Flink(tablePath, getHiveTable(tablePath)); String path = hiveTable.getSd().getLocation(); Map parameters = hiveTable.getParameters(); Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf); @@ -391,16 +402,21 @@ public class HoodieHiveCatalog extends AbstractCatalog { org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder() .fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema)); String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME); + String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key()); if (!StringUtils.isNullOrEmpty(pkConstraintName)) { - builder.primaryKeyNamed(pkConstraintName, StringUtils.split(parameters.get(PK_COLUMNS), ",")); + // pkColumns expect not to be null + builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, ",")); + } else if (pkColumns != null) { + builder.primaryKey(StringUtils.split(pkColumns, ",")); } schema = builder.build(); } else { LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath); schema = HiveSchemaUtils.convertTableSchema(hiveTable); } + Map options = supplementOptions(tablePath, parameters); return CatalogTable.of(schema, parameters.get(COMMENT), - HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), parameters); + HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), options); } @Override @@ -439,8 +455,8 @@ public class HoodieHiveCatalog extends AbstractCatalog { } private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) { - Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions())); - final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString(); + Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions()); + final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPersistedRowDataType().getLogicalType()).toString(); flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema); // stores two copies of options: @@ -449,15 +465,13 @@ public class HoodieHiveCatalog extends AbstractCatalog { // because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it // when calling #getTable. - if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) { + if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent() + && !flinkConf.contains(FlinkOptions.RECORD_KEY_FIELD)) { final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); - String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD); - if (!Objects.equals(pkColumns, recordKey)) { - throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey)); - } + flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns); } - if (catalogTable.isPartitioned()) { + if (catalogTable.isPartitioned() && !flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) { final String partitions = String.join(",", catalogTable.getPartitionKeys()); flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions); } @@ -468,7 +482,7 @@ public class HoodieHiveCatalog extends AbstractCatalog { flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName()); try { - StreamerUtil.initTableIfNotExists(flinkConf); + StreamerUtil.initTableIfNotExists(flinkConf, hiveConf); } catch (IOException e) { throw new HoodieCatalogException("Initialize table exception.", e); } @@ -487,20 +501,6 @@ public class HoodieHiveCatalog extends AbstractCatalog { return location; } - private Map applyOptionsHook(Map options) { - Map properties = new HashMap<>(options); - if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) { - properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue()); - } - if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) { - properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue()); - } - if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) { - properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue()); - } - return properties; - } - private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException { // let Hive set default parameters for us, e.g. serialization.format Table hiveTable = @@ -510,7 +510,7 @@ public class HoodieHiveCatalog extends AbstractCatalog { hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName()); hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); - Map properties = applyOptionsHook(table.getOptions()); + Map properties = new HashMap<>(table.getOptions()); if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) { hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString()); @@ -523,17 +523,11 @@ public class HoodieHiveCatalog extends AbstractCatalog { } //set pk - if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) { + if (table.getUnresolvedSchema().getPrimaryKey().isPresent() + && !properties.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) { String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); - String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue()); - if (!Objects.equals(pkColumns, recordKey)) { - throw new HoodieCatalogException( - String.format("Primary key [%s] and record key [%s] should be the the same.", - pkColumns, - recordKey)); - } properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName()); - properties.put(PK_COLUMNS, pkColumns); + properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), pkColumns); } if (!properties.containsKey(FlinkOptions.PATH.key())) { @@ -896,4 +890,22 @@ public class HoodieHiveCatalog extends AbstractCatalog { throws PartitionNotExistException, CatalogException { throw new HoodieCatalogException("Not supported."); } + + private Map supplementOptions( + ObjectPath tablePath, + Map options) { + if (HoodieCatalogUtil.isEmbeddedMetastore(hiveConf)) { + return options; + } else { + Map newOptions = new HashMap<>(options); + // set up hive sync options + newOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); + newOptions.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS)); + newOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); + newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true"); + newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName()); + newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName()); + return newOptions; + } + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java index d6cfe3ed7..e6b15788f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TypeInfoLogicalTypeVisitor.java @@ -18,8 +18,6 @@ package org.apache.hudi.table.catalog; -import org.apache.hudi.exception.HoodieCatalogException; - import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.BigIntType; @@ -40,8 +38,6 @@ import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; -import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -53,64 +49,25 @@ import java.util.List; */ public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor { private final LogicalType type; - // whether to check type precision - private final boolean checkPrecision; - TypeInfoLogicalTypeVisitor(DataType dataType, boolean checkPrecision) { - this(dataType.getLogicalType(), checkPrecision); + TypeInfoLogicalTypeVisitor(DataType dataType) { + this(dataType.getLogicalType()); } - TypeInfoLogicalTypeVisitor(LogicalType type, boolean checkPrecision) { + TypeInfoLogicalTypeVisitor(LogicalType type) { this.type = type; - this.checkPrecision = checkPrecision; } @Override public TypeInfo visit(CharType charType) { - // Flink and Hive have different length limit for CHAR. Promote it to STRING if it - // exceeds the limits of - // Hive and we're told not to check precision. This can be useful when calling Hive UDF - // to process data. - if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH || charType.getLength() < 1) { - if (checkPrecision) { - throw new HoodieCatalogException( - String.format( - "HiveCatalog doesn't support char type with length of '%d'. " - + "The supported length is [%d, %d]", - charType.getLength(), 1, HiveChar.MAX_CHAR_LENGTH)); - } else { - return TypeInfoFactory.stringTypeInfo; - } - } - return TypeInfoFactory.getCharTypeInfo(charType.getLength()); + // hoodie only supports avro compatible data type + return TypeInfoFactory.stringTypeInfo; } @Override public TypeInfo visit(VarCharType varCharType) { - // Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE) - // We don't have more information in LogicalTypeRoot to distinguish StringType and a - // VARCHAR(Integer.MAX_VALUE) instance - // Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType - if (varCharType.getLength() == Integer.MAX_VALUE) { - return TypeInfoFactory.stringTypeInfo; - } - // Flink and Hive have different length limit for VARCHAR. Promote it to STRING if it - // exceeds the limits of - // Hive and we're told not to check precision. This can be useful when calling Hive UDF - // to process data. - if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH - || varCharType.getLength() < 1) { - if (checkPrecision) { - throw new HoodieCatalogException( - String.format( - "HiveCatalog doesn't support varchar type with length of '%d'. " - + "The supported length is [%d, %d]", - varCharType.getLength(), 1, HiveVarchar.MAX_VARCHAR_LENGTH)); - } else { - return TypeInfoFactory.stringTypeInfo; - } - } - return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength()); + // hoodie only supports avro compatible data type + return TypeInfoFactory.stringTypeInfo; } @Override @@ -140,12 +97,14 @@ public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor