1
0

[HUDI-2548] Flink streaming reader misses the rolling over file handles (#3787)

This commit is contained in:
Danny Chan
2021-10-14 10:36:18 +08:00
committed by GitHub
parent cff384d23f
commit abf3e3fe71
16 changed files with 225 additions and 126 deletions

View File

@@ -371,8 +371,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch.size")
.doubleType()
.defaultValue(64D) // 64MB
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB");
.defaultValue(256D) // 256MB
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 256MB");
public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions
.key("write.log_block.size")

View File

@@ -139,7 +139,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
// this.writeClient.close();
}
}
@@ -378,6 +378,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
final DataItem item = DataItem.fromHoodieRecord(value);
bucket.records.add(item);
boolean flushBucket = bucket.detector.detect(item);
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
if (flushBucket) {
@@ -398,7 +400,6 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
}
}
bucket.records.add(item);
}
private boolean hasData() {

View File

@@ -211,11 +211,13 @@ public class StreamWriteOperatorCoordinator
// the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
final boolean committed = commitInstant(this.instant);
if (committed) {
if (tableState.scheduleCompaction) {
// if async compaction is on, schedule the compaction
if (tableState.scheduleCompaction) {
if (committed || tableState.timeCompactionTriggerStrategy) {
writeClient.scheduleCompaction(Option.empty());
}
}
if (committed) {
// start new instant.
startInstant();
// sync Hive if is enabled
@@ -530,6 +532,7 @@ public class StreamWriteOperatorCoordinator
final String commitAction;
final boolean isOverwrite;
final boolean scheduleCompaction;
final boolean timeCompactionTriggerStrategy;
final boolean syncHive;
final boolean syncMetadata;
@@ -539,6 +542,7 @@ public class StreamWriteOperatorCoordinator
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
this.timeCompactionTriggerStrategy = StreamerUtil.isTimeCompactionTriggerStrategy(conf);
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
}

View File

@@ -225,7 +225,7 @@ public class WriteProfile {
.orElse(null)))
.filter(Objects::nonNull)
.collect(Collectors.toList());
FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList);
FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList, table.getMetaClient().getTableType());
fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles);
}
}

View File

@@ -21,11 +21,13 @@ package org.apache.hudi.sink.partitioner.profile;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.core.fs.Path;
@@ -37,11 +39,9 @@ import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -83,57 +83,67 @@ public class WriteProfiles {
}
/**
* Returns all the incremental write file path statuses with the given commits metadata.
* Returns all the incremental write file statuses with the given commits metadata.
*
* @param basePath Table base path
* @param hadoopConf The hadoop conf
* @param metadataList The commits metadata
* @return the file statuses array
* @param tableType The table type
* @return the file status array
*/
public static FileStatus[] getWritePathsOfInstants(
Path basePath,
Configuration hadoopConf,
List<HoodieCommitMetadata> metadataList) {
List<HoodieCommitMetadata> metadataList,
HoodieTableType tableType) {
FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
return metadataList.stream().map(metadata -> getWritePathsOfInstant(basePath, metadata, fs))
.flatMap(Collection::stream).toArray(FileStatus[]::new);
Map<String, FileStatus> uniqueIdToFileStatus = new HashMap<>();
metadataList.forEach(metadata ->
uniqueIdToFileStatus.putAll(getFilesToReadOfInstant(basePath, metadata, fs, tableType)));
return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
}
/**
* Returns the commit file paths with given metadata.
* Returns the commit file status info with given metadata.
*
* @param basePath Table base path
* @param metadata The metadata
* @param fs The filesystem
* @return the commit file status list
* @param basePath Table base path
* @param metadata The metadata
* @param fs The filesystem
* @param tableType The table type
* @return the commit file status info grouping by specific ID
*/
private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {
return metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
.map(org.apache.hadoop.fs.Path::new)
private static Map<String, FileStatus> getFilesToReadOfInstant(
Path basePath,
HoodieCommitMetadata metadata,
FileSystem fs,
HoodieTableType tableType) {
return getFilesToRead(metadata, basePath.toString(), tableType).entrySet().stream()
// filter out the file paths that does not exist, some files may be cleaned by
// the cleaner.
.filter(path -> {
.filter(entry -> {
try {
return fs.exists(path);
return fs.exists(entry.getValue().getPath());
} catch (IOException e) {
LOG.error("Checking exists of path: {} error", path);
throw new HoodieException(e);
}
}).map(path -> {
try {
return fs.getFileStatus(path);
} catch (FileNotFoundException fe) {
LOG.warn("File {} was deleted by the cleaner, ignore", path);
return null;
} catch (IOException e) {
LOG.error("Get write status of path: {} error", path);
LOG.error("Checking exists of path: {} error", entry.getValue().getPath());
throw new HoodieException(e);
}
})
// filter out crushed files
.filter(Objects::nonNull)
.filter(StreamerUtil::isValidFile)
.collect(Collectors.toList());
.filter(entry -> StreamerUtil.isValidFile(entry.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private static Map<String, FileStatus> getFilesToRead(
HoodieCommitMetadata metadata,
String basePath,
HoodieTableType tableType) {
switch (tableType) {
case COPY_ON_WRITE:
return metadata.getFileIdToFileStatus(basePath);
case MERGE_ON_READ:
return metadata.getFullPathToFileStatus(basePath);
default:
throw new AssertionError();
}
}
/**
@@ -178,9 +188,8 @@ public class WriteProfiles {
Path basePath,
HoodieInstant instant,
HoodieTimeline timeline) {
byte[] data = timeline.getInstantDetails(instant).get();
try {
return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
return HoodieInputFormatUtils.getCommitMetadata(instant, timeline);
} catch (IOException e) {
LOG.error("Get write metadata for table {} with instant {} and path: {} error",
tableName, instant.getTimestamp(), basePath);

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -159,23 +160,24 @@ public class IncrementalInputSplits implements Serializable {
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn(""
LOG.warn("\n"
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
? mergeList(activeMetadataList, archivedMetadataList)
// IMPORTANT: the merged metadata list must be in ascending order by instant time
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;
Set<String> writePartitions = getWritePartitionPaths(metadataList);
Set<String> writePartitions = HoodieInputFormatUtils.getWritePartitionPaths(metadataList);
// apply partition push down
if (this.requiredPartitions != null) {
writePartitions = writePartitions.stream()
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
}
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList);
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
return Result.EMPTY;
@@ -275,19 +277,6 @@ public class IncrementalInputSplits implements Serializable {
return instantStream.collect(Collectors.toList());
}
/**
* Returns all the incremental write partition paths as a set with the given commits metadata.
*
* @param metadataList The commits metadata
* @return the partition path set
*/
private Set<String> getWritePartitionPaths(List<HoodieCommitMetadata> metadataList) {
return metadataList.stream()
.map(HoodieCommitMetadata::getWritePartitionPaths)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
List<T> merged = new ArrayList<>(list1);
merged.addAll(list2);

View File

@@ -201,7 +201,7 @@ public class StreamReadMonitoringFunction
}
// update the issues instant time
this.issuedInstant = result.getEndInstant();
LOG.info(""
LOG.info("\n"
+ "------------------------------------------------------------\n"
+ "---------- consumed to instant: {}\n"
+ "------------------------------------------------------------",

View File

@@ -184,8 +184,8 @@ public class FlinkStreamerConfig extends Configuration {
public Double writeTaskMaxSize = 1024D;
@Parameter(names = {"--write-batch-size"},
description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
public Double writeBatchSize = 64D;
description = "Batch buffer size in MB to flush data into the underneath filesystem, default 256MB")
public Double writeBatchSize = 256D;
@Parameter(names = {"--write-log-block-size"}, description = "Max log block size in MB for log file, default 128MB")
public Integer writeLogBlockSize = 128;

View File

@@ -286,7 +286,7 @@ public class StreamerUtil {
}
/**
* Returns whether needs to schedule the compaction plan.
* Returns whether there is need to schedule the compaction plan.
*
* @param conf The flink configuration.
*/
@@ -297,6 +297,16 @@ public class StreamerUtil {
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
}
/**
* Returns whether the compaction trigger strategy is time based.
*
* @param conf The flink configuration.
*/
public static boolean isTimeCompactionTriggerStrategy(Configuration conf) {
final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY);
return FlinkOptions.TIME_ELAPSED.equalsIgnoreCase(strategy) || FlinkOptions.NUM_OR_TIME.equalsIgnoreCase(strategy);
}
/**
* Creates the meta client for reader.
*