1
0

[HUDI-1757] Assigns the buckets by record key for Flink writer (#2757)

Currently we assign the buckets by record partition path which could
cause hotspot if the partition field is datetime type. Changes to assign
buckets by grouping the record whth their key first, the assignment is
valid if only there is no conflict(two task write to the same bucket).

This patch also changes the coordinator execution to be asynchronous.
This commit is contained in:
Danny Chan
2021-04-06 19:06:41 +08:00
committed by GitHub
parent 920537cac8
commit 9c369c607d
25 changed files with 638 additions and 400 deletions

View File

@@ -255,7 +255,7 @@ public class FlinkOptions {
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
.key("write.batch.size.MB")
.doubleType()
.defaultValue(128D) // 128MB
.defaultValue(2D) // 2MB
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem");
// ------------------------------------------------------------------------
@@ -294,6 +294,12 @@ public class FlinkOptions {
.defaultValue(3600) // default 1 hour
.withDescription("Max delta seconds time needed to trigger compaction, default 1 hour");
public static final ConfigOption<Integer> COMPACTION_MAX_MEMORY = ConfigOptions
.key("compaction.max_memory")
.intType()
.defaultValue(100) // default 100 MB
.withDescription("Max memory in MB for compaction spillable map, default 100MB");
public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions
.key("clean.async.enabled")
.booleanType()

View File

@@ -30,6 +30,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Sink function that cleans the old commits.
@@ -40,6 +42,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
*/
public class CleanFunction<T> extends AbstractRichFunction
implements SinkFunction<T>, CheckpointedFunction, CheckpointListener {
private static final Logger LOG = LoggerFactory.getLogger(CleanFunction.class);
private final Configuration conf;
private HoodieFlinkWriteClient writeClient;
@@ -56,7 +60,7 @@ public class CleanFunction<T> extends AbstractRichFunction
super.open(parameters);
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.executor = new NonThrownExecutor();
this.executor = new NonThrownExecutor(LOG);
}
}
@@ -70,7 +74,7 @@ public class CleanFunction<T> extends AbstractRichFunction
// ensure to switch the isCleaning flag
this.isCleaning = false;
}
}, "wait for cleaning finish", "");
}, "wait for cleaning finish");
}
}

View File

@@ -31,9 +31,9 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -50,8 +50,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
/**
@@ -103,21 +101,6 @@ public class StreamWriteFunction<K, I, O>
*/
private transient Map<String, DataBucket> buckets;
/**
* The buffer lock to control data buffering/flushing.
*/
private transient ReentrantLock bufferLock;
/**
* The condition to decide whether to add new records into the buffer.
*/
private transient Condition addToBufferCondition;
/**
* Flag saying whether there is an on-going checkpoint.
*/
private volatile boolean onCheckpointing = false;
/**
* Config options.
*/
@@ -169,32 +152,15 @@ public class StreamWriteFunction<K, I, O>
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
bufferLock.lock();
try {
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
this.onCheckpointing = true;
// wait for the buffer data flush out and request a new instant
flushRemaining(false);
// signal the task thread to start buffering
addToBufferCondition.signal();
} finally {
this.onCheckpointing = false;
bufferLock.unlock();
}
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
// wait for the buffer data flush out and request a new instant
flushRemaining(false);
}
@Override
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) throws Exception {
bufferLock.lock();
try {
if (onCheckpointing) {
addToBufferCondition.await();
}
bufferRecord(value);
} finally {
bufferLock.unlock();
}
bufferRecord(value);
}
@Override
@@ -247,8 +213,6 @@ public class StreamWriteFunction<K, I, O>
private void initBuffer() {
this.buckets = new LinkedHashMap<>();
this.bufferLock = new ReentrantLock();
this.addToBufferCondition = this.bufferLock.newCondition();
}
private void initWriteFunction() {

View File

@@ -26,27 +26,20 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.sink.utils.CoordinatorExecutor;
import org.apache.hudi.sink.utils.HiveSyncContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.Preconditions;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -64,7 +57,7 @@ import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
* <p>This coordinator starts a new instant when a new checkpoint starts. It commits the instant when all the
* operator tasks write the buffer successfully for a round of checkpoint.
*
* <p>If there is no data for a round of checkpointing, it rolls back the metadata.
* <p>If there is no data for a round of checkpointing, it resets the events buffer and returns early.
*
* @see StreamWriteFunction for the work flow and semantics
*/
@@ -77,20 +70,20 @@ public class StreamWriteOperatorCoordinator
*/
private final Configuration conf;
/**
* Coordinator context.
*/
private final Context context;
/**
* Write client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
* Current data buffering checkpoint.
*/
private long inFlightCheckpoint = -1;
/**
* Current REQUESTED instant, for validation.
*/
private String instant = "";
private volatile String instant = "";
/**
* Event buffer for one round of checkpointing. When all the elements are non-null and have the same
@@ -111,7 +104,12 @@ public class StreamWriteOperatorCoordinator
/**
* A single-thread executor to handle all the asynchronous jobs of the coordinator.
*/
private NonThrownExecutor executor;
private CoordinatorExecutor executor;
/**
* A single-thread executor to handle asynchronous hive sync.
*/
private NonThrownExecutor hiveSyncExecutor;
/**
* Context that holds variables for asynchronous hive sync.
@@ -121,14 +119,15 @@ public class StreamWriteOperatorCoordinator
/**
* Constructs a StreamingSinkOperatorCoordinator.
*
* @param conf The config options
* @param parallelism The operator task number
* @param conf The config options
* @param context The coordinator context
*/
public StreamWriteOperatorCoordinator(
Configuration conf,
int parallelism) {
Context context) {
this.conf = conf;
this.parallelism = parallelism;
this.context = context;
this.parallelism = context.currentParallelism();
this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
}
@@ -142,6 +141,8 @@ public class StreamWriteOperatorCoordinator
initTableIfNotExists(this.conf);
// start a new instant
startInstant();
// start the executor
this.executor = new CoordinatorExecutor(this.context, LOG);
// start the executor if required
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
initHiveSync();
@@ -162,38 +163,46 @@ public class StreamWriteOperatorCoordinator
@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
try {
this.inFlightCheckpoint = checkpointId;
result.complete(writeCheckpointBytes());
} catch (Throwable throwable) {
// when a checkpoint fails, throws directly.
result.completeExceptionally(
new CompletionException(
String.format("Failed to checkpoint Instant %s for source %s",
this.instant, this.getClass().getSimpleName()), throwable));
}
executor.execute(
() -> {
try {
result.complete(new byte[0]);
} catch (Throwable throwable) {
// when a checkpoint fails, throws directly.
result.completeExceptionally(
new CompletionException(
String.format("Failed to checkpoint Instant %s for source %s",
this.instant, this.getClass().getSimpleName()), throwable));
}
}, "taking checkpoint %d", checkpointId
);
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
// start to commit the instant.
final String errorMsg = String.format("Instant [%s] has a complete checkpoint [%d],\n"
+ "but the coordinator has not received full write success events,\n"
+ "rolls back the instant and rethrow", this.instant, checkpointId);
checkAndForceCommit(errorMsg);
// if async compaction is on, schedule the compaction
if (needsScheduleCompaction) {
writeClient.scheduleCompaction(Option.empty());
}
executor.execute(
() -> {
// for streaming mode, commits the ever received events anyway,
// the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
final boolean committed = commitInstant();
if (committed) {
// if async compaction is on, schedule the compaction
if (needsScheduleCompaction) {
writeClient.scheduleCompaction(Option.empty());
}
// start new instant.
startInstant();
}
}, "commits the instant %s", this.instant
);
// sync Hive if is enabled
syncHiveIfEnabled();
// start new instant.
startInstant();
}
private void syncHiveIfEnabled() {
if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
this.executor.execute(this::syncHive, "sync hive metadata", this.instant);
this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
}
}
@@ -211,43 +220,38 @@ public class StreamWriteOperatorCoordinator
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
}
public void notifyCheckpointAborted(long checkpointId) {
Preconditions.checkState(inFlightCheckpoint == checkpointId,
"The aborted checkpoint should always be the last checkpoint");
checkAndForceCommit("The last checkpoint was aborted, roll back the last write and throw");
}
@Override
public void resetToCheckpoint(long checkpointID, @Nullable byte[] checkpointData) throws Exception {
if (checkpointData != null) {
// restore when any checkpoint completed
deserializeCheckpointAndRestore(checkpointData);
}
// no operation
}
@Override
public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
// no event to handle
ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
"The coordinator can only handle BatchWriteSuccessEvent");
BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
// the write task does not block after checkpointing(and before it receives a checkpoint success event),
// if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint
// success event, the data buffer would flush with an older instant time.
ValidationUtils.checkState(
HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
String.format("Receive an unexpected event for instant %s from task %d",
event.getInstantTime(), event.getTaskID()));
if (this.eventBuffer[event.getTaskID()] != null) {
this.eventBuffer[event.getTaskID()].mergeWith(event);
} else {
this.eventBuffer[event.getTaskID()] = event;
}
if (event.isEndInput() && checkReady()) {
// start to commit the instant.
doCommit();
// no compaction scheduling for batch mode
}
executor.execute(
() -> {
// no event to handle
ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
"The coordinator can only handle BatchWriteSuccessEvent");
BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
// the write task does not block after checkpointing(and before it receives a checkpoint success event),
// if it it checkpoints succeed then flushes the data buffer again before this coordinator receives a checkpoint
// success event, the data buffer would flush with an older instant time.
ValidationUtils.checkState(
HoodieTimeline.compareTimestamps(instant, HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
String.format("Receive an unexpected event for instant %s from task %d",
event.getInstantTime(), event.getTaskID()));
if (this.eventBuffer[event.getTaskID()] != null) {
this.eventBuffer[event.getTaskID()].mergeWith(event);
} else {
this.eventBuffer[event.getTaskID()] = event;
}
if (event.isEndInput() && allEventsReceived()) {
// start to commit the instant.
commitInstant();
// no compaction scheduling for batch mode
}
}, "handle write success event for instant %s", this.instant
);
}
@Override
@@ -265,85 +269,31 @@ public class StreamWriteOperatorCoordinator
// -------------------------------------------------------------------------
private void initHiveSync() {
this.executor = new NonThrownExecutor();
this.hiveSyncExecutor = new NonThrownExecutor(LOG);
this.hiveSyncContext = HiveSyncContext.create(conf);
}
static byte[] readBytes(DataInputStream in, int size) throws IOException {
byte[] bytes = new byte[size];
in.readFully(bytes);
return bytes;
}
/**
* Serialize the coordinator state. The current implementation may not be super efficient,
* but it should not matter that much because most of the state should be rather small.
* Large states themselves may already be a problem regardless of how the serialization
* is implemented.
*
* @return A byte array containing the serialized state of the source coordinator.
* @throws IOException When something goes wrong in serialization.
*/
private byte[] writeCheckpointBytes() throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputViewStreamWrapper(baos)) {
out.writeLong(this.inFlightCheckpoint);
byte[] serializedInstant = this.instant.getBytes();
out.writeInt(serializedInstant.length);
out.write(serializedInstant);
out.flush();
return baos.toByteArray();
}
}
/**
* Restore the state of this source coordinator from the state bytes.
*
* @param bytes The checkpoint bytes that was returned from {@link #writeCheckpointBytes()}
* @throws Exception When the deserialization failed.
*/
private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception {
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputStream in = new DataInputViewStreamWrapper(bais)) {
long checkpointID = in.readLong();
int serializedInstantSize = in.readInt();
byte[] serializedInstant = readBytes(in, serializedInstantSize);
this.inFlightCheckpoint = checkpointID;
this.instant = new String(serializedInstant);
}
}
private void reset() {
this.instant = "";
this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
}
private void checkAndForceCommit(String errMsg) {
if (!checkReady()) {
// forced but still has inflight instant
String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE));
if (inflightInstant != null) {
assert inflightInstant.equals(this.instant);
writeClient.rollback(this.instant);
throw new HoodieException(errMsg);
}
if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
// The last checkpoint finished successfully.
return;
}
}
doCommit();
}
/** Checks the buffer is ready to commit. */
private boolean checkReady() {
private boolean allEventsReceived() {
return Arrays.stream(eventBuffer)
.allMatch(event -> event != null && event.isReady(this.instant));
}
/** Performs the actual commit action. */
private void doCommit() {
/**
* Commits the instant.
*
* @return true if the write statuses are committed successfully.
*/
private boolean commitInstant() {
if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
// The last checkpoint finished successfully.
return false;
}
List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
.filter(Objects::nonNull)
.map(BatchWriteSuccessEvent::getWriteStatuses)
@@ -351,12 +301,16 @@ public class StreamWriteOperatorCoordinator
.collect(Collectors.toList());
if (writeResults.size() == 0) {
// No data has written, clear the metadata file
this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant);
// No data has written, reset the buffer and returns early
reset();
return;
return false;
}
doCommit(writeResults);
return true;
}
/** Performs the actual commit action. */
private void doCommit(List<WriteStatus> writeResults) {
// commit or rollback
long totalErrorRecords = writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
long totalRecords = writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);
@@ -371,7 +325,6 @@ public class StreamWriteOperatorCoordinator
boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata));
if (success) {
writeClient.postCommit(this.instant);
reset();
LOG.info("Commit instant [{}] success!", this.instant);
} else {
@@ -409,6 +362,19 @@ public class StreamWriteOperatorCoordinator
return writeClient;
}
@VisibleForTesting
public Context getContext() {
return context;
}
@VisibleForTesting
public void setExecutor(CoordinatorExecutor executor) throws Exception {
if (this.executor != null) {
this.executor.close();
}
this.executor = executor;
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
@@ -432,7 +398,7 @@ public class StreamWriteOperatorCoordinator
@Override
public OperatorCoordinator create(Context context) {
return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism());
return new StreamWriteOperatorCoordinator(this.conf, context);
}
}
}

View File

@@ -21,13 +21,17 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
@@ -36,6 +40,7 @@ import java.util.List;
* In order to execute scalable, the input should shuffle by the compact event {@link CompactionPlanEvent}.
*/
public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEvent, CompactionCommitEvent> {
private static final Logger LOG = LoggerFactory.getLogger(CompactFunction.class);
/**
* Config options.
@@ -52,6 +57,11 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv
*/
private int taskID;
/**
* Executor service to execute the compaction task.
*/
private transient NonThrownExecutor executor;
public CompactFunction(Configuration conf) {
this.conf = conf;
}
@@ -60,23 +70,33 @@ public class CompactFunction extends KeyedProcessFunction<Long, CompactionPlanEv
public void open(Configuration parameters) throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.executor = new NonThrownExecutor(LOG);
}
@Override
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
final String instantTime = event.getCompactionInstantTime();
final CompactionOperation compactionOperation = event.getOperation();
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
this.writeClient.getConfig(),
this.writeClient.getEngineContext(),
this.writeClient.getHoodieTable().getMetaClient()),
this.writeClient.getHoodieTable().getMetaClient(),
this.writeClient.getConfig(),
compactionOperation,
instantTime);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}, "Execute compaction for instant %s from task %d", instantTime, taskID
);
}
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
this.writeClient.getConfig(),
this.writeClient.getEngineContext(),
this.writeClient.getHoodieTable().getMetaClient()),
this.writeClient.getHoodieTable().getMetaClient(),
this.writeClient.getConfig(),
compactionOperation,
instantTime);
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;
}
}

View File

@@ -23,8 +23,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.sink.CleanFunction;
@@ -37,8 +35,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -67,13 +66,9 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
/**
* Buffer to collect the event from each compact task {@code CompactFunction}.
* The key is the instant time.
*/
private transient List<CompactionCommitEvent> commitBuffer;
/**
* Current on-going compaction instant time.
*/
private String compactionInstantTime;
private transient Map<String, List<CompactionCommitEvent>> commitBuffer;
public CompactionCommitSink(Configuration conf) {
super(conf);
@@ -84,36 +79,32 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext());
this.commitBuffer = new ArrayList<>();
this.commitBuffer = new HashMap<>();
}
@Override
public void invoke(CompactionCommitEvent event, Context context) throws Exception {
if (compactionInstantTime == null) {
compactionInstantTime = event.getInstant();
} else if (!event.getInstant().equals(compactionInstantTime)) {
// last compaction still not finish, rolls it back
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(this.compactionInstantTime);
writeClient.rollbackInflightCompaction(inflightInstant);
this.compactionInstantTime = event.getInstant();
}
this.commitBuffer.add(event);
commitIfNecessary();
final String instant = event.getInstant();
commitBuffer.computeIfAbsent(instant, k -> new ArrayList<>())
.add(event);
commitIfNecessary(instant, commitBuffer.get(instant));
}
/**
* Condition to commit: the commit buffer has equal size with the compaction plan operations
* and all the compact commit event {@link CompactionCommitEvent} has the same compaction instant time.
*
* @param instant Compaction commit instant time
* @param events Commit events ever received for the instant
*/
private void commitIfNecessary() throws IOException {
private void commitIfNecessary(String instant, List<CompactionCommitEvent> events) throws IOException {
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
this.writeClient.getHoodieTable().getMetaClient(), compactionInstantTime);
boolean isReady = compactionPlan.getOperations().size() == commitBuffer.size()
&& commitBuffer.stream().allMatch(event -> event != null && Objects.equals(event.getInstant(), compactionInstantTime));
this.writeClient.getHoodieTable().getMetaClient(), instant);
boolean isReady = compactionPlan.getOperations().size() == events.size();
if (!isReady) {
return;
}
List<WriteStatus> statuses = this.commitBuffer.stream()
List<WriteStatus> statuses = events.stream()
.map(CompactionCommitEvent::getWriteStatuses)
.flatMap(Collection::stream)
.collect(Collectors.toList());
@@ -127,16 +118,15 @@ public class CompactionCommitSink extends CleanFunction<CompactionCommitEvent> {
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, writeClient.getConfig().getSchema());
this.writeClient.completeCompaction(
metadata, statuses, this.writeClient.getHoodieTable(), compactionInstantTime);
metadata, statuses, this.writeClient.getHoodieTable(), instant);
}
// commit the compaction
this.writeClient.commitCompaction(compactionInstantTime, statuses, Option.empty());
this.writeClient.commitCompaction(instant, statuses, Option.empty());
// reset the status
reset();
reset(instant);
}
private void reset() {
this.commitBuffer.clear();
this.compactionInstantTime = null;
private void reset(String instant) {
this.commitBuffer.remove(instant);
}
}

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -56,8 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* The function to build the write profile incrementally for records within a checkpoint,
@@ -106,29 +103,11 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
private final boolean isChangingRecords;
/**
* All the partition paths when the task starts. It is used to help checking whether all the partitions
* are loaded into the state.
*/
private transient Set<String> initialPartitionsToLoad;
/**
* State to book-keep which partition is loaded into the index state {@code indexState}.
*/
private MapState<String, Integer> partitionLoadState;
/**
* Whether all partitions are loaded, if it is true,
* we can only check the state for locations.
*/
private boolean allPartitionsLoaded = false;
/**
* Flag saying whether to check that all the partitions are loaded.
* So that there is chance that flag {@code allPartitionsLoaded} becomes true.
*/
private boolean checkPartition = true;
public BucketAssignFunction(Configuration conf) {
this.conf = conf;
this.isChangingRecords = WriteOperationType.isChangingRecords(
@@ -144,12 +123,11 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = BucketAssigners.create(
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
context,
writeConfig);
// initialize and check the partitions load state
loadInitialPartitions();
}
@Override
@@ -181,15 +159,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
final BucketInfo bucketInfo;
final HoodieRecordLocation location;
// Checks whether all the partitions are loaded first.
if (checkPartition && !allPartitionsLoaded) {
checkPartitionsLoaded();
checkPartition = false;
}
if (!allPartitionsLoaded
&& initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // this is an existing partition
&& !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
if (!partitionLoadState.contains(hoodieKey.getPartitionPath())) {
// If the partition records are never loaded, load the records first.
loadRecords(hoodieKey.getPartitionPath());
}
@@ -226,9 +196,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
public void notifyCheckpointComplete(long l) {
// Refresh the table state when there are new commits.
this.bucketAssigner.refreshTable();
if (!allPartitionsLoaded) {
checkPartition = true;
}
}
/**
@@ -241,12 +208,21 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
List<HoodieBaseFile> latestBaseFiles =
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
for (HoodieBaseFile baseFile : latestBaseFiles) {
List<HoodieKey> hoodieKeys =
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
hoodieKeys.forEach(hoodieKey -> {
try {
this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
// Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
// the input records is shuffled by record key
boolean shouldLoad = KeyGroupRangeAssignment.assignKeyToParallelOperator(
hoodieKey.getRecordKey(), maxParallelism, parallelism) == taskID;
if (shouldLoad) {
this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
}
} catch (Exception e) {
throw new HoodieIOException("Error when load record keys from file: " + baseFile);
}
@@ -256,49 +232,9 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
partitionLoadState.put(partitionPath, 0);
}
/**
* Loads the existing partitions for this task.
*/
private void loadInitialPartitions() {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
this.conf.getString(FlinkOptions.PATH), false, false, false);
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
// reference: org.apache.flink.streaming.api.datastream.KeyedStream
this.initialPartitionsToLoad = allPartitionPaths.stream()
.filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
.collect(Collectors.toSet());
}
/**
* Checks whether all the partitions of the table are loaded into the state,
* set the flag {@code allPartitionsLoaded} to true if it is.
*/
private void checkPartitionsLoaded() {
for (String partition : this.initialPartitionsToLoad) {
try {
if (!this.partitionLoadState.contains(partition)) {
return;
}
} catch (Exception e) {
LOG.warn("Error when check whether all partitions are loaded, ignored", e);
throw new HoodieException(e);
}
}
this.allPartitionsLoaded = true;
}
@VisibleForTesting
public boolean isAllPartitionsLoaded() {
return this.allPartitionsLoaded;
}
@VisibleForTesting
public void clearIndexState() {
this.allPartitionsLoaded = false;
this.indexState.clear();
loadInitialPartitions();
}
@VisibleForTesting

View File

@@ -60,6 +60,16 @@ import java.util.stream.Collectors;
public class BucketAssigner {
private static final Logger LOG = LogManager.getLogger(BucketAssigner.class);
/**
* Task ID.
*/
private final int taskID;
/**
* Number of tasks.
*/
private final int numTasks;
/**
* Remembers what type each bucket is for later.
*/
@@ -104,12 +114,16 @@ public class BucketAssigner {
private final Map<String, NewFileAssignState> newFileAssignStates;
public BucketAssigner(
int taskID,
int numTasks,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
bucketInfoMap = new HashMap<>();
partitionSmallFilesMap = new HashMap<>();
smallFileAssignStates = new HashMap<>();
newFileAssignStates = new HashMap<>();
this.taskID = taskID;
this.numTasks = numTasks;
this.context = context;
this.config = config;
this.table = HoodieFlinkTable.create(this.config, this.context);
@@ -187,7 +201,7 @@ public class BucketAssigner {
if (partitionSmallFilesMap.containsKey(partitionPath)) {
return partitionSmallFilesMap.get(partitionPath);
}
List<SmallFile> smallFiles = getSmallFiles(partitionPath);
List<SmallFile> smallFiles = smallFilesOfThisTask(getSmallFiles(partitionPath));
if (smallFiles.size() > 0) {
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
partitionSmallFilesMap.put(partitionPath, smallFiles);
@@ -240,6 +254,15 @@ public class BucketAssigner {
return smallFileLocations;
}
private List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) {
// computes the small files to write inserts for this task.
List<SmallFile> smallFilesOfThisTask = new ArrayList<>();
for (int i = taskID; i < smallFiles.size(); i += numTasks) {
smallFilesOfThisTask.add(smallFiles.get(i));
}
return smallFilesOfThisTask;
}
/**
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.

View File

@@ -33,20 +33,24 @@ public abstract class BucketAssigners {
/**
* Creates a {@code BucketAssigner}.
*
* @param taskID The task ID
* @param numTasks The number of tasks
* @param tableType The table type
* @param context The engine context
* @param config The configuration
* @return the bucket assigner instance
*/
public static BucketAssigner create(
int taskID,
int numTasks,
HoodieTableType tableType,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
switch (tableType) {
case COPY_ON_WRITE:
return new BucketAssigner(context, config);
return new BucketAssigner(taskID, numTasks, context, config);
case MERGE_ON_READ:
return new DeltaBucketAssigner(context, config);
return new DeltaBucketAssigner(taskID, numTasks, context, config);
default:
throw new AssertionError();
}

View File

@@ -40,8 +40,12 @@ import java.util.stream.Collectors;
* <p>Note: assumes the index can always index log files for Flink write.
*/
public class DeltaBucketAssigner extends BucketAssigner {
public DeltaBucketAssigner(HoodieFlinkEngineContext context, HoodieWriteConfig config) {
super(context, config);
public DeltaBucketAssigner(
int taskID,
int numTasks,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
super(taskID, numTasks, context, config);
}
@Override
@@ -77,11 +81,13 @@ public class DeltaBucketAssigner extends BucketAssigner {
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
} else {
HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> {
// in case there is something error, and the file slice has no log file
sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
FSUtils.getFileIdFromLogPath(logFile.getPath()));
sf.sizeBytes = getTotalFileSize(smallFileSlice);
smallFileLocations.add(sf);
});
}
}
}

View File

@@ -23,6 +23,9 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.util.RowDataToAvroConverters;
@@ -36,7 +39,11 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
/**
* Function that transforms RowData to HoodieRecord.
@@ -63,6 +70,11 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
*/
private transient KeyGenerator keyGenerator;
/**
* Utilities to create hoodie pay load instance.
*/
private transient PayloadCreation payloadCreation;
/**
* Config options.
*/
@@ -79,6 +91,7 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
this.avroSchema = StreamerUtil.getSourceSchema(this.config);
this.converter = RowDataToAvroConverters.createConverter(this.rowType);
this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
this.payloadCreation = PayloadCreation.instance(config);
}
@SuppressWarnings("unchecked")
@@ -95,19 +108,61 @@ public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?
* @throws IOException if error occurs
*/
@SuppressWarnings("rawtypes")
private HoodieRecord toHoodieRecord(I record) throws IOException {
boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
|| WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
private HoodieRecord toHoodieRecord(I record) throws Exception {
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
final HoodieKey hoodieKey = keyGenerator.getKey(gr);
// nullify the payload insert data to mark the record as a DELETE
gr = record.getRowKind() == RowKind.DELETE ? null : gr;
HoodieRecordPayload payload = shouldCombine
? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
: StreamerUtil.createPayload(payloadClazz, gr);
final boolean isDelete = record.getRowKind() == RowKind.DELETE;
HoodieRecordPayload payload = payloadCreation.createPayload(gr, isDelete);
return new HoodieRecord<>(hoodieKey, payload);
}
/**
* Util to create hoodie pay load instance.
*/
private static class PayloadCreation implements Serializable {
private static final long serialVersionUID = 1L;
private final boolean shouldCombine;
private final Constructor<?> constructor;
private final String preCombineField;
private PayloadCreation(
boolean shouldCombine,
Constructor<?> constructor,
@Nullable String preCombineField) {
this.shouldCombine = shouldCombine;
this.constructor = constructor;
this.preCombineField = preCombineField;
}
public static PayloadCreation instance(Configuration conf) throws Exception {
boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
|| WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
String preCombineField = null;
final Class<?>[] argTypes;
final Constructor<?> constructor;
if (shouldCombine) {
preCombineField = conf.getString(FlinkOptions.PRECOMBINE_FIELD);
argTypes = new Class<?>[] {GenericRecord.class, Comparable.class};
} else {
argTypes = new Class<?>[] {Option.class};
}
final String clazz = conf.getString(FlinkOptions.PAYLOAD_CLASS);
constructor = ReflectionUtils.getClass(clazz).getConstructor(argTypes);
return new PayloadCreation(shouldCombine, constructor, preCombineField);
}
public HoodieRecordPayload<?> createPayload(GenericRecord record, boolean isDelete) throws Exception {
if (shouldCombine) {
ValidationUtils.checkState(preCombineField != null);
Comparable<?> orderingVal = (Comparable<?>) HoodieAvroUtils.getNestedFieldVal(record,
preCombineField, false);
return (HoodieRecordPayload<?>) constructor.newInstance(
isDelete ? null : record, orderingVal);
} else {
return (HoodieRecordPayload<?>) this.constructor.newInstance(Option.of(record));
}
}
}
}

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.hudi.exception.HoodieException;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
/**
* Coordinator executor that executes the tasks asynchronously, it fails the job
* for any task exceptions.
*
* <p>We need this because the coordinator methods are called by
* the Job Manager's main thread (mailbox thread), executes the methods asynchronously
* to avoid blocking the main thread.
*/
public class CoordinatorExecutor extends NonThrownExecutor {
private final OperatorCoordinator.Context context;
public CoordinatorExecutor(OperatorCoordinator.Context context, Logger logger) {
super(logger);
this.context = context;
}
@Override
protected void exceptionHook(String actionString, Throwable t) {
this.context.failJob(new HoodieException(actionString, t));
}
@Override
public void close() throws Exception {
// wait for the remaining tasks to finish.
executor.shutdown();
// We do not expect this to actually block for long. At this point, there should
// be very few task running in the executor, if any.
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.sink.utils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -31,15 +30,16 @@ import java.util.concurrent.TimeUnit;
* An executor service that catches all the throwable with logging.
*/
public class NonThrownExecutor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NonThrownExecutor.class);
private final Logger logger;
/**
* A single-thread executor to handle all the asynchronous jobs.
*/
private final ExecutorService executor;
protected final ExecutorService executor;
public NonThrownExecutor() {
public NonThrownExecutor(Logger logger) {
this.executor = Executors.newSingleThreadExecutor();
this.logger = logger;
}
/**
@@ -48,24 +48,30 @@ public class NonThrownExecutor implements AutoCloseable {
public void execute(
final ThrowingRunnable<Throwable> action,
final String actionName,
final String instant) {
final Object... actionParams) {
executor.execute(
() -> {
final String actionString = String.format(actionName, actionParams);
try {
action.run();
LOG.info("Executor executes action [{}] for instant [{}] success!", actionName, instant);
logger.info("Executor executes action [{}] success!", actionString);
} catch (Throwable t) {
// if we have a JVM critical error, promote it immediately, there is a good
// chance the
// logging or job failing will not succeed any more
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
final String errMsg = String.format("Executor executes action [%s] error", actionName);
LOG.error(errMsg, t);
final String errMsg = String.format("Executor executes action [%s] error", actionString);
logger.error(errMsg, t);
exceptionHook(errMsg, t);
}
});
}
protected void exceptionHook(String errMsg, Throwable t) {
// for sub-class to override.
}
@Override
public void close() throws Exception {
if (executor != null) {

View File

@@ -69,8 +69,8 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning {
DataStream<Object> pipeline = dataStream
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
// Key-by partition path, to avoid multiple subtasks write to a partition at the same time
.keyBy(HoodieRecord::getPartitionPath)
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),

View File

@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
@@ -179,19 +180,6 @@ public class StreamerUtil {
}
}
/**
* Create a payload class via reflection, do not ordering/precombine value.
*/
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record)
throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[] {Option.class}, Option.of(record));
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
}
public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) {
return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
}
@@ -215,6 +203,12 @@ public class StreamerUtil {
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.build())
.withMemoryConfig(
HoodieMemoryConfig.newBuilder()
.withMaxMemoryMaxSize(
conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L,
conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L
).build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withAutoCommit(false)
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));