[HUDI-2677] Add DFS based message queue for flink writer[part3] (#4961)
This commit is contained in:
@@ -62,9 +62,7 @@ public class CleanFunction<T> extends AbstractRichFunction
|
|||||||
public void open(Configuration parameters) throws Exception {
|
public void open(Configuration parameters) throws Exception {
|
||||||
super.open(parameters);
|
super.open(parameters);
|
||||||
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
|
||||||
// do not use the remote filesystem view because the async cleaning service
|
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
|
||||||
// local timeline is very probably to fall behind with the remote one.
|
|
||||||
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false);
|
|
||||||
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
|
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
|
||||||
|
|
||||||
if (conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())) {
|
if (conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())) {
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import org.apache.hudi.configuration.OptionsResolver;
|
|||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.sink.event.CommitAckEvent;
|
import org.apache.hudi.sink.event.CommitAckEvent;
|
||||||
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
||||||
|
import org.apache.hudi.sink.meta.CkpMetadata;
|
||||||
import org.apache.hudi.sink.utils.HiveSyncContext;
|
import org.apache.hudi.sink.utils.HiveSyncContext;
|
||||||
import org.apache.hudi.sink.utils.NonThrownExecutor;
|
import org.apache.hudi.sink.utils.NonThrownExecutor;
|
||||||
import org.apache.hudi.util.CompactionUtil;
|
import org.apache.hudi.util.CompactionUtil;
|
||||||
@@ -137,6 +138,16 @@ public class StreamWriteOperatorCoordinator
|
|||||||
*/
|
*/
|
||||||
private transient TableState tableState;
|
private transient TableState tableState;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The checkpoint metadata.
|
||||||
|
*/
|
||||||
|
private CkpMetadata ckpMetadata;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Current checkpoint.
|
||||||
|
*/
|
||||||
|
private long checkpointId = -1;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a StreamingSinkOperatorCoordinator.
|
* Constructs a StreamingSinkOperatorCoordinator.
|
||||||
*
|
*
|
||||||
@@ -175,6 +186,8 @@ public class StreamWriteOperatorCoordinator
|
|||||||
if (tableState.syncMetadata) {
|
if (tableState.syncMetadata) {
|
||||||
initMetadataSync();
|
initMetadataSync();
|
||||||
}
|
}
|
||||||
|
this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), metaClient.getBasePath());
|
||||||
|
this.ckpMetadata.bootstrap(this.metaClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -192,10 +205,14 @@ public class StreamWriteOperatorCoordinator
|
|||||||
writeClient.close();
|
writeClient.close();
|
||||||
}
|
}
|
||||||
this.eventBuffer = null;
|
this.eventBuffer = null;
|
||||||
|
if (this.ckpMetadata != null) {
|
||||||
|
this.ckpMetadata.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
|
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
|
||||||
|
this.checkpointId = checkpointId;
|
||||||
executor.execute(
|
executor.execute(
|
||||||
() -> {
|
() -> {
|
||||||
try {
|
try {
|
||||||
@@ -238,6 +255,15 @@ public class StreamWriteOperatorCoordinator
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void notifyCheckpointAborted(long checkpointId) {
|
||||||
|
if (checkpointId == this.checkpointId) {
|
||||||
|
executor.execute(() -> {
|
||||||
|
this.ckpMetadata.abortInstant(this.instant);
|
||||||
|
}, "abort instant %s", this.instant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
|
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
|
||||||
// no operation
|
// no operation
|
||||||
@@ -340,6 +366,7 @@ public class StreamWriteOperatorCoordinator
|
|||||||
// because the instant request from write task is asynchronous.
|
// because the instant request from write task is asynchronous.
|
||||||
this.instant = this.writeClient.startCommit();
|
this.instant = this.writeClient.startCommit();
|
||||||
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
|
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
|
||||||
|
this.ckpMetadata.startInstant(this.instant);
|
||||||
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
|
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
|
||||||
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
|
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
|
||||||
}
|
}
|
||||||
@@ -488,6 +515,7 @@ public class StreamWriteOperatorCoordinator
|
|||||||
tableState.commitAction, partitionToReplacedFileIds);
|
tableState.commitAction, partitionToReplacedFileIds);
|
||||||
if (success) {
|
if (success) {
|
||||||
reset();
|
reset();
|
||||||
|
this.ckpMetadata.commitInstant(instant);
|
||||||
LOG.info("Commit instant [{}] success!", instant);
|
LOG.info("Commit instant [{}] success!", instant);
|
||||||
} else {
|
} else {
|
||||||
throw new HoodieException(String.format("Commit instant [%s] failed!", instant));
|
throw new HoodieException(String.format("Commit instant [%s] failed!", instant));
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
|
|||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
|
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|
||||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
@@ -37,6 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
|
|||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
|
import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
|
||||||
|
import org.apache.hudi.sink.meta.CkpMetadata;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.table.format.FormatUtils;
|
import org.apache.hudi.table.format.FormatUtils;
|
||||||
import org.apache.hudi.util.FlinkTables;
|
import org.apache.hudi.util.FlinkTables;
|
||||||
@@ -83,6 +83,8 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
|
|||||||
|
|
||||||
protected HoodieTable<?, ?, ?, ?> hoodieTable;
|
protected HoodieTable<?, ?, ?, ?> hoodieTable;
|
||||||
|
|
||||||
|
private CkpMetadata ckpMetadata;
|
||||||
|
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
|
|
||||||
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
|
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
|
||||||
@@ -101,8 +103,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void snapshotState(StateSnapshotContext context) throws Exception {
|
public void snapshotState(StateSnapshotContext context) throws Exception {
|
||||||
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(this.conf);
|
lastInstantTime = this.ckpMetadata.lastPendingInstant();
|
||||||
lastInstantTime = StreamerUtil.getLastPendingInstant(metaClient);
|
|
||||||
instantState.update(Collections.singletonList(lastInstantTime));
|
instantState.update(Collections.singletonList(lastInstantTime));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -124,6 +125,7 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
|
|||||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||||
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
|
this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true);
|
||||||
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
|
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
|
||||||
|
this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
|
||||||
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
|
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
|
||||||
|
|
||||||
preLoadIndexRecords();
|
preLoadIndexRecords();
|
||||||
|
|||||||
@@ -21,11 +21,11 @@ package org.apache.hudi.sink.bulk;
|
|||||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.common.model.WriteOperationType;
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
|
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
|
||||||
import org.apache.hudi.sink.common.AbstractWriteFunction;
|
import org.apache.hudi.sink.common.AbstractWriteFunction;
|
||||||
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
||||||
|
import org.apache.hudi.sink.meta.CkpMetadata;
|
||||||
import org.apache.hudi.sink.utils.TimeWait;
|
import org.apache.hudi.sink.utils.TimeWait;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
@@ -79,11 +79,6 @@ public class BulkInsertWriteFunction<I>
|
|||||||
*/
|
*/
|
||||||
private int taskID;
|
private int taskID;
|
||||||
|
|
||||||
/**
|
|
||||||
* Meta Client.
|
|
||||||
*/
|
|
||||||
private transient HoodieTableMetaClient metaClient;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write Client.
|
* Write Client.
|
||||||
*/
|
*/
|
||||||
@@ -99,6 +94,11 @@ public class BulkInsertWriteFunction<I>
|
|||||||
*/
|
*/
|
||||||
private transient OperatorEventGateway eventGateway;
|
private transient OperatorEventGateway eventGateway;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checkpoint metadata.
|
||||||
|
*/
|
||||||
|
private CkpMetadata ckpMetadata;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a StreamingSinkFunction.
|
* Constructs a StreamingSinkFunction.
|
||||||
*
|
*
|
||||||
@@ -112,9 +112,9 @@ public class BulkInsertWriteFunction<I>
|
|||||||
@Override
|
@Override
|
||||||
public void open(Configuration parameters) throws IOException {
|
public void open(Configuration parameters) throws IOException {
|
||||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||||
this.metaClient = StreamerUtil.createMetaClient(this.config);
|
|
||||||
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
|
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
|
||||||
this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false);
|
this.ckpMetadata = CkpMetadata.getInstance(config.getString(FlinkOptions.PATH));
|
||||||
|
this.initInstant = lastPendingInstant();
|
||||||
sendBootstrapEvent();
|
sendBootstrapEvent();
|
||||||
initWriterHelper();
|
initWriterHelper();
|
||||||
}
|
}
|
||||||
@@ -187,7 +187,7 @@ public class BulkInsertWriteFunction<I>
|
|||||||
* Returns the last pending instant time.
|
* Returns the last pending instant time.
|
||||||
*/
|
*/
|
||||||
protected String lastPendingInstant() {
|
protected String lastPendingInstant() {
|
||||||
return StreamerUtil.getLastPendingInstant(this.metaClient);
|
return this.ckpMetadata.lastPendingInstant();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String instantToWrite() {
|
private String instantToWrite() {
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import org.apache.hudi.configuration.FlinkOptions;
|
|||||||
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
|
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
|
||||||
import org.apache.hudi.sink.event.CommitAckEvent;
|
import org.apache.hudi.sink.event.CommitAckEvent;
|
||||||
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
import org.apache.hudi.sink.event.WriteMetadataEvent;
|
||||||
|
import org.apache.hudi.sink.meta.CkpMetadata;
|
||||||
import org.apache.hudi.sink.utils.TimeWait;
|
import org.apache.hudi.sink.utils.TimeWait;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
@@ -114,6 +115,11 @@ public abstract class AbstractStreamWriteFunction<I>
|
|||||||
*/
|
*/
|
||||||
protected List<WriteStatus> writeStatuses;
|
protected List<WriteStatus> writeStatuses;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The checkpoint metadata.
|
||||||
|
*/
|
||||||
|
private transient CkpMetadata ckpMetadata;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a StreamWriteFunctionBase.
|
* Constructs a StreamWriteFunctionBase.
|
||||||
*
|
*
|
||||||
@@ -135,6 +141,7 @@ public abstract class AbstractStreamWriteFunction<I>
|
|||||||
TypeInformation.of(WriteMetadataEvent.class)
|
TypeInformation.of(WriteMetadataEvent.class)
|
||||||
));
|
));
|
||||||
|
|
||||||
|
this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());
|
||||||
this.currentInstant = lastPendingInstant();
|
this.currentInstant = lastPendingInstant();
|
||||||
if (context.isRestored()) {
|
if (context.isRestored()) {
|
||||||
restoreWriteMetadata();
|
restoreWriteMetadata();
|
||||||
@@ -217,7 +224,7 @@ public abstract class AbstractStreamWriteFunction<I>
|
|||||||
* Returns the last pending instant time.
|
* Returns the last pending instant time.
|
||||||
*/
|
*/
|
||||||
protected String lastPendingInstant() {
|
protected String lastPendingInstant() {
|
||||||
return StreamerUtil.getLastPendingInstant(this.metaClient);
|
return this.ckpMetadata.lastPendingInstant();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -238,7 +245,7 @@ public abstract class AbstractStreamWriteFunction<I>
|
|||||||
// wait condition:
|
// wait condition:
|
||||||
// 1. there is no inflight instant
|
// 1. there is no inflight instant
|
||||||
// 2. the inflight instant does not change and the checkpoint has buffering data
|
// 2. the inflight instant does not change and the checkpoint has buffering data
|
||||||
if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
|
if (instant == null || (instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant))) {
|
||||||
// sleep for a while
|
// sleep for a while
|
||||||
timeWait.waitFor();
|
timeWait.waitFor();
|
||||||
// refresh the inflight instant
|
// refresh the inflight instant
|
||||||
|
|||||||
@@ -0,0 +1,113 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.sink.meta;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A checkpoint message.
|
||||||
|
*/
|
||||||
|
public class CkpMessage implements Serializable, Comparable<CkpMessage> {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public static final Comparator<CkpMessage> COMPARATOR = Comparator.comparing(CkpMessage::getInstant)
|
||||||
|
.thenComparing(CkpMessage::getState);
|
||||||
|
|
||||||
|
private final String instant; // the instant time
|
||||||
|
private final State state; // the checkpoint state
|
||||||
|
|
||||||
|
public CkpMessage(String instant, String state) {
|
||||||
|
this.instant = instant;
|
||||||
|
this.state = State.valueOf(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CkpMessage(FileStatus fileStatus) {
|
||||||
|
String fileName = fileStatus.getPath().getName();
|
||||||
|
String[] nameAndExt = fileName.split("\\.");
|
||||||
|
ValidationUtils.checkState(nameAndExt.length == 2);
|
||||||
|
String name = nameAndExt[0];
|
||||||
|
String ext = nameAndExt[1];
|
||||||
|
|
||||||
|
this.instant = name;
|
||||||
|
this.state = State.valueOf(ext);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getInstant() {
|
||||||
|
return instant;
|
||||||
|
}
|
||||||
|
|
||||||
|
public State getState() {
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAborted() {
|
||||||
|
return State.ABORTED == this.state;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isComplete() {
|
||||||
|
return State.COMPLETED == this.state;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInflight() {
|
||||||
|
return State.INFLIGHT == this.state;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String getFileName(String instant, State state) {
|
||||||
|
return instant + "." + state.name();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<String> getAllFileNames(String instant) {
|
||||||
|
return Arrays.stream(State.values())
|
||||||
|
.map(state -> getFileName(instant, state))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(@NotNull CkpMessage o) {
|
||||||
|
return COMPARATOR.compare(this, o);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instant State.
|
||||||
|
*/
|
||||||
|
public enum State {
|
||||||
|
// Inflight instant
|
||||||
|
INFLIGHT,
|
||||||
|
// Aborted instant
|
||||||
|
// An instant can be aborted then be reused again, so it has lower priority
|
||||||
|
// than COMPLETED
|
||||||
|
ABORTED,
|
||||||
|
// Committed instant
|
||||||
|
COMPLETED
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Ckp{" + "instant='" + instant + '\'' + ", state='" + state + '\'' + '}';
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,226 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.sink.meta;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The checkpoint metadata for bookkeeping the checkpoint messages.
|
||||||
|
*
|
||||||
|
* <p>Each time the driver starts a new instant, it writes a commit message into the metadata, the write tasks
|
||||||
|
* then consume the message and unblock the data flushing.
|
||||||
|
*
|
||||||
|
* <p>Why we use the DFS based message queue instead of sending
|
||||||
|
* the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ?
|
||||||
|
* The write task handles the operator event using the main mailbox executor which has the lowest priority for mails,
|
||||||
|
* it is also used to process the inputs. When the write task blocks and waits for the operator event to ack the valid instant to write,
|
||||||
|
* it actually blocks all the subsequent events in the mailbox, the operator event would never be consumed then it causes deadlock.
|
||||||
|
*
|
||||||
|
* <p>The checkpoint metadata is also more lightweight than the active timeline.
|
||||||
|
*
|
||||||
|
* <p>NOTE: should be removed in the future if we have good manner to handle the async notifications from driver.
|
||||||
|
*/
|
||||||
|
public class CkpMetadata implements Serializable {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
|
||||||
|
|
||||||
|
protected static final int MAX_RETAIN_CKP_NUM = 3;
|
||||||
|
|
||||||
|
// the ckp metadata directory
|
||||||
|
private static final String CKP_META = "ckp_meta";
|
||||||
|
|
||||||
|
private final FileSystem fs;
|
||||||
|
protected final Path path;
|
||||||
|
|
||||||
|
private List<CkpMessage> messages;
|
||||||
|
private List<String> instantCache;
|
||||||
|
|
||||||
|
private CkpMetadata(String basePath) {
|
||||||
|
this(FSUtils.getFs(basePath, StreamerUtil.getHadoopConf()), basePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private CkpMetadata(FileSystem fs, String basePath) {
|
||||||
|
this.fs = fs;
|
||||||
|
this.path = new Path(ckpMetaPath(basePath));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
this.instantCache = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// WRITE METHODS
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
/**
|
||||||
|
* Initialize the message bus, would clean all the messages and publish the last pending instant.
|
||||||
|
*
|
||||||
|
* <p>This expects to be called by the driver.
|
||||||
|
*/
|
||||||
|
public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
|
||||||
|
fs.delete(path, true);
|
||||||
|
fs.mkdirs(path);
|
||||||
|
metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction()
|
||||||
|
.lastInstant().ifPresent(instant -> startInstant(instant.getTimestamp()));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void startInstant(String instant) {
|
||||||
|
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT));
|
||||||
|
try {
|
||||||
|
fs.createNewFile(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant);
|
||||||
|
}
|
||||||
|
// cleaning
|
||||||
|
clean(instant);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void clean(String newInstant) {
|
||||||
|
if (this.instantCache == null) {
|
||||||
|
this.instantCache = new ArrayList<>();
|
||||||
|
}
|
||||||
|
this.instantCache.add(newInstant);
|
||||||
|
if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
|
||||||
|
final String instant = instantCache.get(0);
|
||||||
|
boolean[] error = new boolean[1];
|
||||||
|
CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> {
|
||||||
|
try {
|
||||||
|
fs.delete(path, false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
error[0] = true;
|
||||||
|
LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if (!error[0]) {
|
||||||
|
instantCache.remove(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a checkpoint commit message.
|
||||||
|
*
|
||||||
|
* @param instant The committed instant
|
||||||
|
*/
|
||||||
|
public void commitInstant(String instant) {
|
||||||
|
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.COMPLETED));
|
||||||
|
try {
|
||||||
|
fs.createNewFile(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieException("Exception while adding checkpoint commit metadata for instant: " + instant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an aborted checkpoint message.
|
||||||
|
*/
|
||||||
|
public void abortInstant(String instant) {
|
||||||
|
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED));
|
||||||
|
try {
|
||||||
|
fs.createNewFile(path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// READ METHODS
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private void load() {
|
||||||
|
try {
|
||||||
|
this.messages = scanCkpMetadata(this.path);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieException("Exception while scanning the checkpoint meta files under path: " + this.path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public String lastPendingInstant() {
|
||||||
|
load();
|
||||||
|
for (int i = this.messages.size() - 1; i >= 0; i--) {
|
||||||
|
CkpMessage ckpMsg = this.messages.get(i);
|
||||||
|
// consider 'aborted' as pending too to reuse the instant
|
||||||
|
if (!ckpMsg.isComplete()) {
|
||||||
|
return ckpMsg.getInstant();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<CkpMessage> getMessages() {
|
||||||
|
load();
|
||||||
|
return messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAborted(String instant) {
|
||||||
|
ValidationUtils.checkState(this.messages != null, "The checkpoint metadata should #load first");
|
||||||
|
return this.messages.stream().anyMatch(ckpMsg -> instant.equals(ckpMsg.getInstant()) && ckpMsg.isAborted());
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Utilities
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
public static CkpMetadata getInstance(String basePath) {
|
||||||
|
return new CkpMetadata(basePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static CkpMetadata getInstance(FileSystem fs, String basePath) {
|
||||||
|
return new CkpMetadata(fs, basePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String ckpMetaPath(String basePath) {
|
||||||
|
return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path fullPath(String fileName) {
|
||||||
|
return new Path(path, fileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<CkpMessage> scanCkpMetadata(Path ckpMetaPath) throws IOException {
|
||||||
|
return Arrays.stream(this.fs.listStatus(ckpMetaPath)).map(CkpMessage::new)
|
||||||
|
.collect(Collectors.groupingBy(CkpMessage::getInstant)).values().stream()
|
||||||
|
.map(messages -> messages.stream().reduce((x, y) -> {
|
||||||
|
// Pick the one with the highest state
|
||||||
|
if (x.getState().compareTo(y.getState()) >= 0) {
|
||||||
|
return x;
|
||||||
|
}
|
||||||
|
return y;
|
||||||
|
}).get())
|
||||||
|
.sorted().collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -488,7 +488,7 @@ public class StreamerUtil {
|
|||||||
if (reloadTimeline) {
|
if (reloadTimeline) {
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
}
|
}
|
||||||
return metaClient.getCommitsTimeline().filterInflights()
|
return metaClient.getCommitsTimeline().filterPendingExcludingCompaction()
|
||||||
.lastInstant()
|
.lastInstant()
|
||||||
.map(HoodieInstant::getTimestamp)
|
.map(HoodieInstant::getTimestamp)
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
|
|||||||
@@ -95,8 +95,8 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
|
|||||||
.assertEmptyEvent()
|
.assertEmptyEvent()
|
||||||
.checkpointFails(1)
|
.checkpointFails(1)
|
||||||
.consume(TestData.DATA_SET_INSERT)
|
.consume(TestData.DATA_SET_INSERT)
|
||||||
.checkpointThrows(2,
|
//.checkpointThrows(2,
|
||||||
"Timeout(1000ms) while waiting for instant initialize")
|
// "Timeout(1000ms) while waiting for instant initialize")
|
||||||
// do not send the write event and fails the checkpoint,
|
// do not send the write event and fails the checkpoint,
|
||||||
// behaves like the last checkpoint is successful.
|
// behaves like the last checkpoint is successful.
|
||||||
.checkpointFails(2)
|
.checkpointFails(2)
|
||||||
|
|||||||
@@ -178,10 +178,11 @@ public class ITTestHoodieFlinkCompactor {
|
|||||||
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
|
options.put(FlinkOptions.CHANGELOG_ENABLED.key(), enableChangelog + "");
|
||||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||||
tableEnv.executeSql(hoodieTableDDL);
|
tableEnv.executeSql(hoodieTableDDL);
|
||||||
tableEnv.executeSql(TestSQL.INSERT_T1).await();
|
|
||||||
|
|
||||||
// wait for the asynchronous commit to finish
|
// insert dataset
|
||||||
TimeUnit.SECONDS.sleep(5);
|
tableEnv.executeSql(TestSQL.INSERT_T1).await();
|
||||||
|
// update the dataset
|
||||||
|
tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await();
|
||||||
|
|
||||||
// Make configuration and setAvroSchema.
|
// Make configuration and setAvroSchema.
|
||||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||||
@@ -195,8 +196,6 @@ public class ITTestHoodieFlinkCompactor {
|
|||||||
HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
|
HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
|
||||||
asyncCompactionService.start(null);
|
asyncCompactionService.start(null);
|
||||||
|
|
||||||
tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await();
|
|
||||||
|
|
||||||
// wait for the asynchronous commit to finish
|
// wait for the asynchronous commit to finish
|
||||||
TimeUnit.SECONDS.sleep(5);
|
TimeUnit.SECONDS.sleep(5);
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,75 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.sink.meta;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
import org.apache.hudi.utils.TestConfigurations;
|
||||||
|
|
||||||
|
import org.apache.flink.configuration.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.io.TempDir;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test cases for {@link CkpMetadata}.
|
||||||
|
*/
|
||||||
|
public class TestCkpMetadata {
|
||||||
|
|
||||||
|
private CkpMetadata metadata;
|
||||||
|
|
||||||
|
@TempDir
|
||||||
|
File tempFile;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void beforeEach() throws Exception {
|
||||||
|
String basePath = tempFile.getAbsolutePath();
|
||||||
|
FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), StreamerUtil.getHadoopConf());
|
||||||
|
|
||||||
|
Configuration conf = TestConfigurations.getDefaultConf(basePath);
|
||||||
|
StreamerUtil.initTableIfNotExists(conf);
|
||||||
|
|
||||||
|
this.metadata = CkpMetadata.getInstance(fs, basePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testWriteAndReadMessage() {
|
||||||
|
// write and read 5 committed checkpoints
|
||||||
|
IntStream.range(0, 3).forEach(i -> metadata.startInstant(i + ""));
|
||||||
|
|
||||||
|
assertThat(metadata.lastPendingInstant(), is("2"));
|
||||||
|
metadata.commitInstant("2");
|
||||||
|
assertThat(metadata.lastPendingInstant(), is("1"));
|
||||||
|
|
||||||
|
// test cleaning
|
||||||
|
IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));
|
||||||
|
assertThat(metadata.getMessages().size(), is(3));
|
||||||
|
// commit and abort instant does not trigger cleaning
|
||||||
|
metadata.commitInstant("6");
|
||||||
|
metadata.abortInstant("7");
|
||||||
|
assertThat(metadata.getMessages().size(), is(5));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user