1
0

[minor] Checks the data block type for archived timeline (#5106)

This commit is contained in:
Danny Chan
2022-03-24 14:10:43 +08:00
committed by GitHub
parent 52f0498330
commit a1c42fcc07
2 changed files with 14 additions and 39 deletions

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
@@ -248,16 +249,19 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
int instantsInPreviousFile = instantsInRange.size();
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
try (ClosableIterator<IndexedRecord> itr = blk.getRecordItr()) {
StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
// Filter blocks in desired time window
.filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r, loadInstantDetails))
.filter(c -> filter == null || filter.isInRange(c))
.forEach(instantsInRange::add);
HoodieLogBlock block = reader.next();
if (block instanceof HoodieAvroDataBlock) {
HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
try (ClosableIterator<IndexedRecord> itr = avroBlock.getRecordItr()) {
StreamSupport.stream(Spliterators.spliteratorUnknownSize(itr, Spliterator.IMMUTABLE), true)
// Filter blocks in desired time window
.filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r, loadInstantDetails))
.filter(c -> filter == null || filter.isInRange(c))
.forEach(instantsInRange::add);
}
}
}

View File

@@ -30,7 +30,6 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.HiveSyncContext;
@@ -43,7 +42,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -431,31 +429,6 @@ public class StreamWriteOperatorCoordinator
addEventToBuffer(event);
}
/**
* The coordinator reuses the instant if there is no data for this round of checkpoint,
* sends the commit ack events to unblock the flushing.
*/
private void sendCommitAckEvents(long checkpointId) {
CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
.map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
.toArray(CompletableFuture<?>[]::new);
CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
if (!sendToFinishedTasks(error)) {
throw new HoodieException("Error while waiting for the commit ack events to finish sending", error);
}
});
}
/**
* Decides whether the given exception is caused by sending events to FINISHED tasks.
*
* <p>Ugly impl: the exception may change in the future.
*/
private static boolean sendToFinishedTasks(Throwable throwable) {
return throwable.getCause() instanceof TaskNotRunningException
|| throwable.getCause().getMessage().contains("running");
}
/**
* Commits the instant.
*/
@@ -483,8 +456,6 @@ public class StreamWriteOperatorCoordinator
if (writeResults.size() == 0) {
// No data has written, reset the buffer and returns early
reset();
// Send commit ack event to the write function to unblock the flushing
sendCommitAckEvents(checkpointId);
return false;
}
doCommit(instant, writeResults);