1
0

[HUDI-2209] Bulk insert for flink writer (#3334)

This commit is contained in:
Danny Chan
2021-07-27 10:58:23 +08:00
committed by GitHub
parent 024cf01f02
commit 9d2a65a6a6
26 changed files with 2000 additions and 83 deletions

View File

@@ -344,6 +344,22 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
+ "waits for the instant commit success, only for internal use");
public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION = ConfigOptions
.key("sink.shuffle-by-partition.enable")
.booleanType()
.defaultValue(false)
.withDescription(
"The option to enable shuffle data by dynamic partition fields in sink"
+ " phase, this can greatly reduce the number of file for filesystem sink but may"
+ " lead data skew.");
// this is only for internal use
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_PARTITION_SORTED = ConfigOptions
.key("write.bulk_insert.partition.sorted")
.booleanType()
.defaultValue(false)
.withDescription("Whether the bulk insert write task input records are already sorted by the partition path");
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
@@ -581,7 +597,9 @@ public class FlinkOptions extends HoodieConfig {
return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
/** Creates a new configuration that is initialized with the options of the given map. */
/**
* Creates a new configuration that is initialized with the options of the given map.
*/
public static Configuration fromMap(Map<String, String> map) {
final Configuration configuration = new Configuration();
map.forEach(configuration::setString);

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
@@ -61,7 +62,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -568,24 +568,17 @@ public class StreamWriteFunction<K, I, O>
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
long waitingTime = 0L;
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
long interval = 500L;
TimeWait timeWait = TimeWait.builder()
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize")
.build();
while (confirming) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
try {
if (waitingTime > ckpTimeout) {
throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for instant " + instant + " to commit");
}
TimeUnit.MILLISECONDS.sleep(interval);
waitingTime += interval;
} catch (InterruptedException e) {
throw new HoodieException("Error while waiting for instant " + instant + " to commit", e);
}
timeWait.waitFor();
// refresh the inflight instant
instant = this.writeClient.getLastPendingInstant(this.actionType);
} else {

View File

@@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
/**
* Factory class for {@link StreamWriteOperator}.
@@ -63,9 +62,4 @@ public class StreamWriteOperatorFactory<I>
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
}
@Override
public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
super.setProcessingTimeService(processingTimeService);
}
}

View File

@@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
/**
* Sink function to write the data to the underneath filesystem.
*
* <p>The function should only be used in operation type {@link WriteOperationType#BULK_INSERT}.
*
* <p>Note: The function task requires the input stream be shuffled by partition path.
*
* @param <I> Type of the input record
* @see StreamWriteOperatorCoordinator
*/
public class BulkInsertWriteFunction<I, O>
extends ProcessFunction<I, O> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(BulkInsertWriteFunction.class);
/**
* Helper class for bulk insert mode.
*/
private transient BulkInsertWriterHelper writerHelper;
/**
* Config options.
*/
private final Configuration config;
/**
* Table row type.
*/
private final RowType rowType;
/**
* Id of current subtask.
*/
private int taskID;
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
/**
* The initial inflight instant when start up.
*/
private volatile String initInstant;
/**
* Gateway to send operator events to the operator coordinator.
*/
private transient OperatorEventGateway eventGateway;
/**
* Commit action type.
*/
private transient String actionType;
/**
* Constructs a StreamingSinkFunction.
*
* @param config The config options
*/
public BulkInsertWriteFunction(Configuration config, RowType rowType) {
this.config = config;
this.rowType = rowType;
}
@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.initInstant = this.writeClient.getLastPendingInstant(this.actionType);
sendBootstrapEvent();
initWriterHelper();
}
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
this.writerHelper.write((RowData) value);
}
@Override
public void close() {
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
}
}
/**
* End input action for batch source.
*/
public void endInput() {
final List<WriteStatus> writeStatus;
try {
this.writerHelper.close();
writeStatus = this.writerHelper.getWriteStatuses().stream()
.map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
}
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(this.writerHelper.getInstantTime())
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(true)
.build();
this.eventGateway.sendEventToCoordinator(event);
}
/**
* Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}.
*/
private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) {
WriteStatus writeStatus = new WriteStatus(false, 0.1);
writeStatus.setStat(internalWriteStatus.getStat());
writeStatus.setFileId(internalWriteStatus.getFileId());
writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
return writeStatus;
}
// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
this.eventGateway = operatorEventGateway;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private void initWriterHelper() {
String instant = instantToWrite();
this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
}
private void sendBootstrapEvent() {
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.writeStatus(Collections.emptyList())
.instantTime("")
.bootstrap(true)
.build();
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}
private String instantToWrite() {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
TimeWait timeWait = TimeWait.builder()
.timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
.action("instant initialize")
.build();
while (instant == null || instant.equals(this.initInstant)) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change
// sleep for a while
timeWait.waitFor();
// refresh the inflight instant
instant = this.writeClient.getLastPendingInstant(this.actionType);
}
return instant;
}
}

View File

@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bulk;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
/**
* Operator for bulk insert mode sink.
*
* @param <I> The input type
*/
public class BulkInsertWriteOperator<I>
extends ProcessOperator<I, Object>
implements OperatorEventHandler, BoundedOneInput {
private final BulkInsertWriteFunction<I, Object> sinkFunction;
public BulkInsertWriteOperator(Configuration conf, RowType rowType) {
super(new BulkInsertWriteFunction<>(conf, rowType));
this.sinkFunction = (BulkInsertWriteFunction<I, Object>) getUserFunction();
}
@Override
public void handleOperatorEvent(OperatorEvent event) {
// no operation
}
void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
sinkFunction.setOperatorEventGateway(operatorEventGateway);
}
@Override
public void endInput() {
sinkFunction.endInput();
}
public static OperatorFactory<RowData> getFactory(Configuration conf, RowType rowType) {
return new OperatorFactory<>(conf, rowType);
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
public static class OperatorFactory<I>
extends SimpleUdfStreamOperatorFactory<Object>
implements CoordinatedOperatorFactory<Object>, OneInputStreamOperatorFactory<I, Object> {
private static final long serialVersionUID = 1L;
private final BulkInsertWriteOperator<I> operator;
private final Configuration conf;
public OperatorFactory(Configuration conf, RowType rowType) {
super(new BulkInsertWriteOperator<>(conf, rowType));
this.operator = (BulkInsertWriteOperator<I>) getOperator();
this.conf = conf;
}
@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorParameters<Object> parameters) {
final OperatorID operatorID = parameters.getStreamConfig().getOperatorID();
final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher();
this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID));
this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
this.operator.setProcessingTimeService(this.processingTimeService);
eventDispatcher.registerEventHandler(operatorID, operator);
return (T) operator;
}
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
}
@Override
public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
super.setProcessingTimeService(processingTimeService);
}
}
}

View File

@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bulk;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* Helper class for bulk insert used by Flink.
*/
public class BulkInsertWriterHelper {
private static final Logger LOG = LogManager.getLogger(BulkInsertWriterHelper.class);
private final String instantTime;
private final int taskPartitionId;
private final long taskId;
private final long taskEpochId;
private final HoodieTable hoodieTable;
private final HoodieWriteConfig writeConfig;
private final RowType rowType;
private final Boolean arePartitionRecordsSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowDataCreateHandle handle;
private String lastKnownPartitionPath = null;
private final String fileIdPrefix;
private int numFilesWritten = 0;
private final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
private final RowDataKeyGen keyGen;
public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
this.hoodieTable = hoodieTable;
this.writeConfig = writeConfig;
this.instantTime = instantTime;
this.taskPartitionId = taskPartitionId;
this.taskId = taskId;
this.taskEpochId = taskEpochId;
this.rowType = addMetadataFields(rowType); // patch up with metadata fields
this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED);
this.fileIdPrefix = UUID.randomUUID().toString();
this.keyGen = RowDataKeyGen.instance(conf, rowType);
}
/**
* Returns the write instant time.
*/
public String getInstantTime() {
return this.instantTime;
}
public void write(RowData record) throws IOException {
try {
String recordKey = keyGen.getRecordKey(record);
String partitionPath = keyGen.getPartitionPath(record);
if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
LOG.info("Creating new file for partition path " + partitionPath);
handle = getRowCreateHandle(partitionPath);
lastKnownPartitionPath = partitionPath;
}
handle.write(recordKey, partitionPath, record);
} catch (Throwable t) {
LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t);
throw t;
}
}
public List<HoodieInternalWriteStatus> getWriteStatuses() throws IOException {
close();
return writeStatusList;
}
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path
// if records are sorted, we can close all existing handles
if (arePartitionRecordsSorted) {
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, rowType);
handles.put(partitionPath, rowCreateHandle);
} else if (!handles.get(partitionPath).canWrite()) {
// even if there is a handle to the partition path, it could have reached its max size threshold. So, we close the handle here and
// create a new one.
writeStatusList.add(handles.remove(partitionPath).close());
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, rowType);
handles.put(partitionPath, rowCreateHandle);
}
return handles.get(partitionPath);
}
public void close() throws IOException {
for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) {
writeStatusList.add(rowCreateHandle.close());
}
handles.clear();
handle = null;
}
private String getNextFileId() {
return String.format("%s-%d", fileIdPrefix, numFilesWritten++);
}
/**
* Adds the Hoodie metadata fields to the given row type.
*/
private static RowType addMetadataFields(RowType rowType) {
List<RowType.RowField> mergedFields = new ArrayList<>();
LogicalType metadataFieldType = DataTypes.STRING().getLogicalType();
RowType.RowField commitTimeField =
new RowType.RowField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, metadataFieldType, "commit time");
RowType.RowField commitSeqnoField =
new RowType.RowField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, metadataFieldType, "commit seqno");
RowType.RowField recordKeyField =
new RowType.RowField(HoodieRecord.RECORD_KEY_METADATA_FIELD, metadataFieldType, "record key");
RowType.RowField partitionPathField =
new RowType.RowField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, metadataFieldType, "partition path");
RowType.RowField fileNameField =
new RowType.RowField(HoodieRecord.FILENAME_METADATA_FIELD, metadataFieldType, "field name");
mergedFields.add(commitTimeField);
mergedFields.add(commitSeqnoField);
mergedFields.add(recordKeyField);
mergedFields.add(partitionPathField);
mergedFields.add(fileNameField);
mergedFields.addAll(rowType.getFields());
return new RowType(false, mergedFields);
}
}

View File

@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bulk;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.util.RowDataProjection;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Arrays;
import java.util.List;
/**
* Key generator for {@link RowData}.
*/
public class RowDataKeyGen {
// reference: NonpartitionedAvroKeyGenerator
private static final String EMPTY_PARTITION = "";
// reference: org.apache.hudi.keygen.KeyGenUtils
private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
private static final String DEFAULT_PARTITION_PATH = "default";
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
private final String[] recordKeyFields;
private final String[] partitionPathFields;
private final RowDataProjection recordKeyProjection;
private final RowDataProjection partitionPathProjection;
private final boolean hiveStylePartitioning;
private final boolean encodePartitionPath;
// efficient code path
private boolean simpleRecordKey = false;
private RowData.FieldGetter recordKeyFieldGetter;
private boolean simplePartitionPath = false;
private RowData.FieldGetter partitionPathFieldGetter;
private boolean nonPartitioned;
private RowDataKeyGen(
String recordKeys,
String partitionFields,
RowType rowType,
boolean hiveStylePartitioning,
boolean encodePartitionPath) {
this.recordKeyFields = recordKeys.split(",");
this.partitionPathFields = partitionFields.split(",");
List<String> fieldNames = rowType.getFieldNames();
List<LogicalType> fieldTypes = rowType.getChildren();
this.hiveStylePartitioning = hiveStylePartitioning;
this.encodePartitionPath = encodePartitionPath;
if (this.recordKeyFields.length == 1) {
// efficient code path
this.simpleRecordKey = true;
int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]);
this.recordKeyFieldGetter = RowData.createFieldGetter(fieldTypes.get(recordKeyIdx), recordKeyIdx);
this.recordKeyProjection = null;
} else {
this.recordKeyProjection = getProjection(this.recordKeyFields, fieldNames, fieldTypes);
}
if (this.partitionPathFields.length == 1) {
// efficient code path
if (this.partitionPathFields[0].equals("")) {
this.nonPartitioned = true;
} else {
this.simplePartitionPath = true;
int partitionPathIdx = fieldNames.indexOf(this.partitionPathFields[0]);
this.partitionPathFieldGetter = RowData.createFieldGetter(fieldTypes.get(partitionPathIdx), partitionPathIdx);
}
this.partitionPathProjection = null;
} else {
this.partitionPathProjection = getProjection(this.partitionPathFields, fieldNames, fieldTypes);
}
}
public static RowDataKeyGen instance(Configuration conf, RowType rowType) {
return new RowDataKeyGen(conf.getString(FlinkOptions.RECORD_KEY_FIELD), conf.getString(FlinkOptions.PARTITION_PATH_FIELD),
rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING));
}
public String getRecordKey(RowData rowData) {
if (this.simpleRecordKey) {
return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), this.recordKeyFields[0]);
} else {
Object[] keyValues = this.recordKeyProjection.projectAsValues(rowData);
return getRecordKey(keyValues, this.recordKeyFields);
}
}
public String getPartitionPath(RowData rowData) {
if (this.simplePartitionPath) {
return getPartitionPath(partitionPathFieldGetter.getFieldOrNull(rowData),
this.partitionPathFields[0], this.hiveStylePartitioning, this.encodePartitionPath);
} else if (this.nonPartitioned) {
return EMPTY_PARTITION;
} else {
Object[] partValues = this.partitionPathProjection.projectAsValues(rowData);
return getRecordPartitionPath(partValues, this.partitionPathFields, this.hiveStylePartitioning, this.encodePartitionPath);
}
}
// reference: org.apache.hudi.keygen.KeyGenUtils.getRecordPartitionPath
private static String getRecordKey(Object[] keyValues, String[] keyFields) {
boolean keyIsNullEmpty = true;
StringBuilder recordKey = new StringBuilder();
for (int i = 0; i < keyValues.length; i++) {
String recordKeyField = keyFields[i];
String recordKeyValue = StringUtils.objToString(keyValues[i]);
if (recordKeyValue == null) {
recordKey.append(recordKeyField).append(":").append(NULL_RECORDKEY_PLACEHOLDER).append(",");
} else if (recordKeyValue.isEmpty()) {
recordKey.append(recordKeyField).append(":").append(EMPTY_RECORDKEY_PLACEHOLDER).append(",");
} else {
recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(",");
keyIsNullEmpty = false;
}
}
recordKey.deleteCharAt(recordKey.length() - 1);
if (keyIsNullEmpty) {
throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: "
+ Arrays.toString(keyFields) + " cannot be entirely null or empty.");
}
return recordKey.toString();
}
// reference: org.apache.hudi.keygen.KeyGenUtils.getRecordPartitionPath
private static String getRecordPartitionPath(
Object[] partValues,
String[] partFields,
boolean hiveStylePartitioning,
boolean encodePartitionPath) {
StringBuilder partitionPath = new StringBuilder();
for (int i = 0; i < partFields.length; i++) {
String partField = partFields[i];
String partValue = StringUtils.objToString(partValues[i]);
if (partValue == null || partValue.isEmpty()) {
partitionPath.append(hiveStylePartitioning ? partField + "=" + DEFAULT_PARTITION_PATH
: DEFAULT_PARTITION_PATH);
} else {
if (encodePartitionPath) {
partValue = PartitionPathEncodeUtils.escapePathName(partValue);
}
partitionPath.append(hiveStylePartitioning ? partField + "=" + partValue : partValue);
}
partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}
// reference: org.apache.hudi.keygen.KeyGenUtils.getRecordKey
public static String getRecordKey(Object recordKeyValue, String recordKeyField) {
String recordKey = StringUtils.objToString(recordKeyValue);
if (recordKey == null || recordKey.isEmpty()) {
throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
}
return recordKey;
}
// reference: org.apache.hudi.keygen.KeyGenUtils.getPartitionPath
public static String getPartitionPath(
Object partValue,
String partField,
boolean hiveStylePartitioning,
boolean encodePartitionPath) {
String partitionPath = StringUtils.objToString(partValue);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = DEFAULT_PARTITION_PATH;
}
if (encodePartitionPath) {
partitionPath = PartitionPathEncodeUtils.escapePathName(partitionPath);
}
if (hiveStylePartitioning) {
partitionPath = partField + "=" + partitionPath;
}
return partitionPath;
}
/**
* Returns the row data projection for the given field names and table schema.
*
* @param fields The projected field names
* @param schemaFields The table schema names
* @param schemaTypes The table schema types
* @return the row data projection for the fields
*/
private static RowDataProjection getProjection(String[] fields, List<String> schemaFields, List<LogicalType> schemaTypes) {
int[] positions = getFieldPositions(fields, schemaFields);
LogicalType[] types = Arrays.stream(positions).mapToObj(schemaTypes::get).toArray(LogicalType[]::new);
return RowDataProjection.instance(types, positions);
}
/**
* Returns the field positions of the given fields {@code fields} among all the fields {@code allFields}.
*/
private static int[] getFieldPositions(String[] fields, List<String> allFields) {
return Arrays.stream(fields).mapToInt(allFields::indexOf).toArray();
}
}

View File

@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.utils;
import org.apache.hudi.exception.HoodieException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Tool used for time waiting.
*/
public class TimeWait {
private final long timeout; // timeout in SECONDS
private final long interval; // interval in MILLISECONDS
private final String action; // action to report error message
private long waitingTime = 0L;
private TimeWait(long timeout, long interval, String action) {
this.timeout = timeout;
this.interval = interval;
this.action = action;
}
public static Builder builder() {
return new Builder();
}
/**
* Wait for an interval time.
*/
public void waitFor() {
try {
if (waitingTime > timeout) {
throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for " + action);
}
TimeUnit.MILLISECONDS.sleep(interval);
waitingTime += interval;
} catch (InterruptedException e) {
throw new HoodieException("Error while waiting for " + action, e);
}
}
/**
* Builder.
*/
public static class Builder {
private long timeout;
private long interval;
private String action;
public Builder() {
this.timeout = 3600;
this.interval = 500;
}
public Builder timeout(long timeout) {
this.timeout = timeout;
return this;
}
public Builder interval(long interval) {
this.interval = interval;
return this;
}
public Builder action(String action) {
this.action = action;
return this;
}
public TimeWait build() {
Objects.requireNonNull(this.action);
return new TimeWait(this.timeout, this.interval, this.action);
}
}
}

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperatorFactory;
import org.apache.hudi.sink.bootstrap.BootstrapFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.compact.CompactFunction;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
@@ -45,6 +46,7 @@ import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
@@ -58,29 +60,57 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
private final Configuration conf;
private final TableSchema schema;
private boolean overwrite = false;
private boolean supportsGrouping = false;
public HoodieTableSink(Configuration conf, TableSchema schema) {
this.conf = conf;
this.schema = schema;
}
public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite, boolean supportsGrouping) {
this.conf = conf;
this.schema = schema;
this.overwrite = overwrite;
this.supportsGrouping = supportsGrouping;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProvider) dataStream -> {
// Read from kafka source
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
// setup configuration
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
int parallelism = dataStream.getExecutionConfig().getParallelism();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
// bulk_insert mode
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
this.conf.set(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED, this.supportsGrouping);
BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
return dataStream.transform("hoodie_bulk_insert_write",
TypeInformation.of(Object.class),
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
.setParallelism(dataStream.getParallelism())
.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
}
// stream write
int parallelism = dataStream.getExecutionConfig().getParallelism();
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
DataStream<HoodieRecord> hoodieDataStream = dataStream
DataStream<HoodieRecord> dataStream1 = dataStream
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class));
// bootstrap index
// TODO: This is a very time-consuming operation, will optimization
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance()
dataStream1 = dataStream1.rebalance()
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
@@ -89,7 +119,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
DataStream<Object> pipeline = hoodieDataStream
DataStream<Object> pipeline = dataStream1
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
@@ -103,6 +133,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
// compaction
if (StreamerUtil.needsAsyncCompaction(conf)) {
return pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
@@ -141,7 +172,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
@Override
public DynamicTableSink copy() {
return new HoodieTableSink(this.conf, this.schema);
return new HoodieTableSink(this.conf, this.schema, this.overwrite, this.supportsGrouping);
}
@Override
@@ -167,4 +198,10 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
public void applyOverwrite(boolean b) {
this.overwrite = b;
}
@Override
public boolean requiresPartitionGrouping(boolean supportsGrouping) {
this.supportsGrouping = supportsGrouping;
return supportsGrouping;
}
}

View File

@@ -47,6 +47,10 @@ public class RowDataProjection {
return new RowDataProjection(types, positions);
}
public static RowDataProjection instance(LogicalType[] types, int[] positions) {
return new RowDataProjection(types, positions);
}
/**
* Returns the projected row data.
*/
@@ -58,4 +62,16 @@ public class RowDataProjection {
}
return genericRowData;
}
/**
* Returns the projected values array.
*/
public Object[] projectAsValues(RowData rowData) {
Object[] values = new Object[this.fieldGetters.length];
for (int i = 0; i < this.fieldGetters.length; i++) {
final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
values[i] = val;
}
return values;
}
}