1
0

[HUDI-2376] Add pipeline for Append mode (#3573)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-09-02 16:32:40 +08:00
committed by GitHub
parent 21fd6edfe7
commit 7a1bd225ca
42 changed files with 946 additions and 443 deletions

View File

@@ -222,10 +222,10 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ");
public static final ConfigOption<Boolean> INSERT_DEDUP = ConfigOptions
.key("write.insert.deduplicate")
.booleanType()
.defaultValue(true)
.withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true");
.key("write.insert.deduplicate")
.booleanType()
.defaultValue(true)
.withDescription("Whether to deduplicate for INSERT operation, if disabled, writes the base files directly, default true");
public static final ConfigOption<String> OPERATION = ConfigOptions
.key("write.operation")
@@ -370,6 +370,27 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(1024)
.withDescription("Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB");
public static final ConfigOption<Integer> WRITE_PARQUET_BLOCK_SIZE = ConfigOptions
.key("write.parquet.block.size")
.intType()
.defaultValue(120)
.withDescription("Parquet RowGroup size. It's recommended to make this large enough that scan costs can be"
+ " amortized by packing enough column values into a single row group.");
public static final ConfigOption<Integer> WRITE_PARQUET_MAX_FILE_SIZE = ConfigOptions
.key("write.parquet.max.file.size")
.intType()
.defaultValue(120)
.withDescription("Target size for parquet files produced by Hudi write phases. "
+ "For DFS, this needs to be aligned with the underlying filesystem block size for optimal performance.");
public static final ConfigOption<Integer> WRITE_PARQUET_PAGE_SIZE = ConfigOptions
.key("hoodie.parquet.page.size")
.intType()
.defaultValue(1)
.withDescription("Parquet page size. Page is the unit of read within a parquet file. "
+ "Within a block, pages are compressed separately.");
public static final ConfigOption<Integer> WRITE_MERGE_MAX_MEMORY = ConfigOptions
.key("write.merge.max_memory")
.intType()

View File

@@ -18,38 +18,26 @@
package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ObjectSizeCalculator;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +49,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -101,9 +88,7 @@ import java.util.stream.Collectors;
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
public class StreamWriteFunction<K, I, O>
extends KeyedProcessFunction<K, I, O>
implements CheckpointedFunction {
public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
private static final long serialVersionUID = 1L;
@@ -114,76 +99,20 @@ public class StreamWriteFunction<K, I, O>
*/
private transient Map<String, DataBucket> buckets;
/**
* Config options.
*/
private final Configuration config;
/**
* Id of current subtask.
*/
private int taskID;
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
/**
* The REQUESTED instant we write the data.
*/
private volatile String currentInstant;
/**
* Gateway to send operator events to the operator coordinator.
*/
private transient OperatorEventGateway eventGateway;
/**
* Commit action type.
*/
private transient String actionType;
/**
* Total size tracer.
*/
private transient TotalSizeTracer tracer;
/**
* Flag saying whether the write task is waiting for the checkpoint success notification
* after it finished a checkpoint.
*
* <p>The flag is needed because the write task does not block during the waiting time interval,
* some data buckets still flush out with old instant time. There are two cases that the flush may produce
* corrupted files if the old instant is committed successfully:
* 1) the write handle was writing data but interrupted, left a corrupted parquet file;
* 2) the write handle finished the write but was not closed, left an empty parquet file.
*
* <p>To solve, when this flag was set to true, we block the data flushing thus the #processElement method,
* the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant
* time changed(the last instant committed successfully).
*/
private volatile boolean confirming = false;
/**
* List state of the write metadata events.
*/
private transient ListState<WriteMetadataEvent> writeMetadataState;
/**
* Write status list for the current checkpoint.
*/
private List<WriteStatus> writeStatuses;
/**
* Constructs a StreamingSinkFunction.
*
* @param config The config options
*/
public StreamWriteFunction(Configuration config) {
this.config = config;
super(config);
}
@Override
@@ -194,42 +123,15 @@ public class StreamWriteFunction<K, I, O>
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.writeStatuses = new ArrayList<>();
this.writeMetadataState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"write-metadata-state",
TypeInformation.of(WriteMetadataEvent.class)
));
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
if (context.isRestored()) {
restoreWriteMetadata();
} else {
sendBootstrapEvent();
}
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
public void snapshotState() {
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
// wait for the buffer data flush out and request a new instant
flushRemaining(false);
// Reload the snapshot state as the current state.
reloadWriteMetaState();
}
@Override
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {
bufferRecord((HoodieRecord<?>) value);
}
@@ -264,21 +166,6 @@ public class StreamWriteFunction<K, I, O>
return ret;
}
@VisibleForTesting
@SuppressWarnings("rawtypes")
public HoodieFlinkWriteClient getWriteClient() {
return writeClient;
}
@VisibleForTesting
public boolean isConfirming() {
return this.confirming;
}
public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
this.eventGateway = operatorEventGateway;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -307,49 +194,6 @@ public class StreamWriteFunction<K, I, O>
}
}
private void restoreWriteMetadata() throws Exception {
String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
boolean eventSent = false;
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
if (Objects.equals(lastInflight, event.getInstantTime())) {
// The checkpoint succeed but the meta does not commit,
// re-commit the inflight instant
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
eventSent = true;
}
}
if (!eventSent) {
sendBootstrapEvent();
}
}
private void sendBootstrapEvent() {
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}
/**
* Reload the write metadata state as the current checkpoint.
*/
private void reloadWriteMetaState() throws Exception {
this.writeMetadataState.clear();
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses))
.bootstrap(true)
.build();
this.writeMetadataState.add(event);
writeStatuses.clear();
}
public void handleOperatorEvent(OperatorEvent event) {
ValidationUtils.checkArgument(event instanceof CommitAckEvent,
"The write function can only handle CommitAckEvent");
this.confirming = false;
}
/**
* Represents a data item in the buffer, this is needed to reduce the
* memory footprint.
@@ -562,32 +406,6 @@ public class StreamWriteFunction<K, I, O>
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}
private String instantToWrite(boolean hasData) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder()
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize")
.build();
while (confirming) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
instant = this.writeClient.getLastPendingInstant(this.actionType);
} else {
// the inflight instant changed, which means the last instant was committed
// successfully.
confirming = false;
}
}
return instant;
}
@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite(true);

View File

@@ -18,12 +18,10 @@
package org.apache.hudi.sink;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
/**
@@ -31,27 +29,13 @@ import org.apache.flink.streaming.api.operators.StreamSink;
*
* @param <I> The input type
*/
public class StreamWriteOperator<I>
extends KeyedProcessOperator<Object, I, Object>
implements OperatorEventHandler, BoundedOneInput {
private final StreamWriteFunction<Object, I, Object> sinkFunction;
public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {
public StreamWriteOperator(Configuration conf) {
super(new StreamWriteFunction<>(conf));
this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
}
@Override
public void handleOperatorEvent(OperatorEvent event) {
this.sinkFunction.handleOperatorEvent(event);
}
void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
sinkFunction.setOperatorEventGateway(operatorEventGateway);
}
@Override
public void endInput() {
sinkFunction.endInput();
public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf));
}
}

View File

@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.append;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import java.util.List;
/**
* Sink function to write the data to the underneath filesystem.
*
* <p>The function writes base files directly for each checkpoint,
* the file may roll over when its size hits the configured threshold.
*
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
private static final long serialVersionUID = 1L;
/**
* Helper class for log mode.
*/
private transient BulkInsertWriterHelper writerHelper;
/**
* Table row type.
*/
private final RowType rowType;
/**
* Constructs an AppendWriteFunction.
*
* @param config The config options
*/
public AppendWriteFunction(Configuration config, RowType rowType) {
super(config);
this.rowType = rowType;
}
@Override
public void snapshotState() {
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
// wait for the buffer data flush out and request a new instant
flushData(false);
// nullify the write helper for next ckp
this.writerHelper = null;
}
@Override
public void processElement(I value, Context ctx, Collector<Object> out) throws Exception {
if (this.writerHelper == null) {
initWriterHelper();
}
this.writerHelper.write((RowData) value);
}
@Override
public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
}
/**
* End input action for batch source.
*/
public void endInput() {
flushData(true);
this.writeClient.cleanHandles();
this.writeStatuses.clear();
}
// -------------------------------------------------------------------------
// GetterSetter
// -------------------------------------------------------------------------
@VisibleForTesting
public BulkInsertWriterHelper getWriterHelper() {
return this.writerHelper;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void initWriterHelper() {
this.currentInstant = instantToWrite(true);
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");
}
this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
this.currentInstant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
}
private void flushData(boolean endInput) {
final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(this.writerHelper.getInstantTime())
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(endInput)
.build();
this.eventGateway.sendEventToCoordinator(event);
}
}

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.append;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
/**
* Operator for {@link AppendWriteFunction}.
*
* @param <I> The input type
*/
public class AppendWriteOperator<I> extends AbstractWriteOperator<I> {
public AppendWriteOperator(Configuration conf, RowType rowType) {
super(new AppendWriteFunction<>(conf, rowType));
}
public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, RowType rowType) {
return WriteOperatorFactory.instance(conf, new AppendWriteOperator<>(conf, rowType));
}
}

View File

@@ -19,21 +19,20 @@
package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
@@ -43,7 +42,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Sink function to write the data to the underneath filesystem.
@@ -55,8 +53,8 @@ import java.util.stream.Collectors;
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
public class BulkInsertWriteFunction<I, O>
extends ProcessFunction<I, O> {
public class BulkInsertWriteFunction<I>
extends AbstractWriteFunction<I> {
private static final long serialVersionUID = 1L;
@@ -126,7 +124,7 @@ public class BulkInsertWriteFunction<I, O>
}
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
public void processElement(I value, Context ctx, Collector<Object> out) throws IOException {
this.writerHelper.write((RowData) value);
}
@@ -142,14 +140,8 @@ public class BulkInsertWriteFunction<I, O>
* End input action for batch source.
*/
public void endInput() {
final List<WriteStatus> writeStatus;
try {
this.writerHelper.close();
writeStatus = this.writerHelper.getWriteStatuses().stream()
.map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
}
final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(this.writerHelper.getInstantTime())
@@ -160,17 +152,9 @@ public class BulkInsertWriteFunction<I, O>
this.eventGateway.sendEventToCoordinator(event);
}
/**
* Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}.
*/
private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) {
WriteStatus writeStatus = new WriteStatus(false, 0.1);
writeStatus.setStat(internalWriteStatus.getStat());
writeStatus.setFileId(internalWriteStatus.getFileId());
writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
return writeStatus;
@Override
public void handleOperatorEvent(OperatorEvent event) {
// no operation
}
// -------------------------------------------------------------------------

View File

@@ -18,24 +18,12 @@
package org.apache.hudi.sink.bulk;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
/**
@@ -44,13 +32,11 @@ import org.apache.flink.table.types.logical.RowType;
* @param <I> The input type
*/
public class BulkInsertWriteOperator<I>
extends ProcessOperator<I, Object>
implements OperatorEventHandler, BoundedOneInput {
private final BulkInsertWriteFunction<I, Object> sinkFunction;
extends AbstractWriteOperator<I>
implements BoundedOneInput {
public BulkInsertWriteOperator(Configuration conf, RowType rowType) {
super(new BulkInsertWriteFunction<>(conf, rowType));
this.sinkFunction = (BulkInsertWriteFunction<I, Object>) getUserFunction();
}
@Override
@@ -58,58 +44,7 @@ public class BulkInsertWriteOperator<I>
// no operation
}
void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
sinkFunction.setOperatorEventGateway(operatorEventGateway);
}
@Override
public void endInput() {
sinkFunction.endInput();
}
public static OperatorFactory<RowData> getFactory(Configuration conf, RowType rowType) {
return new OperatorFactory<>(conf, rowType);
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
public static class OperatorFactory<I>
extends SimpleUdfStreamOperatorFactory<Object>
implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
private static final long serialVersionUID = 1L;
private final BulkInsertWriteOperator<I> operator;
private final Configuration conf;
public OperatorFactory(Configuration conf, RowType rowType) {
super(new BulkInsertWriteOperator<>(conf, rowType));
this.operator = (BulkInsertWriteOperator<I>) getOperator();
this.conf = conf;
}
@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
final OperatorID operatorID = parameters.getStreamConfig().getOperatorID();
final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID));
this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
this.operator.setProcessingTimeService(this.processingTimeService);
eventDispatcher.registerEventHandler(operatorID, operator);
return (T) operator;
}
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
}
@Override
public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
super.setProcessingTimeService(processingTimeService);
}
public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, RowType rowType) {
return WriteOperatorFactory.instance(conf, new BulkInsertWriteOperator<>(conf, rowType));
}
}

View File

@@ -19,9 +19,11 @@
package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.table.HoodieTable;
@@ -39,6 +41,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* Helper class for bulk insert used by Flink.
@@ -101,7 +104,7 @@ public class BulkInsertWriterHelper {
}
}
public List<HoodieInternalWriteStatus> getWriteStatuses() throws IOException {
public List<HoodieInternalWriteStatus> getHoodieWriteStatuses() throws IOException {
close();
return writeStatusList;
}
@@ -172,5 +175,27 @@ public class BulkInsertWriterHelper {
return new RowType(false, mergedFields);
}
public List<WriteStatus> getWriteStatuses(int taskID) {
try {
return getHoodieWriteStatuses().stream()
.map(BulkInsertWriterHelper::toWriteStatus).collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieException("Error collect the write status for task [" + taskID + "]");
}
}
/**
* Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}.
*/
private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) {
WriteStatus writeStatus = new WriteStatus(false, 0.1);
writeStatus.setStat(internalWriteStatus.getStat());
writeStatus.setFileId(internalWriteStatus.getFileId());
writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
return writeStatus;
}
}

View File

@@ -35,7 +35,6 @@ import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.common;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Base infrastructures for streaming writer function.
*
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
public abstract class AbstractStreamWriteFunction<I>
extends AbstractWriteFunction<I>
implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamWriteFunction.class);
/**
* Config options.
*/
protected final Configuration config;
/**
* Id of current subtask.
*/
protected int taskID;
/**
* Write Client.
*/
protected transient HoodieFlinkWriteClient writeClient;
/**
* The REQUESTED instant we write the data.
*/
protected volatile String currentInstant;
/**
* Gateway to send operator events to the operator coordinator.
*/
protected transient OperatorEventGateway eventGateway;
/**
* Commit action type.
*/
protected transient String actionType;
/**
* Flag saying whether the write task is waiting for the checkpoint success notification
* after it finished a checkpoint.
*
* <p>The flag is needed because the write task does not block during the waiting time interval,
* some data buckets still flush out with old instant time. There are two cases that the flush may produce
* corrupted files if the old instant is committed successfully:
* 1) the write handle was writing data but interrupted, left a corrupted parquet file;
* 2) the write handle finished the write but was not closed, left an empty parquet file.
*
* <p>To solve, when this flag was set to true, we block the data flushing thus the #processElement method,
* the flag was reset to false if the task receives the checkpoint success event or the latest inflight instant
* time changed(the last instant committed successfully).
*/
protected volatile boolean confirming = false;
/**
* List state of the write metadata events.
*/
private transient ListState<WriteMetadataEvent> writeMetadataState;
/**
* Write status list for the current checkpoint.
*/
protected List<WriteStatus> writeStatuses;
/**
* Constructs a StreamWriteFunctionBase.
*
* @param config The config options
*/
public AbstractStreamWriteFunction(Configuration config) {
this.config = config;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.writeStatuses = new ArrayList<>();
this.writeMetadataState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"write-metadata-state",
TypeInformation.of(WriteMetadataEvent.class)
));
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
if (context.isRestored()) {
restoreWriteMetadata();
} else {
sendBootstrapEvent();
}
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
snapshotState();
// Reload the snapshot state as the current state.
reloadWriteMetaState();
}
public abstract void snapshotState();
// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
@VisibleForTesting
@SuppressWarnings("rawtypes")
public HoodieFlinkWriteClient getWriteClient() {
return writeClient;
}
@VisibleForTesting
public boolean isConfirming() {
return this.confirming;
}
public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
this.eventGateway = operatorEventGateway;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void restoreWriteMetadata() throws Exception {
String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
boolean eventSent = false;
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
if (Objects.equals(lastInflight, event.getInstantTime())) {
// The checkpoint succeed but the meta does not commit,
// re-commit the inflight instant
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
eventSent = true;
}
}
if (!eventSent) {
sendBootstrapEvent();
}
}
private void sendBootstrapEvent() {
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}
/**
* Reload the write metadata state as the current checkpoint.
*/
private void reloadWriteMetaState() throws Exception {
this.writeMetadataState.clear();
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses))
.bootstrap(true)
.build();
this.writeMetadataState.add(event);
writeStatuses.clear();
}
public void handleOperatorEvent(OperatorEvent event) {
ValidationUtils.checkArgument(event instanceof CommitAckEvent,
"The write function can only handle CommitAckEvent");
this.confirming = false;
}
/**
* Prepares the instant time to write with for next checkpoint.
*
* @param hasData Whether the task has buffering data
* @return The instant time
*/
protected String instantToWrite(boolean hasData) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder()
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize")
.build();
while (confirming) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
instant = this.writeClient.getLastPendingInstant(this.actionType);
} else {
// the inflight instant changed, which means the last instant was committed
// successfully.
confirming = false;
}
}
return instant;
}
}

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.common;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
/**
* Base class for write function.
*
* @param <I> the input type
*/
public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, Object> implements BoundedOneInput {
/**
* Sets up the event gateway.
*/
public abstract void setOperatorEventGateway(OperatorEventGateway operatorEventGateway);
/**
* Invoked when bounded source ends up.
*/
public abstract void endInput();
/**
* Handles the operator event sent by the coordinator.
* @param event The event
*/
public abstract void handleOperatorEvent(OperatorEvent event);
}

View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.common;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ProcessOperator;
/**
* Base class for write operator.
*
* @param <I> the input type
*/
public abstract class AbstractWriteOperator<I>
extends ProcessOperator<I, Object>
implements OperatorEventHandler, BoundedOneInput {
private final AbstractWriteFunction<I> function;
public AbstractWriteOperator(AbstractWriteFunction<I> function) {
super(function);
this.function = function;
}
public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
this.function.setOperatorEventGateway(operatorEventGateway);
}
@Override
public void endInput() {
this.function.endInput();
}
@Override
public void handleOperatorEvent(OperatorEvent evt) {
this.function.handleOperatorEvent(evt);
}
}

View File

@@ -16,7 +16,10 @@
* limitations under the License.
*/
package org.apache.hudi.sink;
package org.apache.hudi.sink.common;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -31,20 +34,24 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
/**
* Factory class for {@link StreamWriteOperator}.
*/
public class StreamWriteOperatorFactory<I>
public class WriteOperatorFactory<I>
extends SimpleUdfStreamOperatorFactory<Object>
implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
private static final long serialVersionUID = 1L;
private final StreamWriteOperator<I> operator;
private final AbstractWriteOperator<I> operator;
private final Configuration conf;
public StreamWriteOperatorFactory(Configuration conf) {
super(new StreamWriteOperator<>(conf));
this.operator = (StreamWriteOperator<I>) getOperator();
public WriteOperatorFactory(Configuration conf, AbstractWriteOperator<I> operator) {
super(operator);
this.operator = operator;
this.conf = conf;
}
public static <I> WriteOperatorFactory<I> instance(Configuration conf, AbstractWriteOperator<I> operator) {
return new WriteOperatorFactory<>(conf, operator);
}
@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {

View File

@@ -29,7 +29,8 @@ public class CommitAckEvent implements OperatorEvent {
private static final CommitAckEvent INSTANCE = new CommitAckEvent();
// default constructor for efficient serialization
public CommitAckEvent() {}
public CommitAckEvent() {
}
public static CommitAckEvent getInstance() {
return INSTANCE;

View File

@@ -61,7 +61,7 @@ public class DeltaWriteProfile extends WriteProfile {
// If we can index log files, we can add more inserts to log files for fileIds including those under
// pending compaction.
List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
.collect(Collectors.toList());
.collect(Collectors.toList());
for (FileSlice fileSlice : allFileSlices) {
if (isSmallFile(fileSlice)) {
allSmallFileSlices.add(fileSlice);

View File

@@ -52,7 +52,8 @@ public class WriteProfiles {
private static final Map<String, WriteProfile> PROFILES = new HashMap<>();
private WriteProfiles() {}
private WriteProfiles() {
}
public static synchronized WriteProfile singleton(
boolean ignoreSmallFiles,
@@ -104,7 +105,6 @@ public class WriteProfiles {
* @param basePath Table base path
* @param metadata The metadata
* @param fs The filesystem
*
* @return the commit file status list
*/
private static List<FileStatus> getWritePathsOfInstant(Path basePath, HoodieCommitMetadata metadata, FileSystem fs) {
@@ -143,7 +143,6 @@ public class WriteProfiles {
* @param basePath The table base path
* @param instant The hoodie instant
* @param timeline The timeline
*
* @return the commit metadata or empty if any error occurs
*/
public static Option<HoodieCommitMetadata> getCommitMetadataSafely(
@@ -172,7 +171,6 @@ public class WriteProfiles {
* @param basePath The table base path
* @param instant The hoodie instant
* @param timeline The timeline
*
* @return the commit metadata
*/
public static HoodieCommitMetadata getCommitMetadata(

View File

@@ -29,7 +29,8 @@ import org.apache.flink.table.types.logical.RowType;
* Utilities for {@link RowDataToHoodieFunction}.
*/
public abstract class RowDataToHoodieFunctions {
private RowDataToHoodieFunctions() {}
private RowDataToHoodieFunctions() {
}
/**
* Creates a {@link RowDataToHoodieFunction} instance based on the given configuration.

View File

@@ -28,6 +28,7 @@ public interface Transformer {
/**
* Transform source DataStream to target DataStream.
*
* @param source
*/
DataStream<RowData> apply(DataStream<RowData> source);

View File

@@ -21,12 +21,14 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -41,6 +43,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -52,7 +55,7 @@ import org.apache.flink.table.types.logical.RowType;
public class Pipelines {
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) {
@@ -80,9 +83,17 @@ public class Pipelines {
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
.addSink(DummySink.INSTANCE);
}
public static DataStreamSink<Object> append(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
return dataStream
.transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE);
}
public static DataStream<HoodieRecord> bootstrap(
@@ -143,7 +154,7 @@ public class Pipelines {
}
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
@@ -180,4 +191,12 @@ public class Pipelines {
.setParallelism(1)
.name("clean_commits");
}
/**
* Dummy sink that does nothing.
*/
public static class DummySink implements SinkFunction<Object> {
private static final long serialVersionUID = 1L;
public static DummySink INSTANCE = new DummySink();
}
}

View File

@@ -270,16 +270,16 @@ public class StreamReadMonitoringFunction
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
basePath, logPaths, commitToIssue,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
}).collect(Collectors.toList()))
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
basePath, logPaths, commitToIssue,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());

View File

@@ -142,22 +142,22 @@ public class FlinkStreamerConfig extends Configuration {
public Integer writeTaskNum = 4;
@Parameter(names = {"--partition-default-name"},
description = "The default partition name in case the dynamic partition column value is null/empty string")
description = "The default partition name in case the dynamic partition column value is null/empty string")
public String partitionDefaultName = "__DEFAULT_PARTITION__";
@Parameter(names = {"--index-bootstrap-enabled"},
description = "Whether to bootstrap the index state from existing hoodie table, default false")
description = "Whether to bootstrap the index state from existing hoodie table, default false")
public Boolean indexBootstrapEnabled = false;
@Parameter(names = {"--index-state-ttl"}, description = "Index state ttl in days, default 1.5 day")
public Double indexStateTtl = 1.5D;
@Parameter(names = {"--index-global-enabled"}, description = "Whether to update index for the old partition path "
+ "if same key record with different partition path came in, default false")
+ "if same key record with different partition path came in, default false")
public Boolean indexGlobalEnabled = false;
@Parameter(names = {"--index-partition-regex"},
description = "Whether to load partitions in state if partition path matching default *")
description = "Whether to load partitions in state if partition path matching default *")
public String indexPartitionRegex = ".*";
@Parameter(names = {"--source-avro-schema-path"}, description = "Source avro schema file path, the parsed schema is used for deserialization")
@@ -167,8 +167,8 @@ public class FlinkStreamerConfig extends Configuration {
public String sourceAvroSchema = "";
@Parameter(names = {"--utc-timezone"}, description = "Use UTC timezone or local timezone to the conversion between epoch"
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+ " use UTC timezone, by default true")
+ " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x"
+ " use UTC timezone, by default true")
public Boolean utcTimezone = true;
@Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false")
@@ -180,18 +180,18 @@ public class FlinkStreamerConfig extends Configuration {
public Boolean hiveStylePartitioning = false;
@Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB")
+ "it flushes the max size data bucket to avoid OOM, default 1GB")
public Double writeTaskMaxSize = 1024D;
@Parameter(names = {"--write-batch-size"},
description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
description = "Batch buffer size in MB to flush data into the underneath filesystem, default 64MB")
public Double writeBatchSize = 64D;
@Parameter(names = {"--write-log-block-size"}, description = "Max log block size in MB for log file, default 128MB")
public Integer writeLogBlockSize = 128;
@Parameter(names = {"--write-log-max-size"},
description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB")
description = "Maximum size allowed in MB for a log file before it is rolled over to the next version, default 1GB")
public Integer writeLogMaxSize = 1024;
@Parameter(names = {"--write-merge-max-memory"}, description = "Max memory in MB for merge, default 100MB")
@@ -204,11 +204,11 @@ public class FlinkStreamerConfig extends Configuration {
public Integer compactionTasks = 10;
@Parameter(names = {"--compaction-trigger-strategy"},
description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
+ "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
+ "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
+ "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
+ "Default is 'num_commits'")
description = "Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits;\n"
+ "'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;\n"
+ "'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;\n"
+ "'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied.\n"
+ "Default is 'num_commits'")
public String compactionTriggerStrategy = FlinkOptions.NUM_COMMITS;
@Parameter(names = {"--compaction-delta-commits"}, description = "Max delta commits needed to trigger compaction, default 5 commits")
@@ -227,16 +227,16 @@ public class FlinkStreamerConfig extends Configuration {
public Boolean cleanAsyncEnabled = true;
@Parameter(names = {"--clean-retain-commits"},
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
description = "Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10")
public Integer cleanRetainCommits = 10;
@Parameter(names = {"--archive-max-commits"},
description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
description = "Max number of commits to keep before archiving older commits into a sequential log, default 30")
public Integer archiveMaxCommits = 30;
@Parameter(names = {"--archive-min-commits"},
description = "Min number of commits to keep before archiving older commits into a sequential log, default 20")
description = "Min number of commits to keep before archiving older commits into a sequential log, default 20")
public Integer archiveMinCommits = 20;
@Parameter(names = {"--hive-sync-enable"}, description = "Asynchronously sync Hive meta to HMS, default false")
@@ -270,7 +270,7 @@ public class FlinkStreamerConfig extends Configuration {
public String hiveSyncPartitionFields = "";
@Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, "
+ "default 'SlashEncodedDayPartitionValueExtractor'")
+ "default 'SlashEncodedDayPartitionValueExtractor'")
public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
@@ -289,7 +289,7 @@ public class FlinkStreamerConfig extends Configuration {
public Boolean hiveSyncSkipRoSuffix = false;
@Parameter(names = {"--hive-sync-support-timestamp"}, description = "INT64 with original type TIMESTAMP_MICROS is converted to hive timestamp type.\n"
+ "Disabled by default for backward compatibility.")
+ "Disabled by default for backward compatibility.")
public Boolean hiveSyncSupportTimestamp = false;

View File

@@ -105,7 +105,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
/**
* The sanity check.
*
* @param conf The table options
* @param conf The table options
* @param schema The table schema
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
@@ -217,7 +217,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
/**
* Sets up the hive options from the table definition.
* */
*/
private static void setupHiveOptions(Configuration conf) {
if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) {

View File

@@ -77,10 +77,17 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();
DataStream<Object> pipeline;
// Append mode
if (StreamerUtil.allowDuplicateInserts(conf)) {
return Pipelines.append(conf, rowType, dataStream);
}
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded());
// write pipeline
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
// compaction
if (StreamerUtil.needsAsyncCompaction(conf)) {
return Pipelines.compact(conf, pipeline);

View File

@@ -303,7 +303,8 @@ public class HoodieTableSource implements
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
}).collect(Collectors.toList()); })
}).collect(Collectors.toList());
})
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

View File

@@ -283,11 +283,11 @@ public class FilePathUtils {
*
* <p>The return list should be [{key1:val1, key2:val2, key3:val3}, {key1:val4, key2:val5, key3:val6}].
*
* @param path The base path
* @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
* @param path The base path
* @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
* @param defaultParName The default partition name for nulls
* @param hivePartition Whether the partition path is in Hive style
* @param hivePartition Whether the partition path is in Hive style
*/
public static List<Map<String, String>> getPartitions(
Path path,
@@ -338,9 +338,9 @@ public class FilePathUtils {
/**
* Returns all the file paths that is the parents of the data files.
*
* @param path The base path
* @param conf The Flink configuration
* @param hadoopConf The hadoop configuration
* @param path The base path
* @param conf The Flink configuration
* @param hadoopConf The hadoop configuration
* @param partitionKeys The partition key list
*/
public static Path[] getReadPaths(
@@ -362,11 +362,10 @@ public class FilePathUtils {
/**
* Transforms the given partition key value mapping to read paths.
*
* @param path The base path
* @param partitionKeys The partition key list
* @param path The base path
* @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
* @param hivePartition Whether the partition path is in Hive style
*
* @param hivePartition Whether the partition path is in Hive style
* @see #getReadPaths
*/
public static Path[] partitionPath2ReadPath(
@@ -384,10 +383,9 @@ public class FilePathUtils {
/**
* Transforms the given partition key value mapping to relative partition paths.
*
* @param partitionKeys The partition key list
* @param partitionKeys The partition key list
* @param partitionPaths The partition key value mapping
* @param hivePartition Whether the partition path is in Hive style
*
* @param hivePartition Whether the partition path is in Hive style
* @see #getReadPaths
*/
public static Set<String> toRelativePartitionPaths(

View File

@@ -296,7 +296,8 @@ public abstract class AbstractColumnReader<V extends WritableColumnVector>
/**
* After read a page, we may need some initialization.
*/
protected void afterReadPage() {}
protected void afterReadPage() {
}
/**
* Support lazy dictionary ids decode. See more in {@link ParquetDictionary}.

View File

@@ -97,15 +97,15 @@ public class MergeOnReadInputSplit implements InputSplit {
@Override
public String toString() {
return "MergeOnReadInputSplit{"
+ "splitNum=" + splitNum
+ ", basePath=" + basePath
+ ", logPaths=" + logPaths
+ ", latestCommit='" + latestCommit + '\''
+ ", tablePath='" + tablePath + '\''
+ ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes
+ ", mergeType='" + mergeType + '\''
+ ", instantRange=" + instantRange
+ '}';
+ "splitNum=" + splitNum
+ ", basePath=" + basePath
+ ", logPaths=" + logPaths
+ ", latestCommit='" + latestCommit + '\''
+ ", tablePath='" + tablePath + '\''
+ ", maxCompactionMemoryInBytes=" + maxCompactionMemoryInBytes
+ ", mergeType='" + mergeType + '\''
+ ", instantRange=" + instantRange
+ '}';
}
}

View File

@@ -106,7 +106,6 @@ public class MergeOnReadTableState implements Serializable {
*
* @param pkOffsets the pk offsets in required row type
* @return pk field logical types
*
* @see #getPkOffsetsInRequired()
*/
public LogicalType[] getPkTypes(int[] pkOffsets) {

View File

@@ -162,7 +162,7 @@ public class AvroSchemaConverter {
* <p>Use "record" as the type name.
*
* @param schema the schema type, usually it should be the top level record type, e.g. not a
* nested type
* nested type
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(LogicalType schema) {
@@ -176,7 +176,7 @@ public class AvroSchemaConverter {
* schema. Nested record type that only differs with type name is still compatible.
*
* @param logicalType logical type
* @param rowName the record name
* @param rowName the record name
* @return Avro's {@link Schema} matching this logical type.
*/
public static Schema convertToSchema(LogicalType logicalType, String rowName) {
@@ -315,7 +315,9 @@ public class AvroSchemaConverter {
return valueType;
}
/** Returns schema with nullable true. */
/**
* Returns schema with nullable true.
*/
private static Schema nullableSchema(Schema schema) {
return schema.isNullable()
? schema

View File

@@ -181,6 +181,9 @@ public class StreamerUtil {
.withStorageConfig(HoodieStorageConfig.newBuilder()
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
.logFileMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024)
.parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024)
.parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024)
.parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED))