1
0

[HUDI-2914] Fix remote timeline server config for flink (#4191)

This commit is contained in:
Danny Chan
2021-12-03 08:59:10 +08:00
committed by GitHub
parent 91d2e61433
commit 934fe54cc5
7 changed files with 34 additions and 40 deletions

View File

@@ -121,16 +121,16 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getMaxNumberOfParallelSubtasks(),
getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks(),
ignoreSmallFiles(writeConfig), ignoreSmallFiles(),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context, context,
writeConfig); writeConfig);
this.payloadCreation = PayloadCreation.instance(this.conf); this.payloadCreation = PayloadCreation.instance(this.conf);
} }
private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) { private boolean ignoreSmallFiles() {
WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
return WriteOperationType.isOverwrite(operationType) || writeConfig.allowDuplicateInserts(); return WriteOperationType.isOverwrite(operationType);
} }
@Override @Override

View File

@@ -22,7 +22,6 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.table.action.commit.SmallFile;
@@ -188,6 +187,7 @@ public class BucketAssigner implements AutoCloseable {
smallFileAssignMap.put(partitionPath, assign); smallFileAssignMap.put(partitionPath, assign);
return assign; return assign;
} }
smallFileAssignMap.put(partitionPath, null);
return null; return null;
} }
@@ -211,10 +211,6 @@ public class BucketAssigner implements AutoCloseable {
this.writeProfile.reload(checkpointId); this.writeProfile.reload(checkpointId);
} }
public HoodieTable<?, ?, ?, ?> getTable() {
return this.writeProfile.getTable();
}
private boolean fileIdOfThisTask(String fileId) { private boolean fileIdOfThisTask(String fileId) {
// the file id can shuffle to this task // the file id can shuffle to this task
return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, maxParallelism, numTasks) == taskID; return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, maxParallelism, numTasks) == taskID;

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.AbstractTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.table.action.commit.SmallFile;
@@ -55,13 +55,11 @@ public class DeltaWriteProfile extends WriteProfile {
// Find out all eligible small file slices // Find out all eligible small file slices
if (!commitTimeline.empty()) { if (!commitTimeline.empty()) {
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// initialize the filesystem view based on the commit metadata // find the smallest file in partition and append to it
initFileSystemView();
// find smallest file in partition and append to it
List<FileSlice> allSmallFileSlices = new ArrayList<>(); List<FileSlice> allSmallFileSlices = new ArrayList<>();
// If we can index log files, we can add more inserts to log files for fileIds including those under // If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction. // pending compaction.
List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) List<FileSlice> allFileSlices = getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList()); .collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) { for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) { if (isSmallFile(fileSlice)) {
@@ -91,8 +89,8 @@ public class DeltaWriteProfile extends WriteProfile {
return smallFileLocations; return smallFileLocations;
} }
protected AbstractTableFileSystemView getFileSystemView() { protected SyncableFileSystemView getFileSystemView() {
return (AbstractTableFileSystemView) this.table.getSliceView(); return (SyncableFileSystemView) this.table.getSliceView();
} }
private long getTotalFileSize(FileSlice fileSlice) { private long getTotalFileSize(FileSlice fileSlice) {

View File

@@ -23,9 +23,10 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.AbstractTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieFlinkTable;
@@ -91,11 +92,6 @@ public class WriteProfile {
*/ */
private long reloadedCheckpointId; private long reloadedCheckpointId;
/**
* The file system view cache for one checkpoint interval.
*/
protected AbstractTableFileSystemView fsView;
/** /**
* Metadata cache to reduce IO of metadata files. * Metadata cache to reduce IO of metadata files.
*/ */
@@ -120,8 +116,8 @@ public class WriteProfile {
return recordsPerBucket; return recordsPerBucket;
} }
public HoodieTable<?, ?, ?, ?> getTable() { public HoodieTableMetaClient getMetaClient() {
return table; return this.table.getMetaClient();
} }
/** /**
@@ -183,9 +179,7 @@ public class WriteProfile {
if (!commitTimeline.empty()) { // if we have some commits if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
// initialize the filesystem view based on the commit metadata List<HoodieBaseFile> allFiles = getFileSystemView()
initFileSystemView();
List<HoodieBaseFile> allFiles = fsView
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) { for (HoodieBaseFile file : allFiles) {
@@ -203,15 +197,8 @@ public class WriteProfile {
return smallFileLocations; return smallFileLocations;
} }
@VisibleForTesting protected SyncableFileSystemView getFileSystemView() {
public void initFileSystemView() { return (SyncableFileSystemView) HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) table.getContext()).getBaseFileOnlyView();
if (fsView == null) {
fsView = getFileSystemView();
}
}
protected AbstractTableFileSystemView getFileSystemView() {
return (AbstractTableFileSystemView) this.table.getBaseFileOnlyView();
} }
/** /**
@@ -245,9 +232,7 @@ public class WriteProfile {
return; return;
} }
this.table.getMetaClient().reloadActiveTimeline(); this.table.getMetaClient().reloadActiveTimeline();
this.table.getHoodieView().sync();
recordProfile(); recordProfile();
this.fsView = null;
cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants()); cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants());
this.smallFilesMap.clear(); this.smallFilesMap.clear();
this.reloadedCheckpointId = checkpointId; this.reloadedCheckpointId = checkpointId;

View File

@@ -365,6 +365,7 @@ public class StreamerUtil {
* *
* <p>This expects to be used by client, the driver should start an embedded timeline server. * <p>This expects to be used by client, the driver should start an embedded timeline server.
*/ */
@SuppressWarnings("rawtypes")
public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) {
HoodieFlinkEngineContext context = HoodieFlinkEngineContext context =
new HoodieFlinkEngineContext( new HoodieFlinkEngineContext(
@@ -382,17 +383,20 @@ public class StreamerUtil {
* *
* <p>The task context supplier is a constant: the write token is always '0-1-0'. * <p>The task context supplier is a constant: the write token is always '0-1-0'.
*/ */
@SuppressWarnings("rawtypes")
public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException {
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
// build the write client to start the embedded timeline server
final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
// create the filesystem view storage properties for client // create the filesystem view storage properties for client
FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig();
// rebuild the view storage config with simplified options. // rebuild the view storage config with simplified options.
FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder()
.withStorageType(viewStorageConfig.getStorageType()) .withStorageType(viewStorageConfig.getStorageType())
.withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build(); .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build();
ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt); ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt);
return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); return writeClient;
} }
/** /**

View File

@@ -401,7 +401,7 @@ public class TestBucketAssigner {
} }
private static String getLastCompleteInstant(WriteProfile profile) { private static String getLastCompleteInstant(WriteProfile profile) {
return StreamerUtil.getLastCompletedInstant(profile.getTable().getMetaClient()); return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
} }
private void assertBucketEquals( private void assertBucketEquals(

View File

@@ -19,9 +19,12 @@
package org.apache.hudi.utils; package org.apache.hudi.utils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.util.ViewStorageProperties;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -98,5 +101,13 @@ public class TestStreamerUtil {
long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower); long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
assertThat(diff, is(75L)); assertThat(diff, is(75L));
} }
@Test
void testDumpRemoteViewStorageConfig() throws IOException {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
StreamerUtil.createWriteClient(conf);
FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST));
}
} }