1
0

[HUDI-1710] Read optimized query type for Flink batch reader (#2702)

Read optimized query returns the records from:

* COW table: the latest parquet files
* MOR table: parquet file records from the latest compaction committed
This commit is contained in:
Danny Chan
2021-03-24 09:41:30 +08:00
committed by GitHub
parent 0e6909d3e2
commit 03668dbaf1
3 changed files with 86 additions and 49 deletions

View File

@@ -340,54 +340,69 @@ public class HoodieTableSource implements
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType(); final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE); final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
if (queryType.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) { switch (queryType) {
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); case FlinkOptions.QUERY_TYPE_SNAPSHOT:
switch (tableType) { final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
case MERGE_ON_READ: switch (tableType) {
final List<MergeOnReadInputSplit> inputSplits; case MERGE_ON_READ:
if (!isStreaming) { final List<MergeOnReadInputSplit> inputSplits;
inputSplits = buildFileIndex(paths); if (!isStreaming) {
if (inputSplits.size() == 0) { inputSplits = buildFileIndex(paths);
// When there is no input splits, just return an empty source. if (inputSplits.size() == 0) {
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead"); // When there is no input splits, just return an empty source.
return new CollectionInputFormat<>(Collections.emptyList(), null); LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
} else {
// streaming reader would build the splits automatically.
inputSplits = Collections.emptyList();
} }
} else { final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
// streaming reader would build the splits automatically. rowType,
inputSplits = Collections.emptyList(); requiredRowType,
} tableAvroSchema.toString(),
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
rowType, inputSplits);
requiredRowType, return new MergeOnReadInputFormat(
tableAvroSchema.toString(), this.conf,
AvroSchemaConverter.convertToSchema(requiredRowType).toString(), FilePathUtils.toFlinkPaths(paths),
inputSplits); hoodieTableState,
return new MergeOnReadInputFormat( rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable.
this.conf, "default",
FilePathUtils.toFlinkPaths(paths), this.limit);
hoodieTableState, case COPY_ON_WRITE:
rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable. FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
"default", FilePathUtils.toFlinkPaths(paths),
this.limit); this.schema.getFieldNames(),
case COPY_ON_WRITE: this.schema.getFieldDataTypes(),
FileInputFormat<RowData> format = new CopyOnWriteInputFormat( this.requiredPos,
FilePathUtils.toFlinkPaths(paths), "default",
this.schema.getFieldNames(), this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
this.schema.getFieldDataTypes(), getParquetConf(this.conf, this.hadoopConf),
this.requiredPos, this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
"default", );
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
getParquetConf(this.conf, this.hadoopConf), return format;
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) default:
); throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); }
return format; case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED:
default: FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE)); FilePathUtils.toFlinkPaths(paths),
} this.schema.getFieldNames(),
} else { this.schema.getFieldDataTypes(),
throw new HoodieException("Invalid query type : '" + queryType + "'. Only '" this.requiredPos,
+ FlinkOptions.QUERY_TYPE_SNAPSHOT + "' is supported now"); "default",
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
return format;
default:
String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
throw new HoodieException(errMsg);
} }
} }

View File

@@ -151,6 +151,30 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
} }
@Test
void testStreamWriteBatchReadOptimized() {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
// read optimized is supported for both MOR and COR table,
// test MOR streaming write with compaction then reads as
// query type 'read_optimized'.
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);
List<Row> rows = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}
@ParameterizedTest @ParameterizedTest
@EnumSource(value = ExecMode.class) @EnumSource(value = ExecMode.class)
void testWriteAndRead(ExecMode execMode) { void testWriteAndRead(ExecMode execMode) {

View File

@@ -33,7 +33,6 @@ import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
@@ -152,7 +151,6 @@ public class ContinuousFileSource implements StreamTableSource<RowData> {
private void loadDataBuffer() { private void loadDataBuffer() {
try { try {
new File(this.path.toString()).exists();
this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri())); this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Read file " + this.path + " error", e); throw new RuntimeException("Read file " + this.path + " error", e);