[minor] Refactor write profile to always generate fs view (#4198)
This commit is contained in:
@@ -50,7 +50,7 @@ public class DeltaWriteProfile extends WriteProfile {
|
|||||||
List<SmallFile> smallFileLocations = new ArrayList<>();
|
List<SmallFile> smallFileLocations = new ArrayList<>();
|
||||||
|
|
||||||
// Init here since this class (and member variables) might not have been initialized
|
// Init here since this class (and member variables) might not have been initialized
|
||||||
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
|
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
|
||||||
|
|
||||||
// Find out all eligible small file slices
|
// Find out all eligible small file slices
|
||||||
if (!commitTimeline.empty()) {
|
if (!commitTimeline.empty()) {
|
||||||
@@ -90,7 +90,7 @@ public class DeltaWriteProfile extends WriteProfile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected SyncableFileSystemView getFileSystemView() {
|
protected SyncableFileSystemView getFileSystemView() {
|
||||||
return (SyncableFileSystemView) this.table.getSliceView();
|
return (SyncableFileSystemView) getTable().getSliceView();
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getTotalFileSize(FileSlice fileSlice) {
|
private long getTotalFileSize(FileSlice fileSlice) {
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import org.apache.hudi.sink.partitioner.BucketAssigner;
|
|||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
import org.apache.hudi.table.HoodieFlinkTable;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.table.action.commit.SmallFile;
|
import org.apache.hudi.table.action.commit.SmallFile;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
import org.apache.flink.annotation.VisibleForTesting;
|
import org.apache.flink.annotation.VisibleForTesting;
|
||||||
import org.apache.flink.core.fs.Path;
|
import org.apache.flink.core.fs.Path;
|
||||||
@@ -67,9 +68,9 @@ public class WriteProfile {
|
|||||||
private final Path basePath;
|
private final Path basePath;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The hoodie table.
|
* The meta client.
|
||||||
*/
|
*/
|
||||||
protected final HoodieTable<?, ?, ?, ?> table;
|
protected final HoodieTableMetaClient metaClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The average record size.
|
* The average record size.
|
||||||
@@ -97,12 +98,18 @@ public class WriteProfile {
|
|||||||
*/
|
*/
|
||||||
private final Map<String, HoodieCommitMetadata> metadataCache;
|
private final Map<String, HoodieCommitMetadata> metadataCache;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The engine context.
|
||||||
|
*/
|
||||||
|
private final HoodieFlinkEngineContext context;
|
||||||
|
|
||||||
public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
|
public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
|
this.context = context;
|
||||||
this.basePath = new Path(config.getBasePath());
|
this.basePath = new Path(config.getBasePath());
|
||||||
this.smallFilesMap = new HashMap<>();
|
this.smallFilesMap = new HashMap<>();
|
||||||
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
||||||
this.table = HoodieFlinkTable.create(config, context);
|
this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), context.getHadoopConf().get());
|
||||||
this.metadataCache = new HashMap<>();
|
this.metadataCache = new HashMap<>();
|
||||||
// profile the record statistics on construction
|
// profile the record statistics on construction
|
||||||
recordProfile();
|
recordProfile();
|
||||||
@@ -117,7 +124,11 @@ public class WriteProfile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public HoodieTableMetaClient getMetaClient() {
|
public HoodieTableMetaClient getMetaClient() {
|
||||||
return this.table.getMetaClient();
|
return this.metaClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected HoodieTable<?, ?, ?, ?> getTable() {
|
||||||
|
return HoodieFlinkTable.create(config, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -127,7 +138,7 @@ public class WriteProfile {
|
|||||||
private long averageBytesPerRecord() {
|
private long averageBytesPerRecord() {
|
||||||
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
|
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
|
||||||
long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit());
|
long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit());
|
||||||
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
|
||||||
if (!commitTimeline.empty()) {
|
if (!commitTimeline.empty()) {
|
||||||
// Go over the reverse ordered commits to get a more recent estimate of average record size.
|
// Go over the reverse ordered commits to get a more recent estimate of average record size.
|
||||||
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
|
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
|
||||||
@@ -175,7 +186,7 @@ public class WriteProfile {
|
|||||||
// smallFiles only for partitionPath
|
// smallFiles only for partitionPath
|
||||||
List<SmallFile> smallFileLocations = new ArrayList<>();
|
List<SmallFile> smallFileLocations = new ArrayList<>();
|
||||||
|
|
||||||
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
|
||||||
|
|
||||||
if (!commitTimeline.empty()) { // if we have some commits
|
if (!commitTimeline.empty()) { // if we have some commits
|
||||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||||
@@ -198,7 +209,7 @@ public class WriteProfile {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected SyncableFileSystemView getFileSystemView() {
|
protected SyncableFileSystemView getFileSystemView() {
|
||||||
return (SyncableFileSystemView) HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) table.getContext()).getBaseFileOnlyView();
|
return (SyncableFileSystemView) getTable().getBaseFileOnlyView();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -231,9 +242,9 @@ public class WriteProfile {
|
|||||||
// already reloaded
|
// already reloaded
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.table.getMetaClient().reloadActiveTimeline();
|
this.metaClient.reloadActiveTimeline();
|
||||||
recordProfile();
|
recordProfile();
|
||||||
cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants());
|
cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstants());
|
||||||
this.smallFilesMap.clear();
|
this.smallFilesMap.clear();
|
||||||
this.reloadedCheckpointId = checkpointId;
|
this.reloadedCheckpointId = checkpointId;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user