[HUDI-2924] Refresh the fs view on successful checkpoints for write profile (#4199)
This commit is contained in:
@@ -60,7 +60,9 @@ public class CleanFunction<T> extends AbstractRichFunction
|
|||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
||||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
// do not use the remote filesystem view because the async cleaning service
|
||||||
|
// local timeline is very probably to fall behind with the remote one.
|
||||||
|
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false);
|
||||||
this.executor = NonThrownExecutor.builder(LOG).build();
|
this.executor = NonThrownExecutor.builder(LOG).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -113,7 +113,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
|
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, false);
|
||||||
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
|
||||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ public class DeltaWriteProfile extends WriteProfile {
|
|||||||
List<FileSlice> allSmallFileSlices = new ArrayList<>();
|
List<FileSlice> allSmallFileSlices = new ArrayList<>();
|
||||||
// If we can index log files, we can add more inserts to log files for fileIds including those under
|
// If we can index log files, we can add more inserts to log files for fileIds including those under
|
||||||
// pending compaction.
|
// pending compaction.
|
||||||
List<FileSlice> allFileSlices = getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
|
List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
for (FileSlice fileSlice : allFileSlices) {
|
for (FileSlice fileSlice : allFileSlices) {
|
||||||
if (isSmallFile(fileSlice)) {
|
if (isSmallFile(fileSlice)) {
|
||||||
|
|||||||
@@ -93,6 +93,11 @@ public class WriteProfile {
|
|||||||
*/
|
*/
|
||||||
private long reloadedCheckpointId;
|
private long reloadedCheckpointId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The file system view cache for one checkpoint interval.
|
||||||
|
*/
|
||||||
|
protected SyncableFileSystemView fsView;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Metadata cache to reduce IO of metadata files.
|
* Metadata cache to reduce IO of metadata files.
|
||||||
*/
|
*/
|
||||||
@@ -111,6 +116,7 @@ public class WriteProfile {
|
|||||||
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
||||||
this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), context.getHadoopConf().get());
|
this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), context.getHadoopConf().get());
|
||||||
this.metadataCache = new HashMap<>();
|
this.metadataCache = new HashMap<>();
|
||||||
|
this.fsView = getFileSystemView();
|
||||||
// profile the record statistics on construction
|
// profile the record statistics on construction
|
||||||
recordProfile();
|
recordProfile();
|
||||||
}
|
}
|
||||||
@@ -190,7 +196,7 @@ public class WriteProfile {
|
|||||||
|
|
||||||
if (!commitTimeline.empty()) { // if we have some commits
|
if (!commitTimeline.empty()) { // if we have some commits
|
||||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||||
List<HoodieBaseFile> allFiles = getFileSystemView()
|
List<HoodieBaseFile> allFiles = fsView
|
||||||
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
|
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
|
||||||
|
|
||||||
for (HoodieBaseFile file : allFiles) {
|
for (HoodieBaseFile file : allFiles) {
|
||||||
@@ -243,6 +249,7 @@ public class WriteProfile {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.metaClient.reloadActiveTimeline();
|
this.metaClient.reloadActiveTimeline();
|
||||||
|
this.fsView = getFileSystemView();
|
||||||
recordProfile();
|
recordProfile();
|
||||||
cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstants());
|
cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstants());
|
||||||
this.smallFilesMap.clear();
|
this.smallFilesMap.clear();
|
||||||
|
|||||||
@@ -367,12 +367,23 @@ public class StreamerUtil {
|
|||||||
*/
|
*/
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
|
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
|
||||||
|
return createWriteClient(conf, runtimeContext, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the Flink write client.
|
||||||
|
*
|
||||||
|
* <p>This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use
|
||||||
|
* remote filesystem view storage config, or an in-memory filesystem view storage is used.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) {
|
||||||
HoodieFlinkEngineContext context =
|
HoodieFlinkEngineContext context =
|
||||||
new HoodieFlinkEngineContext(
|
new HoodieFlinkEngineContext(
|
||||||
new SerializableConfiguration(getHadoopConf()),
|
new SerializableConfiguration(getHadoopConf()),
|
||||||
new FlinkTaskContextSupplier(runtimeContext));
|
new FlinkTaskContextSupplier(runtimeContext));
|
||||||
|
|
||||||
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
|
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig);
|
||||||
return new HoodieFlinkWriteClient<>(context, writeConfig);
|
return new HoodieFlinkWriteClient<>(context, writeConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user