[HUDI-1701] Implement HoodieTableSource.explainSource for all kinds of pushing down (#2690)
We should implement the interface HoodieTableSource.explainSource to track the table source signature diff for all kinds of pushing down, such as filter pushing or limit pushing.
This commit is contained in:
@@ -68,6 +68,7 @@ import org.apache.flink.table.sources.StreamTableSource;
|
||||
import org.apache.flink.table.sources.TableSource;
|
||||
import org.apache.flink.table.types.DataType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.apache.flink.table.utils.TableConnectorUtils;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
@@ -234,6 +235,18 @@ public class HoodieTableSource implements
|
||||
return schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String explainSource() {
|
||||
final String filterString = filters.stream()
|
||||
.map(Expression::asSummaryString)
|
||||
.collect(Collectors.joining(","));
|
||||
return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames())
|
||||
+ (requiredPartitions == null ? "" : ", requiredPartition=" + requiredPartitions)
|
||||
+ (requiredPos == null ? "" : ", requiredPos=" + Arrays.toString(requiredPos))
|
||||
+ (limit == -1 ? "" : ", limit=" + limit)
|
||||
+ (filters.size() == 0 ? "" : ", filters=" + filterString);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataType getProducedDataType() {
|
||||
String[] schemaFieldNames = this.schema.getFieldNames();
|
||||
|
||||
@@ -171,9 +171,16 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
|
||||
execInsertSql(tableEnv, insertInto);
|
||||
|
||||
List<Row> rows = CollectionUtil.iterableToList(
|
||||
List<Row> result1 = CollectionUtil.iterableToList(
|
||||
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
|
||||
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
|
||||
assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
|
||||
// apply filters
|
||||
List<Row> result2 = CollectionUtil.iterableToList(
|
||||
() -> tableEnv.sqlQuery("select * from t1 where uuid > 'id5'").execute().collect());
|
||||
assertRowsEquals(result2, "["
|
||||
+ "id6,Emma,20,1970-01-01T00:00:06,par3, "
|
||||
+ "id7,Bob,44,1970-01-01T00:00:07,par4, "
|
||||
+ "id8,Han,56,1970-01-01T00:00:08,par4]");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user