1
0

[HUDI-2084] Resend the uncommitted write metadata when start up (#3168)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-06-29 08:53:52 +08:00
committed by GitHub
parent 039aeb6dce
commit 37b7c65d8a
13 changed files with 387 additions and 166 deletions

View File

@@ -58,6 +58,7 @@ import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@@ -174,7 +175,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
/** /**
* Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table. * Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table.
* *
* @param records HoodieRecords to insert * @param records HoodieRecords to insert
* @param instantTime Instant time of the commit * @param instantTime Instant time of the commit
* @return list of WriteStatus to inspect errors and counts * @return list of WriteStatus to inspect errors and counts
*/ */
@@ -194,7 +195,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
/** /**
* Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table. * Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table.
* *
* @param records HoodieRecords to insert * @param records HoodieRecords to insert
* @param instantTime Instant time of the commit * @param instantTime Instant time of the commit
* @return list of WriteStatus to inspect errors and counts * @return list of WriteStatus to inspect errors and counts
*/ */
@@ -235,7 +236,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table = HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime); getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys); HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context, instantTime, keys);
return postWrite(result, instantTime, table); return postWrite(result, instantTime, table);
} }
@@ -391,11 +392,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
/** /**
* Get or create a new write handle in order to reuse the file handles. * Get or create a new write handle in order to reuse the file handles.
* *
* @param record The first record in the bucket * @param record The first record in the bucket
* @param config Write config * @param config Write config
* @param instantTime The instant time * @param instantTime The instant time
* @param table The table * @param table The table
* @param recordItr Record iterator * @param recordItr Record iterator
* @return Existing write handle or create a new one * @return Existing write handle or create a new one
*/ */
private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle( private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
@@ -454,7 +455,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
} }
public String getLastPendingInstant(String actionType) { public String getLastPendingInstant(String actionType) {
HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested(); HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants() return unCompletedTimeline.getInstants()
.filter(x -> x.getAction().equals(actionType)) .filter(x -> x.getAction().equals(actionType))
.map(HoodieInstant::getTimestamp) .map(HoodieInstant::getTimestamp)
@@ -465,7 +467,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
public String getLastCompletedInstant(HoodieTableType tableType) { public String getLastCompletedInstant(HoodieTableType tableType) {
final String commitType = CommitUtils.getCommitActionType(tableType); final String commitType = CommitUtils.getCommitActionType(tableType);
HoodieTimeline completedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline completedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterCompletedInstants();
return completedTimeline.getInstants() return completedTimeline.getInstants()
.filter(x -> x.getAction().equals(commitType)) .filter(x -> x.getAction().equals(commitType))
.map(HoodieInstant::getTimestamp) .map(HoodieInstant::getTimestamp)
@@ -475,8 +478,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
} }
public void transitionRequestedToInflight(String commitType, String inFlightInstant) { public void transitionRequestedToInflight(String commitType, String inFlightInstant) {
HoodieFlinkTable<T> table = getHoodieTable(); HoodieActiveTimeline activeTimeline = FlinkClientUtil.createMetaClient(basePath).getActiveTimeline();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant); HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
activeTimeline.transitionRequestedToInflight(requested, Option.empty(), activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
config.shouldAllowMultiWriteOnSameInstant()); config.shouldAllowMultiWriteOnSameInstant());

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.util; package org.apache.hudi.util;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@@ -29,6 +31,13 @@ import java.io.File;
*/ */
public class FlinkClientUtil { public class FlinkClientUtil {
/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(String basePath) {
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build();
}
/** /**
* Parses the file name from path. * Parses the file name from path.
*/ */

View File

@@ -32,11 +32,14 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -54,6 +57,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@@ -76,20 +80,18 @@ import java.util.stream.Collectors;
* starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
* start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists. * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
* *
* <p>In order to improve the throughput, The function process thread does not block data buffering * <p>The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until
* after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint * the current checkpoint succeed and the coordinator starts a new instant. Any error triggers the job failure during the metadata committing,
* batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records * when the job recovers from a failure, the write function re-send the write metadata to the coordinator to see if these metadata
* (e.g. the eager write batch), the semantics is still correct using the UPSERT operation. * can re-commit, thus if unexpected error happens during the instant committing, the coordinator would retry to commit when the job
* recovers.
* *
* <p><h2>Fault Tolerance</h2> * <p><h2>Fault Tolerance</h2>
* *
* <p>The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully. * <p>The operator coordinator checks and commits the last instant then starts a new one after a checkpoint finished successfully.
* The operator rolls back the written data and throws to trigger a failover when any error occurs. * It rolls back any inflight instant before it starts a new instant, this means one hoodie instant only span one checkpoint,
* This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped). * the write function blocks data buffer flushing for the configured checkpoint timeout
* If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last * before it throws exception, any checkpoint failure would finally trigger the job failure.
* step of the #flushBuffer method).
*
* <p>The operator coordinator would try several times when committing the write status.
* *
* <p>Note: The function task requires the input stream be shuffled by the file IDs. * <p>Note: The function task requires the input stream be shuffled by the file IDs.
* *
@@ -162,6 +164,16 @@ public class StreamWriteFunction<K, I, O>
*/ */
private volatile boolean confirming = false; private volatile boolean confirming = false;
/**
* List state of the write metadata events.
*/
private transient ListState<WriteMetadataEvent> writeMetadataState;
/**
* Write status list for the current checkpoint.
*/
private List<WriteStatus> writeStatuses;
/** /**
* Constructs a StreamingSinkFunction. * Constructs a StreamingSinkFunction.
* *
@@ -173,27 +185,43 @@ public class StreamWriteFunction<K, I, O>
@Override @Override
public void open(Configuration parameters) throws IOException { public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.tracer = new TotalSizeTracer(this.config); this.tracer = new TotalSizeTracer(this.config);
initBuffer(); initBuffer();
initWriteFunction(); initWriteFunction();
} }
@Override @Override
public void initializeState(FunctionInitializationContext context) { public void initializeState(FunctionInitializationContext context) throws Exception {
// no operation this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.writeStatuses = new ArrayList<>();
this.writeMetadataState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"write-metadata-state",
TypeInformation.of(WriteMetadataEvent.class)
));
if (context.isRestored()) {
restoreWriteMetadata();
} else {
sendBootstrapEvent();
}
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
} }
@Override @Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
// Based on the fact that the coordinator starts the checkpoint first, // Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity. // it would check the validity.
// wait for the buffer data flush out and request a new instant // wait for the buffer data flush out and request a new instant
flushRemaining(false); flushRemaining(false);
// Reload the snapshot state as the current state.
reloadWriteMetaState();
} }
@Override @Override
@@ -215,6 +243,7 @@ public class StreamWriteFunction<K, I, O>
public void endInput() { public void endInput() {
flushRemaining(true); flushRemaining(true);
this.writeClient.cleanHandles(); this.writeClient.cleanHandles();
this.writeStatuses.clear();
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@@ -274,6 +303,49 @@ public class StreamWriteFunction<K, I, O>
} }
} }
private void restoreWriteMetadata() throws Exception {
String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
boolean eventSent = false;
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
if (Objects.equals(lastInflight, event.getInstantTime())) {
// The checkpoint succeed but the meta does not commit,
// re-commit the inflight instant
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
eventSent = true;
}
}
if (!eventSent) {
sendBootstrapEvent();
}
}
private void sendBootstrapEvent() {
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.writeStatus(Collections.emptyList())
.instantTime("")
.isBootstrap(true)
.build();
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}
/**
* Reload the write metadata state as the current checkpoint.
*/
private void reloadWriteMetaState() throws Exception {
this.writeMetadataState.clear();
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses))
.isBootstrap(true)
.build();
this.writeMetadataState.add(event);
writeStatuses.clear();
}
/** /**
* Represents a data item in the buffer, this is needed to reduce the * Represents a data item in the buffer, this is needed to reduce the
* memory footprint. * memory footprint.
@@ -477,23 +549,23 @@ public class StreamWriteFunction<K, I, O>
bucket.records.add(item); bucket.records.add(item);
} }
@SuppressWarnings("unchecked, rawtypes") private boolean hasData() {
private boolean flushBucket(DataBucket bucket) { return this.buckets.size() > 0
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}
private String instantToWrite() {
String instant = this.writeClient.getLastPendingInstant(this.actionType); String instant = this.writeClient.getLastPendingInstant(this.actionType);
if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, skip.");
return false;
}
// if exactly-once semantics turns on, // if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits. // waits for the checkpoint notification until the checkpoint timeout threshold hits.
if (confirming) { if (confirming) {
long waitingTime = 0L; long waitingTime = 0L;
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT); long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
long interval = 500L; long interval = 500L;
while (instant == null || instant.equals(this.currentInstant)) { // wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
while (instant == null || (instant.equals(this.currentInstant) && hasData())) {
// sleep for a while // sleep for a while
try { try {
if (waitingTime > ckpTimeout) { if (waitingTime > ckpTimeout) {
@@ -511,6 +583,18 @@ public class StreamWriteFunction<K, I, O>
// successfully. // successfully.
confirming = false; confirming = false;
} }
return instant;
}
@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite();
if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, skip.");
return false;
}
List<HoodieRecord> records = bucket.writeBuffer(); List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
@@ -520,20 +604,22 @@ public class StreamWriteFunction<K, I, O>
bucket.preWrite(records); bucket.preWrite(records);
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
records.clear(); records.clear();
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID) .taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant. .instantTime(instant) // the write instant may shift but the event still use the currentInstant.
.writeStatus(writeStatus) .writeStatus(writeStatus)
.isLastBatch(false) .isLastBatch(false)
.isEndInput(false) .isEndInput(false)
.build(); .build();
this.eventGateway.sendEventToCoordinator(event); this.eventGateway.sendEventToCoordinator(event);
writeStatuses.addAll(writeStatus);
return true; return true;
} }
@SuppressWarnings("unchecked, rawtypes") @SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) { private void flushRemaining(boolean isEndInput) {
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType); this.currentInstant = instantToWrite();
if (this.currentInstant == null) { if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data // in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!"); throw new HoodieException("No inflight instant when flushing data!");
@@ -560,17 +646,20 @@ public class StreamWriteFunction<K, I, O>
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
writeStatus = Collections.emptyList(); writeStatus = Collections.emptyList();
} }
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder() final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID) .taskID(taskID)
.instantTime(currentInstant) .instantTime(currentInstant)
.writeStatus(writeStatus) .writeStatus(writeStatus)
.isLastBatch(true) .isLastBatch(true)
.isEndInput(isEndInput) .isEndInput(isEndInput)
.build(); .build();
this.eventGateway.sendEventToCoordinator(event); this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear(); this.buckets.clear();
this.tracer.reset(); this.tracer.reset();
this.writeClient.cleanHandles(); this.writeClient.cleanHandles();
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true; this.confirming = true;
} }
} }

View File

@@ -51,7 +51,7 @@ public class StreamWriteOperator<I>
} }
@Override @Override
public void endInput() throws Exception { public void endInput() {
sinkFunction.endInput(); sinkFunction.endInput();
} }
} }

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.CoordinatorExecutor; import org.apache.hudi.sink.utils.CoordinatorExecutor;
import org.apache.hudi.sink.utils.HiveSyncContext; import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.sink.utils.NonThrownExecutor;
@@ -97,7 +97,7 @@ public class StreamWriteOperatorCoordinator
* Event buffer for one round of checkpointing. When all the elements are non-null and have the same * Event buffer for one round of checkpointing. When all the elements are non-null and have the same
* write instant, then the instant succeed and we can commit it. * write instant, then the instant succeed and we can commit it.
*/ */
private transient BatchWriteSuccessEvent[] eventBuffer; private transient WriteMetadataEvent[] eventBuffer;
/** /**
* Task number of the operator. * Task number of the operator.
@@ -152,8 +152,6 @@ public class StreamWriteOperatorCoordinator
this.tableState = TableState.create(conf); this.tableState = TableState.create(conf);
// init table, create it if not exists. // init table, create it if not exists.
initTableIfNotExists(this.conf); initTableIfNotExists(this.conf);
// start a new instant
startInstant();
// start the executor // start the executor
this.executor = new CoordinatorExecutor(this.context, LOG); this.executor = new CoordinatorExecutor(this.context, LOG);
// start the executor if required // start the executor if required
@@ -201,7 +199,7 @@ public class StreamWriteOperatorCoordinator
// for streaming mode, commits the ever received events anyway, // for streaming mode, commits the ever received events anyway,
// the stream write task snapshot and flush the data buffer synchronously in sequence, // the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
final boolean committed = commitInstant(); final boolean committed = commitInstant(this.instant);
if (committed) { if (committed) {
// if async compaction is on, schedule the compaction // if async compaction is on, schedule the compaction
if (asyncCompaction) { if (asyncCompaction) {
@@ -216,30 +214,8 @@ public class StreamWriteOperatorCoordinator
); );
} }
private void syncHiveIfEnabled() {
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
}
}
/**
* Sync hoodie table metadata to Hive metastore.
*/
public void syncHive() {
hiveSyncContext.hiveSyncTool().syncHoodieTable();
}
private void startInstant() {
final String instant = HoodieActiveTimeline.createNewInstantTime();
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.instant = instant;
this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
}
@Override @Override
public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) { public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
// no operation // no operation
} }
@@ -248,27 +224,17 @@ public class StreamWriteOperatorCoordinator
executor.execute( executor.execute(
() -> { () -> {
// no event to handle // no event to handle
ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent, ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
"The coordinator can only handle BatchWriteSuccessEvent"); "The coordinator can only handle WriteMetaEvent");
BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;
// the write task does not block after checkpointing(and before it receives a checkpoint success event), if (event.isBootstrap()) {
// if it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint handleBootstrapEvent(event);
// success event, the data buffer would flush with an older instant time. } else if (event.isEndInput()) {
ValidationUtils.checkState( handleEndInputEvent(event);
HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
String.format("Receive an unexpected event for instant %s from task %d",
event.getInstantTime(), event.getTaskID()));
if (this.eventBuffer[event.getTaskID()] != null) {
this.eventBuffer[event.getTaskID()].mergeWith(event);
} else { } else {
this.eventBuffer[event.getTaskID()] = event; handleWriteMetaEvent(event);
} }
if (event.isEndInput() && allEventsReceived()) { }, "handle write metadata event for instant %s", this.instant
// start to commit the instant.
commitInstant();
// no compaction scheduling for batch mode
}
}, "handle write success event for instant %s", this.instant
); );
} }
@@ -291,22 +257,108 @@ public class StreamWriteOperatorCoordinator
this.hiveSyncContext = HiveSyncContext.create(conf); this.hiveSyncContext = HiveSyncContext.create(conf);
} }
private void reset() { private void syncHiveIfEnabled() {
this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism]; if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
}
} }
/** Checks the buffer is ready to commit. */ /**
* Sync hoodie table metadata to Hive metastore.
*/
public void syncHive() {
hiveSyncContext.hiveSyncTool().syncHoodieTable();
}
private void reset() {
this.eventBuffer = new WriteMetadataEvent[this.parallelism];
}
/**
* Checks the buffer is ready to commit.
*/
private boolean allEventsReceived() { private boolean allEventsReceived() {
return Arrays.stream(eventBuffer) return Arrays.stream(eventBuffer)
.allMatch(event -> event != null && event.isReady(this.instant)); .allMatch(event -> event != null && event.isReady(this.instant));
} }
private void addEventToBuffer(WriteMetadataEvent event) {
if (this.eventBuffer[event.getTaskID()] != null) {
this.eventBuffer[event.getTaskID()].mergeWith(event);
} else {
this.eventBuffer[event.getTaskID()] = event;
}
}
private void startInstant() {
final String instant = HoodieActiveTimeline.createNewInstantTime();
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.instant = instant;
this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
}
/**
* Initializes the instant.
*
* <p>Recommits the last inflight instant if the write metadata checkpoint successfully
* but was not committed due to some rare cases.
*
* <p>Starts a new instant, a writer can not flush data buffer
* until it finds a new inflight instant on the timeline.
*/
private void initInstant(String instant) {
HoodieTimeline completedTimeline =
StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
executor.execute(() -> {
if (instant.equals("") || completedTimeline.containsInstant(instant)) {
// the last instant committed successfully
reset();
} else {
LOG.info("Recommit instant {}", instant);
commitInstant(instant);
}
// starts a new instant
startInstant();
}, "initialize instant %s", instant);
}
private void handleBootstrapEvent(WriteMetadataEvent event) {
addEventToBuffer(event);
if (Arrays.stream(eventBuffer).allMatch(Objects::nonNull)) {
// start to initialize the instant.
initInstant(event.getInstantTime());
}
}
private void handleEndInputEvent(WriteMetadataEvent event) {
addEventToBuffer(event);
if (allEventsReceived()) {
// start to commit the instant.
commitInstant(this.instant);
// no compaction scheduling for batch mode
}
}
private void handleWriteMetaEvent(WriteMetadataEvent event) {
// the write task does not block after checkpointing(and before it receives a checkpoint success event),
// if it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint
// success event, the data buffer would flush with an older instant time.
ValidationUtils.checkState(
HoodieTimeline.compareTimestamps(this.instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
String.format("Receive an unexpected event for instant %s from task %d",
event.getInstantTime(), event.getTaskID()));
addEventToBuffer(event);
}
/** /**
* Commits the instant. * Commits the instant.
* *
* @return true if the write statuses are committed successfully. * @return true if the write statuses are committed successfully.
*/ */
private boolean commitInstant() { private boolean commitInstant(String instant) {
if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
// The last checkpoint finished successfully. // The last checkpoint finished successfully.
return false; return false;
@@ -314,7 +366,7 @@ public class StreamWriteOperatorCoordinator
List<WriteStatus> writeResults = Arrays.stream(eventBuffer) List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(BatchWriteSuccessEvent::getWriteStatuses) .map(WriteMetadataEvent::getWriteStatuses)
.flatMap(Collection::stream) .flatMap(Collection::stream)
.collect(Collectors.toList()); .collect(Collectors.toList());
@@ -323,13 +375,15 @@ public class StreamWriteOperatorCoordinator
reset(); reset();
return false; return false;
} }
doCommit(writeResults); doCommit(instant, writeResults);
return true; return true;
} }
/** Performs the actual commit action. */ /**
* Performs the actual commit action.
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void doCommit(List<WriteStatus> writeResults) { private void doCommit(String instant, List<WriteStatus> writeResults) {
// commit or rollback // commit or rollback
long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L); long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L); long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
@@ -345,13 +399,13 @@ public class StreamWriteOperatorCoordinator
final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite final Map<String, List<String>> partitionToReplacedFileIds = tableState.isOverwrite
? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults) ? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults)
: Collections.emptyMap(); : Collections.emptyMap();
boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata), boolean success = writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),
tableState.commitAction, partitionToReplacedFileIds); tableState.commitAction, partitionToReplacedFileIds);
if (success) { if (success) {
reset(); reset();
LOG.info("Commit instant [{}] success!", this.instant); LOG.info("Commit instant [{}] success!", instant);
} else { } else {
throw new HoodieException(String.format("Commit instant [%s] failed!", this.instant)); throw new HoodieException(String.format("Commit instant [%s] failed!", instant));
} }
} else { } else {
LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
@@ -364,13 +418,13 @@ public class StreamWriteOperatorCoordinator
} }
}); });
// Rolls back instant // Rolls back instant
writeClient.rollback(this.instant); writeClient.rollback(instant);
throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.instant)); throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", instant));
} }
} }
@VisibleForTesting @VisibleForTesting
public BatchWriteSuccessEvent[] getEventBuffer() { public WriteMetadataEvent[] getEventBuffer() {
return eventBuffer; return eventBuffer;
} }

View File

@@ -58,7 +58,7 @@ public class HoodieFlinkCompactor {
Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg); Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
// create metaClient // create metaClient
HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf); HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
// get the table name // get the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());

View File

@@ -30,13 +30,14 @@ import java.util.Objects;
/** /**
* An operator event to mark successful checkpoint batch write. * An operator event to mark successful checkpoint batch write.
*/ */
public class BatchWriteSuccessEvent implements OperatorEvent { public class WriteMetadataEvent implements OperatorEvent {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private List<WriteStatus> writeStatuses; private List<WriteStatus> writeStatuses;
private final int taskID; private final int taskID;
private String instantTime; private String instantTime;
private boolean isLastBatch; private boolean isLastBatch;
/** /**
* Flag saying whether the event comes from the end of input, e.g. the source * Flag saying whether the event comes from the end of input, e.g. the source
* is bounded, there are two cases in which this flag should be set to true: * is bounded, there are two cases in which this flag should be set to true:
@@ -45,6 +46,11 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
*/ */
private final boolean isEndInput; private final boolean isEndInput;
/**
* Flag saying whether the event comes from bootstrap of a write function.
*/
private final boolean isBootstrap;
/** /**
* Creates an event. * Creates an event.
* *
@@ -55,22 +61,25 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
* within an checkpoint interval, * within an checkpoint interval,
* if true, the whole data set of the checkpoint * if true, the whole data set of the checkpoint
* has been flushed successfully * has been flushed successfully
* @param isBootstrap Whether the event comes from the bootstrap
*/ */
private BatchWriteSuccessEvent( private WriteMetadataEvent(
int taskID, int taskID,
String instantTime, String instantTime,
List<WriteStatus> writeStatuses, List<WriteStatus> writeStatuses,
boolean isLastBatch, boolean isLastBatch,
boolean isEndInput) { boolean isEndInput,
boolean isBootstrap) {
this.taskID = taskID; this.taskID = taskID;
this.instantTime = instantTime; this.instantTime = instantTime;
this.writeStatuses = new ArrayList<>(writeStatuses); this.writeStatuses = new ArrayList<>(writeStatuses);
this.isLastBatch = isLastBatch; this.isLastBatch = isLastBatch;
this.isEndInput = isEndInput; this.isEndInput = isEndInput;
this.isBootstrap = isBootstrap;
} }
/** /**
* Returns the builder for {@link BatchWriteSuccessEvent}. * Returns the builder for {@link WriteMetadataEvent}.
*/ */
public static Builder builder() { public static Builder builder() {
return new Builder(); return new Builder();
@@ -96,12 +105,16 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
return isEndInput; return isEndInput;
} }
public boolean isBootstrap() {
return isBootstrap;
}
/** /**
* Merges this event with given {@link BatchWriteSuccessEvent} {@code other}. * Merges this event with given {@link WriteMetadataEvent} {@code other}.
* *
* @param other The event to be merged * @param other The event to be merged
*/ */
public void mergeWith(BatchWriteSuccessEvent other) { public void mergeWith(WriteMetadataEvent other) {
ValidationUtils.checkArgument(this.taskID == other.taskID); ValidationUtils.checkArgument(this.taskID == other.taskID);
// the instant time could be monotonically increasing // the instant time could be monotonically increasing
this.instantTime = other.instantTime; this.instantTime = other.instantTime;
@@ -112,7 +125,9 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
this.writeStatuses = statusList; this.writeStatuses = statusList;
} }
/** Returns whether the event is ready to commit. */ /**
* Returns whether the event is ready to commit.
*/
public boolean isReady(String currentInstant) { public boolean isReady(String currentInstant) {
return isLastBatch && this.instantTime.equals(currentInstant); return isLastBatch && this.instantTime.equals(currentInstant);
} }
@@ -122,7 +137,7 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/** /**
* Builder for {@link BatchWriteSuccessEvent}. * Builder for {@link WriteMetadataEvent}.
*/ */
public static class Builder { public static class Builder {
private List<WriteStatus> writeStatus; private List<WriteStatus> writeStatus;
@@ -130,12 +145,13 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
private String instantTime; private String instantTime;
private boolean isLastBatch = false; private boolean isLastBatch = false;
private boolean isEndInput = false; private boolean isEndInput = false;
private boolean isBootstrap = false;
public BatchWriteSuccessEvent build() { public WriteMetadataEvent build() {
Objects.requireNonNull(taskID); Objects.requireNonNull(taskID);
Objects.requireNonNull(instantTime); Objects.requireNonNull(instantTime);
Objects.requireNonNull(writeStatus); Objects.requireNonNull(writeStatus);
return new BatchWriteSuccessEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput); return new WriteMetadataEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput, isBootstrap);
} }
public Builder taskID(int taskID) { public Builder taskID(int taskID) {
@@ -162,5 +178,10 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
this.isEndInput = isEndInput; this.isEndInput = isEndInput;
return this; return this;
} }
public Builder isBootstrap(boolean isBootstrap) {
this.isBootstrap = isBootstrap;
return this;
}
} }
} }

View File

@@ -26,11 +26,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.HoodieFlinkTable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -43,13 +43,6 @@ public class CompactionUtil {
private static final Logger LOG = LoggerFactory.getLogger(CompactionUtil.class); private static final Logger LOG = LoggerFactory.getLogger(CompactionUtil.class);
/**
* Creates the metaClient.
*/
public static HoodieTableMetaClient createMetaClient(Configuration conf) {
return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
}
/** /**
* Gets compaction Instant time. * Gets compaction Instant time.
*/ */
@@ -72,7 +65,7 @@ public class CompactionUtil {
* Sets up the avro schema string into the give configuration {@code conf} * Sets up the avro schema string into the give configuration {@code conf}
* through reading from the hoodie table metadata. * through reading from the hoodie table metadata.
* *
* @param conf The configuration * @param conf The configuration
*/ */
public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception { public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);

View File

@@ -154,7 +154,7 @@ public class StreamerUtil {
.withMaxMemoryMaxSize( .withMaxMemoryMaxSize(
conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L, conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L,
conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L
).build()) ).build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME)) .forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withStorageConfig(HoodieStorageConfig.newBuilder() .withStorageConfig(HoodieStorageConfig.newBuilder()
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
@@ -221,13 +221,16 @@ public class StreamerUtil {
// some of the filesystems release the handles in #close method. // some of the filesystems release the handles in #close method.
} }
/** Generates the bucket ID using format {partition path}_{fileID}. */ /**
* Generates the bucket ID using format {partition path}_{fileID}.
*/
public static String generateBucketKey(String partitionPath, String fileId) { public static String generateBucketKey(String partitionPath, String fileId) {
return String.format("%s_%s", partitionPath, fileId); return String.format("%s_%s", partitionPath, fileId);
} }
/** /**
* Returns whether needs to schedule the async compaction. * Returns whether needs to schedule the async compaction.
*
* @param conf The flink configuration. * @param conf The flink configuration.
*/ */
public static boolean needsAsyncCompaction(Configuration conf) { public static boolean needsAsyncCompaction(Configuration conf) {
@@ -237,6 +240,13 @@ public class StreamerUtil {
&& conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED); && conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED);
} }
/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(Configuration conf) {
return HoodieTableMetaClient.builder().setBasePath(conf.getString(FlinkOptions.PATH)).setConf(FlinkClientUtil.getHadoopConf()).build();
}
/** /**
* Creates the Flink write client. * Creates the Flink write client.
*/ */

View File

@@ -200,7 +200,7 @@ public class StreamWriteITCase extends TestLogger {
conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
// create metaClient // create metaClient
HoodieTableMetaClient metaClient = CompactionUtil.createMetaClient(conf); HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
// set the table name // set the table name
conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName()); conf.setString(FlinkOptions.TABLE_NAME, metaClient.getTableConfig().getTableName());

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestConfigurations;
@@ -70,6 +70,23 @@ public class TestStreamWriteOperatorCoordinator {
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context); TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context);
coordinator.start(); coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context)); coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.builder()
.taskID(0)
.instantTime("")
.writeStatus(Collections.emptyList())
.isBootstrap(true)
.build();
final WriteMetadataEvent event1 = WriteMetadataEvent.builder()
.taskID(1)
.instantTime("")
.writeStatus(Collections.emptyList())
.isBootstrap(true)
.build();
coordinator.handleEventFromOperator(0, event0);
coordinator.handleEventFromOperator(1, event1);
} }
@AfterEach @AfterEach
@@ -85,7 +102,7 @@ public class TestStreamWriteOperatorCoordinator {
WriteStatus writeStatus = new WriteStatus(true, 0.1D); WriteStatus writeStatus = new WriteStatus(true, 0.1D);
writeStatus.setPartitionPath("par1"); writeStatus.setPartitionPath("par1");
writeStatus.setStat(new HoodieWriteStat()); writeStatus.setStat(new HoodieWriteStat());
OperatorEvent event0 = BatchWriteSuccessEvent.builder() OperatorEvent event0 = WriteMetadataEvent.builder()
.taskID(0) .taskID(0)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus)) .writeStatus(Collections.singletonList(writeStatus))
@@ -95,7 +112,7 @@ public class TestStreamWriteOperatorCoordinator {
WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
writeStatus1.setPartitionPath("par2"); writeStatus1.setPartitionPath("par2");
writeStatus1.setStat(new HoodieWriteStat()); writeStatus1.setStat(new HoodieWriteStat());
OperatorEvent event1 = BatchWriteSuccessEvent.builder() OperatorEvent event1 = WriteMetadataEvent.builder()
.taskID(1) .taskID(1)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1)) .writeStatus(Collections.singletonList(writeStatus1))
@@ -132,7 +149,7 @@ public class TestStreamWriteOperatorCoordinator {
public void testReceiveInvalidEvent() { public void testReceiveInvalidEvent() {
CompletableFuture<byte[]> future = new CompletableFuture<>(); CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future); coordinator.checkpointCoordinator(1, future);
OperatorEvent event = BatchWriteSuccessEvent.builder() OperatorEvent event = WriteMetadataEvent.builder()
.taskID(0) .taskID(0)
.instantTime("abc") .instantTime("abc")
.writeStatus(Collections.emptyList()) .writeStatus(Collections.emptyList())
@@ -147,7 +164,7 @@ public class TestStreamWriteOperatorCoordinator {
final CompletableFuture<byte[]> future = new CompletableFuture<>(); final CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future); coordinator.checkpointCoordinator(1, future);
String instant = coordinator.getInstant(); String instant = coordinator.getInstant();
OperatorEvent event = BatchWriteSuccessEvent.builder() OperatorEvent event = WriteMetadataEvent.builder()
.taskID(0) .taskID(0)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.emptyList()) .writeStatus(Collections.emptyList())
@@ -163,7 +180,7 @@ public class TestStreamWriteOperatorCoordinator {
WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
writeStatus1.setPartitionPath("par2"); writeStatus1.setPartitionPath("par2");
writeStatus1.setStat(new HoodieWriteStat()); writeStatus1.setStat(new HoodieWriteStat());
OperatorEvent event1 = BatchWriteSuccessEvent.builder() OperatorEvent event1 = WriteMetadataEvent.builder()
.taskID(1) .taskID(1)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1)) .writeStatus(Collections.singletonList(writeStatus1))
@@ -186,20 +203,30 @@ public class TestStreamWriteOperatorCoordinator {
coordinator.start(); coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context)); coordinator.setExecutor(new MockCoordinatorExecutor(context));
final WriteMetadataEvent event0 = WriteMetadataEvent.builder()
.taskID(0)
.instantTime("")
.writeStatus(Collections.emptyList())
.isBootstrap(true)
.build();
coordinator.handleEventFromOperator(0, event0);
String instant = coordinator.getInstant(); String instant = coordinator.getInstant();
assertNotEquals("", instant); assertNotEquals("", instant);
WriteStatus writeStatus = new WriteStatus(true, 0.1D); WriteStatus writeStatus = new WriteStatus(true, 0.1D);
writeStatus.setPartitionPath("par1"); writeStatus.setPartitionPath("par1");
writeStatus.setStat(new HoodieWriteStat()); writeStatus.setStat(new HoodieWriteStat());
OperatorEvent event0 = BatchWriteSuccessEvent.builder()
OperatorEvent event1 = WriteMetadataEvent.builder()
.taskID(0) .taskID(0)
.instantTime(instant) .instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus)) .writeStatus(Collections.singletonList(writeStatus))
.isLastBatch(true) .isLastBatch(true)
.build(); .build();
coordinator.handleEventFromOperator(0, event0); coordinator.handleEventFromOperator(0, event1);
// never throw for hive synchronization now // never throw for hive synchronization now
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestConfigurations;
@@ -135,8 +135,8 @@ public class TestWriteCopyOnWrite {
String instant = funcWrapper.getWriteClient().getLastPendingInstant(getTableType()); String instant = funcWrapper.getWriteClient().getLastPendingInstant(getTableType());
final OperatorEvent nextEvent = funcWrapper.getNextEvent(); final OperatorEvent nextEvent = funcWrapper.getNextEvent();
MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); MatcherAssert.assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
List<WriteStatus> writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses(); List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
assertNotNull(writeStatuses); assertNotNull(writeStatuses);
MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files MatcherAssert.assertThat(writeStatuses.size(), is(4)); // write 4 partition files
assertThat(writeStatuses.stream() assertThat(writeStatuses.stream()
@@ -162,8 +162,8 @@ public class TestWriteCopyOnWrite {
assertNotEquals(instant, instant2); assertNotEquals(instant, instant2);
final OperatorEvent nextEvent2 = funcWrapper.getNextEvent(); final OperatorEvent nextEvent2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent2, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent2, instanceOf(WriteMetadataEvent.class));
List<WriteStatus> writeStatuses2 = ((BatchWriteSuccessEvent) nextEvent2).getWriteStatuses(); List<WriteStatus> writeStatuses2 = ((WriteMetadataEvent) nextEvent2).getWriteStatuses();
assertNotNull(writeStatuses2); assertNotNull(writeStatuses2);
assertThat(writeStatuses2.size(), is(0)); // write empty statuses assertThat(writeStatuses2.size(), is(0)); // write empty statuses
@@ -191,8 +191,8 @@ public class TestWriteCopyOnWrite {
assertNotNull(instant); assertNotNull(instant);
final OperatorEvent nextEvent = funcWrapper.getNextEvent(); final OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
List<WriteStatus> writeStatuses = ((BatchWriteSuccessEvent) nextEvent).getWriteStatuses(); List<WriteStatus> writeStatuses = ((WriteMetadataEvent) nextEvent).getWriteStatuses();
assertNotNull(writeStatuses); assertNotNull(writeStatuses);
assertThat(writeStatuses.size(), is(0)); // no data write assertThat(writeStatuses.size(), is(0)); // no data write
@@ -210,7 +210,9 @@ public class TestWriteCopyOnWrite {
} }
// this returns early because there is no inflight instant // this returns early because there is no inflight instant
funcWrapper.checkpointFunction(2); assertThrows(HoodieException.class,
() -> funcWrapper.checkpointFunction(2),
"Timeout(0ms) while waiting for");
// do not sent the write event and fails the checkpoint, // do not sent the write event and fails the checkpoint,
// behaves like the last checkpoint is successful. // behaves like the last checkpoint is successful.
funcWrapper.checkpointFails(2); funcWrapper.checkpointFails(2);
@@ -232,7 +234,7 @@ public class TestWriteCopyOnWrite {
.getLastPendingInstant(getTableType()); .getLastPendingInstant(getTableType());
final OperatorEvent nextEvent = funcWrapper.getNextEvent(); final OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -262,7 +264,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent(); OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -298,7 +300,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent(); OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -318,7 +320,7 @@ public class TestWriteCopyOnWrite {
.getLastPendingInstant(getTableType()); .getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent(); nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -343,7 +345,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent(); OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -363,7 +365,7 @@ public class TestWriteCopyOnWrite {
.getLastPendingInstant(getTableType()); .getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent(); nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -408,7 +410,7 @@ public class TestWriteCopyOnWrite {
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event2 = funcWrapper.getNextEvent(); final OperatorEvent event2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event1); funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
funcWrapper.getCoordinator().handleEventFromOperator(0, event2); funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
@@ -470,7 +472,7 @@ public class TestWriteCopyOnWrite {
final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first
final OperatorEvent event2 = funcWrapper.getNextEvent(); final OperatorEvent event2 = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event1); funcWrapper.getCoordinator().handleEventFromOperator(0, event1);
funcWrapper.getCoordinator().handleEventFromOperator(0, event2); funcWrapper.getCoordinator().handleEventFromOperator(0, event2);
@@ -534,7 +536,7 @@ public class TestWriteCopyOnWrite {
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event); funcWrapper.getCoordinator().handleEventFromOperator(0, event);
} }
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -592,7 +594,7 @@ public class TestWriteCopyOnWrite {
funcWrapper.checkpointFunction(1); funcWrapper.checkpointFunction(1);
OperatorEvent nextEvent = funcWrapper.getNextEvent(); OperatorEvent nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event");
@@ -634,7 +636,7 @@ public class TestWriteCopyOnWrite {
.getLastPendingInstant(getTableType()); .getLastPendingInstant(getTableType());
nextEvent = funcWrapper.getNextEvent(); nextEvent = funcWrapper.getNextEvent();
assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", nextEvent, instanceOf(WriteMetadataEvent.class));
checkWrittenData(tempFile, EXPECTED2); checkWrittenData(tempFile, EXPECTED2);
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
@@ -673,7 +675,7 @@ public class TestWriteCopyOnWrite {
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first final OperatorEvent event = funcWrapper.getNextEvent(); // remove the first event first
assertThat("The operator expect to send an event", event, instanceOf(BatchWriteSuccessEvent.class)); assertThat("The operator expect to send an event", event, instanceOf(WriteMetadataEvent.class));
funcWrapper.getCoordinator().handleEventFromOperator(0, event); funcWrapper.getCoordinator().handleEventFromOperator(0, event);
} }

View File

@@ -27,7 +27,7 @@ import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator; import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bootstrap.BootstrapFunction; import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.bootstrap.IndexRecord;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent; import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction; import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketAssignOperator; import org.apache.hudi.sink.partitioner.BucketAssignOperator;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction; import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -70,15 +70,25 @@ public class StreamWriteFunctionWrapper<I> {
private final StreamWriteOperatorCoordinator coordinator; private final StreamWriteOperatorCoordinator coordinator;
private final MockFunctionInitializationContext functionInitializationContext; private final MockFunctionInitializationContext functionInitializationContext;
/** Function that converts row data to HoodieRecord. */ /**
* Function that converts row data to HoodieRecord.
*/
private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction; private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
/** Function that load index in state. */ /**
* Function that load index in state.
*/
private BootstrapFunction<HoodieRecord<?>, HoodieRecord<?>> bootstrapFunction; private BootstrapFunction<HoodieRecord<?>, HoodieRecord<?>> bootstrapFunction;
/** Function that assigns bucket ID. */ /**
* Function that assigns bucket ID.
*/
private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction; private BucketAssignFunction<String, HoodieRecord<?>, HoodieRecord<?>> bucketAssignerFunction;
/** BucketAssignOperator context. **/ /**
* BucketAssignOperator context.
**/
private MockBucketAssignOperatorContext bucketAssignOperatorContext; private MockBucketAssignOperatorContext bucketAssignOperatorContext;
/** Stream write function. */ /**
* Stream write function.
*/
private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction; private StreamWriteFunction<Object, HoodieRecord<?>, Object> writeFunction;
private CompactFunctionWrapper compactFunctionWrapper; private CompactFunctionWrapper compactFunctionWrapper;
@@ -133,8 +143,12 @@ public class StreamWriteFunctionWrapper<I> {
writeFunction = new StreamWriteFunction<>(conf); writeFunction = new StreamWriteFunction<>(conf);
writeFunction.setRuntimeContext(runtimeContext); writeFunction.setRuntimeContext(runtimeContext);
writeFunction.setOperatorEventGateway(gateway); writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.functionInitializationContext);
writeFunction.open(conf); writeFunction.open(conf);
// handle the bootstrap event
coordinator.handleEventFromOperator(0, getNextEvent());
if (asyncCompaction) { if (asyncCompaction) {
compactFunctionWrapper.openFunction(); compactFunctionWrapper.openFunction();
} }
@@ -184,7 +198,7 @@ public class StreamWriteFunctionWrapper<I> {
writeFunction.processElement(hoodieRecords[0], null, null); writeFunction.processElement(hoodieRecords[0], null, null);
} }
public BatchWriteSuccessEvent[] getEventBuffer() { public WriteMetadataEvent[] getEventBuffer() {
return this.coordinator.getEventBuffer(); return this.coordinator.getEventBuffer();
} }
@@ -201,7 +215,7 @@ public class StreamWriteFunctionWrapper<I> {
return this.writeFunction.getWriteClient(); return this.writeFunction.getWriteClient();
} }
public void checkpointFunction(long checkpointId) { public void checkpointFunction(long checkpointId) throws Exception {
// checkpoint the coordinator first // checkpoint the coordinator first
this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>()); this.coordinator.checkpointCoordinator(checkpointId, new CompletableFuture<>());
bucketAssignerFunction.snapshotState(null); bucketAssignerFunction.snapshotState(null);