1
0

[HUDI-1781] Fix Flink streaming reader throws ClassCastException (#2900)

This commit is contained in:
dijie
2021-05-01 19:13:15 +08:00
committed by GitHub
parent ea14d687da
commit c5220b96e9
2 changed files with 28 additions and 1 deletions

View File

@@ -174,7 +174,11 @@ public class HoodieTableSource implements
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) getInputFormat(true));
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
if (!(inputFormat instanceof MergeOnReadInputFormat)) {
throw new HoodieException("No successful commits under path " + path);
}
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
.setParallelism(1)
.uid("uid_streaming_source")

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -53,6 +54,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hudi.utils.TestData.assertRowsEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -390,6 +392,27 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par5]");
}
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testStreamReadEmptyTablePath(ExecMode execMode) throws Exception {
// create an empty table
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamerUtil.initTableIfNotExists(conf);
// create a flink source table
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
String createHoodieTable = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(createHoodieTable);
// execute query and assert throws exception
assertThrows(HoodieException.class, () -> execSelectSql(streamTableEnv, "select * from t1", 10),
"No successful commits under path " + tempFile.getAbsolutePath());
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------