[HUDI-2019] Set up the file system view storage config for singleton embedded server write config every time (#3102)
Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
@@ -49,6 +49,8 @@ public class EmbeddedTimelineServerHelper {
|
|||||||
if (config.isEmbeddedTimelineServerReuseEnabled()) {
|
if (config.isEmbeddedTimelineServerReuseEnabled()) {
|
||||||
if (!TIMELINE_SERVER.isPresent() || !TIMELINE_SERVER.get().canReuseFor(config.getBasePath())) {
|
if (!TIMELINE_SERVER.isPresent() || !TIMELINE_SERVER.get().canReuseFor(config.getBasePath())) {
|
||||||
TIMELINE_SERVER = Option.of(startTimelineService(context, config));
|
TIMELINE_SERVER = Option.of(startTimelineService(context, config));
|
||||||
|
} else {
|
||||||
|
updateWriteConfigWithTimelineServer(TIMELINE_SERVER.get(), config);
|
||||||
}
|
}
|
||||||
return TIMELINE_SERVER;
|
return TIMELINE_SERVER;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,10 +24,13 @@ import org.apache.hudi.common.model.HoodieKey;
|
|||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
|
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
|
||||||
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
|
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
import org.apache.hudi.utils.TestConfigurations;
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
import org.apache.hudi.utils.TestData;
|
import org.apache.hudi.utils.TestData;
|
||||||
|
|
||||||
@@ -50,9 +53,11 @@ import java.util.stream.Collectors;
|
|||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
@@ -692,6 +697,19 @@ public class TestWriteCopyOnWrite {
|
|||||||
}, "Timeout(500ms) while waiting for instant");
|
}, "Timeout(500ms) while waiting for instant");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReuseEmbeddedServer() {
|
||||||
|
HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null);
|
||||||
|
FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig();
|
||||||
|
|
||||||
|
assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
|
||||||
|
|
||||||
|
// get another write client
|
||||||
|
writeClient = StreamerUtil.createWriteClient(conf, null);
|
||||||
|
assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST);
|
||||||
|
assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort());
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Utilities
|
// Utilities
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user