1
0

[HUDI-2877] Support flink catalog to help user use flink table conveniently (#4153)

* [HUDI-2877] Support flink catalog to help user use flink table conveniently

* Fix comment

* fix comment2
This commit is contained in:
Ron
2021-12-05 10:14:29 +08:00
committed by GitHub
parent 36b69d8033
commit a8fb69656f
7 changed files with 1027 additions and 0 deletions

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import java.util.Map;
/**
* Hoodie catalog options.
*/
public class CatalogOptions {
public static final ConfigOption<String> CATALOG_PATH =
ConfigOptions.key("catalog.path")
.stringType()
.noDefaultValue()
.withDescription("Catalog base DFS path, used for inferring the sink table path. "
+ "The default strategy for a table path is: ${catalog.path}/${db_name}/${table_name}");
public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
.stringType()
.defaultValue("default");
/**
* Returns all the common table options that can be shared.
*
* @param catalogOptions The catalog options
*/
public static Map<String, String> tableCommonOptions(Configuration catalogOptions) {
Configuration copied = new Configuration(catalogOptions);
copied.removeConfig(DEFAULT_DATABASE);
copied.removeConfig(CATALOG_PATH);
return copied.toMap();
}
}

View File

@@ -0,0 +1,508 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
/**
* Catalog that can set up common options for underneath table.
*/
public class HoodieCatalog extends AbstractCatalog {
private static final Logger LOG = LoggerFactory.getLogger(HoodieCatalog.class);
private final org.apache.hadoop.conf.Configuration hadoopConf;
private final String catalogPathStr;
private final Map<String, String> tableCommonOptions;
private Path catalogPath;
private FileSystem fs;
public HoodieCatalog(String name, Configuration options) {
super(name, options.get(DEFAULT_DATABASE));
this.catalogPathStr = options.get(CATALOG_PATH);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.tableCommonOptions = CatalogOptions.tableCommonOptions(options);
}
@Override
public void open() throws CatalogException {
fs = FSUtils.getFs(catalogPathStr, hadoopConf);
catalogPath = new Path(catalogPathStr);
try {
if (!fs.exists(catalogPath)) {
throw new CatalogException(String.format("Catalog %s path %s does not exist.", getName(), catalogPathStr));
}
} catch (IOException e) {
throw new CatalogException(String.format("Checking catalog path %s exists exception.", catalogPathStr), e);
}
}
@Override
public void close() throws CatalogException {
try {
fs.close();
} catch (IOException e) {
throw new CatalogException("Closing FileSystem exception.", e);
}
}
// ------ databases ------
@Override
public List<String> listDatabases() throws CatalogException {
try {
FileStatus[] fileStatuses = fs.listStatus(catalogPath);
return Arrays.stream(fileStatuses)
.filter(FileStatus::isDirectory)
.map(fileStatus -> fileStatus.getPath().getName())
.collect(Collectors.toList());
} catch (IOException e) {
throw new CatalogException("Listing database exception.", e);
}
}
@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
if (databaseExists(databaseName)) {
return new CatalogDatabaseImpl(Collections.emptyMap(), null);
} else {
throw new DatabaseNotExistException(getName(), databaseName);
}
}
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
return listDatabases().contains(databaseName);
}
@Override
public void createDatabase(String databaseName, CatalogDatabase catalogDatabase, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
if (databaseExists(databaseName)) {
if (ignoreIfExists) {
return;
} else {
throw new DatabaseAlreadyExistException(getName(), databaseName);
}
}
if (!CollectionUtil.isNullOrEmpty(catalogDatabase.getProperties())) {
throw new CatalogException("Hudi catalog doesn't support to create database with options.");
}
Path dbPath = new Path(catalogPath, databaseName);
try {
fs.mkdirs(dbPath);
} catch (IOException e) {
throw new CatalogException(String.format("Creating database %s exception.", databaseName), e);
}
}
@Override
public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
if (!databaseExists(databaseName)) {
if (ignoreIfNotExists) {
return;
} else {
throw new DatabaseNotExistException(getName(), databaseName);
}
}
List<String> tables = listTables(databaseName);
if (!tables.isEmpty() && !cascade) {
throw new DatabaseNotEmptyException(getName(), databaseName);
}
if (databaseName.equals(getDefaultDatabase())) {
throw new IllegalArgumentException(
"Hudi catalog doesn't support to drop the default database.");
}
Path dbPath = new Path(catalogPath, databaseName);
try {
fs.delete(dbPath, true);
} catch (IOException e) {
throw new CatalogException(String.format("Dropping database %s exception.", databaseName), e);
}
}
@Override
public void alterDatabase(String databaseName, CatalogDatabase catalogDatabase, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException("Altering database is not implemented.");
}
// ------ tables ------
@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(getName(), databaseName);
}
Path dbPath = new Path(catalogPath, databaseName);
try {
return Arrays.stream(fs.listStatus(dbPath))
.filter(FileStatus::isDirectory)
.map(fileStatus -> fileStatus.getPath().getName())
.collect(Collectors.toList());
} catch (IOException e) {
throw new CatalogException(String.format("Listing table in database %s exception.", dbPath), e);
}
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
final String path = inferTablePath(catalogPathStr, tablePath);
Map<String, String> options = TableOptionProperties.loadFromProperties(path, hadoopConf);
final Schema latestSchema = getLatestTableSchema(path);
if (latestSchema != null) {
org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestSchema));
final String pkConstraintName = TableOptionProperties.getPkConstraintName(options);
if (pkConstraintName != null) {
builder.primaryKeyNamed(pkConstraintName, TableOptionProperties.getPkColumns(options));
}
final org.apache.flink.table.api.Schema schema = builder.build();
return CatalogTable.of(
schema,
TableOptionProperties.getComment(options),
TableOptionProperties.getPartitionColumns(options),
TableOptionProperties.getTableOptions(options));
} else {
throw new TableNotExistException(getName(), tablePath);
}
}
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable catalogTable, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if (tableExists(tablePath)) {
if (ignoreIfExists) {
return;
} else {
throw new TableAlreadyExistException(getName(), tablePath);
}
}
if (catalogTable instanceof CatalogView) {
throw new UnsupportedOperationException(
"Hudi catalog doesn't support to CREATE VIEW.");
}
ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) catalogTable;
final String tablePathStr = inferTablePath(catalogPathStr, tablePath);
Map<String, String> options = applyOptionsHook(tablePathStr, catalogTable.getOptions());
Configuration conf = Configuration.fromMap(options);
conf.setString(FlinkOptions.PATH, tablePathStr);
ResolvedSchema resolvedSchema = resolvedTable.getResolvedSchema();
if (!resolvedSchema.getPrimaryKey().isPresent()) {
throw new CatalogException("Primary key definition is missing");
}
final String avroSchema = AvroSchemaConverter.convertToSchema(resolvedSchema.toPhysicalRowDataType().getLogicalType()).toString();
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
// stores two copies of options:
// - partition keys
// - primary keys
// because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it
// when calling #getTable.
final String pkColumns = String.join(",", resolvedSchema.getPrimaryKey().get().getColumns());
conf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
options.put(TableOptionProperties.PK_CONSTRAINT_NAME, resolvedSchema.getPrimaryKey().get().getName());
options.put(TableOptionProperties.PK_COLUMNS, pkColumns);
if (resolvedTable.isPartitioned()) {
final String partitions = String.join(",", resolvedTable.getPartitionKeys());
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
}
conf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
try {
StreamerUtil.initTableIfNotExists(conf);
// prepare the non-table-options properties
options.put(TableOptionProperties.COMMENT, resolvedTable.getComment());
TableOptionProperties.createProperties(tablePathStr, hadoopConf, options);
} catch (IOException e) {
throw new CatalogException(String.format("Initialize table path %s exception.", tablePathStr), e);
}
}
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
return StreamerUtil.tableExists(inferTablePath(catalogPathStr, tablePath), hadoopConf);
}
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
if (!tableExists(tablePath)) {
if (ignoreIfNotExists) {
return;
} else {
throw new TableNotExistException(getName(), tablePath);
}
}
Path path = new Path(inferTablePath(catalogPathStr, tablePath));
try {
this.fs.delete(path, true);
} catch (IOException e) {
throw new CatalogException(String.format("Dropping table %s exception.", tablePath), e);
}
}
@Override
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException, CatalogException {
throw new UnsupportedOperationException("renameTable is not implemented.");
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable catalogBaseTable, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException("alterTable is not implemented.");
}
@Override
public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
return Collections.emptyList();
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
return Collections.emptyList();
}
@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
return Collections.emptyList();
}
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec)
throws PartitionNotExistException, CatalogException {
return null;
}
@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) throws CatalogException {
return false;
}
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
throw new UnsupportedOperationException("createPartition is not implemented.");
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException("dropPartition is not implemented.");
}
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException("alterPartition is not implemented.");
}
@Override
public List<String> listFunctions(String databaseName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
@Override
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
return null;
}
@Override
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
return false;
}
@Override
public void createFunction(ObjectPath functionPath, CatalogFunction catalogFunction, boolean ignoreIfExists)
throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException("createFunction is not implemented.");
}
@Override
public void alterFunction(ObjectPath functionPath, CatalogFunction catalogFunction, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
throw new UnsupportedOperationException("alterFunction is not implemented.");
}
@Override
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
throws FunctionNotExistException, CatalogException {
throw new UnsupportedOperationException("dropFunction is not implemented.");
}
@Override
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
return CatalogTableStatistics.UNKNOWN;
}
@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}
@Override
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec)
throws PartitionNotExistException, CatalogException {
return CatalogTableStatistics.UNKNOWN;
}
@Override
public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec)
throws PartitionNotExistException, CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}
@Override
public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics catalogTableStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException("alterTableStatistics is not implemented.");
}
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics catalogColumnStatistics, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException, TablePartitionedException {
throw new UnsupportedOperationException("alterTableColumnStatistics is not implemented.");
}
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, CatalogTableStatistics catalogTableStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException("alterPartitionStatistics is not implemented.");
}
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, CatalogColumnStatistics catalogColumnStatistics, boolean ignoreIfNotExists)
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException("alterPartitionColumnStatistics is not implemented.");
}
private @Nullable Schema getLatestTableSchema(String path) {
if (path != null && StreamerUtil.tableExists(path, hadoopConf)) {
try {
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf);
return new TableSchemaResolver(metaClient).getTableAvroSchema(false); // change log mode is not supported now
} catch (Throwable throwable) {
LOG.warn("Error while resolving the latest table schema.", throwable);
// ignored
}
}
return null;
}
private Map<String, String> applyOptionsHook(String tablePath, Map<String, String> options) {
Map<String, String> newOptions = new HashMap<>(options);
newOptions.put("connector", "hudi");
newOptions.computeIfAbsent(FlinkOptions.PATH.key(), k -> tablePath);
tableCommonOptions.forEach(newOptions::putIfAbsent);
return newOptions;
}
private String inferTablePath(String catalogPath, ObjectPath tablePath) {
return String.format("%s/%s/%s", catalogPath, tablePath.getDatabaseName(), tablePath.getObjectName());
}
}

View File

@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
/**
* A catalog factory impl that creates {@link HoodieCatalog}.
*/
public class HoodieCatalogFactory implements CatalogFactory {
private static final Logger LOG = LoggerFactory.getLogger(HoodieCatalogFactory.class);
public static final String IDENTIFIER = "hudi";
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validate();
return new HoodieCatalog(
context.getName(),
(Configuration) helper.getOptions());
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(CATALOG_PATH);
options.add(DEFAULT_DATABASE);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}

View File

@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
/**
* Helper class to read/write flink table options as a map.
*/
public class TableOptionProperties {
private static final Logger LOG = LoggerFactory.getLogger(TableOptionProperties.class);
private static final String FILE_NAME = "table_option.properties";
public static final String PK_CONSTRAINT_NAME = "pk.constraint.name";
public static final String PK_COLUMNS = "pk.columns";
public static final String COMMENT = "comment";
public static final String PARTITION_COLUMNS = "partition.columns";
public static final List<String> NON_OPTION_KEYS = Arrays.asList(PK_CONSTRAINT_NAME, PK_COLUMNS, COMMENT, PARTITION_COLUMNS);
/**
* Initialize the {@link #FILE_NAME} meta file.
*/
public static void createProperties(String basePath,
Configuration hadoopConf,
Map<String, String> options) throws IOException {
Path propertiesFilePath = getPropertiesFilePath(basePath);
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
try (FSDataOutputStream outputStream = fs.create(propertiesFilePath)) {
Properties properties = new Properties();
properties.putAll(options);
properties.store(outputStream,
"Table option properties saved on " + new Date(System.currentTimeMillis()));
}
LOG.info(String.format("Create file %s success.", propertiesFilePath));
}
/**
* Read table options map from the given table base path.
*/
public static Map<String, String> loadFromProperties(String basePath, Configuration hadoopConf) {
Path propertiesFilePath = getPropertiesFilePath(basePath);
Map<String, String> options = new HashMap<>();
Properties props = new Properties();
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
try (FSDataInputStream inputStream = fs.open(propertiesFilePath)) {
props.load(inputStream);
for (final String name : props.stringPropertyNames()) {
options.put(name, props.getProperty(name));
}
} catch (IOException e) {
throw new HoodieIOException(String.format("Could not load table option properties from %s", propertiesFilePath), e);
}
LOG.info(String.format("Loading table option properties from %s success.", propertiesFilePath));
return options;
}
private static Path getPropertiesFilePath(String basePath) {
String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME;
return new Path(auxPath, FILE_NAME);
}
public static String getPkConstraintName(Map<String, String> options) {
return options.get(PK_CONSTRAINT_NAME);
}
public static List<String> getPkColumns(Map<String, String> options) {
if (options.containsKey(PK_COLUMNS)) {
return Arrays.stream(options.get(PK_COLUMNS).split(",")).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
public static List<String> getPartitionColumns(Map<String, String> options) {
if (options.containsKey(PARTITION_COLUMNS)) {
return Arrays.stream(options.get(PARTITION_COLUMNS).split(",")).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
public static String getComment(Map<String, String> options) {
return options.get(COMMENT);
}
public static Map<String, String> getTableOptions(Map<String, String> options) {
Map<String, String> copied = new HashMap<>(options);
NON_OPTION_KEYS.forEach(copied::remove);
return copied;
}
}

View File

@@ -257,6 +257,7 @@ public class StreamerUtil {
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
if (!tableExists(basePath, hadoopConf)) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA))
.setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
.setTableName(conf.getString(FlinkOptions.TABLE_NAME))
.setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null))

View File

@@ -15,3 +15,4 @@
# limitations under the License.
org.apache.hudi.table.HoodieTableFactory
org.apache.hudi.table.catalog.HoodieCatalogFactory

View File

@@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.catalog;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link HoodieCatalog}.
*/
public class TestHoodieCatalog {
private static final String TEST_DEFAULT_DATABASE = "test_db";
private static final String NONE_EXIST_DATABASE = "none_exist_database";
private static final List<Column> CREATE_COLUMNS = Arrays.asList(
Column.physical("uuid", DataTypes.VARCHAR(20)),
Column.physical("name", DataTypes.VARCHAR(20)),
Column.physical("age", DataTypes.INT()),
Column.physical("tss", DataTypes.TIMESTAMP(3)),
Column.physical("partition", DataTypes.VARCHAR(10))
);
private static final UniqueConstraint CONSTRAINTS = UniqueConstraint.primaryKey("uuid", Arrays.asList("uuid"));
private static final ResolvedSchema CREATE_TABLE_SCHEMA =
new ResolvedSchema(
CREATE_COLUMNS,
Collections.emptyList(),
CONSTRAINTS);
private static final List<Column> EXPECTED_TABLE_COLUMNS =
CREATE_COLUMNS.stream()
.map(
col -> {
// Flink char/varchar is transform to string in avro.
if (col.getDataType()
.getLogicalType()
.getTypeRoot()
.equals(LogicalTypeRoot.VARCHAR)) {
return Column.physical(col.getName(), DataTypes.STRING());
} else {
return col;
}
})
.collect(Collectors.toList());
private static final ResolvedSchema EXPECTED_TABLE_SCHEMA =
new ResolvedSchema(EXPECTED_TABLE_COLUMNS, Collections.emptyList(), CONSTRAINTS);
private static final Map<String, String> EXPECTED_OPTIONS = new HashMap<>();
static {
EXPECTED_OPTIONS.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
EXPECTED_OPTIONS.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false");
EXPECTED_OPTIONS.put(FlinkOptions.PRE_COMBINE.key(), "true");
}
private static final ResolvedCatalogTable EXPECTED_CATALOG_TABLE = new ResolvedCatalogTable(
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(CREATE_TABLE_SCHEMA).build(),
"test",
Arrays.asList("partition"),
EXPECTED_OPTIONS),
CREATE_TABLE_SCHEMA
);
private TableEnvironment streamTableEnv;
private HoodieCatalog catalog;
@TempDir
File tempFile;
@BeforeEach
void beforeEach() {
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
streamTableEnv = TableEnvironmentImpl.create(settings);
streamTableEnv.getConfig().getConfiguration()
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
File testDb = new File(tempFile, TEST_DEFAULT_DATABASE);
testDb.mkdir();
Map<String, String> catalogOptions = new HashMap<>();
catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE);
catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions));
catalog.open();
}
@Test
public void testListDatabases() {
List<String> actual = catalog.listDatabases();
assertTrue(actual.contains(TEST_DEFAULT_DATABASE));
assertFalse(actual.contains(NONE_EXIST_DATABASE));
}
@Test
public void testDatabaseExists() {
assertTrue(catalog.databaseExists(TEST_DEFAULT_DATABASE));
assertFalse(catalog.databaseExists(NONE_EXIST_DATABASE));
}
@Test
public void testCreateAndDropDatabase() throws Exception {
CatalogDatabase expected = new CatalogDatabaseImpl(Collections.emptyMap(), null);
catalog.createDatabase("db1", expected, true);
CatalogDatabase actual = catalog.getDatabase("db1");
assertTrue(catalog.listDatabases().contains("db1"));
assertEquals(expected.getProperties(), actual.getProperties());
// drop exist database
catalog.dropDatabase("db1", true);
assertFalse(catalog.listDatabases().contains("db1"));
// drop non-exist database
assertThrows(DatabaseNotExistException.class,
() -> catalog.dropDatabase(NONE_EXIST_DATABASE, false));
}
@Test
public void testCreateDatabaseWithOptions() {
Map<String, String> options = new HashMap<>();
options.put("k1", "v1");
options.put("k2", "v2");
assertThrows(
CatalogException.class,
() -> catalog.createDatabase("db1", new CatalogDatabaseImpl(options, null), true));
}
@Test
public void testCreateTable() throws Exception {
ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
// test create table
catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
// test table exist
assertTrue(catalog.tableExists(tablePath));
// test create exist table
assertThrows(TableAlreadyExistException.class,
() -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, false));
}
@Test
public void testListTable() throws Exception {
ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
ObjectPath tablePath2 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2");
// create table
catalog.createTable(tablePath1, EXPECTED_CATALOG_TABLE, true);
catalog.createTable(tablePath2, EXPECTED_CATALOG_TABLE, true);
// test list table
List<String> tables = catalog.listTables(TEST_DEFAULT_DATABASE);
assertTrue(tables.contains(tablePath1.getObjectName()));
assertTrue(tables.contains(tablePath2.getObjectName()));
// test list non-exist database table
assertThrows(DatabaseNotExistException.class,
() -> catalog.listTables(NONE_EXIST_DATABASE));
}
@Test
public void testGetTable() throws Exception {
ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
// create table
catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
Map<String, String> expectedOptions = new HashMap<>(EXPECTED_OPTIONS);
expectedOptions.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
expectedOptions.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false");
expectedOptions.put(FlinkOptions.PRE_COMBINE.key(), "true");
expectedOptions.put("connector", "hudi");
expectedOptions.put(
FlinkOptions.PATH.key(),
String.format("%s/%s/%s", tempFile.getAbsolutePath(), tablePath.getDatabaseName(), tablePath.getObjectName()));
// test get table
CatalogBaseTable actualTable = catalog.getTable(tablePath);
// validate schema
Schema actualSchema = actualTable.getUnresolvedSchema();
Schema expectedSchema = Schema.newBuilder().fromResolvedSchema(EXPECTED_TABLE_SCHEMA).build();
assertEquals(expectedSchema, actualSchema);
// validate options
Map<String, String> actualOptions = actualTable.getOptions();
assertEquals(expectedOptions, actualOptions);
// validate comment
assertEquals(EXPECTED_CATALOG_TABLE.getComment(), actualTable.getComment());
// validate partition key
assertEquals(EXPECTED_CATALOG_TABLE.getPartitionKeys(),((CatalogTable) actualTable).getPartitionKeys());
}
@Test
public void dropTable() throws Exception {
ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
// create table
catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
// test drop table
catalog.dropTable(tablePath, true);
assertFalse(catalog.tableExists(tablePath));
// drop non-exist table
assertThrows(TableNotExistException.class,
() -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE, "non_exist"), false));
}
}