[HUDI-1999] Refresh the base file view cache for WriteProfile (#3067)
Refresh the view to discover new small files.
This commit is contained in:
@@ -55,6 +55,11 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
|
|
||||||
private boolean isClosed = false;
|
private boolean isClosed = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flag saying whether we should replace the old file with new.
|
||||||
|
*/
|
||||||
|
private boolean shouldReplace = true;
|
||||||
|
|
||||||
public FlinkMergeAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
public FlinkMergeAndReplaceHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
|
||||||
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
|
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
|
||||||
TaskContextSupplier taskContextSupplier, Path basePath) {
|
TaskContextSupplier taskContextSupplier, Path basePath) {
|
||||||
@@ -103,11 +108,12 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
@Override
|
@Override
|
||||||
protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
|
protected void makeOldAndNewFilePaths(String partitionPath, String oldFileName, String newFileName) {
|
||||||
// old and new file name expects to be the same.
|
// old and new file name expects to be the same.
|
||||||
if (!oldFileName.equals(newFileName)) {
|
if (!FSUtils.getCommitTime(oldFileName).equals(instantTime)) {
|
||||||
LOG.warn("MERGE and REPLACE handle expect the same name for old and new files,\n"
|
LOG.warn("MERGE and REPLACE handle expect the same name for old and new files,\n"
|
||||||
+ "while got new file: " + newFileName + " with old file: " + oldFileName + ",\n"
|
+ "while got new file: " + newFileName + " with old file: " + oldFileName + ",\n"
|
||||||
+ "this rarely happens when the checkpoint success event was not received yet\n"
|
+ "this rarely happens when the checkpoint success event was not received yet\n"
|
||||||
+ "but the write task flush with new instant time, which does not break the UPSERT semantics");
|
+ "but the write task flush with new instant time, which does not break the UPSERT semantics");
|
||||||
|
shouldReplace = false;
|
||||||
}
|
}
|
||||||
super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
|
super.makeOldAndNewFilePaths(partitionPath, oldFileName, newFileName);
|
||||||
try {
|
try {
|
||||||
@@ -146,6 +152,10 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void finalizeWrite() {
|
public void finalizeWrite() {
|
||||||
|
// Behaves like the normal merge handle if the write instant time changes.
|
||||||
|
if (!shouldReplace) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
// The file visibility should be kept by the configured ConsistencyGuard instance.
|
// The file visibility should be kept by the configured ConsistencyGuard instance.
|
||||||
try {
|
try {
|
||||||
fs.delete(oldFilePath, false);
|
fs.delete(oldFilePath, false);
|
||||||
|
|||||||
@@ -206,11 +206,11 @@ public class StreamWriteOperatorCoordinator
|
|||||||
}
|
}
|
||||||
// start new instant.
|
// start new instant.
|
||||||
startInstant();
|
startInstant();
|
||||||
|
// sync Hive if is enabled
|
||||||
|
syncHiveIfEnabled();
|
||||||
}
|
}
|
||||||
}, "commits the instant %s", this.instant
|
}, "commits the instant %s", this.instant
|
||||||
);
|
);
|
||||||
// sync Hive if is enabled
|
|
||||||
syncHiveIfEnabled();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void syncHiveIfEnabled() {
|
private void syncHiveIfEnabled() {
|
||||||
|
|||||||
@@ -54,12 +54,13 @@ public class DeltaWriteProfile extends WriteProfile {
|
|||||||
// Find out all eligible small file slices
|
// Find out all eligible small file slices
|
||||||
if (!commitTimeline.empty()) {
|
if (!commitTimeline.empty()) {
|
||||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||||
|
// initialize the filesystem view based on the commit metadata
|
||||||
|
initFSViewIfNecessary(commitTimeline);
|
||||||
// find smallest file in partition and append to it
|
// find smallest file in partition and append to it
|
||||||
List<FileSlice> allSmallFileSlices = new ArrayList<>();
|
List<FileSlice> allSmallFileSlices = new ArrayList<>();
|
||||||
// If we can index log files, we can add more inserts to log files for fileIds including those under
|
// If we can index log files, we can add more inserts to log files for fileIds including those under
|
||||||
// pending compaction.
|
// pending compaction.
|
||||||
List<FileSlice> allFileSlices =
|
List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
|
||||||
table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
|
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
for (FileSlice fileSlice : allFileSlices) {
|
for (FileSlice fileSlice : allFileSlices) {
|
||||||
if (isSmallFile(fileSlice)) {
|
if (isSmallFile(fileSlice)) {
|
||||||
|
|||||||
@@ -25,12 +25,17 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
|
|||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.sink.partitioner.BucketAssigner;
|
import org.apache.hudi.sink.partitioner.BucketAssigner;
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.table.action.commit.SmallFile;
|
import org.apache.hudi.table.action.commit.SmallFile;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
|
import org.apache.flink.core.fs.Path;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@@ -55,6 +60,11 @@ public class WriteProfile {
|
|||||||
*/
|
*/
|
||||||
protected final HoodieWriteConfig config;
|
protected final HoodieWriteConfig config;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Table base path.
|
||||||
|
*/
|
||||||
|
private final Path basePath;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The hoodie table.
|
* The hoodie table.
|
||||||
*/
|
*/
|
||||||
@@ -81,11 +91,23 @@ public class WriteProfile {
|
|||||||
*/
|
*/
|
||||||
private long reloadedCheckpointId;
|
private long reloadedCheckpointId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The file system view cache for one checkpoint interval.
|
||||||
|
*/
|
||||||
|
protected HoodieTableFileSystemView fsView;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hadoop configuration.
|
||||||
|
*/
|
||||||
|
private final Configuration hadoopConf;
|
||||||
|
|
||||||
public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
|
public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.basePath = new Path(config.getBasePath());
|
||||||
this.smallFilesMap = new HashMap<>();
|
this.smallFilesMap = new HashMap<>();
|
||||||
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
||||||
this.table = HoodieFlinkTable.create(config, context);
|
this.table = HoodieFlinkTable.create(config, context);
|
||||||
|
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||||
// profile the record statistics on construction
|
// profile the record statistics on construction
|
||||||
recordProfile();
|
recordProfile();
|
||||||
}
|
}
|
||||||
@@ -160,7 +182,9 @@ public class WriteProfile {
|
|||||||
|
|
||||||
if (!commitTimeline.empty()) { // if we have some commits
|
if (!commitTimeline.empty()) { // if we have some commits
|
||||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||||
List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
|
// initialize the filesystem view based on the commit metadata
|
||||||
|
initFSViewIfNecessary(commitTimeline);
|
||||||
|
List<HoodieBaseFile> allFiles = fsView
|
||||||
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
|
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
|
||||||
|
|
||||||
for (HoodieBaseFile file : allFiles) {
|
for (HoodieBaseFile file : allFiles) {
|
||||||
@@ -178,6 +202,16 @@ public class WriteProfile {
|
|||||||
return smallFileLocations;
|
return smallFileLocations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void initFSViewIfNecessary(HoodieTimeline commitTimeline) {
|
||||||
|
if (fsView == null) {
|
||||||
|
List<HoodieCommitMetadata> metadataList = commitTimeline.getInstants()
|
||||||
|
.map(instant -> WriteProfiles.getCommitMetadata(config.getTableName(), basePath, instant, commitTimeline))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList);
|
||||||
|
fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void recordProfile() {
|
private void recordProfile() {
|
||||||
this.avgSize = averageBytesPerRecord();
|
this.avgSize = averageBytesPerRecord();
|
||||||
if (config.shouldAllowMultiWriteOnSameInstant()) {
|
if (config.shouldAllowMultiWriteOnSameInstant()) {
|
||||||
@@ -200,6 +234,7 @@ public class WriteProfile {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
recordProfile();
|
recordProfile();
|
||||||
|
this.fsView = null;
|
||||||
this.smallFilesMap.clear();
|
this.smallFilesMap.clear();
|
||||||
this.table.getMetaClient().reloadActiveTimeline();
|
this.table.getMetaClient().reloadActiveTimeline();
|
||||||
this.reloadedCheckpointId = checkpointId;
|
this.reloadedCheckpointId = checkpointId;
|
||||||
|
|||||||
@@ -19,15 +19,33 @@
|
|||||||
package org.apache.hudi.sink.partitioner.profile;
|
package org.apache.hudi.sink.partitioner.profile;
|
||||||
|
|
||||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
|
import org.apache.flink.core.fs.Path;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory for {@link WriteProfile}.
|
* Factory for {@link WriteProfile}.
|
||||||
*/
|
*/
|
||||||
public class WriteProfiles {
|
public class WriteProfiles {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(WriteProfiles.class);
|
||||||
|
|
||||||
private static final Map<String, WriteProfile> PROFILES = new HashMap<>();
|
private static final Map<String, WriteProfile> PROFILES = new HashMap<>();
|
||||||
|
|
||||||
private WriteProfiles() {}
|
private WriteProfiles() {}
|
||||||
@@ -58,4 +76,80 @@ public class WriteProfiles {
|
|||||||
public static void clean(String path) {
|
public static void clean(String path) {
|
||||||
PROFILES.remove(path);
|
PROFILES.remove(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns all the incremental write file path statuses with the given commits metadata.
|
||||||
|
*
|
||||||
|
* @param basePath Table base path
|
||||||
|
* @param hadoopConf The hadoop conf
|
||||||
|
* @param metadataList The commits metadata
|
||||||
|
* @return the file statuses array
|
||||||
|
*/
|
||||||
|
public static FileStatus[] getWritePathsOfInstants(
|
||||||
|
Path basePath,
|
||||||
|
Configuration hadoopConf,
|
||||||
|
List<HoodieCommitMetadata> metadataList) {
|
||||||
|
FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
|
||||||
|
return metadataList.stream().map(metadata -> getWritePathsOfInstant(basePath, metadata, fs))
|
||||||
|
.flatMap(Collection::stream).toArray(FileStatus[]::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the commit file paths with given metadata.
|
||||||
|
*
|
||||||
|
* @param basePath Table base path
|
||||||
|
* @param metadata The metadata
|
||||||
|
* @param fs The filesystem
|
||||||
|
*
|
||||||
|
* @return the commit file status list
|
||||||
|
*/
|
||||||
|
private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {
|
||||||
|
return metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
|
||||||
|
.map(org.apache.hadoop.fs.Path::new)
|
||||||
|
// filter out the file paths that does not exist, some files may be cleaned by
|
||||||
|
// the cleaner.
|
||||||
|
.filter(path -> {
|
||||||
|
try {
|
||||||
|
return fs.exists(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Checking exists of path: {} error", path);
|
||||||
|
throw new HoodieException(e);
|
||||||
|
}
|
||||||
|
}).map(path -> {
|
||||||
|
try {
|
||||||
|
return fs.getFileStatus(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Get write status of path: {} error", path);
|
||||||
|
throw new HoodieException(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
// filter out crushed files
|
||||||
|
.filter(fileStatus -> fileStatus.getLen() > 0)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the commit metadata of the given instant.
|
||||||
|
*
|
||||||
|
* @param tableName The table name
|
||||||
|
* @param basePath The table base path
|
||||||
|
* @param instant The hoodie instant
|
||||||
|
* @param timeline The timeline
|
||||||
|
*
|
||||||
|
* @return the commit metadata
|
||||||
|
*/
|
||||||
|
public static HoodieCommitMetadata getCommitMetadata(
|
||||||
|
String tableName,
|
||||||
|
Path basePath,
|
||||||
|
HoodieInstant instant,
|
||||||
|
HoodieTimeline timeline) {
|
||||||
|
byte[] data = timeline.getInstantDetails(instant).get();
|
||||||
|
try {
|
||||||
|
return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Get write metadata for table {} with instant {} and path: {} error",
|
||||||
|
tableName, instant.getTimestamp(), basePath);
|
||||||
|
throw new HoodieException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.source;
|
package org.apache.hudi.source;
|
||||||
|
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
|
||||||
import org.apache.hudi.common.model.BaseFile;
|
import org.apache.hudi.common.model.BaseFile;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
@@ -30,7 +29,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
|
||||||
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
|
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
@@ -46,11 +45,9 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
|||||||
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
|
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
|
||||||
import org.apache.flink.streaming.api.functions.source.SourceFunction;
|
import org.apache.flink.streaming.api.functions.source.SourceFunction;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@@ -220,10 +217,11 @@ public class StreamReadMonitoringFunction
|
|||||||
// 3. filter the full file paths
|
// 3. filter the full file paths
|
||||||
// 4. use the file paths from #step 3 as the back-up of the filesystem view
|
// 4. use the file paths from #step 3 as the back-up of the filesystem view
|
||||||
|
|
||||||
|
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
|
||||||
List<HoodieCommitMetadata> metadataList = instants.stream()
|
List<HoodieCommitMetadata> metadataList = instants.stream()
|
||||||
.map(instant -> getCommitMetadata(instant, commitTimeline)).collect(Collectors.toList());
|
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
|
||||||
Set<String> writePartitions = getWritePartitionPaths(metadataList);
|
Set<String> writePartitions = getWritePartitionPaths(metadataList);
|
||||||
FileStatus[] fileStatuses = getWritePathsOfInstants(metadataList);
|
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList);
|
||||||
if (fileStatuses.length == 0) {
|
if (fileStatuses.length == 0) {
|
||||||
LOG.warn("No files found for reading in user provided path.");
|
LOG.warn("No files found for reading in user provided path.");
|
||||||
return;
|
return;
|
||||||
@@ -334,52 +332,4 @@ public class StreamReadMonitoringFunction
|
|||||||
.flatMap(Collection::stream)
|
.flatMap(Collection::stream)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns all the incremental write file path statuses with the given commits metadata.
|
|
||||||
*
|
|
||||||
* @param metadataList The commits metadata
|
|
||||||
* @return the file statuses array
|
|
||||||
*/
|
|
||||||
private FileStatus[] getWritePathsOfInstants(List<HoodieCommitMetadata> metadataList) {
|
|
||||||
FileSystem fs = FSUtils.getFs(path.toString(), hadoopConf);
|
|
||||||
return metadataList.stream().map(metadata -> getWritePathsOfInstant(metadata, fs))
|
|
||||||
.flatMap(Collection::stream).toArray(FileStatus[]::new);
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata metadata, FileSystem fs) {
|
|
||||||
return metadata.getFileIdAndFullPaths(path.toString()).values().stream()
|
|
||||||
.map(org.apache.hadoop.fs.Path::new)
|
|
||||||
// filter out the file paths that does not exist, some files may be cleaned by
|
|
||||||
// the cleaner.
|
|
||||||
.filter(path -> {
|
|
||||||
try {
|
|
||||||
return fs.exists(path);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Checking exists of path: {} error", path);
|
|
||||||
throw new HoodieException(e);
|
|
||||||
}
|
|
||||||
}).map(path -> {
|
|
||||||
try {
|
|
||||||
return fs.getFileStatus(path);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Get write status of path: {} error", path);
|
|
||||||
throw new HoodieException(e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
// filter out crushed files
|
|
||||||
.filter(fileStatus -> fileStatus.getLen() > 0)
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
private HoodieCommitMetadata getCommitMetadata(HoodieInstant instant, HoodieTimeline timeline) {
|
|
||||||
byte[] data = timeline.getInstantDetails(instant).get();
|
|
||||||
try {
|
|
||||||
return HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Get write metadata for table {} with instant {} and path: {} error",
|
|
||||||
conf.getString(FlinkOptions.TABLE_NAME), instant.getTimestamp(), path);
|
|
||||||
throw new HoodieException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
|
|||||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
|
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
|
||||||
import org.apache.hudi.table.action.commit.BucketInfo;
|
import org.apache.hudi.table.action.commit.BucketInfo;
|
||||||
@@ -29,6 +31,7 @@ import org.apache.hudi.table.action.commit.BucketType;
|
|||||||
import org.apache.hudi.table.action.commit.SmallFile;
|
import org.apache.hudi.table.action.commit.SmallFile;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
import org.apache.hudi.utils.TestConfigurations;
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
|
import org.apache.hudi.utils.TestData;
|
||||||
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
@@ -45,6 +48,10 @@ import java.util.Map;
|
|||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test cases for {@link BucketAssigner}.
|
* Test cases for {@link BucketAssigner}.
|
||||||
@@ -52,6 +59,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||||||
public class TestBucketAssigner {
|
public class TestBucketAssigner {
|
||||||
private HoodieWriteConfig writeConfig;
|
private HoodieWriteConfig writeConfig;
|
||||||
private HoodieFlinkEngineContext context;
|
private HoodieFlinkEngineContext context;
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
@TempDir
|
@TempDir
|
||||||
File tempFile;
|
File tempFile;
|
||||||
@@ -59,7 +67,7 @@ public class TestBucketAssigner {
|
|||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void before() throws IOException {
|
public void before() throws IOException {
|
||||||
final String basePath = tempFile.getAbsolutePath();
|
final String basePath = tempFile.getAbsolutePath();
|
||||||
final Configuration conf = TestConfigurations.getDefaultConf(basePath);
|
conf = TestConfigurations.getDefaultConf(basePath);
|
||||||
|
|
||||||
writeConfig = StreamerUtil.getHoodieClientConfig(conf);
|
writeConfig = StreamerUtil.getHoodieClientConfig(conf);
|
||||||
context = new HoodieFlinkEngineContext(
|
context = new HoodieFlinkEngineContext(
|
||||||
@@ -291,6 +299,44 @@ public class TestBucketAssigner {
|
|||||||
assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
|
assertBucketEquals(bucketInfo2, "par1", BucketType.UPDATE, "f1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteProfileReload() throws Exception {
|
||||||
|
WriteProfile writeProfile = new WriteProfile(writeConfig, context);
|
||||||
|
List<SmallFile> smallFiles1 = writeProfile.getSmallFiles("par1");
|
||||||
|
assertTrue(smallFiles1.isEmpty(), "Should have no small files");
|
||||||
|
|
||||||
|
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||||
|
Option<String> instantOption = getLastCompleteInstant(writeProfile);
|
||||||
|
assertFalse(instantOption.isPresent());
|
||||||
|
|
||||||
|
writeProfile.reload(1);
|
||||||
|
String instant1 = getLastCompleteInstant(writeProfile).orElse(null);
|
||||||
|
assertNotNull(instant1);
|
||||||
|
List<SmallFile> smallFiles2 = writeProfile.getSmallFiles("par1");
|
||||||
|
assertThat("Should have 1 small file", smallFiles2.size(), is(1));
|
||||||
|
assertThat("Small file should have same timestamp as last complete instant",
|
||||||
|
smallFiles2.get(0).location.getInstantTime(), is(instant1));
|
||||||
|
|
||||||
|
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||||
|
List<SmallFile> smallFiles3 = writeProfile.getSmallFiles("par1");
|
||||||
|
assertThat("Should have 1 small file", smallFiles3.size(), is(1));
|
||||||
|
assertThat("Non-reloaded write profile has the same base file view as before",
|
||||||
|
smallFiles3.get(0).location.getInstantTime(), is(instant1));
|
||||||
|
|
||||||
|
writeProfile.reload(2);
|
||||||
|
String instant2 = getLastCompleteInstant(writeProfile).orElse(null);
|
||||||
|
assertNotEquals(instant2, instant1, "Should have new complete instant");
|
||||||
|
List<SmallFile> smallFiles4 = writeProfile.getSmallFiles("par1");
|
||||||
|
assertThat("Should have 1 small file", smallFiles4.size(), is(1));
|
||||||
|
assertThat("Small file should have same timestamp as last complete instant",
|
||||||
|
smallFiles4.get(0).location.getInstantTime(), is(instant2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Option<String> getLastCompleteInstant(WriteProfile profile) {
|
||||||
|
return profile.getTable().getMetaClient().getCommitsTimeline()
|
||||||
|
.filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp);
|
||||||
|
}
|
||||||
|
|
||||||
private void assertBucketEquals(
|
private void assertBucketEquals(
|
||||||
BucketInfo bucketInfo,
|
BucketInfo bucketInfo,
|
||||||
String partition,
|
String partition,
|
||||||
|
|||||||
Reference in New Issue
Block a user