1
0

[HUDI-4098] Support HMS for flink HudiCatalog (#6082)

* [HUDI-4098]Support HMS for flink HudiCatalog
This commit is contained in:
Bo Cui
2022-07-18 11:46:23 +08:00
committed by GitHub
parent 3964c476e0
commit 9282611bae
19 changed files with 2098 additions and 11 deletions

View File

@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -261,6 +262,11 @@ public class TableSchemaResolver {
}
}
public static MessageType convertAvroSchemaToParquet(Schema schema, Configuration hadoopConf) {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(hadoopConf);
return avroSchemaConverter.convert(schema);
}
private Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf());
return avroSchemaConverter.convert(parquetSchema);

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.exception;
/**
* Exception thrown for Hoodie Catalog errors.
*/
public class HoodieCatalogException extends RuntimeException {
public HoodieCatalogException() {
super();
}
public HoodieCatalogException(String message) {
super(message);
}
public HoodieCatalogException(String message, Throwable t) {
super(message, t);
}
public HoodieCatalogException(Throwable t) {
super(t);
}
}

View File

@@ -269,7 +269,42 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
</exclusion>
<exclusion>
<groupId>javax.transaction</groupId>
<artifactId>javax.transaction-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Test dependencies -->
<!-- Junit 5 dependencies -->

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.table.format.FilePathUtils;
import org.apache.flink.configuration.Configuration;
import java.util.Locale;
import java.util.Map;
/**
* Tool helping to resolve the flink options {@link FlinkOptions}.
@@ -66,6 +67,14 @@ public class OptionsResolver {
.equals(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
}
/**
* Returns whether it is a MERGE_ON_READ table.
*/
public static boolean isMorTable(Map<String, String> options) {
return options.getOrDefault(FlinkOptions.TABLE_TYPE.key(),
FlinkOptions.TABLE_TYPE.defaultValue()).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
}
/**
* Returns whether it is a COPY_ON_WRITE table.
*/

View File

@@ -29,6 +29,8 @@ import java.util.Map;
* Hoodie catalog options.
*/
public class CatalogOptions {
public static final String HIVE_SITE_FILE = "hive-site.xml";
public static final String DEFAULT_DB = "default";
public static final ConfigOption<String> CATALOG_PATH =
ConfigOptions.key("catalog.path")
@@ -42,6 +44,22 @@ public class CatalogOptions {
.stringType()
.defaultValue("default");
public static final ConfigOption<String> HIVE_CONF_DIR = ConfigOptions
.key("hive.conf.dir")
.stringType()
.noDefaultValue();
public static final ConfigOption<String> MODE = ConfigOptions
.key("mode")
.stringType()
.defaultValue("dfs");
public static final ConfigOption<Boolean> TABLE_EXTERNAL = ConfigOptions
.key("table.external")
.booleanType()
.defaultValue(false)
.withDescription("Whether the table is external, default false");
/**
* Returns all the common table options that can be shared.
*

View File

@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.util.StringUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Utilities for Hive field schema.
*/
public class HiveSchemaUtils {
/** Get field names from field schemas. */
public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
List<String> names = new ArrayList<>(fieldSchemas.size());
for (FieldSchema fs : fieldSchemas) {
names.add(fs.getName());
}
return names;
}
public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
List<FieldSchema> allCols = new ArrayList<>(hiveTable.getSd().getCols());
allCols.addAll(hiveTable.getPartitionKeys());
String pkConstraintName = hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
List<String> primaryColNames = StringUtils.isNullOrEmpty(pkConstraintName)
? Collections.EMPTY_LIST
: StringUtils.split(hiveTable.getParameters().get(TableOptionProperties.PK_COLUMNS),",");
String[] colNames = new String[allCols.size()];
DataType[] colTypes = new DataType[allCols.size()];
for (int i = 0; i < allCols.size(); i++) {
FieldSchema fs = allCols.get(i);
colNames[i] = fs.getName();
colTypes[i] =
toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
if (primaryColNames.contains(colNames[i])) {
colTypes[i] = colTypes[i].notNull();
}
}
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
builder.primaryKeyNamed(pkConstraintName, primaryColNames);
}
return builder.build();
}
/**
* Convert Hive data type to a Flink data type.
*
* @param hiveType a Hive data type
* @return the corresponding Flink data type
*/
public static DataType toFlinkType(TypeInfo hiveType) {
checkNotNull(hiveType, "hiveType cannot be null");
switch (hiveType.getCategory()) {
case PRIMITIVE:
return toFlinkPrimitiveType((PrimitiveTypeInfo) hiveType);
case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo) hiveType;
return DataTypes.ARRAY(toFlinkType(listTypeInfo.getListElementTypeInfo()));
case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo) hiveType;
return DataTypes.MAP(
toFlinkType(mapTypeInfo.getMapKeyTypeInfo()),
toFlinkType(mapTypeInfo.getMapValueTypeInfo()));
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo) hiveType;
List<String> names = structTypeInfo.getAllStructFieldNames();
List<TypeInfo> typeInfos = structTypeInfo.getAllStructFieldTypeInfos();
DataTypes.Field[] fields = new DataTypes.Field[names.size()];
for (int i = 0; i < fields.length; i++) {
fields[i] = DataTypes.FIELD(names.get(i), toFlinkType(typeInfos.get(i)));
}
return DataTypes.ROW(fields);
default:
throw new UnsupportedOperationException(
String.format("Flink doesn't support Hive data type %s yet.", hiveType));
}
}
private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
checkNotNull(hiveType, "hiveType cannot be null");
switch (hiveType.getPrimitiveCategory()) {
case CHAR:
return DataTypes.CHAR(((CharTypeInfo) hiveType).getLength());
case VARCHAR:
return DataTypes.VARCHAR(((VarcharTypeInfo) hiveType).getLength());
case STRING:
return DataTypes.STRING();
case BOOLEAN:
return DataTypes.BOOLEAN();
case BYTE:
return DataTypes.TINYINT();
case SHORT:
return DataTypes.SMALLINT();
case INT:
return DataTypes.INT();
case LONG:
return DataTypes.BIGINT();
case FLOAT:
return DataTypes.FLOAT();
case DOUBLE:
return DataTypes.DOUBLE();
case DATE:
return DataTypes.DATE();
case TIMESTAMP:
return DataTypes.TIMESTAMP(9);
case BINARY:
return DataTypes.BYTES();
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) hiveType;
return DataTypes.DECIMAL(
decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
default:
throw new UnsupportedOperationException(
String.format(
"Flink doesn't support Hive primitive type %s yet", hiveType));
}
}
/** Create Hive columns from Flink TableSchema. */
public static List<FieldSchema> createHiveColumns(TableSchema schema) {
String[] fieldNames = schema.getFieldNames();
DataType[] fieldTypes = schema.getFieldDataTypes();
List<FieldSchema> columns = new ArrayList<>(fieldNames.length);
for (int i = 0; i < fieldNames.length; i++) {
columns.add(
new FieldSchema(
fieldNames[i],
toHiveTypeInfo(fieldTypes[i], true).getTypeName(),
null));
}
return columns;
}
/**
* Convert Flink DataType to Hive TypeInfo. For types with a precision parameter, e.g.
* timestamp, the supported precisions in Hive and Flink can be different. Therefore the
* conversion will fail for those types if the precision is not supported by Hive and
* checkPrecision is true.
*
* @param dataType a Flink DataType
* @param checkPrecision whether to fail the conversion if the precision of the DataType is not
* supported by Hive
* @return the corresponding Hive data type
*/
public static TypeInfo toHiveTypeInfo(DataType dataType, boolean checkPrecision) {
checkNotNull(dataType, "type cannot be null");
LogicalType logicalType = dataType.getLogicalType();
return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType, checkPrecision));
}
}

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.table.catalog;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.Catalog;
@@ -28,10 +30,11 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
/**
* A catalog factory impl that creates {@link HoodieCatalog}.
@@ -51,22 +54,35 @@ public class HoodieCatalogFactory implements CatalogFactory {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
return new HoodieCatalog(
context.getName(),
(Configuration) helper.getOptions());
String mode = helper.getOptions().get(CatalogOptions.MODE);
switch (mode.toLowerCase(Locale.ROOT)) {
case "hms":
return new HoodieHiveCatalog(
context.getName(),
helper.getOptions().get(CatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(CatalogOptions.HIVE_CONF_DIR));
case "dfs":
return new HoodieCatalog(
context.getName(),
(Configuration) helper.getOptions());
default:
throw new HoodieCatalogException(String.format("Invalid catalog mode: %s, supported modes: [hms, dfs].", mode));
}
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(CATALOG_PATH);
options.add(DEFAULT_DATABASE);
return options;
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(CatalogOptions.DEFAULT_DATABASE);
options.add(PROPERTY_VERSION);
options.add(CatalogOptions.HIVE_CONF_DIR);
options.add(CatalogOptions.MODE);
options.add(CATALOG_PATH);
return options;
}
}

View File

@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.hudi.table.catalog.CatalogOptions.HIVE_SITE_FILE;
/**
* Utilities for Hoodie Catalog.
*/
public class HoodieCatalogUtil {
private static final Logger LOG = LoggerFactory.getLogger(HoodieCatalogUtil.class);
/**
* Returns a new {@code HiveConf}.
*
* @param hiveConfDir Hive conf directory path.
* @return A HiveConf instance.
*/
public static HiveConf createHiveConf(@Nullable String hiveConfDir) {
// create HiveConf from hadoop configuration with hadoop conf directory configured.
Configuration hadoopConf = HadoopConfigurations.getHadoopConf(new org.apache.flink.configuration.Configuration());
// ignore all the static conf file URLs that HiveConf may have set
HiveConf.setHiveSiteLocation(null);
HiveConf.setLoadMetastoreConfig(false);
HiveConf.setLoadHiveServer2Config(false);
HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
LOG.info("Setting hive conf dir as {}", hiveConfDir);
if (hiveConfDir != null) {
Path hiveSite = new Path(hiveConfDir, HIVE_SITE_FILE);
if (!hiveSite.toUri().isAbsolute()) {
// treat relative URI as local file to be compatible with previous behavior
hiveSite = new Path(new File(hiveSite.toString()).toURI());
}
try (InputStream inputStream = hiveSite.getFileSystem(hadoopConf).open(hiveSite)) {
hiveConf.addResource(inputStream, hiveSite.toString());
// trigger a read from the conf so that the input stream is read
isEmbeddedMetastore(hiveConf);
} catch (IOException e) {
throw new CatalogException(
"Failed to load hive-site.xml from specified path:" + hiveSite, e);
}
} else {
// user doesn't provide hive conf dir, we try to find it in classpath
URL hiveSite =
Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE);
if (hiveSite != null) {
LOG.info("Found {} in classpath: {}", HIVE_SITE_FILE, hiveSite);
hiveConf.addResource(hiveSite);
}
}
return hiveConf;
}
/**
* Check whether the hive.metastore.uris is empty
*/
public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
return isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
}
}

View File

@@ -0,0 +1,899 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase;
import org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner;
import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveDatabase;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabase.ALTER_DATABASE_OP;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_NAME;
import static org.apache.flink.sql.parser.hive.ddl.SqlAlterHiveDatabaseOwner.DATABASE_OWNER_TYPE;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.hudi.configuration.FlinkOptions.PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DB;
import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_COLUMNS;
import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
/**
* A catalog implementation for Hoodie based on MetaStore.
*/
public class HoodieHiveCatalog extends AbstractCatalog {
private static final Logger LOG = LoggerFactory.getLogger(HoodieHiveCatalog.class);
private final HiveConf hiveConf;
private IMetaStoreClient client;
public HoodieHiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir) {
this(catalogName, defaultDatabase, HoodieCatalogUtil.createHiveConf(hiveConfDir), false);
}
public HoodieHiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, boolean allowEmbedded) {
super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
this.hiveConf = hiveConf;
if (!allowEmbedded) {
checkArgument(
!HoodieCatalogUtil.isEmbeddedMetastore(this.hiveConf),
"Embedded metastore is not allowed. Make sure you have set a valid value for "
+ HiveConf.ConfVars.METASTOREURIS);
}
LOG.info("Created Hoodie Catalog '{}' in hms mode", catalogName);
}
@Override
public void open() throws CatalogException {
if (this.client == null) {
try {
this.client = Hive.get(hiveConf).getMSC();
} catch (Exception e) {
throw new HoodieCatalogException("Failed to create hive metastore client", e);
}
LOG.info("Connected to Hive metastore");
}
if (!databaseExists(getDefaultDatabase())) {
LOG.info("{} does not exist, will be created.", getDefaultDatabase());
CatalogDatabase database = new CatalogDatabaseImpl(Collections.EMPTY_MAP, "default database");
try {
createDatabase(getDefaultDatabase(), database, true);
} catch (DatabaseAlreadyExistException e) {
throw new HoodieCatalogException(getName(), e);
}
}
}
@Override
public void close() throws CatalogException {
if (client != null) {
client.close();
client = null;
LOG.info("Disconnect to hive metastore");
}
}
public HiveConf getHiveConf() {
return hiveConf;
}
// ------ databases ------
@Override
public List<String> listDatabases() throws CatalogException {
try {
return client.getAllDatabases();
} catch (TException e) {
throw new HoodieCatalogException(
String.format("Failed to list all databases in %s", getName()), e);
}
}
private Database getHiveDatabase(String databaseName) throws DatabaseNotExistException {
try {
return client.getDatabase(databaseName);
} catch (NoSuchObjectException e) {
throw new DatabaseNotExistException(getName(), databaseName);
} catch (TException e) {
throw new HoodieCatalogException(
String.format("Failed to get database %s from %s", databaseName, getName()), e);
}
}
@Override
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
Database hiveDatabase = getHiveDatabase(databaseName);
Map<String, String> properties = new HashMap<>(hiveDatabase.getParameters());
properties.put(SqlCreateHiveDatabase.DATABASE_LOCATION_URI, hiveDatabase.getLocationUri());
return new CatalogDatabaseImpl(properties, hiveDatabase.getDescription());
}
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
try {
return client.getDatabase(databaseName) != null;
} catch (NoSuchObjectException e) {
return false;
} catch (TException e) {
throw new HoodieCatalogException(
String.format(
"Failed to determine whether database %s exists or not", databaseName),
e);
}
}
@Override
public void createDatabase(
String databaseName, CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
checkArgument(
!isNullOrWhitespaceOnly(databaseName), "Database name can not null or empty");
checkNotNull(database, "database cannot be null");
Map<String, String> properties = database.getProperties();
String dbLocationUri = properties.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
Database hiveDatabase =
new Database(databaseName, database.getComment(), dbLocationUri, properties);
try {
client.createDatabase(hiveDatabase);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(getName(), hiveDatabase.getName());
}
} catch (TException e) {
throw new HoodieCatalogException(
String.format("Failed to create database %s", hiveDatabase.getName()), e);
}
}
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
try {
client.dropDatabase(name, true, ignoreIfNotExists, cascade);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), name);
}
} catch (InvalidOperationException e) {
throw new DatabaseNotEmptyException(getName(), name);
} catch (TException e) {
throw new HoodieCatalogException(String.format("Failed to drop database %s", name), e);
}
}
@Override
public void alterDatabase(
String databaseName, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
checkArgument(
!isNullOrWhitespaceOnly(databaseName), "Database name cannot be null or empty");
checkNotNull(newDatabase, "New database cannot be null");
// client.alterDatabase doesn't throw any exception if there is no existing database
Database hiveDB;
try {
hiveDB = getHiveDatabase(databaseName);
} catch (DatabaseNotExistException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), databaseName);
}
return;
}
try {
client.alterDatabase(databaseName, alterDatabase(hiveDB, newDatabase));
} catch (TException e) {
throw new HoodieCatalogException(
String.format("Failed to alter database %s", databaseName), e);
}
}
private static Database alterDatabase(Database hiveDB, CatalogDatabase newDatabase) {
Map<String, String> newParams = newDatabase.getProperties();
String opStr = newParams.remove(ALTER_DATABASE_OP);
if (opStr == null) {
// by default is to alter db properties
opStr = SqlAlterHiveDatabase.AlterHiveDatabaseOp.CHANGE_PROPS.name();
}
String newLocation = newParams.remove(SqlCreateHiveDatabase.DATABASE_LOCATION_URI);
SqlAlterHiveDatabase.AlterHiveDatabaseOp op =
SqlAlterHiveDatabase.AlterHiveDatabaseOp.valueOf(opStr);
switch (op) {
case CHANGE_PROPS:
hiveDB.setParameters(newParams);
break;
case CHANGE_LOCATION:
hiveDB.setLocationUri(newLocation);
break;
case CHANGE_OWNER:
String ownerName = newParams.remove(DATABASE_OWNER_NAME);
String ownerType = newParams.remove(DATABASE_OWNER_TYPE);
hiveDB.setOwnerName(ownerName);
switch (ownerType) {
case SqlAlterHiveDatabaseOwner.ROLE_OWNER:
hiveDB.setOwnerType(PrincipalType.ROLE);
break;
case SqlAlterHiveDatabaseOwner.USER_OWNER:
hiveDB.setOwnerType(PrincipalType.USER);
break;
default:
throw new CatalogException("Unsupported database owner type: " + ownerType);
}
break;
default:
throw new CatalogException("Unsupported alter database op:" + opStr);
}
// is_generic is deprecated, remove it
if (hiveDB.getParameters() != null) {
hiveDB.getParameters().remove(CatalogPropertiesUtil.IS_GENERIC);
}
return hiveDB;
}
// ------ tables ------
private Table isHoodieTable(Table hiveTable) {
if (!hiveTable.getParameters().getOrDefault(SPARK_SOURCE_PROVIDER, "").equalsIgnoreCase("hudi")
&& !isFlinkHoodieTable(hiveTable)) {
throw new HoodieCatalogException(String.format("the %s is not hoodie table", hiveTable.getTableName()));
}
return hiveTable;
}
private boolean isFlinkHoodieTable(Table hiveTable) {
return hiveTable.getParameters().getOrDefault(CONNECTOR.key(), "").equalsIgnoreCase("hudi");
}
@VisibleForTesting
public Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
try {
Table hiveTable = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
return isHoodieTable(hiveTable);
} catch (NoSuchObjectException e) {
throw new TableNotExistException(getName(), tablePath);
} catch (TException e) {
throw new HoodieCatalogException(String.format("Failed to get table %s from Hive metastore", tablePath.getObjectName()));
}
}
private Table translateSparkTable2Flink(ObjectPath tablePath, Table hiveTable) {
if (!isFlinkHoodieTable(hiveTable)) {
try {
Map<String, String> parameters = hiveTable.getParameters();
parameters.putAll(TableOptionProperties.translateSparkTableProperties2Flink(hiveTable));
String path = hiveTable.getSd().getLocation();
parameters.put(PATH.key(), path);
if (!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
Path hoodieTablePath = new Path(path);
boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, hiveConf).listStatus(hoodieTablePath))
.map(fileStatus -> fileStatus.getPath().getName())
.filter(f -> !f.equals(".hoodie") && !f.equals("default"))
.anyMatch(FilePathUtils::isHiveStylePartitioning);
parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), String.valueOf(hiveStyle));
}
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
} catch (Exception e) {
throw new HoodieCatalogException("Failed to update table schema", e);
}
}
return hiveTable;
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
Table hiveTable = getHiveTable(tablePath);
hiveTable = translateSparkTable2Flink(tablePath, hiveTable);
String path = hiveTable.getSd().getLocation();
Map<String, String> parameters = hiveTable.getParameters();
Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, hiveConf);
org.apache.flink.table.api.Schema schema;
if (latestTableSchema != null) {
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
builder.primaryKeyNamed(pkConstraintName, StringUtils.split(parameters.get(PK_COLUMNS), ","));
}
schema = builder.build();
} else {
LOG.warn("{} does not have any hoodie schema, and use hive table schema to infer the table schema", tablePath);
schema = HiveSchemaUtils.convertTableSchema(hiveTable);
}
return CatalogTable.of(schema, parameters.get(COMMENT),
HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys()), parameters);
}
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
checkNotNull(table, "Table cannot be null");
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if (!table.getOptions().getOrDefault(CONNECTOR.key(), "").equalsIgnoreCase("hudi")) {
throw new HoodieCatalogException(String.format("The %s is not hoodie table", tablePath.getObjectName()));
}
if (table instanceof CatalogView) {
throw new HoodieCatalogException("CREATE VIEW is not supported.");
}
try {
boolean isMorTable = OptionsResolver.isMorTable(table.getOptions());
Table hiveTable = instantiateHiveTable(tablePath, table, inferTablePath(tablePath, table), isMorTable);
//create hive table
client.createTable(hiveTable);
//init hoodie metaClient
initTableIfNotExists(tablePath, (CatalogTable)table);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(getName(), tablePath, e);
}
} catch (Exception e) {
throw new HoodieCatalogException(
String.format("Failed to create table %s", tablePath.getFullName()), e);
}
}
private void initTableIfNotExists(ObjectPath tablePath, CatalogTable catalogTable) {
Configuration flinkConf = Configuration.fromMap(applyOptionsHook(catalogTable.getOptions()));
final String avroSchema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()).toString();
flinkConf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
// stores two copies of options:
// - partition keys
// - primary keys
// because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
// when calling #getTable.
if (catalogTable.getUnresolvedSchema().getPrimaryKey().isPresent()) {
final String pkColumns = String.join(",", catalogTable.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
String recordKey = flinkConf.get(FlinkOptions.RECORD_KEY_FIELD);
if (!Objects.equals(pkColumns, recordKey)) {
throw new HoodieCatalogException(String.format("%s and %s are the different", pkColumns, recordKey));
}
}
if (catalogTable.isPartitioned()) {
final String partitions = String.join(",", catalogTable.getPartitionKeys());
flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
}
if (!flinkConf.getOptional(PATH).isPresent()) {
flinkConf.setString(PATH, inferTablePath(tablePath, catalogTable));
}
flinkConf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
try {
StreamerUtil.initTableIfNotExists(flinkConf);
} catch (IOException e) {
throw new HoodieCatalogException("Initialize table exception.", e);
}
}
private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
String location = table.getOptions().getOrDefault(PATH.key(), "");
if (StringUtils.isNullOrEmpty(location)) {
try {
Path dbLocation = new Path(client.getDatabase(tablePath.getDatabaseName()).getLocationUri());
location = new Path(dbLocation, tablePath.getObjectName()).toString();
} catch (TException e) {
throw new HoodieCatalogException(String.format("Failed to infer hoodie table path for table %s", tablePath), e);
}
}
return location;
}
private Map<String, String> applyOptionsHook(Map<String, String> options) {
Map<String, String> properties = new HashMap<>(options);
if (!options.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
properties.put(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
}
if (!options.containsKey(FlinkOptions.PRECOMBINE_FIELD.key())) {
properties.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.PRECOMBINE_FIELD.defaultValue());
}
if (!options.containsKey(FlinkOptions.TABLE_TYPE.key())) {
properties.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE.defaultValue());
}
return properties;
}
private Table instantiateHiveTable(ObjectPath tablePath, CatalogBaseTable table, String location, boolean useRealTimeInputFormat) throws IOException {
// let Hive set default parameters for us, e.g. serialization.format
Table hiveTable =
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(
tablePath.getDatabaseName(), tablePath.getObjectName());
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
Map<String, String> properties = applyOptionsHook(table.getOptions());
if (Boolean.parseBoolean(table.getOptions().get(CatalogOptions.TABLE_EXTERNAL.key()))) {
hiveTable.setTableType(TableType.EXTERNAL_TABLE.toString());
properties.put("EXTERNAL", "TRUE");
}
// Table comment
if (table.getComment() != null) {
properties.put(COMMENT, table.getComment());
}
//set pk
if (table.getUnresolvedSchema().getPrimaryKey().isPresent()) {
String pkColumns = String.join(",", table.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
String recordKey = properties.getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.defaultValue());
if (!Objects.equals(pkColumns, recordKey)) {
throw new HoodieCatalogException(
String.format("Primary key [%s] and record key [%s] should be the the same.",
pkColumns,
recordKey));
}
properties.put(PK_CONSTRAINT_NAME, table.getUnresolvedSchema().getPrimaryKey().get().getConstraintName());
properties.put(PK_COLUMNS, pkColumns);
}
if (!properties.containsKey(FlinkOptions.PATH.key())) {
properties.put(FlinkOptions.PATH.key(), location);
}
//set sd
StorageDescriptor sd = new StorageDescriptor();
List<FieldSchema> allColumns = HiveSchemaUtils.createHiveColumns(table.getSchema());
// Table columns and partition keys
if (table instanceof CatalogTable) {
CatalogTable catalogTable = (CatalogTable) table;
if (catalogTable.isPartitioned()) {
int partitionKeySize = catalogTable.getPartitionKeys().size();
List<FieldSchema> regularColumns =
allColumns.subList(0, allColumns.size() - partitionKeySize);
List<FieldSchema> partitionColumns =
allColumns.subList(
allColumns.size() - partitionKeySize, allColumns.size());
sd.setCols(regularColumns);
hiveTable.setPartitionKeys(partitionColumns);
} else {
sd.setCols(allColumns);
hiveTable.setPartitionKeys(new ArrayList<>());
}
} else {
sd.setCols(allColumns);
}
HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
//ignore uber input Format
String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
String outputFormatClassName = HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
String serDeClassName = HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
sd.setInputFormat(inputFormatClassName);
sd.setOutputFormat(outputFormatClassName);
Map<String, String> serdeProperties = new HashMap<>();
serdeProperties.put("path", location);
serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat));
serdeProperties.put("serialization.format", "1");
serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark((CatalogTable)table, hiveConf, properties));
sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
sd.setLocation(location);
hiveTable.setSd(sd);
hiveTable.setParameters(properties);
return hiveTable;
}
@Override
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
checkArgument(
!isNullOrWhitespaceOnly(databaseName), "Database name cannot be null or empty");
try {
return client.getAllTables(databaseName);
} catch (UnknownDBException e) {
throw new DatabaseNotExistException(getName(), databaseName);
} catch (TException e) {
throw new HoodieCatalogException(
String.format("Failed to list tables in database %s", databaseName), e);
}
}
@Override
public List<String> listViews(String databaseName)
throws DatabaseNotExistException, CatalogException {
throw new HoodieCatalogException("Hoodie catalog does not support to listViews");
}
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
try {
return client.tableExists(tablePath.getDatabaseName(), tablePath.getObjectName());
} catch (UnknownDBException e) {
return false;
} catch (TException e) {
throw new CatalogException(
String.format(
"Failed to check whether table %s exists or not.",
tablePath.getFullName()),
e);
}
}
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
try {
client.dropTable(
tablePath.getDatabaseName(),
tablePath.getObjectName(),
// Indicate whether associated data should be deleted.
// Set to 'true' for now because Flink tables shouldn't have data in Hive. Can
// be changed later if necessary
true,
ignoreIfNotExists);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(getName(), tablePath);
}
} catch (TException e) {
throw new HoodieCatalogException(
String.format("Failed to drop table %s", tablePath.getFullName()), e);
}
}
@Override
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
checkArgument(
!isNullOrWhitespaceOnly(newTableName), "New table name cannot be null or empty");
try {
// alter_table() doesn't throw a clear exception when target table doesn't exist.
// Thus, check the table existence explicitly
if (tableExists(tablePath)) {
ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
// alter_table() doesn't throw a clear exception when new table already exists.
// Thus, check the table existence explicitly
if (tableExists(newPath)) {
throw new TableAlreadyExistException(getName(), newPath);
} else {
Table hiveTable = getHiveTable(tablePath);
//update hoodie
StorageDescriptor sd = hiveTable.getSd();
String location = sd.getLocation();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(location).setConf(hiveConf).build();
//Init table with new name
HoodieTableMetaClient.withPropertyBuilder().fromProperties(metaClient.getTableConfig().getProps())
.setTableName(newTableName)
.initTable(hiveConf, location);
hiveTable.setTableName(newTableName);
client.alter_table(
tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
}
} else if (!ignoreIfNotExists) {
throw new TableNotExistException(getName(), tablePath);
}
} catch (Exception e) {
throw new HoodieCatalogException(
String.format("Failed to rename table %s", tablePath.getFullName()), e);
}
}
private boolean sameOptions(Map<String, String> existingOptions, Map<String, String> newOptions, ConfigOption option) {
return existingOptions.getOrDefault(option.key(), String.valueOf(option.defaultValue()))
.equalsIgnoreCase(newOptions.getOrDefault(option.key(), String.valueOf(option.defaultValue())));
}
@Override
public void alterTable(
ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath, "Table path cannot be null");
checkNotNull(newCatalogTable, "New catalog table cannot be null");
if (!newCatalogTable.getOptions().getOrDefault(CONNECTOR.key(), "").equalsIgnoreCase("hudi")) {
throw new HoodieCatalogException(String.format("The %s is not hoodie table", tablePath.getObjectName()));
}
if (newCatalogTable instanceof CatalogView) {
throw new HoodieCatalogException("Hoodie catalog does not support to ALTER VIEW");
}
try {
Table hiveTable = getHiveTable(tablePath);
if (!sameOptions(hiveTable.getParameters(), newCatalogTable.getOptions(), FlinkOptions.TABLE_TYPE)
|| !sameOptions(hiveTable.getParameters(), newCatalogTable.getOptions(), FlinkOptions.INDEX_TYPE)) {
throw new HoodieCatalogException("Hoodie catalog does not support to alter table type and index type");
}
} catch (TableNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
}
return;
}
try {
boolean isMorTable = OptionsResolver.isMorTable(newCatalogTable.getOptions());
Table hiveTable = instantiateHiveTable(tablePath, newCatalogTable, inferTablePath(tablePath, newCatalogTable), isMorTable);
//alter hive table
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
} catch (Exception e) {
LOG.error("Failed to alter table {}", tablePath.getObjectName(), e);
throw new HoodieCatalogException(String.format("Failed to alter table %s", tablePath.getObjectName()), e);
}
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public List<CatalogPartitionSpec> listPartitions(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(
ObjectPath tablePath, List<Expression> expressions)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public void createPartition(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogPartition partition,
boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException,
CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public void dropPartition(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public void alterPartition(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogPartition newPartition,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public List<String> listFunctions(String databaseName)
throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
@Override
public CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException {
throw new FunctionNotExistException(getName(), functionPath);
}
@Override
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
return false;
}
@Override
public void createFunction(
ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public void alterFunction(
ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
return CatalogTableStatistics.UNKNOWN;
}
@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}
@Override
public CatalogTableStatistics getPartitionStatistics(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
return CatalogTableStatistics.UNKNOWN;
}
@Override
public CatalogColumnStatistics getPartitionColumnStatistics(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws PartitionNotExistException, CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}
@Override
public void alterTableStatistics(
ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public void alterTableColumnStatistics(
ObjectPath tablePath,
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException, TablePartitionedException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public void alterPartitionStatistics(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogTableStatistics partitionStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
@Override
public void alterPartitionColumnStatistics(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new HoodieCatalogException("Not supported.");
}
}

View File

@@ -19,13 +19,22 @@
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.avro.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +48,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
/**
@@ -47,6 +57,12 @@ import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER
public class TableOptionProperties {
private static final Logger LOG = LoggerFactory.getLogger(TableOptionProperties.class);
public static final String SPARK_SOURCE_PROVIDER = "spark.sql.sources.provider";
public static final String SPARK_VERSION = "spark.version";
public static final String DEFAULT_SPARK_VERSION = "spark2.4.4";
static final Map<String, String> VALUE_MAPPING = new HashMap<>();
static final Map<String, String> KEY_MAPPING = new HashMap<>();
private static final String FILE_NAME = "table_option.properties";
public static final String PK_CONSTRAINT_NAME = "pk.constraint.name";
@@ -56,6 +72,25 @@ public class TableOptionProperties {
public static final List<String> NON_OPTION_KEYS = Arrays.asList(PK_CONSTRAINT_NAME, PK_COLUMNS, COMMENT, PARTITION_COLUMNS);
static {
VALUE_MAPPING.put("mor", HoodieTableType.MERGE_ON_READ.name());
VALUE_MAPPING.put("cow", HoodieTableType.COPY_ON_WRITE.name());
VALUE_MAPPING.put(HoodieTableType.MERGE_ON_READ.name(), "mor");
VALUE_MAPPING.put(HoodieTableType.COPY_ON_WRITE.name(), "cow");
KEY_MAPPING.put("type", FlinkOptions.TABLE_TYPE.key());
KEY_MAPPING.put("primaryKey", FlinkOptions.RECORD_KEY_FIELD.key());
KEY_MAPPING.put("preCombineField", FlinkOptions.PRECOMBINE_FIELD.key());
KEY_MAPPING.put("payloadClass", FlinkOptions.PAYLOAD_CLASS_NAME.key());
KEY_MAPPING.put(SPARK_SOURCE_PROVIDER, CONNECTOR.key());
KEY_MAPPING.put(FlinkOptions.KEYGEN_CLASS_NAME.key(), FlinkOptions.KEYGEN_CLASS_NAME.key());
KEY_MAPPING.put(FlinkOptions.TABLE_TYPE.key(), "type");
KEY_MAPPING.put(FlinkOptions.RECORD_KEY_FIELD.key(), "primaryKey");
KEY_MAPPING.put(FlinkOptions.PRECOMBINE_FIELD.key(), "preCombineField");
KEY_MAPPING.put(FlinkOptions.PAYLOAD_CLASS_NAME.key(), "payloadClass");
}
/**
* Initialize the {@link #FILE_NAME} meta file.
*/
@@ -128,4 +163,33 @@ public class TableOptionProperties {
NON_OPTION_KEYS.forEach(copied::remove);
return copied;
}
public static Map<String, String> translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration hadoopConf, Map<String, String> properties) {
Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
catalogTable.getPartitionKeys(),
sparkVersion,
4000,
messageType);
properties.putAll(sparkTableProperties);
return properties.entrySet().stream()
.filter(e -> KEY_MAPPING.containsKey(e.getKey()) && !catalogTable.getOptions().containsKey(KEY_MAPPING.get(e.getKey())))
.collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()),
e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
}
public static Map<String, String> translateSparkTableProperties2Flink(Map<String, String> options) {
if (options.containsKey(CONNECTOR.key())) {
return options;
}
return options.entrySet().stream().filter(e -> KEY_MAPPING.containsKey(e.getKey()))
.collect(Collectors.toMap(e -> KEY_MAPPING.get(e.getKey()),
e -> e.getKey().equalsIgnoreCase("type") ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
}
public static Map<String, String> translateSparkTableProperties2Flink(Table hiveTable) {
return translateSparkTableProperties2Flink(hiveTable.getParameters());
}
}

View File

@@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.NullType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Create a TypeInfoLogicalTypeVisitor for hoodie table.
*/
public class TypeInfoLogicalTypeVisitor extends LogicalTypeDefaultVisitor<TypeInfo> {
private final LogicalType type;
// whether to check type precision
private final boolean checkPrecision;
TypeInfoLogicalTypeVisitor(DataType dataType, boolean checkPrecision) {
this(dataType.getLogicalType(), checkPrecision);
}
TypeInfoLogicalTypeVisitor(LogicalType type, boolean checkPrecision) {
this.type = type;
this.checkPrecision = checkPrecision;
}
@Override
public TypeInfo visit(CharType charType) {
// Flink and Hive have different length limit for CHAR. Promote it to STRING if it
// exceeds the limits of
// Hive and we're told not to check precision. This can be useful when calling Hive UDF
// to process data.
if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH || charType.getLength() < 1) {
if (checkPrecision) {
throw new HoodieCatalogException(
String.format(
"HiveCatalog doesn't support char type with length of '%d'. "
+ "The supported length is [%d, %d]",
charType.getLength(), 1, HiveChar.MAX_CHAR_LENGTH));
} else {
return TypeInfoFactory.stringTypeInfo;
}
}
return TypeInfoFactory.getCharTypeInfo(charType.getLength());
}
@Override
public TypeInfo visit(VarCharType varCharType) {
// Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE)
// We don't have more information in LogicalTypeRoot to distinguish StringType and a
// VARCHAR(Integer.MAX_VALUE) instance
// Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType
if (varCharType.getLength() == Integer.MAX_VALUE) {
return TypeInfoFactory.stringTypeInfo;
}
// Flink and Hive have different length limit for VARCHAR. Promote it to STRING if it
// exceeds the limits of
// Hive and we're told not to check precision. This can be useful when calling Hive UDF
// to process data.
if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH
|| varCharType.getLength() < 1) {
if (checkPrecision) {
throw new HoodieCatalogException(
String.format(
"HiveCatalog doesn't support varchar type with length of '%d'. "
+ "The supported length is [%d, %d]",
varCharType.getLength(), 1, HiveVarchar.MAX_VARCHAR_LENGTH));
} else {
return TypeInfoFactory.stringTypeInfo;
}
}
return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength());
}
@Override
public TypeInfo visit(BooleanType booleanType) {
return TypeInfoFactory.booleanTypeInfo;
}
@Override
public TypeInfo visit(VarBinaryType varBinaryType) {
// Flink's BytesType is defined as VARBINARY(Integer.MAX_VALUE)
// We don't have more information in LogicalTypeRoot to distinguish BytesType and a
// VARBINARY(Integer.MAX_VALUE) instance
// Thus always treat VARBINARY(Integer.MAX_VALUE) as BytesType
if (varBinaryType.getLength() == VarBinaryType.MAX_LENGTH) {
return TypeInfoFactory.binaryTypeInfo;
}
return defaultMethod(varBinaryType);
}
@Override
public TypeInfo visit(DecimalType decimalType) {
// Flink and Hive share the same precision and scale range
// Flink already validates the type so we don't need to validate again here
return TypeInfoFactory.getDecimalTypeInfo(
decimalType.getPrecision(), decimalType.getScale());
}
@Override
public TypeInfo visit(TinyIntType tinyIntType) {
return TypeInfoFactory.byteTypeInfo;
}
@Override
public TypeInfo visit(SmallIntType smallIntType) {
return TypeInfoFactory.shortTypeInfo;
}
@Override
public TypeInfo visit(IntType intType) {
return TypeInfoFactory.intTypeInfo;
}
@Override
public TypeInfo visit(BigIntType bigIntType) {
return TypeInfoFactory.longTypeInfo;
}
@Override
public TypeInfo visit(FloatType floatType) {
return TypeInfoFactory.floatTypeInfo;
}
@Override
public TypeInfo visit(DoubleType doubleType) {
return TypeInfoFactory.doubleTypeInfo;
}
@Override
public TypeInfo visit(DateType dateType) {
return TypeInfoFactory.dateTypeInfo;
}
@Override
public TypeInfo visit(TimestampType timestampType) {
if (checkPrecision && timestampType.getPrecision() == 9) {
throw new HoodieCatalogException(
"HoodieCatalog currently does not support timestamp of precision 9");
}
return TypeInfoFactory.timestampTypeInfo;
}
@Override
public TypeInfo visit(ArrayType arrayType) {
LogicalType elementType = arrayType.getElementType();
TypeInfo elementTypeInfo = elementType.accept(this);
if (null != elementTypeInfo) {
return TypeInfoFactory.getListTypeInfo(elementTypeInfo);
} else {
return defaultMethod(arrayType);
}
}
@Override
public TypeInfo visit(MapType mapType) {
LogicalType keyType = mapType.getKeyType();
LogicalType valueType = mapType.getValueType();
TypeInfo keyTypeInfo = keyType.accept(this);
TypeInfo valueTypeInfo = valueType.accept(this);
if (null == keyTypeInfo || null == valueTypeInfo) {
return defaultMethod(mapType);
} else {
return TypeInfoFactory.getMapTypeInfo(keyTypeInfo, valueTypeInfo);
}
}
@Override
public TypeInfo visit(RowType rowType) {
List<String> names = rowType.getFieldNames();
List<TypeInfo> typeInfos = new ArrayList<>(names.size());
for (String name : names) {
TypeInfo typeInfo = rowType.getTypeAt(rowType.getFieldIndex(name)).accept(this);
if (null != typeInfo) {
typeInfos.add(typeInfo);
} else {
return defaultMethod(rowType);
}
}
return TypeInfoFactory.getStructTypeInfo(names, typeInfos);
}
@Override
public TypeInfo visit(NullType nullType) {
return TypeInfoFactory.voidTypeInfo;
}
@Override
protected TypeInfo defaultMethod(LogicalType logicalType) {
throw new UnsupportedOperationException(
String.format(
"Flink doesn't support converting type %s to Hive type yet.",
type.toString()));
}
}

View File

@@ -442,4 +442,8 @@ public class FilePathUtils {
}
return conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS).split(",");
}
public static boolean isHiveStylePartitioning(String path) {
return HIVE_PARTITION_NAME_PATTERN.matcher(path).matches();
}
}

View File

@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -515,6 +516,20 @@ public class StreamerUtil {
return schemaUtil.getTableAvroSchema(includeMetadataFields);
}
public static Schema getLatestTableSchema(String path, org.apache.hadoop.conf.Configuration hadoopConf) {
if (StringUtils.isNullOrEmpty(path) || !StreamerUtil.tableExists(path, hadoopConf)) {
return null;
}
try {
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf);
return getTableAvroSchema(metaClient, false);
} catch (Exception e) {
LOG.warn("Error while resolving the latest table schema", e);
}
return null;
}
public static boolean fileExists(FileSystem fs, Path path) {
try {
return fs.exists(path);

View File

@@ -23,6 +23,8 @@ import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -1321,6 +1323,46 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
@Test
void testBuiltinFunctionWithHMSCatalog() {
TableEnvironment tableEnv = batchTableEnv;
HoodieHiveCatalog hoodieCatalog = HoodieCatalogTestUtils.createHiveCatalog("hudi_catalog");
tableEnv.registerCatalog("hudi_catalog", hoodieCatalog);
tableEnv.executeSql("use catalog hudi_catalog");
String dbName = "hudi";
tableEnv.executeSql("create database " + dbName);
tableEnv.executeSql("use " + dbName);
String hoodieTableDDL = sql("t1")
.field("f_int int")
.field("f_date DATE")
.field("f_par string")
.pkField("f_int")
.partitionField("f_par")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/" + dbName + "/" + "t1")
.option(FlinkOptions.RECORD_KEY_FIELD, "f_int")
.option(FlinkOptions.PRECOMBINE_FIELD, "f_date")
.end();
tableEnv.executeSql(hoodieTableDDL);
String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02'), '1'), (2, DATE '2022-02-02', '1')";
execInsertSql(tableEnv, insertSql);
List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "+I[1, 2022-02-02, 1], "
+ "+I[2, 2022-02-02, 1]]";
assertRowsEquals(result, expected);
List<Row> partitionResult = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where f_int = 1").execute().collect());
assertRowsEquals(partitionResult, "[+I[1, 2022-02-02, 1]]");
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.hadoop.hive.conf.HiveConf;
import java.io.IOException;
/** Test utils for Hoodie catalog. */
public class HoodieCatalogTestUtils {
private static final String HIVE_WAREHOUSE_URI_FORMAT =
"jdbc:derby:;databaseName=%s;create=true";
private static final String TEST_CATALOG_NAME = "test_catalog";
private static final org.junit.rules.TemporaryFolder TEMPORARY_FOLDER = new org.junit.rules.TemporaryFolder();
/** Create a HiveCatalog with an embedded Hive Metastore. */
public static HoodieHiveCatalog createHiveCatalog() {
return createHiveCatalog(TEST_CATALOG_NAME);
}
public static HoodieHiveCatalog createHiveCatalog(String name) {
return new HoodieHiveCatalog(
name,
null,
createHiveConf(),
true);
}
public static HiveConf createHiveConf() {
ClassLoader classLoader = HoodieCatalogTestUtils.class.getClassLoader();
try {
TEMPORARY_FOLDER.create();
String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);
HiveConf.setHiveSiteLocation(classLoader.getResource(CatalogOptions.HIVE_SITE_FILE));
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(
HiveConf.ConfVars.METASTOREWAREHOUSE,
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
return hiveConf;
} catch (IOException e) {
throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
}
}
}

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Test cases for {@link HoodieCatalogFactory}.
*/
public class TestHoodieCatalogFactory {
private static final URL CONF_DIR =
Thread.currentThread().getContextClassLoader().getResource("test-catalog-factory-conf");
@TempDir
File tempFile;
@Test
public void testCreateHMSCatalog() {
final String catalogName = "mycatalog";
final HoodieHiveCatalog expectedCatalog = HoodieCatalogTestUtils.createHiveCatalog(catalogName);
final Map<String, String> options = new HashMap<>();
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactory.IDENTIFIER);
options.put(CatalogOptions.HIVE_CONF_DIR.key(), CONF_DIR.getPath());
options.put(CatalogOptions.MODE.key(), "hms");
final Catalog actualCatalog =
FactoryUtil.createCatalog(
catalogName, options, null, Thread.currentThread().getContextClassLoader());
assertEquals(
((HoodieHiveCatalog) actualCatalog)
.getHiveConf()
.getVar(HiveConf.ConfVars.METASTOREURIS), "dummy-hms");
checkEquals(expectedCatalog, (HoodieHiveCatalog) actualCatalog);
}
@Test
public void testCreateDFSCatalog() {
final String catalogName = "mycatalog";
Map<String, String> catalogOptions = new HashMap<>();
catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
catalogOptions.put(DEFAULT_DATABASE.key(), "test_db");
HoodieCatalog expectedCatalog = new HoodieCatalog(catalogName, Configuration.fromMap(catalogOptions));
final Map<String, String> options = new HashMap<>();
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), HoodieCatalogFactory.IDENTIFIER);
options.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
options.put(DEFAULT_DATABASE.key(), "test_db");
options.put(CatalogOptions.MODE.key(), "dfs");
final Catalog actualCatalog =
FactoryUtil.createCatalog(
catalogName, options, null, Thread.currentThread().getContextClassLoader());
checkEquals(expectedCatalog, (AbstractCatalog)actualCatalog);
}
private static void checkEquals(AbstractCatalog c1, AbstractCatalog c2) {
// Only assert a few selected properties for now
assertEquals(c2.getName(), c1.getName());
assertEquals(c2.getDefaultDatabase(), c1.getDefaultDatabase());
}
}

View File

@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hudi.util.StreamerUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link HoodieHiveCatalog}.
*/
public class TestHoodieHiveCatalog {
TableSchema schema =
TableSchema.builder()
.field("uuid", DataTypes.INT().notNull())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.field("ts", DataTypes.BIGINT())
.field("par1", DataTypes.STRING())
.primaryKey("uuid")
.build();
List<String> partitions = Collections.singletonList("par1");
private static HoodieHiveCatalog hoodieCatalog;
private final ObjectPath tablePath = new ObjectPath("default", "test");
@BeforeAll
public static void createCatalog() {
hoodieCatalog = HoodieCatalogTestUtils.createHiveCatalog();
hoodieCatalog.open();
}
@AfterEach
public void dropTable() throws TableNotExistException {
hoodieCatalog.dropTable(tablePath, true);
}
@AfterAll
public static void closeCatalog() {
if (hoodieCatalog != null) {
hoodieCatalog.close();
}
}
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Exception {
Map<String, String> originOptions = new HashMap<>();
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
originOptions.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString());
CatalogTable table =
new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
hoodieCatalog.createTable(tablePath, table, false);
CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), tableType.toString());
assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), "uuid");
assertEquals(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), "ts");
assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(), Collections.singletonList("uuid"));
assertEquals(((CatalogTable)table1).getPartitionKeys(), Collections.singletonList("par1"));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateExternalTable(boolean isExternal) throws TableAlreadyExistException, DatabaseNotExistException, TableNotExistException, IOException {
Map<String, String> originOptions = new HashMap<>();
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
originOptions.put(CatalogOptions.TABLE_EXTERNAL.key(), String.valueOf(isExternal));
CatalogTable table =
new CatalogTableImpl(schema, originOptions, "hudi table");
hoodieCatalog.createTable(tablePath, table, false);
Table table1 = hoodieCatalog.getHiveTable(tablePath);
if (isExternal) {
assertTrue(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
assertEquals("EXTERNAL_TABLE", table1.getTableType());
} else {
assertFalse(Boolean.parseBoolean(table1.getParameters().get(CatalogOptions.TABLE_EXTERNAL.key())));
assertEquals("MANAGED_TABLE", table1.getTableType());
}
hoodieCatalog.dropTable(tablePath, false);
Path path = new Path(table1.getParameters().get(FlinkOptions.PATH.key()));
boolean exists = StreamerUtil.fileExists(FileSystem.getLocal(new Configuration()), path);
assertTrue(isExternal && exists || !isExternal && !exists);
}
@Test
public void testCreateNonHoodieTable() throws TableAlreadyExistException, DatabaseNotExistException {
CatalogTable table =
new CatalogTableImpl(schema, Collections.emptyMap(), "hudi table");
try {
hoodieCatalog.createTable(tablePath, table, false);
} catch (HoodieCatalogException e) {
assertEquals(String.format("The %s is not hoodie table", tablePath.getObjectName()), e.getMessage());
}
}
@Test
public void testAlterTable() throws Exception {
Map<String, String> originOptions = new HashMap<>();
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
CatalogTable originTable =
new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
hoodieCatalog.createTable(tablePath, originTable, false);
Table hiveTable = hoodieCatalog.getHiveTable(tablePath);
Map<String, String> newOptions = hiveTable.getParameters();
newOptions.put("k", "v");
CatalogTable newTable = new CatalogTableImpl(schema, partitions, newOptions, "alter hudi table");
hoodieCatalog.alterTable(tablePath, newTable, false);
hiveTable = hoodieCatalog.getHiveTable(tablePath);
assertEquals(hiveTable.getParameters().get(CONNECTOR.key()), "hudi");
assertEquals(hiveTable.getParameters().get("k"), "v");
}
@Test
public void testRenameTable() throws Exception {
Map<String, String> originOptions = new HashMap<>();
originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
CatalogTable originTable =
new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
hoodieCatalog.createTable(tablePath, originTable, false);
hoodieCatalog.renameTable(tablePath, "test1", false);
assertEquals(hoodieCatalog.getHiveTable(new ObjectPath("default", "test1")).getTableName(), "test1");
hoodieCatalog.renameTable(new ObjectPath("default", "test1"), "test", false);
}
}

View File

@@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateTables</name>
<value>true</value>
</property>
</configuration>

View File

@@ -0,0 +1,26 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<!-- production code doesn't allow embedded mode, set a dummy value here which is OK as long as the created HiveCatalog is not used -->
<property>
<name>hive.metastore.uris</name>
<value>dummy-hms</value>
</property>
</configuration>