diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 26ac9f3ad..13154b217 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -60,7 +60,9 @@ public class CleanFunction extends AbstractRichFunction public void open(Configuration parameters) throws Exception { super.open(parameters); if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + // do not use the remote filesystem view because the async cleaning service + // local timeline is very probably to fall behind with the remote one. + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false); this.executor = NonThrownExecutor.builder(LOG).build(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 73fd66855..14cad16df 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -113,7 +113,7 @@ public class BucketAssignFunction> @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, false); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index e1c890cfc..97b6b2388 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -59,7 +59,7 @@ public class DeltaWriteProfile extends WriteProfile { List allSmallFileSlices = new ArrayList<>(); // If we can index log files, we can add more inserts to log files for fileIds including those under // pending compaction. - List allFileSlices = getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) + List allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(fileSlice)) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index d13162f61..f6840e5ef 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -93,6 +93,11 @@ public class WriteProfile { */ private long reloadedCheckpointId; + /** + * The file system view cache for one checkpoint interval. + */ + protected SyncableFileSystemView fsView; + /** * Metadata cache to reduce IO of metadata files. */ @@ -111,6 +116,7 @@ public class WriteProfile { this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize(); this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), context.getHadoopConf().get()); this.metadataCache = new HashMap<>(); + this.fsView = getFileSystemView(); // profile the record statistics on construction recordProfile(); } @@ -190,7 +196,7 @@ public class WriteProfile { if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - List allFiles = getFileSystemView() + List allFiles = fsView .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); for (HoodieBaseFile file : allFiles) { @@ -243,6 +249,7 @@ public class WriteProfile { return; } this.metaClient.reloadActiveTimeline(); + this.fsView = getFileSystemView(); recordProfile(); cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstants()); this.smallFilesMap.clear(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 1e39c92e3..c417d7d7d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -367,12 +367,23 @@ public class StreamerUtil { */ @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { + return createWriteClient(conf, runtimeContext, true); + } + + /** + * Creates the Flink write client. + * + *

This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use + * remote filesystem view storage config, or an in-memory filesystem view storage is used. + */ + @SuppressWarnings("rawtypes") + public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(getHadoopConf()), new FlinkTaskContextSupplier(runtimeContext)); - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig); return new HoodieFlinkWriteClient<>(context, writeConfig); }