diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index ddfe22ac9..a966fee1d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; @@ -248,16 +249,19 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { int instantsInPreviousFile = instantsInRange.size(); // Read the avro blocks while (reader.hasNext()) { - HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - // TODO If we can store additional metadata in datablock, we can skip parsing records - // (such as startTime, endTime of records in the block) - try (ClosableIterator itr = blk.getRecordItr()) { - StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true) - // Filter blocks in desired time window - .filter(r -> commitsFilter.apply((GenericRecord) r)) - .map(r -> readCommit((GenericRecord) r, loadInstantDetails)) - .filter(c -> filter == null || filter.isInRange(c)) - .forEach(instantsInRange::add); + HoodieLogBlock block = reader.next(); + if (block instanceof HoodieAvroDataBlock) { + HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block; + // TODO If we can store additional metadata in datablock, we can skip parsing records + // (such as startTime, endTime of records in the block) + try (ClosableIterator itr = avroBlock.getRecordItr()) { + StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true) + // Filter blocks in desired time window + .filter(r -> commitsFilter.apply((GenericRecord) r)) + .map(r -> readCommit((GenericRecord) r, loadInstantDetails)) + .filter(c -> filter == null || filter.isInRange(c)) + .forEach(instantsInRange::add); + } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index c4f2e771c..b5ec08a58 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -30,7 +30,6 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.HiveSyncContext; @@ -43,7 +42,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -431,31 +429,6 @@ public class StreamWriteOperatorCoordinator addEventToBuffer(event); } - /** - * The coordinator reuses the instant if there is no data for this round of checkpoint, - * sends the commit ack events to unblock the flushing. - */ - private void sendCommitAckEvents(long checkpointId) { - CompletableFuture[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull) - .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId))) - .toArray(CompletableFuture[]::new); - CompletableFuture.allOf(futures).whenComplete((resp, error) -> { - if (!sendToFinishedTasks(error)) { - throw new HoodieException("Error while waiting for the commit ack events to finish sending", error); - } - }); - } - - /** - * Decides whether the given exception is caused by sending events to FINISHED tasks. - * - *

Ugly impl: the exception may change in the future. - */ - private static boolean sendToFinishedTasks(Throwable throwable) { - return throwable.getCause() instanceof TaskNotRunningException - || throwable.getCause().getMessage().contains("running"); - } - /** * Commits the instant. */ @@ -483,8 +456,6 @@ public class StreamWriteOperatorCoordinator if (writeResults.size() == 0) { // No data has written, reset the buffer and returns early reset(); - // Send commit ack event to the write function to unblock the flushing - sendCommitAckEvents(checkpointId); return false; } doCommit(instant, writeResults);