1
0

[HUDI-2371] Improvement flink streaming reader (#3552)

- Support reading empty table
- Fix filtering by partition path
- Support reading from earliest commit
This commit is contained in:
Danny Chan
2021-08-28 20:16:54 +08:00
committed by GitHub
parent 69cbcc9516
commit 57668d02a0
9 changed files with 348 additions and 74 deletions

View File

@@ -196,6 +196,7 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(60)// default 1 minute
.withDescription("Check interval for streaming read of SECOND, default 1 minute");
public static final String START_COMMIT_EARLIEST = "earliest";
public static final ConfigOption<String> READ_STREAMING_START_COMMIT = ConfigOptions
.key("read.streaming.start-commit")
.stringType()

View File

@@ -48,6 +48,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -105,20 +107,23 @@ public class StreamReadMonitoringFunction
private transient org.apache.hadoop.conf.Configuration hadoopConf;
private final HoodieTableMetaClient metaClient;
private HoodieTableMetaClient metaClient;
private final long maxCompactionMemoryInBytes;
// for partition pruning
private final Set<String> requiredPartitionPaths;
public StreamReadMonitoringFunction(
Configuration conf,
Path path,
HoodieTableMetaClient metaClient,
long maxCompactionMemoryInBytes) {
long maxCompactionMemoryInBytes,
Set<String> requiredPartitionPaths) {
this.conf = conf;
this.path = path;
this.metaClient = metaClient;
this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.requiredPartitionPaths = requiredPartitionPaths;
}
@Override
@@ -180,8 +185,26 @@ public class StreamReadMonitoringFunction
}
}
@Nullable
private HoodieTableMetaClient getOrCreateMetaClient() {
if (this.metaClient != null) {
return this.metaClient;
}
if (StreamerUtil.tableExists(this.path.toString(), hadoopConf)) {
this.metaClient = StreamerUtil.createMetaClient(this.path.toString(), hadoopConf);
return this.metaClient;
}
// fallback
return null;
}
@VisibleForTesting
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
HoodieTableMetaClient metaClient = getOrCreateMetaClient();
if (metaClient == null) {
// table does not exist
return;
}
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
if (commitTimeline.empty()) {
@@ -200,8 +223,9 @@ public class StreamReadMonitoringFunction
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
// first time consume and has a start commit
final String specifiedStart = this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
instantRange = InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(),
InstantRange.RangeType.CLOSE_CLOSE);
instantRange = specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
? null
: InstantRange.getInstance(specifiedStart, instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
} else {
// first time consume and no start commit, consumes the latest incremental data set.
HoodieInstant latestCommitInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
@@ -222,6 +246,11 @@ public class StreamReadMonitoringFunction
List<HoodieCommitMetadata> metadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
Set<String> writePartitions = getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitionPaths.size() > 0) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet());
}
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList);
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
@@ -310,7 +339,8 @@ public class StreamReadMonitoringFunction
return commitTimeline.getInstants()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
&& !this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)) {
String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
return commitTimeline.getInstants()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit))

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -85,6 +86,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -151,9 +153,8 @@ public class HoodieTableSource implements
: requiredPos;
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
this.filters = filters == null ? Collections.emptyList() : filters;
final String basePath = this.conf.getString(FlinkOptions.PATH);
this.hadoopConf = StreamerUtil.getHadoopConf();
this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(this.hadoopConf));
}
@@ -173,11 +174,8 @@ public class HoodieTableSource implements
(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(
conf, FilePathUtils.toFlinkPath(path), metaClient, maxCompactionMemoryInBytes);
conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
if (!(inputFormat instanceof MergeOnReadInputFormat)) {
throw new HoodieException("No successful commits under path " + path);
}
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, "streaming_source")
.uid("uid_streaming_source_" + conf.getString(FlinkOptions.TABLE_NAME))
@@ -218,7 +216,8 @@ public class HoodieTableSource implements
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
this.filters = new ArrayList<>(filters);
return Result.of(new ArrayList<>(filters), new ArrayList<>(filters));
// refuse all the filters now
return Result.of(Collections.emptyList(), new ArrayList<>(filters));
}
@Override
@@ -266,7 +265,18 @@ public class HoodieTableSource implements
return requiredPartitions;
}
private Set<String> getRequiredPartitionPaths() {
if (this.requiredPartitions == null) {
return Collections.emptySet();
}
return FilePathUtils.toRelativePartitionPaths(this.partitionKeys, this.requiredPartitions,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
}
private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
if (paths.length == 0) {
return Collections.emptyList();
}
FileStatus[] fileStatuses = Arrays.stream(paths)
.flatMap(path ->
Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, hadoopConf)))
@@ -304,6 +314,10 @@ public class HoodieTableSource implements
@VisibleForTesting
public InputFormat<RowData, ?> getInputFormat(boolean isStreaming) {
return isStreaming ? getStreamInputFormat() : getBatchInputFormat();
}
private InputFormat<RowData, ?> getBatchInputFormat() {
// When this table has no partition, just return an empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
return new CollectionInputFormat<>(Collections.emptyList(), null);
@@ -314,13 +328,7 @@ public class HoodieTableSource implements
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
final Schema tableAvroSchema;
try {
tableAvroSchema = schemaUtil.getTableAvroSchema();
} catch (Exception e) {
throw new HoodieException("Get table avro schema error", e);
}
final Schema tableAvroSchema = getTableAvroSchema();
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
@@ -331,17 +339,11 @@ public class HoodieTableSource implements
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
switch (tableType) {
case MERGE_ON_READ:
final List<MergeOnReadInputSplit> inputSplits;
if (!isStreaming) {
inputSplits = buildFileIndex(paths);
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
} else {
// streaming reader would build the splits automatically.
inputSplits = Collections.emptyList();
final List<MergeOnReadInputSplit> inputSplits = buildFileIndex(paths);
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
@@ -359,28 +361,9 @@ public class HoodieTableSource implements
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.emitDelete(isStreaming)
.emitDelete(false)
.build();
case COPY_ON_WRITE:
if (isStreaming) {
final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
Collections.emptyList(),
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
return MergeOnReadInputFormat.builder()
.config(this.conf)
.paths(FilePathUtils.toFlinkPaths(paths))
.tableState(hoodieTableState2)
// use the explicit fields data type because the AvroSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.build();
}
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getColumnNames().toArray(new String[0]),
@@ -416,6 +399,86 @@ public class HoodieTableSource implements
}
}
private InputFormat<RowData, ?> getStreamInputFormat() {
// if table does not exist, use schema from the DDL
Schema tableAvroSchema = this.metaClient == null ? inferSchemaFromDdl() : getTableAvroSchema();
final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();
final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
org.apache.flink.core.fs.Path[] paths = new org.apache.flink.core.fs.Path[0];
if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
switch (tableType) {
case MERGE_ON_READ:
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
Collections.emptyList(),
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
return MergeOnReadInputFormat.builder()
.config(this.conf)
.paths(paths)
.tableState(hoodieTableState)
// use the explicit fields data type because the AvroSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.emitDelete(true)
.build();
case COPY_ON_WRITE:
final MergeOnReadTableState hoodieTableState2 = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
Collections.emptyList(),
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
return MergeOnReadInputFormat.builder()
.config(this.conf)
.paths(paths)
.tableState(hoodieTableState2)
// use the explicit fields data type because the AvroSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
.limit(this.limit)
.build();
default:
throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
}
}
String errMsg = String.format("Invalid query type : '%s', options ['%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT);
throw new HoodieException(errMsg);
}
private Schema inferSchemaFromDdl() {
Schema schema = AvroSchemaConverter.convertToSchema(this.schema.toSourceRowDataType().getLogicalType());
return HoodieAvroUtils.addMetadataFields(schema, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
}
@VisibleForTesting
public Schema getTableAvroSchema() {
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
return schemaUtil.getTableAvroSchema();
} catch (Throwable e) {
// table exists but has no written data
LOG.warn("Get table avro schema error, use schema from the DDL instead", e);
return inferSchemaFromDdl();
}
}
@VisibleForTesting
public HoodieTableMetaClient getMetaClient() {
return this.metaClient;
}
@VisibleForTesting
public Configuration getConf() {
return this.conf;

View File

@@ -36,6 +36,7 @@ import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -82,11 +83,13 @@ public class FilePathUtils {
* @param partitionKVs The partition key value mapping
* @param hivePartition Whether the partition path is with Hive style,
* e.g. {partition key} = {partition value}
* @param sepSuffix Whether to append the file separator as suffix
* @return an escaped, valid partition name
*/
public static String generatePartitionPath(
LinkedHashMap<String, String> partitionKVs,
boolean hivePartition) {
boolean hivePartition,
boolean sepSuffix) {
if (partitionKVs.isEmpty()) {
return "";
}
@@ -103,7 +106,9 @@ public class FilePathUtils {
suffixBuf.append(escapePathName(e.getValue()));
i++;
}
suffixBuf.append(File.separator);
if (sepSuffix) {
suffixBuf.append(File.separator);
}
return suffixBuf.toString();
}
@@ -371,11 +376,30 @@ public class FilePathUtils {
boolean hivePartition) {
return partitionPaths.stream()
.map(m -> validateAndReorderPartitions(m, partitionKeys))
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition))
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition, true))
.map(n -> new Path(path, n))
.toArray(Path[]::new);
}
/**
* Transforms the given partition key value mapping to relative partition paths.
*
* @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
* @param hivePartition Whether the partition path is in Hive style
*
* @see #getReadPaths
*/
public static Set<String> toRelativePartitionPaths(
List<String> partitionKeys,
List<Map<String, String>> partitionPaths,
boolean hivePartition) {
return partitionPaths.stream()
.map(m -> validateAndReorderPartitions(m, partitionKeys))
.map(kvs -> FilePathUtils.generatePartitionPath(kvs, hivePartition, false))
.collect(Collectors.toSet());
}
/**
* Transforms the array of Hadoop paths to Flink paths.
*/

View File

@@ -229,9 +229,7 @@ public class StreamerUtil {
public static void initTableIfNotExists(Configuration conf) throws IOException {
final String basePath = conf.getString(FlinkOptions.PATH);
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
// Hadoop FileSystem
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
if (!tableExists(basePath, hadoopConf)) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
.setTableName(conf.getString(FlinkOptions.TABLE_NAME))
@@ -251,6 +249,19 @@ public class StreamerUtil {
// some of the filesystems release the handles in #close method.
}
/**
* Returns whether the hoodie table exists under given path {@code basePath}.
*/
public static boolean tableExists(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
// Hadoop FileSystem
FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
try {
return fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME));
} catch (IOException e) {
throw new HoodieException("Error while checking whether table exists under path:" + basePath, e);
}
}
/**
* Generates the bucket ID using format {partition path}_{fileID}.
*/
@@ -282,11 +293,37 @@ public class StreamerUtil {
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
}
/**
* Creates the meta client for reader.
*
* <p>The streaming pipeline process is long running, so empty table path is allowed,
* the reader would then check and refresh the meta client.
*
* @see org.apache.hudi.source.StreamReadMonitoringFunction
*/
public static HoodieTableMetaClient metaClientForReader(
Configuration conf,
org.apache.hadoop.conf.Configuration hadoopConf) {
final String basePath = conf.getString(FlinkOptions.PATH);
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING) && !tableExists(basePath, hadoopConf)) {
return null;
} else {
return createMetaClient(basePath, hadoopConf);
}
}
/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(String basePath, org.apache.hadoop.conf.Configuration hadoopConf) {
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build();
}
/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(String basePath) {
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build();
return createMetaClient(basePath, FlinkClientUtil.getHadoopConf());
}
/**