diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java index 558b5ff62..3112a9185 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java @@ -49,6 +49,8 @@ public class EmbeddedTimelineServerHelper { if (config.isEmbeddedTimelineServerReuseEnabled()) { if (!TIMELINE_SERVER.isPresent() || !TIMELINE_SERVER.get().canReuseFor(config.getBasePath())) { TIMELINE_SERVER = Option.of(startTimelineService(context, config)); + } else { + updateWriteConfigWithTimelineServer(TIMELINE_SERVER.get(), config); } return TIMELINE_SERVER; } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 5e68b7656..d150d5346 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -24,10 +24,13 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; +import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -50,9 +53,11 @@ import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -692,6 +697,19 @@ public class TestWriteCopyOnWrite { }, "Timeout(500ms) while waiting for instant"); } + @Test + public void testReuseEmbeddedServer() { + HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, null); + FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig(); + + assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); + + // get another write client + writeClient = StreamerUtil.createWriteClient(conf, null); + assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); + assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort()); + } + // ------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------