1
0

[HUDI-4416] Default database path for hoodie hive catalog (#6136)

This commit is contained in:
Danny Chan
2022-07-19 15:38:47 +08:00
committed by GitHub
parent 382d19e85b
commit 6c3578069e
9 changed files with 123 additions and 132 deletions

View File

@@ -269,18 +269,6 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>${hive.groupid}</groupId> <groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId> <artifactId>hive-metastore</artifactId>
@@ -421,5 +409,18 @@
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- Hive dependencies -->
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@@ -19,11 +19,13 @@
package org.apache.hudi.table.catalog; package org.apache.hudi.table.catalog;
import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
@@ -37,7 +39,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -60,9 +61,8 @@ public class HiveSchemaUtils {
allCols.addAll(hiveTable.getPartitionKeys()); allCols.addAll(hiveTable.getPartitionKeys());
String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME); String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
List<String> primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName) String pkColumnStr = hiveTable.getParameters().getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
? Collections.EMPTY_LIST List<String> pkColumns = StringUtils.split(pkColumnStr,",");
: StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),",");
String[] colNames = new String[allCols.size()]; String[] colNames = new String[allCols.size()];
DataType[] colTypes = new DataType[allCols.size()]; DataType[] colTypes = new DataType[allCols.size()];
@@ -73,14 +73,16 @@ public class HiveSchemaUtils {
colNames[i] = fs.getName(); colNames[i] = fs.getName();
colTypes[i] = colTypes[i] =
toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType())); toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
if (primaryColNames.contains(colNames[i])) { if (pkColumns.contains(colNames[i])) {
colTypes[i] = colTypes[i].notNull(); colTypes[i] = colTypes[i].notNull();
} }
} }
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes); org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
if (!StringUtils.isNullOrEmpty(pkConstraintName)) { if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
builder.primaryKeyNamed(pkConstraintName, primaryColNames); builder.primaryKeyNamed(pkConstraintName, pkColumns);
} else {
builder.primaryKey(pkColumns);
} }
return builder.build(); return builder.build();
@@ -152,7 +154,8 @@ public class HiveSchemaUtils {
case DATE: case DATE:
return DataTypes.DATE(); return DataTypes.DATE();
case TIMESTAMP: case TIMESTAMP:
return DataTypes.TIMESTAMP(9); // see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details.
return DataTypes.TIMESTAMP(6);
case BINARY: case BINARY:
return DataTypes.BYTES(); return DataTypes.BYTES();
case DECIMAL: case DECIMAL:
@@ -168,8 +171,10 @@ public class HiveSchemaUtils {
/** Create Hive columns from Flink TableSchema. */ /** Create Hive columns from Flink TableSchema. */
public static List<FieldSchema> createHiveColumns(TableSchema schema) { public static List<FieldSchema> createHiveColumns(TableSchema schema) {
String[] fieldNames = schema.getFieldNames(); final DataType dataType = schema.toPersistedRowDataType();
DataType[] fieldTypes = schema.getFieldDataTypes(); final RowType rowType = (RowType) dataType.getLogicalType();
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
final DataType[] fieldTypes = dataType.getChildren().toArray(new DataType[0]);
List<FieldSchema> columns = new ArrayList<>(fieldNames.length); List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
@@ -177,7 +182,7 @@ public class HiveSchemaUtils {
columns.add( columns.add(
new FieldSchema( new FieldSchema(
fieldNames[i], fieldNames[i],
toHiveTypeInfo(fieldTypes[i], true).getTypeName(), toHiveTypeInfo(fieldTypes[i]).getTypeName(),
null)); null));
} }
@@ -191,13 +196,12 @@ public class HiveSchemaUtils {
* checkPrecision is true. * checkPrecision is true.
* *
* @param dataType a Flink DataType * @param dataType a Flink DataType
* @param checkPrecision whether to fail the conversion if the precision of the DataType is not *
* supported by Hive
* @return the corresponding Hive data type * @return the corresponding Hive data type
*/ */
public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) { public static TypeInfo toHiveTypeInfo(DataType dataType) {
checkNotNull(dataType, "type cannot be null"); checkNotNull(dataType, "type cannot be null");
LogicalType logicalType = dataType.getLogicalType(); LogicalType logicalType = dataType.getLogicalType();
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision)); return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
} }
} }

View File

@@ -34,7 +34,6 @@ import java.util.Locale;
import java.util.Set; import java.util.Set;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
/** /**
* A catalog factory impl that creates {@link HoodieCatalog}. * A catalog factory impl that creates {@link HoodieCatalog}.
@@ -59,6 +58,7 @@ public class HoodieCatalogFactory implements CatalogFactory {
case "hms": case "hms":
return new HoodieHiveCatalog( return new HoodieHiveCatalog(
context.getName(), context.getName(),
helper.getOptions().get(CatalogOptions.CATALOG_PATH),
helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE), helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR)); helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
case "dfs": case "dfs":
@@ -82,7 +82,7 @@ public class HoodieCatalogFactory implements CatalogFactory {
options.add(PROPERTY_VERSION); options.add(PROPERTY_VERSION);
options.add(CatalogOptions.HIVE_CONF_DIR); options.add(CatalogOptions.HIVE_CONF_DIR);
options.add(CatalogOptions.MODE); options.add(CatalogOptions.MODE);
options.add(CATALOG_PATH); options.add(CatalogOptions.CATALOG_PATH);
return options; return options;
} }
} }

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.table.catalog; package org.apache.hudi.table.catalog;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -69,6 +68,7 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -92,7 +92,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP; import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME; import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
@@ -104,7 +103,6 @@ import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.hudi.configuration.FlinkOptions.PATH; import static org.apache.hudi.configuration.FlinkOptions.PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB; import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT; import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_COLUMNS;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME; import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER; import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
@@ -117,12 +115,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
private final HiveConf hiveConf; private final HiveConf hiveConf;
private IMetaStoreClient client; private IMetaStoreClient client;
public HoodieHiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) { // optional catalog base path: used for db/table path inference.
this(catalogName, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false); private final String catalogPath;
public HoodieHiveCatalog(String catalogName, String catalogPath, String defaultDatabase, String hiveConfDir) {
this(catalogName, catalogPath, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
} }
public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, boolean allowEmbedded) { public HoodieHiveCatalog(
String catalogName,
String catalogPath,
String defaultDatabase,
HiveConf hiveConf,
boolean allowEmbedded) {
super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase); super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
// fallback to hive.metastore.warehouse.dir if catalog path is not specified
this.catalogPath = catalogPath == null ? hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) : catalogPath;
this.hiveConf = hiveConf; this.hiveConf = hiveConf;
if (!allowEmbedded) { if (!allowEmbedded) {
checkArgument( checkArgument(
@@ -145,7 +153,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
} }
if (!databaseExists(getDefaultDatabase())) { if (!databaseExists(getDefaultDatabase())) {
LOG.info("{} does not exist, will be created.", getDefaultDatabase()); LOG.info("{} does not exist, will be created.", getDefaultDatabase());
CatalogDatabase database = new CatalogDatabaseImpl(Collections.EMPTY_MAP, "default database"); CatalogDatabase database = new CatalogDatabaseImpl(Collections.emptyMap(), "default database");
try { try {
createDatabase(getDefaultDatabase(), database, true); createDatabase(getDefaultDatabase(), database, true);
} catch (DatabaseAlreadyExistException e) { } catch (DatabaseAlreadyExistException e) {
@@ -227,6 +235,10 @@ public class HoodieHiveCatalog extends AbstractCatalog {
Map<String, String> properties = database.getProperties(); Map<String, String> properties = database.getProperties();
String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI); String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
if (dbLocationUri == null && this.catalogPath != null) {
// infer default location uri
dbLocationUri = new Path(this.catalogPath, databaseName).toString();
}
Database hiveDatabase = Database hiveDatabase =
new Database(databaseName, database.getComment(), dbLocationUri, properties); new Database(databaseName, database.getComment(), dbLocationUri, properties);
@@ -381,8 +393,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
@Override @Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null"); checkNotNull(tablePath, "Table path cannot be null");
Table hiveTable = getHiveTable(tablePath); Table hiveTable = translateSparkTable2Flink(tablePath, getHiveTable(tablePath));
hiveTable = translateSparkTable2Flink(tablePath, hiveTable);
String path = hiveTable.getSd().getLocation(); String path = hiveTable.getSd().getLocation();
Map<String, String> parameters = hiveTable.getParameters(); Map<String, String> parameters = hiveTable.getParameters();
Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf); Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
@@ -391,16 +402,21 @@ public class HoodieHiveCatalog extends AbstractCatalog {
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder() org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema)); .fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME); String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
if (!StringUtils.isNullOrEmpty(pkConstraintName)) { if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
builder.primaryKeyNamed(pkConstraintName, StringUtils.split(parameters.get(PK_COLUMNS), ",")); // pkColumns expect not to be null
builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, ","));
} else if (pkColumns != null) {
builder.primaryKey(StringUtils.split(pkColumns, ","));
} }
schema = builder.build(); schema = builder.build();
} else { } else {
LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath); LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath);
schema = HiveSchemaUtils.convertTableSchema(hiveTable); schema = HiveSchemaUtils.convertTableSchema(hiveTable);
} }
Map<String, String> options = supplementOptions(tablePath, parameters);
return CatalogTable.of(schema, parameters.get(COMMENT), return CatalogTable.of(schema, parameters.get(COMMENT),
HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), parameters); HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), options);
} }
@Override @Override
@@ -439,8 +455,8 @@ public class HoodieHiveCatalog extends AbstractCatalog {
} }
private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) { private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions())); Configuration flinkConf = Configuration.fromMap(catalogTable.getOptions());
final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString(); final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPersistedRowDataType().getLogicalType()).toString();
flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema); flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
// stores two copies of options: // stores two copies of options:
@@ -449,15 +465,13 @@ public class HoodieHiveCatalog extends AbstractCatalog {
// because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it // because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
// when calling #getTable. // when calling #getTable.
if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) { if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()
&& !flinkConf.contains(FlinkOptions.RECORD_KEY_FIELD)) {
final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD); flinkConf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
if (!Objects.equals(pkColumns, recordKey)) {
throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
}
} }
if (catalogTable.isPartitioned()) { if (catalogTable.isPartitioned() && !flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
final String partitions = String.join(",", catalogTable.getPartitionKeys()); final String partitions = String.join(",", catalogTable.getPartitionKeys());
flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions); flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
} }
@@ -468,7 +482,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName()); flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
try { try {
StreamerUtil.initTableIfNotExists(flinkConf); StreamerUtil.initTableIfNotExists(flinkConf, hiveConf);
} catch (IOException e) { } catch (IOException e) {
throw new HoodieCatalogException("Initialize table exception.", e); throw new HoodieCatalogException("Initialize table exception.", e);
} }
@@ -487,20 +501,6 @@ public class HoodieHiveCatalog extends AbstractCatalog {
return location; return location;
} }
private Map<String, String> applyOptionsHook(Map<String, String> options) {
Map<String, String> properties = new HashMap<>(options);
if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
}
if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) {
properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
}
if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) {
properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
}
return properties;
}
private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException { private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
// let Hive set default parameters for us, e.g. serialization.format // let Hive set default parameters for us, e.g. serialization.format
Table hiveTable = Table hiveTable =
@@ -510,7 +510,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName()); hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
Map<String, String> properties = applyOptionsHook(table.getOptions()); Map<String, String> properties = new HashMap<>(table.getOptions());
if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) { if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString()); hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
@@ -523,17 +523,11 @@ public class HoodieHiveCatalog extends AbstractCatalog {
} }
//set pk //set pk
if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) { if (table.getUnresolvedSchema().getPrimaryKey().isPresent()
&& !properties.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames()); String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
if (!Objects.equals(pkColumns, recordKey)) {
throw new HoodieCatalogException(
String.format("Primary key [%s] and record key [%s] should be the the same.",
pkColumns,
recordKey));
}
properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName()); properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName());
properties.put(PK_COLUMNS, pkColumns); properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), pkColumns);
} }
if (!properties.containsKey(FlinkOptions.PATH.key())) { if (!properties.containsKey(FlinkOptions.PATH.key())) {
@@ -896,4 +890,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
throws PartitionNotExistException, CatalogException { throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported."); throw new HoodieCatalogException("Not supported.");
} }
private Map<String, String> supplementOptions(
ObjectPath tablePath,
Map<String, String> options) {
if (HoodieCatalogUtil.isEmbeddedMetastore(hiveConf)) {
return options;
} else {
Map<String, String> newOptions = new HashMap<>(options);
// set up hive sync options
newOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
newOptions.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
newOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
newOptions.putIfAbsent(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());
return newOptions;
}
}
} }

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.table.catalog; package org.apache.hudi.table.catalog;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BigIntType;
@@ -40,8 +38,6 @@ import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor; import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -53,64 +49,25 @@ import java.util.List;
*/ */
public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeInfo> { public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeInfo> {
private final LogicalType type; private final LogicalType type;
// whether to check type precision
private final boolean checkPrecision;
TypeInfoLogicalTypeVisitor(DataType dataType, boolean checkPrecision) { TypeInfoLogicalTypeVisitor(DataType dataType) {
this(dataType.getLogicalType(), checkPrecision); this(dataType.getLogicalType());
} }
TypeInfoLogicalTypeVisitor(LogicalType type, boolean checkPrecision) { TypeInfoLogicalTypeVisitor(LogicalType type) {
this.type = type; this.type = type;
this.checkPrecision = checkPrecision;
} }
@Override @Override
public TypeInfo visit(CharType charType) { public TypeInfo visit(CharType charType) {
// Flink and Hive have different length limit for CHAR. Promote it to STRING if it // hoodie only supports avro compatible data type
// exceeds the limits of return TypeInfoFactory.stringTypeInfo;
// Hive and we're told not to check precision. This can be useful when calling Hive UDF
// to process data.
if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH || charType.getLength() < 1) {
if (checkPrecision) {
throw new HoodieCatalogException(
String.format(
"HiveCatalog doesn't support char type with length of '%d'. "
+ "The supported length is [%d, %d]",
charType.getLength(), 1, HiveChar.MAX_CHAR_LENGTH));
} else {
return TypeInfoFactory.stringTypeInfo;
}
}
return TypeInfoFactory.getCharTypeInfo(charType.getLength());
} }
@Override @Override
public TypeInfo visit(VarCharType varCharType) { public TypeInfo visit(VarCharType varCharType) {
// Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE) // hoodie only supports avro compatible data type
// We don't have more information in LogicalTypeRoot to distinguish StringType and a return TypeInfoFactory.stringTypeInfo;
// VARCHAR(Integer.MAX_VALUE) instance
// Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
if (varCharType.getLength() == Integer.MAX_VALUE) {
return TypeInfoFactory.stringTypeInfo;
}
// Flink and Hive have different length limit for VARCHAR. Promote it to STRING if it
// exceeds the limits of
// Hive and we're told not to check precision. This can be useful when calling Hive UDF
// to process data.
if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH
|| varCharType.getLength() < 1) {
if (checkPrecision) {
throw new HoodieCatalogException(
String.format(
"HiveCatalog doesn't support varchar type with length of '%d'. "
+ "The supported length is [%d, %d]",
varCharType.getLength(), 1, HiveVarchar.MAX_VARCHAR_LENGTH));
} else {
return TypeInfoFactory.stringTypeInfo;
}
}
return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
} }
@Override @Override
@@ -140,12 +97,14 @@ public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeIn
@Override @Override
public TypeInfo visit(TinyIntType tinyIntType) { public TypeInfo visit(TinyIntType tinyIntType) {
return TypeInfoFactory.byteTypeInfo; // hoodie only supports avro compatible data type
return TypeInfoFactory.intTypeInfo;
} }
@Override @Override
public TypeInfo visit(SmallIntType smallIntType) { public TypeInfo visit(SmallIntType smallIntType) {
return TypeInfoFactory.shortTypeInfo; // hoodie only supports avro compatible data type
return TypeInfoFactory.intTypeInfo;
} }
@Override @Override
@@ -175,11 +134,14 @@ public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeIn
@Override @Override
public TypeInfo visit(TimestampType timestampType) { public TypeInfo visit(TimestampType timestampType) {
if (checkPrecision && timestampType.getPrecision() == 9) { int precision = timestampType.getPrecision();
throw new HoodieCatalogException( // see org.apache.hudi.hive.util.HiveSchemaUtil#convertField for details.
"HoodieCatalog currently does not support timestamp of precision 9"); // default supports timestamp
if (precision == 6) {
return TypeInfoFactory.timestampTypeInfo;
} else {
return TypeInfoFactory.longTypeInfo;
} }
return TypeInfoFactory.timestampTypeInfo;
} }
@Override @Override

View File

@@ -273,8 +273,19 @@ public class StreamerUtil {
* @throws IOException if errors happens when writing metadata * @throws IOException if errors happens when writing metadata
*/ */
public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException { public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) throws IOException {
return initTableIfNotExists(conf, HadoopConfigurations.getHadoopConf(conf));
}
/**
* Initialize the table if it does not exist.
*
* @param conf the configuration
* @throws IOException if errors happens when writing metadata
*/
public static HoodieTableMetaClient initTableIfNotExists(
Configuration conf,
org.apache.hadoop.conf.Configuration hadoopConf) throws IOException {
final String basePath = conf.getString(FlinkOptions.PATH); final String basePath = conf.getString(FlinkOptions.PATH);
final org.apache.hadoop.conf.Configuration hadoopConf = HadoopConfigurations.getHadoopConf(conf);
if (!tableExists(basePath, hadoopConf)) { if (!tableExists(basePath, hadoopConf)) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA)) .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
@@ -529,7 +540,7 @@ public class StreamerUtil {
} }
return null; return null;
} }
public static boolean fileExists(FileSystem fs, Path path) { public static boolean fileExists(FileSystem fs, Path path) {
try { try {
return fs.exists(path); return fs.exists(path);

View File

@@ -1342,7 +1342,6 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
.field("f_par string") .field("f_par string")
.pkField("f_int") .pkField("f_int")
.partitionField("f_par") .partitionField("f_par")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/" + dbName + "/" + "t1")
.option(FlinkOptions.RECORD_KEY_FIELD, "f_int") .option(FlinkOptions.RECORD_KEY_FIELD, "f_int")
.option(FlinkOptions.PRECOMBINE_FIELD, "f_date") .option(FlinkOptions.PRECOMBINE_FIELD, "f_date")
.end(); .end();

View File

@@ -41,6 +41,7 @@ public class HoodieCatalogTestUtils {
return new HoodieHiveCatalog( return new HoodieHiveCatalog(
name, name,
null, null,
null,
createHiveConf(), createHiveConf(),
true); true);
} }

View File

@@ -54,6 +54,7 @@ import java.util.Map;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
@@ -106,7 +107,7 @@ public class TestHoodieHiveCatalog {
assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi"); assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString()); assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString());
assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid"); assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid");
assertEquals(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "ts"); assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "preCombine key is not declared");
assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid")); assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid"));
assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1")); assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1"));
} }