1
0

[HUDI-2132] Make coordinator events as POJO for efficient serialization (#3223)

This commit is contained in:
Danny Chan
2021-07-06 09:02:38 +08:00
committed by GitHub
parent 650c4455c6
commit 32bd8ce088
4 changed files with 72 additions and 42 deletions

View File

@@ -327,7 +327,7 @@ public class StreamWriteFunction<K, I, O>
.taskID(taskID) .taskID(taskID)
.writeStatus(Collections.emptyList()) .writeStatus(Collections.emptyList())
.instantTime("") .instantTime("")
.isBootstrap(true) .bootstrap(true)
.build(); .build();
this.eventGateway.sendEventToCoordinator(event); this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
@@ -342,7 +342,7 @@ public class StreamWriteFunction<K, I, O>
.taskID(taskID) .taskID(taskID)
.instantTime(currentInstant) .instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses)) .writeStatus(new ArrayList<>(writeStatuses))
.isBootstrap(true) .bootstrap(true)
.build(); .build();
this.writeMetadataState.add(event); this.writeMetadataState.add(event);
writeStatuses.clear(); writeStatuses.clear();
@@ -617,8 +617,8 @@ public class StreamWriteFunction<K, I, O>
.taskID(taskID) .taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant. .instantTime(instant) // the write instant may shift but the event still use the currentInstant.
.writeStatus(writeStatus) .writeStatus(writeStatus)
.isLastBatch(false) .lastBatch(false)
.isEndInput(false) .endInput(false)
.build(); .build();
this.eventGateway.sendEventToCoordinator(event); this.eventGateway.sendEventToCoordinator(event);
@@ -627,7 +627,7 @@ public class StreamWriteFunction<K, I, O>
} }
@SuppressWarnings("unchecked, rawtypes") @SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) { private void flushRemaining(boolean endInput) {
this.currentInstant = instantToWrite(hasData()); this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) { if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data // in case there are empty checkpoints that has no input data
@@ -659,8 +659,8 @@ public class StreamWriteFunction<K, I, O>
.taskID(taskID) .taskID(taskID)
.instantTime(currentInstant) .instantTime(currentInstant)
.writeStatus(writeStatus) .writeStatus(writeStatus)
.isLastBatch(true) .lastBatch(true)
.isEndInput(isEndInput) .endInput(endInput)
.build(); .build();
this.eventGateway.sendEventToCoordinator(event); this.eventGateway.sendEventToCoordinator(event);

View File

@@ -28,6 +28,9 @@ public class CommitAckEvent implements OperatorEvent {
private static final CommitAckEvent INSTANCE = new CommitAckEvent(); private static final CommitAckEvent INSTANCE = new CommitAckEvent();
// default constructor for efficient serialization
public CommitAckEvent() {}
public static CommitAckEvent getInstance() { public static CommitAckEvent getInstance() {
return INSTANCE; return INSTANCE;
} }

View File

@@ -34,9 +34,9 @@ public class WriteMetadataEvent implements OperatorEvent {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private List<WriteStatus> writeStatuses; private List<WriteStatus> writeStatuses;
private final int taskID; private int taskID;
private String instantTime; private String instantTime;
private boolean isLastBatch; private boolean lastBatch;
/** /**
* Flag saying whether the event comes from the end of input, e.g. the source * Flag saying whether the event comes from the end of input, e.g. the source
@@ -44,12 +44,12 @@ public class WriteMetadataEvent implements OperatorEvent {
* 1. batch execution mode * 1. batch execution mode
* 2. bounded stream source such as VALUES * 2. bounded stream source such as VALUES
*/ */
private final boolean isEndInput; private boolean endInput;
/** /**
* Flag saying whether the event comes from bootstrap of a write function. * Flag saying whether the event comes from bootstrap of a write function.
*/ */
private final boolean isBootstrap; private boolean bootstrap;
/** /**
* Creates an event. * Creates an event.
@@ -57,27 +57,30 @@ public class WriteMetadataEvent implements OperatorEvent {
* @param taskID The task ID * @param taskID The task ID
* @param instantTime The instant time under which to write the data * @param instantTime The instant time under which to write the data
* @param writeStatuses The write statues list * @param writeStatuses The write statues list
* @param isLastBatch Whether the event reports the last batch * @param lastBatch Whether the event reports the last batch
* within an checkpoint interval, * within an checkpoint interval,
* if true, the whole data set of the checkpoint * if true, the whole data set of the checkpoint
* has been flushed successfully * has been flushed successfully
* @param isBootstrap Whether the event comes from the bootstrap * @param bootstrap Whether the event comes from the bootstrap
*/ */
private WriteMetadataEvent( private WriteMetadataEvent(
int taskID, int taskID,
String instantTime, String instantTime,
List<WriteStatus> writeStatuses, List<WriteStatus> writeStatuses,
boolean isLastBatch, boolean lastBatch,
boolean isEndInput, boolean endInput,
boolean isBootstrap) { boolean bootstrap) {
this.taskID = taskID; this.taskID = taskID;
this.instantTime = instantTime; this.instantTime = instantTime;
this.writeStatuses = new ArrayList<>(writeStatuses); this.writeStatuses = new ArrayList<>(writeStatuses);
this.isLastBatch = isLastBatch; this.lastBatch = lastBatch;
this.isEndInput = isEndInput; this.endInput = endInput;
this.isBootstrap = isBootstrap; this.bootstrap = bootstrap;
} }
// default constructor for efficient serialization
public WriteMetadataEvent() {}
/** /**
* Returns the builder for {@link WriteMetadataEvent}. * Returns the builder for {@link WriteMetadataEvent}.
*/ */
@@ -89,24 +92,48 @@ public class WriteMetadataEvent implements OperatorEvent {
return writeStatuses; return writeStatuses;
} }
public void setWriteStatuses(List<WriteStatus> writeStatuses) {
this.writeStatuses = writeStatuses;
}
public int getTaskID() { public int getTaskID() {
return taskID; return taskID;
} }
public void setTaskID(int taskID) {
this.taskID = taskID;
}
public String getInstantTime() { public String getInstantTime() {
return instantTime; return instantTime;
} }
public boolean isLastBatch() { public void setInstantTime(String instantTime) {
return isLastBatch; this.instantTime = instantTime;
} }
public boolean isEndInput() { public boolean isEndInput() {
return isEndInput; return endInput;
}
public void setEndInput(boolean endInput) {
this.endInput = endInput;
} }
public boolean isBootstrap() { public boolean isBootstrap() {
return isBootstrap; return bootstrap;
}
public void setBootstrap(boolean bootstrap) {
this.bootstrap = bootstrap;
}
public boolean isLastBatch() {
return lastBatch;
}
public void setLastBatch(boolean lastBatch) {
this.lastBatch = lastBatch;
} }
/** /**
@@ -118,7 +145,7 @@ public class WriteMetadataEvent implements OperatorEvent {
ValidationUtils.checkArgument(this.taskID == other.taskID); ValidationUtils.checkArgument(this.taskID == other.taskID);
// the instant time could be monotonically increasing // the instant time could be monotonically increasing
this.instantTime = other.instantTime; this.instantTime = other.instantTime;
this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true. this.lastBatch |= other.lastBatch; // true if one of the event isLastBatch true.
List<WriteStatus> statusList = new ArrayList<>(); List<WriteStatus> statusList = new ArrayList<>();
statusList.addAll(this.writeStatuses); statusList.addAll(this.writeStatuses);
statusList.addAll(other.writeStatuses); statusList.addAll(other.writeStatuses);
@@ -129,7 +156,7 @@ public class WriteMetadataEvent implements OperatorEvent {
* Returns whether the event is ready to commit. * Returns whether the event is ready to commit.
*/ */
public boolean isReady(String currentInstant) { public boolean isReady(String currentInstant) {
return isLastBatch && this.instantTime.equals(currentInstant); return lastBatch && this.instantTime.equals(currentInstant);
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@@ -143,15 +170,15 @@ public class WriteMetadataEvent implements OperatorEvent {
private List<WriteStatus> writeStatus; private List<WriteStatus> writeStatus;
private Integer taskID; private Integer taskID;
private String instantTime; private String instantTime;
private boolean isLastBatch = false; private boolean lastBatch = false;
private boolean isEndInput = false; private boolean endInput = false;
private boolean isBootstrap = false; private boolean bootstrap = false;
public WriteMetadataEvent build() { public WriteMetadataEvent build() {
Objects.requireNonNull(taskID); Objects.requireNonNull(taskID);
Objects.requireNonNull(instantTime); Objects.requireNonNull(instantTime);
Objects.requireNonNull(writeStatus); Objects.requireNonNull(writeStatus);
return new WriteMetadataEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput, isBootstrap); return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap);
} }
public Builder taskID(int taskID) { public Builder taskID(int taskID) {
@@ -169,18 +196,18 @@ public class WriteMetadataEvent implements OperatorEvent {
return this; return this;
} }
public Builder isLastBatch(boolean isLastBatch) { public Builder lastBatch(boolean lastBatch) {
this.isLastBatch = isLastBatch; this.lastBatch = lastBatch;
return this; return this;
} }
public Builder isEndInput(boolean isEndInput) { public Builder endInput(boolean endInput) {
this.isEndInput = isEndInput; this.endInput = endInput;
return this; return this;
} }
public Builder isBootstrap(boolean isBootstrap) { public Builder bootstrap(boolean bootstrap) {
this.isBootstrap = isBootstrap; this.bootstrap = bootstrap;
return this; return this;
} }
} }

View File

@@ -75,14 +75,14 @@ public class TestStreamWriteOperatorCoordinator {
.taskID(0) .taskID(0)
.instantTime("") .instantTime("")
.writeStatus(Collections.emptyList()) .writeStatus(Collections.emptyList())
.isBootstrap(true) .bootstrap(true)
.build(); .build();
final WriteMetadataEvent event1 = WriteMetadataEvent.builder() final WriteMetadataEvent event1 = WriteMetadataEvent.builder()
.taskID(1) .taskID(1)
.instantTime("") .instantTime("")
.writeStatus(Collections.emptyList()) .writeStatus(Collections.emptyList())
.isBootstrap(true) .bootstrap(true)
.build(); .build();
coordinator.handleEventFromOperator(0, event0); coordinator.handleEventFromOperator(0, event0);
@@ -106,7 +106,7 @@ public class TestStreamWriteOperatorCoordinator {
.taskID(0) .taskID(0)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus)) .writeStatus(Collections.singletonList(writeStatus))
.isLastBatch(true) .lastBatch(true)
.build(); .build();
WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
@@ -116,7 +116,7 @@ public class TestStreamWriteOperatorCoordinator {
.taskID(1) .taskID(1)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1)) .writeStatus(Collections.singletonList(writeStatus1))
.isLastBatch(true) .lastBatch(true)
.build(); .build();
coordinator.handleEventFromOperator(0, event0); coordinator.handleEventFromOperator(0, event0);
coordinator.handleEventFromOperator(1, event1); coordinator.handleEventFromOperator(1, event1);
@@ -184,7 +184,7 @@ public class TestStreamWriteOperatorCoordinator {
.taskID(1) .taskID(1)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1)) .writeStatus(Collections.singletonList(writeStatus1))
.isLastBatch(true) .lastBatch(true)
.build(); .build();
coordinator.handleEventFromOperator(1, event1); coordinator.handleEventFromOperator(1, event1);
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2), assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
@@ -207,7 +207,7 @@ public class TestStreamWriteOperatorCoordinator {
.taskID(0) .taskID(0)
.instantTime("") .instantTime("")
.writeStatus(Collections.emptyList()) .writeStatus(Collections.emptyList())
.isBootstrap(true) .bootstrap(true)
.build(); .build();
coordinator.handleEventFromOperator(0, event0); coordinator.handleEventFromOperator(0, event0);
@@ -223,7 +223,7 @@ public class TestStreamWriteOperatorCoordinator {
.taskID(0) .taskID(0)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus)) .writeStatus(Collections.singletonList(writeStatus))
.isLastBatch(true) .lastBatch(true)
.build(); .build();
coordinator.handleEventFromOperator(0, event1); coordinator.handleEventFromOperator(0, event1);