diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index bc53c9223..50420f905 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -174,7 +174,11 @@ public class HoodieTableSource implements if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes); - OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true)); + InputFormat inputFormat = getInputFormat(true); + if (!(inputFormat instanceof MergeOnReadInputFormat)) { + throw new HoodieException("No successful commits under path " + path); + } + OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, "streaming_source") .setParallelism(1) .uid("uid_streaming_source") diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 3d413a7fb..69627f23a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -21,6 +21,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -53,6 +54,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.hudi.utils.TestData.assertRowsEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -390,6 +392,27 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]"); } + @ParameterizedTest + @EnumSource(value = ExecMode.class) + void testStreamReadEmptyTablePath(ExecMode execMode) throws Exception { + // create an empty table + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + StreamerUtil.initTableIfNotExists(conf); + + // create a flink source table + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); + options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(createHoodieTable); + + // execute query and assert throws exception + assertThrows(HoodieException.class, () -> execSelectSql(streamTableEnv, "select * from t1", 10), + "No successful commits under path " + tempFile.getAbsolutePath()); + + } + // ------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------