[HUDI-4403] Fix the end input metadata for bounded source (#6116)
This commit is contained in:
@@ -408,6 +408,16 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
|
|||||||
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
|
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanWriteHandles() {
|
||||||
|
if (freshInstant(currentInstant)) {
|
||||||
|
// In rare cases, when a checkpoint was aborted and the instant time
|
||||||
|
// is reused, the merge handle generates a new file name
|
||||||
|
// with the reused instant time of last checkpoint, the write handles
|
||||||
|
// should be kept and reused in case data loss.
|
||||||
|
this.writeClient.cleanHandles();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked, rawtypes")
|
@SuppressWarnings("unchecked, rawtypes")
|
||||||
private boolean flushBucket(DataBucket bucket) {
|
private boolean flushBucket(DataBucket bucket) {
|
||||||
String instant = instantToWrite(true);
|
String instant = instantToWrite(true);
|
||||||
@@ -479,7 +489,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
|
|||||||
this.eventGateway.sendEventToCoordinator(event);
|
this.eventGateway.sendEventToCoordinator(event);
|
||||||
this.buckets.clear();
|
this.buckets.clear();
|
||||||
this.tracer.reset();
|
this.tracer.reset();
|
||||||
this.writeClient.cleanHandles();
|
cleanWriteHandles();
|
||||||
this.writeStatuses.addAll(writeStatus);
|
this.writeStatuses.addAll(writeStatus);
|
||||||
// blocks flushing until the coordinator starts a new instant
|
// blocks flushing until the coordinator starts a new instant
|
||||||
this.confirming = true;
|
this.confirming = true;
|
||||||
|
|||||||
@@ -365,7 +365,10 @@ public class StreamWriteOperatorCoordinator
|
|||||||
*/
|
*/
|
||||||
private boolean allEventsReceived() {
|
private boolean allEventsReceived() {
|
||||||
return Arrays.stream(eventBuffer)
|
return Arrays.stream(eventBuffer)
|
||||||
.allMatch(event -> event != null && event.isReady(this.instant));
|
// we do not use event.isReady to check the instant
|
||||||
|
// because the write task may send an event eagerly for empty
|
||||||
|
// data set, the even may have a timestamp of last committed instant.
|
||||||
|
.allMatch(event -> event != null && event.isLastBatch());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addEventToBuffer(WriteMetadataEvent event) {
|
private void addEventToBuffer(WriteMetadataEvent event) {
|
||||||
@@ -425,12 +428,14 @@ public class StreamWriteOperatorCoordinator
|
|||||||
addEventToBuffer(event);
|
addEventToBuffer(event);
|
||||||
if (allEventsReceived()) {
|
if (allEventsReceived()) {
|
||||||
// start to commit the instant.
|
// start to commit the instant.
|
||||||
commitInstant(this.instant);
|
boolean committed = commitInstant(this.instant);
|
||||||
// The executor thread inherits the classloader of the #handleEventFromOperator
|
if (committed) {
|
||||||
// caller, which is a AppClassLoader.
|
// The executor thread inherits the classloader of the #handleEventFromOperator
|
||||||
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
// caller, which is a AppClassLoader.
|
||||||
// sync Hive synchronously if it is enabled in batch mode.
|
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
||||||
syncHive();
|
// sync Hive synchronously if it is enabled in batch mode.
|
||||||
|
syncHive();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -474,8 +479,8 @@ public class StreamWriteOperatorCoordinator
|
|||||||
/**
|
/**
|
||||||
* Commits the instant.
|
* Commits the instant.
|
||||||
*/
|
*/
|
||||||
private void commitInstant(String instant) {
|
private boolean commitInstant(String instant) {
|
||||||
commitInstant(instant, -1);
|
return commitInstant(instant, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -243,6 +243,13 @@ public abstract class AbstractStreamWriteFunction<I>
|
|||||||
return this.ckpMetadata.lastPendingInstant();
|
return this.ckpMetadata.lastPendingInstant();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether the instant is fresh new(not aborted).
|
||||||
|
*/
|
||||||
|
protected boolean freshInstant(String instant) {
|
||||||
|
return !this.ckpMetadata.isAborted(instant);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepares the instant time to write with for next checkpoint.
|
* Prepares the instant time to write with for next checkpoint.
|
||||||
*
|
*
|
||||||
@@ -279,6 +286,6 @@ public abstract class AbstractStreamWriteFunction<I>
|
|||||||
* Returns whether the pending instant is invalid to write with.
|
* Returns whether the pending instant is invalid to write with.
|
||||||
*/
|
*/
|
||||||
private boolean invalidInstant(String instant, boolean hasData) {
|
private boolean invalidInstant(String instant, boolean hasData) {
|
||||||
return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant);
|
return instant.equals(this.currentInstant) && hasData && freshInstant(instant);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -358,9 +358,9 @@ public class Pipelines {
|
|||||||
* The whole pipeline looks like the following:
|
* The whole pipeline looks like the following:
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* /=== | task1 | ===\
|
* /=== | task1 | ===\
|
||||||
* | plan generation | ===> hash | commit |
|
* | plan generation | ===> hash | commit |
|
||||||
* \=== | task2 | ===/
|
* \=== | task2 | ===/
|
||||||
*
|
*
|
||||||
* Note: both the compaction plan generation task and commission task are singleton.
|
* Note: both the compaction plan generation task and commission task are singleton.
|
||||||
* </pre>
|
* </pre>
|
||||||
@@ -374,6 +374,8 @@ public class Pipelines {
|
|||||||
TypeInformation.of(CompactionPlanEvent.class),
|
TypeInformation.of(CompactionPlanEvent.class),
|
||||||
new CompactionPlanOperator(conf))
|
new CompactionPlanOperator(conf))
|
||||||
.setParallelism(1) // plan generate must be singleton
|
.setParallelism(1) // plan generate must be singleton
|
||||||
|
// make the distribution strategy deterministic to avoid concurrent modifications
|
||||||
|
// on the same bucket files
|
||||||
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
|
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
|
||||||
.transform("compact_task",
|
.transform("compact_task",
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
TypeInformation.of(CompactionCommitEvent.class),
|
||||||
@@ -393,9 +395,9 @@ public class Pipelines {
|
|||||||
* The whole pipeline looks like the following:
|
* The whole pipeline looks like the following:
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* /=== | task1 | ===\
|
* /=== | task1 | ===\
|
||||||
* | plan generation | ===> hash | commit |
|
* | plan generation | ===> hash | commit |
|
||||||
* \=== | task2 | ===/
|
* \=== | task2 | ===/
|
||||||
*
|
*
|
||||||
* Note: both the clustering plan generation task and commission task are singleton.
|
* Note: both the clustering plan generation task and commission task are singleton.
|
||||||
* </pre>
|
* </pre>
|
||||||
@@ -410,9 +412,11 @@ public class Pipelines {
|
|||||||
TypeInformation.of(ClusteringPlanEvent.class),
|
TypeInformation.of(ClusteringPlanEvent.class),
|
||||||
new ClusteringPlanOperator(conf))
|
new ClusteringPlanOperator(conf))
|
||||||
.setParallelism(1) // plan generate must be singleton
|
.setParallelism(1) // plan generate must be singleton
|
||||||
.keyBy(plan -> plan.getClusteringGroupInfo().getOperations()
|
.keyBy(plan ->
|
||||||
.stream().map(ClusteringOperation::getFileId)
|
// make the distribution strategy deterministic to avoid concurrent modifications
|
||||||
.collect(Collectors.joining()))
|
// on the same bucket files
|
||||||
|
plan.getClusteringGroupInfo().getOperations()
|
||||||
|
.stream().map(ClusteringOperation::getFileId).collect(Collectors.joining()))
|
||||||
.transform("clustering_task",
|
.transform("clustering_task",
|
||||||
TypeInformation.of(ClusteringCommitEvent.class),
|
TypeInformation.of(ClusteringCommitEvent.class),
|
||||||
new ClusteringOperator(conf, rowType))
|
new ClusteringOperator(conf, rowType))
|
||||||
|
|||||||
@@ -248,21 +248,8 @@ public class ITTestDataStreamWrite extends TestLogger {
|
|||||||
Pipelines.clean(conf, pipeline);
|
Pipelines.clean(conf, pipeline);
|
||||||
Pipelines.compact(conf, pipeline);
|
Pipelines.compact(conf, pipeline);
|
||||||
}
|
}
|
||||||
JobClient client = execEnv.executeAsync(jobName);
|
|
||||||
if (isMor) {
|
|
||||||
if (client.getJobStatus().get() != JobStatus.FAILED) {
|
|
||||||
try {
|
|
||||||
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
|
|
||||||
client.cancel();
|
|
||||||
} catch (Throwable var1) {
|
|
||||||
// ignored
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// wait for the streaming job to finish
|
|
||||||
client.getJobExecutionResult().get();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
execute(execEnv, isMor, jobName);
|
||||||
TestData.checkWrittenDataCOW(tempFile, expected);
|
TestData.checkWrittenDataCOW(tempFile, expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -322,17 +309,14 @@ public class ITTestDataStreamWrite extends TestLogger {
|
|||||||
execEnv.addOperator(pipeline.getTransformation());
|
execEnv.addOperator(pipeline.getTransformation());
|
||||||
|
|
||||||
Pipelines.cluster(conf, rowType, pipeline);
|
Pipelines.cluster(conf, rowType, pipeline);
|
||||||
JobClient client = execEnv.executeAsync(jobName);
|
execEnv.execute(jobName);
|
||||||
|
|
||||||
// wait for the streaming job to finish
|
|
||||||
client.getJobExecutionResult().get();
|
|
||||||
|
|
||||||
TestData.checkWrittenDataCOW(tempFile, expected);
|
TestData.checkWrittenDataCOW(tempFile, expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
|
public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
|
||||||
JobClient client = execEnv.executeAsync(jobName);
|
|
||||||
if (isMor) {
|
if (isMor) {
|
||||||
|
JobClient client = execEnv.executeAsync(jobName);
|
||||||
if (client.getJobStatus().get() != JobStatus.FAILED) {
|
if (client.getJobStatus().get() != JobStatus.FAILED) {
|
||||||
try {
|
try {
|
||||||
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
|
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
|
||||||
@@ -343,7 +327,7 @@ public class ITTestDataStreamWrite extends TestLogger {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// wait for the streaming job to finish
|
// wait for the streaming job to finish
|
||||||
client.getJobExecutionResult().get();
|
execEnv.execute(jobName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -451,5 +435,4 @@ public class ITTestDataStreamWrite extends TestLogger {
|
|||||||
execute(execEnv, true, "Api_Sink_Test");
|
execute(execEnv, true, "Api_Sink_Test");
|
||||||
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
|
TestData.checkWrittenDataCOW(tempFile, EXPECTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user