diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java index fddaaba66..331796700 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -376,7 +376,7 @@ public class HoodieCatalog extends AbstractCatalog { @Override public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, CatalogException { - return null; + throw new PartitionNotExistException(getName(), tablePath, catalogPartitionSpec); } @Override @@ -409,7 +409,7 @@ public class HoodieCatalog extends AbstractCatalog { @Override public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { - return null; + throw new FunctionNotExistException(getName(), functionPath); } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 43116c2ff..7c9b0bb6a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -64,6 +64,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.utils.TestConfigurations.catalog; import static org.apache.hudi.utils.TestConfigurations.sql; import static org.apache.hudi.utils.TestData.assertRowsEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -1192,6 +1193,47 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, expected); } + @ParameterizedTest + @ValueSource(strings = {"insert", "upsert", "bulk_insert"}) + void testBuiltinFunctionWithCatalog(String operation) { + TableEnvironment tableEnv = streamTableEnv; + + String hudiCatalogDDL = catalog("hudi_" + operation) + .catalogPath(tempFile.getAbsolutePath()) + .end(); + + tableEnv.executeSql(hudiCatalogDDL); + tableEnv.executeSql("use catalog " + ("hudi_" + operation)); + + String dbName = "hudi"; + tableEnv.executeSql("create database " + dbName); + tableEnv.executeSql("use " + dbName); + + String hoodieTableDDL = sql("t1") + .field("f_int int") + .field("f_date DATE") + .pkField("f_int") + .partitionField("f_int") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/" + dbName + "/" + operation) + .option(FlinkOptions.OPERATION, operation) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02')), (2, DATE '2022-02-02')"; + execInsertSql(tableEnv, insertSql); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + final String expected = "[" + + "+I[1, 2022-02-02], " + + "+I[2, 2022-02-02]]"; + assertRowsEquals(result, expected); + + List partitionResult = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where f_int = 1").execute().collect()); + assertRowsEquals(partitionResult, "[+I[1, 2022-02-02]]"); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 46cad3e82..d1b6e04a1 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -64,12 +64,12 @@ public class TestConfigurations { .map(RowType.RowField::asSummaryString).collect(Collectors.toList()); public static final DataType ROW_DATA_TYPE_WIDER = DataTypes.ROW( - DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key - DataTypes.FIELD("name", DataTypes.VARCHAR(10)), - DataTypes.FIELD("age", DataTypes.INT()), - DataTypes.FIELD("salary", DataTypes.DOUBLE()), - DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field - DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("salary", DataTypes.DOUBLE()), + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field + DataTypes.FIELD("partition", DataTypes.VARCHAR(10))) .notNull(); public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType(); @@ -112,6 +112,15 @@ public class TestConfigurations { return builder.toString(); } + public static String getCreateHudiCatalogDDL(final String catalogName, final String catalogPath) { + StringBuilder builder = new StringBuilder(); + builder.append("create catalog ").append(catalogName).append(" with (\n"); + builder.append(" 'type' = 'hudi',\n" + + " 'catalog.path' = '").append(catalogPath).append("'"); + builder.append("\n)"); + return builder.toString(); + } + public static String getFileSourceDDL(String tableName) { return getFileSourceDDL(tableName, "test_source.data"); } @@ -222,6 +231,10 @@ public class TestConfigurations { return new Sql(tableName); } + public static Catalog catalog(String catalogName) { + return new Catalog(catalogName); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- @@ -285,4 +298,22 @@ public class TestConfigurations { this.withPartition, this.pkField, this.partitionField); } } + + public static class Catalog { + private final String catalogName; + private String catalogPath = "."; + + public Catalog(String catalogName) { + this.catalogName = catalogName; + } + + public Catalog catalogPath(String catalogPath) { + this.catalogPath = catalogPath; + return this; + } + + public String end() { + return TestConfigurations.getCreateHudiCatalogDDL(catalogName, catalogPath); + } + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java index 92d9c5572..31b3ad5c7 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/ContinuousFileSourceFactory.java @@ -53,7 +53,7 @@ public class ContinuousFileSourceFactory implements DynamicTableSourceFactory { Configuration conf = (Configuration) helper.getOptions(); Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> new ValidationException("Option [path] should be not empty."))); - return new ContinuousFileSource(context.getCatalogTable().getSchema(), path, conf); + return new ContinuousFileSource(context.getCatalogTable().getResolvedSchema(), path, conf); } @Override diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java index a44061076..d38aad60c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java @@ -18,15 +18,15 @@ package org.apache.hudi.utils.source; +import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -59,12 +59,12 @@ import static org.apache.hudi.utils.factory.ContinuousFileSourceFactory.CHECKPOI */ public class ContinuousFileSource implements ScanTableSource { - private final TableSchema tableSchema; + private final ResolvedSchema tableSchema; private final Path path; private final Configuration conf; public ContinuousFileSource( - TableSchema tableSchema, + ResolvedSchema tableSchema, Path path, Configuration conf) { this.tableSchema = tableSchema; @@ -83,7 +83,7 @@ public class ContinuousFileSource implements ScanTableSource { @Override public DataStream produceDataStream(StreamExecutionEnvironment execEnv) { - final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); + final RowType rowType = (RowType) tableSchema.toSourceRowDataType().getLogicalType(); JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( rowType, InternalTypeInfo.of(rowType), @@ -178,7 +178,7 @@ public class ContinuousFileSource implements ScanTableSource { } @Override - public void notifyCheckpointComplete(long l) throws Exception { + public void notifyCheckpointComplete(long l) { this.currentCP.incrementAndGet(); } }