From 32bd8ce088e0f1d82577575ac048e1a44d44e380 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 6 Jul 2021 09:02:38 +0800 Subject: [PATCH] [HUDI-2132] Make coordinator events as POJO for efficient serialization (#3223) --- .../apache/hudi/sink/StreamWriteFunction.java | 14 ++-- .../hudi/sink/event/CommitAckEvent.java | 3 + .../hudi/sink/event/WriteMetadataEvent.java | 83 ++++++++++++------- .../TestStreamWriteOperatorCoordinator.java | 14 ++-- 4 files changed, 72 insertions(+), 42 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 36d60a400..788e928b9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -327,7 +327,7 @@ public class StreamWriteFunction .taskID(taskID) .writeStatus(Collections.emptyList()) .instantTime("") - .isBootstrap(true) + .bootstrap(true) .build(); this.eventGateway.sendEventToCoordinator(event); LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); @@ -342,7 +342,7 @@ public class StreamWriteFunction .taskID(taskID) .instantTime(currentInstant) .writeStatus(new ArrayList<>(writeStatuses)) - .isBootstrap(true) + .bootstrap(true) .build(); this.writeMetadataState.add(event); writeStatuses.clear(); @@ -617,8 +617,8 @@ public class StreamWriteFunction .taskID(taskID) .instantTime(instant) // the write instant may shift but the event still use the currentInstant. .writeStatus(writeStatus) - .isLastBatch(false) - .isEndInput(false) + .lastBatch(false) + .endInput(false) .build(); this.eventGateway.sendEventToCoordinator(event); @@ -627,7 +627,7 @@ public class StreamWriteFunction } @SuppressWarnings("unchecked, rawtypes") - private void flushRemaining(boolean isEndInput) { + private void flushRemaining(boolean endInput) { this.currentInstant = instantToWrite(hasData()); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data @@ -659,8 +659,8 @@ public class StreamWriteFunction .taskID(taskID) .instantTime(currentInstant) .writeStatus(writeStatus) - .isLastBatch(true) - .isEndInput(isEndInput) + .lastBatch(true) + .endInput(endInput) .build(); this.eventGateway.sendEventToCoordinator(event); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java index 7c81dee37..93f74af43 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/CommitAckEvent.java @@ -28,6 +28,9 @@ public class CommitAckEvent implements OperatorEvent { private static final CommitAckEvent INSTANCE = new CommitAckEvent(); + // default constructor for efficient serialization + public CommitAckEvent() {} + public static CommitAckEvent getInstance() { return INSTANCE; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java index 662383b50..a63232a45 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/event/WriteMetadataEvent.java @@ -34,9 +34,9 @@ public class WriteMetadataEvent implements OperatorEvent { private static final long serialVersionUID = 1L; private List writeStatuses; - private final int taskID; + private int taskID; private String instantTime; - private boolean isLastBatch; + private boolean lastBatch; /** * Flag saying whether the event comes from the end of input, e.g. the source @@ -44,12 +44,12 @@ public class WriteMetadataEvent implements OperatorEvent { * 1. batch execution mode * 2. bounded stream source such as VALUES */ - private final boolean isEndInput; + private boolean endInput; /** * Flag saying whether the event comes from bootstrap of a write function. */ - private final boolean isBootstrap; + private boolean bootstrap; /** * Creates an event. @@ -57,27 +57,30 @@ public class WriteMetadataEvent implements OperatorEvent { * @param taskID The task ID * @param instantTime The instant time under which to write the data * @param writeStatuses The write statues list - * @param isLastBatch Whether the event reports the last batch + * @param lastBatch Whether the event reports the last batch * within an checkpoint interval, * if true, the whole data set of the checkpoint * has been flushed successfully - * @param isBootstrap Whether the event comes from the bootstrap + * @param bootstrap Whether the event comes from the bootstrap */ private WriteMetadataEvent( int taskID, String instantTime, List writeStatuses, - boolean isLastBatch, - boolean isEndInput, - boolean isBootstrap) { + boolean lastBatch, + boolean endInput, + boolean bootstrap) { this.taskID = taskID; this.instantTime = instantTime; this.writeStatuses = new ArrayList<>(writeStatuses); - this.isLastBatch = isLastBatch; - this.isEndInput = isEndInput; - this.isBootstrap = isBootstrap; + this.lastBatch = lastBatch; + this.endInput = endInput; + this.bootstrap = bootstrap; } + // default constructor for efficient serialization + public WriteMetadataEvent() {} + /** * Returns the builder for {@link WriteMetadataEvent}. */ @@ -89,24 +92,48 @@ public class WriteMetadataEvent implements OperatorEvent { return writeStatuses; } + public void setWriteStatuses(List writeStatuses) { + this.writeStatuses = writeStatuses; + } + public int getTaskID() { return taskID; } + public void setTaskID(int taskID) { + this.taskID = taskID; + } + public String getInstantTime() { return instantTime; } - public boolean isLastBatch() { - return isLastBatch; + public void setInstantTime(String instantTime) { + this.instantTime = instantTime; } public boolean isEndInput() { - return isEndInput; + return endInput; + } + + public void setEndInput(boolean endInput) { + this.endInput = endInput; } public boolean isBootstrap() { - return isBootstrap; + return bootstrap; + } + + public void setBootstrap(boolean bootstrap) { + this.bootstrap = bootstrap; + } + + public boolean isLastBatch() { + return lastBatch; + } + + public void setLastBatch(boolean lastBatch) { + this.lastBatch = lastBatch; } /** @@ -118,7 +145,7 @@ public class WriteMetadataEvent implements OperatorEvent { ValidationUtils.checkArgument(this.taskID == other.taskID); // the instant time could be monotonically increasing this.instantTime = other.instantTime; - this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true. + this.lastBatch |= other.lastBatch; // true if one of the event isLastBatch true. List statusList = new ArrayList<>(); statusList.addAll(this.writeStatuses); statusList.addAll(other.writeStatuses); @@ -129,7 +156,7 @@ public class WriteMetadataEvent implements OperatorEvent { * Returns whether the event is ready to commit. */ public boolean isReady(String currentInstant) { - return isLastBatch && this.instantTime.equals(currentInstant); + return lastBatch && this.instantTime.equals(currentInstant); } // ------------------------------------------------------------------------- @@ -143,15 +170,15 @@ public class WriteMetadataEvent implements OperatorEvent { private List writeStatus; private Integer taskID; private String instantTime; - private boolean isLastBatch = false; - private boolean isEndInput = false; - private boolean isBootstrap = false; + private boolean lastBatch = false; + private boolean endInput = false; + private boolean bootstrap = false; public WriteMetadataEvent build() { Objects.requireNonNull(taskID); Objects.requireNonNull(instantTime); Objects.requireNonNull(writeStatus); - return new WriteMetadataEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput, isBootstrap); + return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap); } public Builder taskID(int taskID) { @@ -169,18 +196,18 @@ public class WriteMetadataEvent implements OperatorEvent { return this; } - public Builder isLastBatch(boolean isLastBatch) { - this.isLastBatch = isLastBatch; + public Builder lastBatch(boolean lastBatch) { + this.lastBatch = lastBatch; return this; } - public Builder isEndInput(boolean isEndInput) { - this.isEndInput = isEndInput; + public Builder endInput(boolean endInput) { + this.endInput = endInput; return this; } - public Builder isBootstrap(boolean isBootstrap) { - this.isBootstrap = isBootstrap; + public Builder bootstrap(boolean bootstrap) { + this.bootstrap = bootstrap; return this; } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 612bb7966..2fd0ca404 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -75,14 +75,14 @@ public class TestStreamWriteOperatorCoordinator { .taskID(0) .instantTime("") .writeStatus(Collections.emptyList()) - .isBootstrap(true) + .bootstrap(true) .build(); final WriteMetadataEvent event1 = WriteMetadataEvent.builder() .taskID(1) .instantTime("") .writeStatus(Collections.emptyList()) - .isBootstrap(true) + .bootstrap(true) .build(); coordinator.handleEventFromOperator(0, event0); @@ -106,7 +106,7 @@ public class TestStreamWriteOperatorCoordinator { .taskID(0) .instantTime(instant) .writeStatus(Collections.singletonList(writeStatus)) - .isLastBatch(true) + .lastBatch(true) .build(); WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); @@ -116,7 +116,7 @@ public class TestStreamWriteOperatorCoordinator { .taskID(1) .instantTime(instant) .writeStatus(Collections.singletonList(writeStatus1)) - .isLastBatch(true) + .lastBatch(true) .build(); coordinator.handleEventFromOperator(0, event0); coordinator.handleEventFromOperator(1, event1); @@ -184,7 +184,7 @@ public class TestStreamWriteOperatorCoordinator { .taskID(1) .instantTime(instant) .writeStatus(Collections.singletonList(writeStatus1)) - .isLastBatch(true) + .lastBatch(true) .build(); coordinator.handleEventFromOperator(1, event1); assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), @@ -207,7 +207,7 @@ public class TestStreamWriteOperatorCoordinator { .taskID(0) .instantTime("") .writeStatus(Collections.emptyList()) - .isBootstrap(true) + .bootstrap(true) .build(); coordinator.handleEventFromOperator(0, event0); @@ -223,7 +223,7 @@ public class TestStreamWriteOperatorCoordinator { .taskID(0) .instantTime(instant) .writeStatus(Collections.singletonList(writeStatus)) - .isLastBatch(true) + .lastBatch(true) .build(); coordinator.handleEventFromOperator(0, event1);