1
0

[MINOR] fix get builtin function issue from Hudi catalog (#4917)

This commit is contained in:
stayrascal
2022-03-02 11:16:19 +08:00
committed by GitHub
parent 3fdc9332e5
commit 3cfb52c413
5 changed files with 88 additions and 15 deletions

View File

@@ -376,7 +376,7 @@ public class HoodieCatalog extends AbstractCatalog {
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec)
throws PartitionNotExistException, CatalogException {
return null;
throw new PartitionNotExistException(getName(), tablePath, catalogPartitionSpec);
}
@Override
@@ -409,7 +409,7 @@ public class HoodieCatalog extends AbstractCatalog {
@Override
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
return null;
throw new FunctionNotExistException(getName(), functionPath);
}
@Override

View File

@@ -64,6 +64,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.utils.TestConfigurations.catalog;
import static org.apache.hudi.utils.TestConfigurations.sql;
import static org.apache.hudi.utils.TestData.assertRowsEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -1192,6 +1193,47 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(result, expected);
}
@ParameterizedTest
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
void testBuiltinFunctionWithCatalog(String operation) {
TableEnvironment tableEnv = streamTableEnv;
String hudiCatalogDDL = catalog("hudi_" + operation)
.catalogPath(tempFile.getAbsolutePath())
.end();
tableEnv.executeSql(hudiCatalogDDL);
tableEnv.executeSql("use catalog " + ("hudi_" + operation));
String dbName = "hudi";
tableEnv.executeSql("create database " + dbName);
tableEnv.executeSql("use " + dbName);
String hoodieTableDDL = sql("t1")
.field("f_int int")
.field("f_date DATE")
.pkField("f_int")
.partitionField("f_int")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/" + dbName + "/" + operation)
.option(FlinkOptions.OPERATION, operation)
.end();
tableEnv.executeSql(hoodieTableDDL);
String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02')), (2, DATE '2022-02-02')";
execInsertSql(tableEnv, insertSql);
List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "+I[1, 2022-02-02], "
+ "+I[2, 2022-02-02]]";
assertRowsEquals(result, expected);
List<Row> partitionResult = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where f_int = 1").execute().collect());
assertRowsEquals(partitionResult, "[+I[1, 2022-02-02]]");
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------

View File

@@ -64,12 +64,12 @@ public class TestConfigurations {
.map(RowType.RowField::asSummaryString).collect(Collectors.toList());
public static final DataType ROW_DATA_TYPE_WIDER = DataTypes.ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("salary", DataTypes.DOUBLE()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("salary", DataTypes.DOUBLE()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
.notNull();
public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType();
@@ -112,6 +112,15 @@ public class TestConfigurations {
return builder.toString();
}
public static String getCreateHudiCatalogDDL(final String catalogName, final String catalogPath) {
StringBuilder builder = new StringBuilder();
builder.append("create catalog ").append(catalogName).append(" with (\n");
builder.append(" 'type' = 'hudi',\n"
+ " 'catalog.path' = '").append(catalogPath).append("'");
builder.append("\n)");
return builder.toString();
}
public static String getFileSourceDDL(String tableName) {
return getFileSourceDDL(tableName, "test_source.data");
}
@@ -222,6 +231,10 @@ public class TestConfigurations {
return new Sql(tableName);
}
public static Catalog catalog(String catalogName) {
return new Catalog(catalogName);
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -285,4 +298,22 @@ public class TestConfigurations {
this.withPartition, this.pkField, this.partitionField);
}
}
public static class Catalog {
private final String catalogName;
private String catalogPath = ".";
public Catalog(String catalogName) {
this.catalogName = catalogName;
}
public Catalog catalogPath(String catalogPath) {
this.catalogPath = catalogPath;
return this;
}
public String end() {
return TestConfigurations.getCreateHudiCatalogDDL(catalogName, catalogPath);
}
}
}

View File

@@ -53,7 +53,7 @@ public class ContinuousFileSourceFactory implements DynamicTableSourceFactory {
Configuration conf = (Configuration) helper.getOptions();
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
new ValidationException("Option [path] should be not empty.")));
return new ContinuousFileSource(context.getCatalogTable().getSchema(), path, conf);
return new ContinuousFileSource(context.getCatalogTable().getResolvedSchema(), path, conf);
}
@Override

View File

@@ -18,15 +18,15 @@
package org.apache.hudi.utils.source;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -59,12 +59,12 @@ import static org.apache.hudi.utils.factory.ContinuousFileSourceFactory.CHECKPOI
*/
public class ContinuousFileSource implements ScanTableSource {
private final TableSchema tableSchema;
private final ResolvedSchema tableSchema;
private final Path path;
private final Configuration conf;
public ContinuousFileSource(
TableSchema tableSchema,
ResolvedSchema tableSchema,
Path path,
Configuration conf) {
this.tableSchema = tableSchema;
@@ -83,7 +83,7 @@ public class ContinuousFileSource implements ScanTableSource {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
final RowType rowType = (RowType) tableSchema.toSourceRowDataType().getLogicalType();
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
@@ -178,7 +178,7 @@ public class ContinuousFileSource implements ScanTableSource {
}
@Override
public void notifyCheckpointComplete(long l) throws Exception {
public void notifyCheckpointComplete(long l) {
this.currentCP.incrementAndGet();
}
}