[HUDI-1557] Make Flink write pipeline write task scalable (#2506)
This is the #step 2 of RFC-24: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal This PR introduce a BucketAssigner that assigns bucket ID (partition path & fileID) for each stream record. There is no need to look up index and partition the records anymore in the following pipeline for these records, we actually decide the write target location before the write and each record computes its location when the BucketAssigner receives it, thus, the indexing is with streaming style. Computing locations for a batch of records all at a time is resource consuming so a pressure to the engine, we should avoid that in streaming system.
This commit is contained in:
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
|
||||
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
|
||||
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
|
||||
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@@ -115,7 +114,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
|
||||
writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
|
||||
|
||||
// init table, create it if not exists.
|
||||
initTable();
|
||||
StreamerUtil.initTableIfNotExists(FlinkOptions.fromStreamerConfig(cfg));
|
||||
|
||||
// create instant marker directory
|
||||
createInstantMarkerDir();
|
||||
@@ -189,6 +188,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
|
||||
*/
|
||||
private String startNewInstant(long checkpointId) {
|
||||
String newTime = writeClient.startCommit();
|
||||
this.writeClient.transitionRequestedToInflight(this.cfg.tableType, newTime);
|
||||
LOG.info("create instant [{}], at checkpoint [{}]", newTime, checkpointId);
|
||||
return newTime;
|
||||
}
|
||||
@@ -218,20 +218,6 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
|
||||
throw new InterruptedException(String.format("Last instant costs more than %s second, stop task now", retryTimes * retryInterval));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create table if not exists.
|
||||
*/
|
||||
private void initTable() throws IOException {
|
||||
if (!fs.exists(new Path(cfg.targetBasePath))) {
|
||||
HoodieTableMetaClient.initTableType(new Configuration(serializableHadoopConf.get()), cfg.targetBasePath,
|
||||
HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived", cfg.payloadClassName, 1);
|
||||
LOG.info("Table initialized");
|
||||
} else {
|
||||
LOG.info("Table already [{}/{}] exists, do nothing here", cfg.targetBasePath, cfg.targetTableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
if (writeClient != null) {
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.operator;
|
||||
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.streamer.FlinkStreamerConfig;
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
@@ -28,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.exception.HoodieFlinkStreamerException;
|
||||
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
@@ -40,8 +42,10 @@ import org.apache.flink.util.Collector;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A {@link KeyedProcessFunction} where the write operations really happens.
|
||||
@@ -52,7 +56,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
|
||||
/**
|
||||
* Records buffer, will be processed in snapshotState function.
|
||||
*/
|
||||
private List<HoodieRecord> bufferedRecords = new LinkedList<>();
|
||||
private Map<String, List<HoodieRecord>> bufferedRecords;
|
||||
|
||||
/**
|
||||
* Flink collector help s to send data downstream.
|
||||
@@ -88,6 +92,8 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
|
||||
this.bufferedRecords = new LinkedHashMap<>();
|
||||
|
||||
indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
|
||||
|
||||
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
|
||||
@@ -112,17 +118,24 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
|
||||
String instantTimestamp = latestInstant;
|
||||
LOG.info("Write records, subtask id = [{}] checkpoint_id = [{}}] instant = [{}], record size = [{}]", indexOfThisSubtask, context.getCheckpointId(), instantTimestamp, bufferedRecords.size());
|
||||
|
||||
List<WriteStatus> writeStatus;
|
||||
switch (cfg.operation) {
|
||||
case INSERT:
|
||||
writeStatus = writeClient.insert(bufferedRecords, instantTimestamp);
|
||||
break;
|
||||
case UPSERT:
|
||||
writeStatus = writeClient.upsert(bufferedRecords, instantTimestamp);
|
||||
break;
|
||||
default:
|
||||
throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation);
|
||||
}
|
||||
final List<WriteStatus> writeStatus = new ArrayList<>();
|
||||
this.bufferedRecords.values().forEach(records -> {
|
||||
if (records.size() > 0) {
|
||||
if (cfg.filterDupes) {
|
||||
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
|
||||
}
|
||||
switch (cfg.operation) {
|
||||
case INSERT:
|
||||
writeStatus.addAll(writeClient.insert(records, instantTimestamp));
|
||||
break;
|
||||
case UPSERT:
|
||||
writeStatus.addAll(writeClient.upsert(records, instantTimestamp));
|
||||
break;
|
||||
default:
|
||||
throw new HoodieFlinkStreamerException("Unknown operation : " + cfg.operation);
|
||||
}
|
||||
}
|
||||
});
|
||||
output.collect(new Tuple3<>(instantTimestamp, writeStatus, indexOfThisSubtask));
|
||||
bufferedRecords.clear();
|
||||
}
|
||||
@@ -144,7 +157,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
|
||||
}
|
||||
|
||||
// buffer the records
|
||||
bufferedRecords.add(hoodieRecord);
|
||||
putDataIntoBuffer(hoodieRecord);
|
||||
}
|
||||
|
||||
public boolean hasRecordsIn() {
|
||||
@@ -155,6 +168,15 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
|
||||
return latestInstant;
|
||||
}
|
||||
|
||||
private void putDataIntoBuffer(HoodieRecord<?> record) {
|
||||
final String fileId = record.getCurrentLocation().getFileId();
|
||||
final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
|
||||
if (!this.bufferedRecords.containsKey(key)) {
|
||||
this.bufferedRecords.put(key, new ArrayList<>());
|
||||
}
|
||||
this.bufferedRecords.get(key).add(record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (writeClient != null) {
|
||||
|
||||
@@ -18,22 +18,18 @@
|
||||
|
||||
package org.apache.hudi.operator;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.keygen.KeyGenerator;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
|
||||
import org.apache.hudi.util.RowDataToAvroConverters;
|
||||
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
|
||||
@@ -41,7 +37,6 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
import org.apache.flink.util.Collector;
|
||||
import org.apache.flink.util.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
@@ -50,7 +45,9 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.BiFunction;
|
||||
@@ -96,7 +93,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
/**
|
||||
* Write buffer for a checkpoint.
|
||||
*/
|
||||
private transient List<HoodieRecord> buffer;
|
||||
private transient Map<String, List<HoodieRecord>> buffer;
|
||||
|
||||
/**
|
||||
* The buffer lock to control data buffering/flushing.
|
||||
@@ -130,23 +127,6 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
|
||||
private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction;
|
||||
|
||||
/**
|
||||
* HoodieKey generator.
|
||||
*/
|
||||
private transient KeyGenerator keyGenerator;
|
||||
|
||||
/**
|
||||
* Row type of the input.
|
||||
*/
|
||||
private final RowType rowType;
|
||||
|
||||
/**
|
||||
* Avro schema of the input.
|
||||
*/
|
||||
private final Schema avroSchema;
|
||||
|
||||
private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
|
||||
|
||||
/**
|
||||
* The REQUESTED instant we write the data.
|
||||
*/
|
||||
@@ -160,20 +140,15 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
/**
|
||||
* Constructs a StreamingSinkFunction.
|
||||
*
|
||||
* @param rowType The input row type
|
||||
* @param config The config options
|
||||
*/
|
||||
public StreamWriteFunction(RowType rowType, Configuration config) {
|
||||
this.rowType = rowType;
|
||||
this.avroSchema = StreamerUtil.getSourceSchema(config);
|
||||
public StreamWriteFunction(Configuration config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws IOException {
|
||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||
this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
|
||||
this.converter = RowDataToAvroConverters.createConverter(this.rowType);
|
||||
initBuffer();
|
||||
initWriteClient();
|
||||
initWriteFunction();
|
||||
@@ -211,7 +186,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
if (onCheckpointing) {
|
||||
addToBufferCondition.await();
|
||||
}
|
||||
this.buffer.add(toHoodieRecord(value));
|
||||
putDataIntoBuffer(value);
|
||||
} finally {
|
||||
bufferLock.unlock();
|
||||
}
|
||||
@@ -230,7 +205,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
|
||||
@VisibleForTesting
|
||||
@SuppressWarnings("rawtypes")
|
||||
public List<HoodieRecord> getBuffer() {
|
||||
public Map<String, List<HoodieRecord>> getBuffer() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@@ -249,7 +224,7 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private void initBuffer() {
|
||||
this.buffer = new ArrayList<>();
|
||||
this.buffer = new LinkedHashMap<>();
|
||||
this.bufferLock = new ReentrantLock();
|
||||
this.addToBufferCondition = this.bufferLock.newCondition();
|
||||
}
|
||||
@@ -277,32 +252,33 @@ public class StreamWriteFunction<K, I, O> extends KeyedProcessFunction<K, I, O>
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the give record to a {@link HoodieRecord}.
|
||||
*
|
||||
* @param record The input record
|
||||
* @return HoodieRecord based on the configuration
|
||||
* @throws IOException if error occurs
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
private HoodieRecord toHoodieRecord(I record) throws IOException {
|
||||
boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
|
||||
|| WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
|
||||
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
|
||||
final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
|
||||
Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
|
||||
this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
|
||||
HoodieRecordPayload payload = shouldCombine
|
||||
? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
|
||||
: StreamerUtil.createPayload(payloadClazz, gr);
|
||||
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
||||
private void putDataIntoBuffer(I value) {
|
||||
HoodieRecord<?> record = (HoodieRecord<?>) value;
|
||||
final String fileId = record.getCurrentLocation().getFileId();
|
||||
final String key = StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId);
|
||||
if (!this.buffer.containsKey(key)) {
|
||||
this.buffer.put(key, new ArrayList<>());
|
||||
}
|
||||
this.buffer.get(key).add(record);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked, rawtypes")
|
||||
private void flushBuffer() {
|
||||
final List<WriteStatus> writeStatus;
|
||||
if (buffer.size() > 0) {
|
||||
writeStatus = writeFunction.apply(buffer, currentInstant);
|
||||
buffer.clear();
|
||||
writeStatus = new ArrayList<>();
|
||||
this.buffer.values()
|
||||
// The records are partitioned by the bucket ID and each batch sent to
|
||||
// the writer belongs to one bucket.
|
||||
.forEach(records -> {
|
||||
if (records.size() > 0) {
|
||||
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
|
||||
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
|
||||
}
|
||||
writeStatus.addAll(writeFunction.apply(records, currentInstant));
|
||||
}
|
||||
});
|
||||
this.buffer.clear();
|
||||
} else {
|
||||
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
|
||||
writeStatus = Collections.emptyList();
|
||||
|
||||
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
|
||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||
import org.apache.flink.streaming.api.operators.StreamSink;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
/**
|
||||
* Operator for {@link StreamSink}.
|
||||
@@ -36,8 +35,8 @@ public class StreamWriteOperator<I>
|
||||
implements OperatorEventHandler {
|
||||
private final StreamWriteFunction<Object, I, Object> sinkFunction;
|
||||
|
||||
public StreamWriteOperator(RowType rowType, Configuration conf) {
|
||||
super(new StreamWriteFunction<>(rowType, conf));
|
||||
public StreamWriteOperator(Configuration conf) {
|
||||
super(new StreamWriteFunction<>(conf));
|
||||
this.sinkFunction = (StreamWriteFunction<Object, I, Object>) getUserFunction();
|
||||
}
|
||||
|
||||
|
||||
@@ -22,9 +22,6 @@ import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.HoodieFlinkWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.operator.event.BatchWriteSuccessEvent;
|
||||
@@ -38,8 +35,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
|
||||
import org.apache.flink.util.Preconditions;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -59,6 +54,8 @@ import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
|
||||
|
||||
/**
|
||||
* {@link OperatorCoordinator} for {@link StreamWriteFunction}.
|
||||
*
|
||||
@@ -121,7 +118,7 @@ public class StreamWriteOperatorCoordinator
|
||||
// writeClient
|
||||
initWriteClient();
|
||||
// init table, create it if not exists.
|
||||
initTable();
|
||||
initTableIfNotExists(this.conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -139,6 +136,7 @@ public class StreamWriteOperatorCoordinator
|
||||
+ " data has not finish writing, roll back the last write and throw";
|
||||
checkAndForceCommit(errMsg);
|
||||
this.inFlightInstant = this.writeClient.startCommit();
|
||||
this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant);
|
||||
this.inFlightCheckpoint = checkpointId;
|
||||
LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId);
|
||||
result.complete(writeCheckpointBytes());
|
||||
@@ -200,28 +198,6 @@ public class StreamWriteOperatorCoordinator
|
||||
true);
|
||||
}
|
||||
|
||||
private void initTable() throws IOException {
|
||||
final String basePath = this.conf.getString(FlinkOptions.PATH);
|
||||
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
|
||||
// Hadoop FileSystem
|
||||
try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
|
||||
if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
|
||||
HoodieTableMetaClient.initTableType(
|
||||
hadoopConf,
|
||||
basePath,
|
||||
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)),
|
||||
this.conf.getString(FlinkOptions.TABLE_NAME),
|
||||
"archived",
|
||||
this.conf.getString(FlinkOptions.PAYLOAD_CLASS),
|
||||
1);
|
||||
LOG.info("Table initialized");
|
||||
} else {
|
||||
LOG.info("Table [{}/{}] already exists, no need to initialize the table",
|
||||
basePath, this.conf.getString(FlinkOptions.TABLE_NAME));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static byte[] readBytes(DataInputStream in, int size) throws IOException {
|
||||
byte[] bytes = new byte[size];
|
||||
in.readFully(bytes);
|
||||
|
||||
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
|
||||
import org.apache.flink.streaming.api.operators.StreamOperator;
|
||||
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
|
||||
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
/**
|
||||
* Factory class for {@link StreamWriteOperator}.
|
||||
@@ -43,10 +42,9 @@ public class StreamWriteOperatorFactory<I>
|
||||
private final int numTasks;
|
||||
|
||||
public StreamWriteOperatorFactory(
|
||||
RowType rowType,
|
||||
Configuration conf,
|
||||
int numTasks) {
|
||||
super(new StreamWriteOperator<>(rowType, conf));
|
||||
super(new StreamWriteOperator<>(conf));
|
||||
this.operator = (StreamWriteOperator<I>) getOperator();
|
||||
this.conf = conf;
|
||||
this.numTasks = numTasks;
|
||||
|
||||
@@ -0,0 +1,149 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.operator.partitioner;
|
||||
|
||||
import org.apache.hudi.client.FlinkTaskContextSupplier;
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.table.action.commit.BucketInfo;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.api.common.state.MapState;
|
||||
import org.apache.flink.api.common.state.MapStateDescriptor;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.state.CheckpointListener;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
import org.apache.flink.runtime.state.FunctionSnapshotContext;
|
||||
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
|
||||
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
/**
|
||||
* The function to build the write profile incrementally for records within a checkpoint,
|
||||
* it then assigns the bucket with ID using the {@link BucketAssigner}.
|
||||
*
|
||||
* <p>All the records are tagged with HoodieRecordLocation, instead of real instant time,
|
||||
* INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep
|
||||
* the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides
|
||||
* where the record should write to. The "I" and "U" tag is only used for downstream to decide whether
|
||||
* the data bucket is a INSERT or a UPSERT, we should factor the it out when the underneath writer
|
||||
* supports specifying the bucket type explicitly.
|
||||
*
|
||||
* <p>The output records should then shuffle by the bucket ID and thus do scalable write.
|
||||
*
|
||||
* @see BucketAssigner
|
||||
*/
|
||||
public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
||||
extends KeyedProcessFunction<K, I, O>
|
||||
implements CheckpointedFunction, CheckpointListener {
|
||||
|
||||
private MapState<HoodieKey, HoodieRecordLocation> indexState;
|
||||
|
||||
private BucketAssigner bucketAssigner;
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
private final boolean isChangingRecords;
|
||||
|
||||
public BucketAssignFunction(Configuration conf) {
|
||||
this.conf = conf;
|
||||
this.isChangingRecords = WriteOperationType.isChangingRecords(
|
||||
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
|
||||
HoodieFlinkEngineContext context =
|
||||
new HoodieFlinkEngineContext(
|
||||
new SerializableConfiguration(StreamerUtil.getHadoopConf()),
|
||||
new FlinkTaskContextSupplier(getRuntimeContext()));
|
||||
this.bucketAssigner = new BucketAssigner(
|
||||
context,
|
||||
writeConfig);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void snapshotState(FunctionSnapshotContext context) {
|
||||
this.bucketAssigner.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initializeState(FunctionInitializationContext context) {
|
||||
MapStateDescriptor<HoodieKey, HoodieRecordLocation> indexStateDesc =
|
||||
new MapStateDescriptor<>(
|
||||
"indexState",
|
||||
TypeInformation.of(HoodieKey.class),
|
||||
TypeInformation.of(HoodieRecordLocation.class));
|
||||
indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
|
||||
// 1. put the record into the BucketAssigner;
|
||||
// 2. look up the state for location, if the record has a location, just send it out;
|
||||
// 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
|
||||
HoodieRecord<?> record = (HoodieRecord<?>) value;
|
||||
final HoodieKey hoodieKey = record.getKey();
|
||||
final BucketInfo bucketInfo;
|
||||
final HoodieRecordLocation location;
|
||||
// Only changing records need looking up the index for the location,
|
||||
// append only records are always recognized as INSERT.
|
||||
if (isChangingRecords && this.indexState.contains(hoodieKey)) {
|
||||
// Set up the instant time as "U" to mark the bucket as an update bucket.
|
||||
location = new HoodieRecordLocation("U", this.indexState.get(hoodieKey).getFileId());
|
||||
this.bucketAssigner.addUpdate(record.getPartitionPath(), location.getFileId());
|
||||
} else {
|
||||
bucketInfo = this.bucketAssigner.addInsert(hoodieKey.getPartitionPath());
|
||||
switch (bucketInfo.getBucketType()) {
|
||||
case INSERT:
|
||||
// This is an insert bucket, use HoodieRecordLocation instant time as "I".
|
||||
// Downstream operators can then check the instant time to know whether
|
||||
// a record belongs to an insert bucket.
|
||||
location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix());
|
||||
break;
|
||||
case UPDATE:
|
||||
location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix());
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
this.indexState.put(hoodieKey, location);
|
||||
}
|
||||
record.unseal();
|
||||
record.setCurrentLocation(location);
|
||||
record.seal();
|
||||
out.collect((O) record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyCheckpointComplete(long l) {
|
||||
// Refresh the table state when there are new commits.
|
||||
this.bucketAssigner.refreshTable();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,326 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.operator.partitioner;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.commit.BucketInfo;
|
||||
import org.apache.hudi.table.action.commit.BucketType;
|
||||
import org.apache.hudi.table.action.commit.SmallFile;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.util.Preconditions;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Bucket assigner that assigns the data buffer of one checkpoint into buckets.
|
||||
*
|
||||
* <p>This assigner assigns the record one by one.
|
||||
* If the record is an update, checks and reuse existing UPDATE bucket or generates a new one;
|
||||
* If the record is an insert, checks the record partition for small files first, try to find a small file
|
||||
* that has space to append new records and reuse the small file's data bucket, if
|
||||
* there is no small file(or no left space for new records), generates an INSERT bucket.
|
||||
*
|
||||
* <p>Use {partition}_{fileId} as the bucket identifier, so that the bucket is unique
|
||||
* within and among partitions.
|
||||
*/
|
||||
public class BucketAssigner {
|
||||
private static final Logger LOG = LogManager.getLogger(BucketAssigner.class);
|
||||
|
||||
/**
|
||||
* Remembers what type each bucket is for later.
|
||||
*/
|
||||
private final HashMap<String, BucketInfo> bucketInfoMap;
|
||||
|
||||
private HoodieTable<?, ?, ?, ?> table;
|
||||
|
||||
/**
|
||||
* Fink engine context.
|
||||
*/
|
||||
private final HoodieFlinkEngineContext context;
|
||||
|
||||
/**
|
||||
* The write config.
|
||||
*/
|
||||
private final HoodieWriteConfig config;
|
||||
|
||||
/**
|
||||
* The average record size.
|
||||
*/
|
||||
private final long averageRecordSize;
|
||||
|
||||
/**
|
||||
* Total records to write for each bucket based on
|
||||
* the config option {@link org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}.
|
||||
*/
|
||||
private final long insertRecordsPerBucket;
|
||||
|
||||
/**
|
||||
* Partition path to small files mapping.
|
||||
*/
|
||||
private final Map<String, List<SmallFile>> partitionSmallFilesMap;
|
||||
|
||||
/**
|
||||
* Bucket ID(partition + fileId) -> small file assign state.
|
||||
*/
|
||||
private final Map<String, SmallFileAssignState> smallFileAssignStates;
|
||||
|
||||
/**
|
||||
* Bucket ID(partition + fileId) -> new file assign state.
|
||||
*/
|
||||
private final Map<String, NewFileAssignState> newFileAssignStates;
|
||||
|
||||
public BucketAssigner(
|
||||
HoodieFlinkEngineContext context,
|
||||
HoodieWriteConfig config) {
|
||||
bucketInfoMap = new HashMap<>();
|
||||
partitionSmallFilesMap = new HashMap<>();
|
||||
smallFileAssignStates = new HashMap<>();
|
||||
newFileAssignStates = new HashMap<>();
|
||||
this.context = context;
|
||||
this.config = config;
|
||||
this.table = HoodieFlinkTable.create(this.config, this.context);
|
||||
averageRecordSize = averageBytesPerRecord(
|
||||
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
|
||||
config);
|
||||
LOG.info("AvgRecordSize => " + averageRecordSize);
|
||||
insertRecordsPerBucket = config.shouldAutoTuneInsertSplits()
|
||||
? config.getParquetMaxFileSize() / averageRecordSize
|
||||
: config.getCopyOnWriteInsertSplitSize();
|
||||
LOG.info("InsertRecordsPerBucket => " + insertRecordsPerBucket);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the states of this assigner, should do once for each checkpoint,
|
||||
* all the states are accumulated within one checkpoint interval.
|
||||
*/
|
||||
public void reset() {
|
||||
bucketInfoMap.clear();
|
||||
partitionSmallFilesMap.clear();
|
||||
smallFileAssignStates.clear();
|
||||
newFileAssignStates.clear();
|
||||
}
|
||||
|
||||
public BucketInfo addUpdate(String partitionPath, String fileIdHint) {
|
||||
final String key = StreamerUtil.generateBucketKey(partitionPath, fileIdHint);
|
||||
if (!bucketInfoMap.containsKey(key)) {
|
||||
BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, partitionPath);
|
||||
bucketInfoMap.put(key, bucketInfo);
|
||||
}
|
||||
// else do nothing because the bucket already exists.
|
||||
return bucketInfoMap.get(key);
|
||||
}
|
||||
|
||||
public BucketInfo addInsert(String partitionPath) {
|
||||
// for new inserts, compute buckets depending on how many records we have for each partition
|
||||
List<SmallFile> smallFiles = getSmallFilesForPartition(partitionPath);
|
||||
|
||||
// first try packing this into one of the smallFiles
|
||||
for (SmallFile smallFile : smallFiles) {
|
||||
final String key = StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId());
|
||||
SmallFileAssignState assignState = smallFileAssignStates.get(key);
|
||||
assert assignState != null;
|
||||
if (assignState.canAssign()) {
|
||||
assignState.assign();
|
||||
// create a new bucket or re-use an existing bucket
|
||||
BucketInfo bucketInfo;
|
||||
if (bucketInfoMap.containsKey(key)) {
|
||||
// Assigns an inserts to existing update bucket
|
||||
bucketInfo = bucketInfoMap.get(key);
|
||||
} else {
|
||||
bucketInfo = addUpdate(partitionPath, smallFile.location.getFileId());
|
||||
}
|
||||
return bucketInfo;
|
||||
}
|
||||
}
|
||||
|
||||
// if we have anything more, create new insert buckets, like normal
|
||||
if (newFileAssignStates.containsKey(partitionPath)) {
|
||||
NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath);
|
||||
if (newFileAssignState.canAssign()) {
|
||||
newFileAssignState.assign();
|
||||
}
|
||||
final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId);
|
||||
return bucketInfoMap.get(key);
|
||||
}
|
||||
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
|
||||
final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
|
||||
bucketInfoMap.put(key, bucketInfo);
|
||||
newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), insertRecordsPerBucket));
|
||||
return bucketInfo;
|
||||
}
|
||||
|
||||
private List<SmallFile> getSmallFilesForPartition(String partitionPath) {
|
||||
if (partitionSmallFilesMap.containsKey(partitionPath)) {
|
||||
return partitionSmallFilesMap.get(partitionPath);
|
||||
}
|
||||
List<SmallFile> smallFiles = getSmallFiles(partitionPath);
|
||||
if (smallFiles.size() > 0) {
|
||||
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
|
||||
partitionSmallFilesMap.put(partitionPath, smallFiles);
|
||||
smallFiles.forEach(smallFile ->
|
||||
smallFileAssignStates.put(
|
||||
StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()),
|
||||
new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, averageRecordSize)));
|
||||
return smallFiles;
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh the table state like TableFileSystemView and HoodieTimeline.
|
||||
*/
|
||||
public void refreshTable() {
|
||||
this.table = HoodieFlinkTable.create(this.config, this.context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of small files in the given partition path.
|
||||
*/
|
||||
protected List<SmallFile> getSmallFiles(String partitionPath) {
|
||||
|
||||
// smallFiles only for partitionPath
|
||||
List<SmallFile> smallFileLocations = new ArrayList<>();
|
||||
|
||||
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
|
||||
|
||||
if (!commitTimeline.empty()) { // if we have some commits
|
||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||
List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
|
||||
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
|
||||
|
||||
for (HoodieBaseFile file : allFiles) {
|
||||
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
|
||||
String filename = file.getFileName();
|
||||
SmallFile sf = new SmallFile();
|
||||
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
|
||||
sf.sizeBytes = file.getFileSize();
|
||||
smallFileLocations.add(sf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return smallFileLocations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains the average record size based on records written during previous commits. Used for estimating how many
|
||||
* records pack into one file.
|
||||
*/
|
||||
protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
|
||||
long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
|
||||
long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
|
||||
try {
|
||||
if (!commitTimeline.empty()) {
|
||||
// Go over the reverse ordered commits to get a more recent estimate of average record size.
|
||||
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
|
||||
while (instants.hasNext()) {
|
||||
HoodieInstant instant = instants.next();
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
|
||||
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
|
||||
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
|
||||
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// make this fail safe.
|
||||
LOG.error("Error trying to compute average bytes/record ", t);
|
||||
}
|
||||
return avgSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Candidate bucket state for small file. It records the total number of records
|
||||
* that the bucket can append and the current number of assigned records.
|
||||
*/
|
||||
private static class SmallFileAssignState {
|
||||
long assigned;
|
||||
long totalUnassigned;
|
||||
|
||||
SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long averageRecordSize) {
|
||||
this.assigned = 0;
|
||||
this.totalUnassigned = (parquetMaxFileSize - smallFile.sizeBytes) / averageRecordSize;
|
||||
}
|
||||
|
||||
public boolean canAssign() {
|
||||
return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remembers to invoke {@link #canAssign()} first.
|
||||
*/
|
||||
public void assign() {
|
||||
Preconditions.checkState(canAssign(),
|
||||
"Can not assign insert to small file: assigned => "
|
||||
+ this.assigned + " totalUnassigned => " + this.totalUnassigned);
|
||||
this.assigned++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Candidate bucket state for a new file. It records the total number of records
|
||||
* that the bucket can append and the current number of assigned records.
|
||||
*/
|
||||
private static class NewFileAssignState {
|
||||
long assigned;
|
||||
long totalUnassigned;
|
||||
final String fileId;
|
||||
|
||||
NewFileAssignState(String fileId, long insertRecordsPerBucket) {
|
||||
this.fileId = fileId;
|
||||
this.assigned = 0;
|
||||
this.totalUnassigned = insertRecordsPerBucket;
|
||||
}
|
||||
|
||||
public boolean canAssign() {
|
||||
return this.totalUnassigned > 0 && this.totalUnassigned > this.assigned;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remembers to invoke {@link #canAssign()} first.
|
||||
*/
|
||||
public void assign() {
|
||||
Preconditions.checkState(canAssign(),
|
||||
"Can not assign insert to new file: assigned => "
|
||||
+ this.assigned + " totalUnassigned => " + this.totalUnassigned);
|
||||
this.assigned++;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.operator.transform;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.keygen.KeyGenerator;
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.util.RowDataToAvroConverters;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.flink.api.common.functions.RichMapFunction;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Function that transforms RowData to HoodieRecord.
|
||||
*/
|
||||
public class RowDataToHoodieFunction<I extends RowData, O extends HoodieRecord<?>>
|
||||
extends RichMapFunction<I, O> {
|
||||
/**
|
||||
* Row type of the input.
|
||||
*/
|
||||
private final RowType rowType;
|
||||
|
||||
/**
|
||||
* Avro schema of the input.
|
||||
*/
|
||||
private transient Schema avroSchema;
|
||||
|
||||
/**
|
||||
* RowData to Avro record converter.
|
||||
*/
|
||||
private transient RowDataToAvroConverters.RowDataToAvroConverter converter;
|
||||
|
||||
/**
|
||||
* HoodieKey generator.
|
||||
*/
|
||||
private transient KeyGenerator keyGenerator;
|
||||
|
||||
/**
|
||||
* Config options.
|
||||
*/
|
||||
private final Configuration config;
|
||||
|
||||
public RowDataToHoodieFunction(RowType rowType, Configuration config) {
|
||||
this.rowType = rowType;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Configuration parameters) throws Exception {
|
||||
super.open(parameters);
|
||||
this.avroSchema = StreamerUtil.getSourceSchema(this.config);
|
||||
this.converter = RowDataToAvroConverters.createConverter(this.rowType);
|
||||
this.keyGenerator = StreamerUtil.createKeyGenerator(FlinkOptions.flatOptions(this.config));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public O map(I i) throws Exception {
|
||||
return (O) toHoodieRecord(i);
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the give record to a {@link HoodieRecord}.
|
||||
*
|
||||
* @param record The input record
|
||||
* @return HoodieRecord based on the configuration
|
||||
* @throws IOException if error occurs
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
private HoodieRecord toHoodieRecord(I record) throws IOException {
|
||||
boolean shouldCombine = this.config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
|
||||
|| WriteOperationType.fromValue(this.config.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
|
||||
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
|
||||
final String payloadClazz = this.config.getString(FlinkOptions.PAYLOAD_CLASS);
|
||||
Comparable orderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
|
||||
this.config.getString(FlinkOptions.PRECOMBINE_FIELD), false);
|
||||
HoodieRecordPayload payload = shouldCombine
|
||||
? StreamerUtil.createPayload(payloadClazz, gr, orderingVal)
|
||||
: StreamerUtil.createPayload(payloadClazz, gr);
|
||||
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
|
||||
}
|
||||
}
|
||||
@@ -71,8 +71,7 @@ public class FlinkStreamerConfig extends Configuration {
|
||||
+ "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are "
|
||||
+ "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer"
|
||||
+ "to individual classes, for supported properties.")
|
||||
public String propsFilePath =
|
||||
"file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
|
||||
public String propsFilePath = "";
|
||||
|
||||
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
|
||||
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter.")
|
||||
|
||||
@@ -22,9 +22,11 @@ import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.operator.InstantGenerateOperator;
|
||||
import org.apache.hudi.operator.KeyedWriteProcessFunction;
|
||||
import org.apache.hudi.operator.KeyedWriteProcessOperator;
|
||||
import org.apache.hudi.operator.partitioner.BucketAssignFunction;
|
||||
import org.apache.hudi.sink.CommitSink;
|
||||
import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
@@ -34,9 +36,11 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||
import org.apache.flink.api.common.typeinfo.TypeHint;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
|
||||
import java.util.List;
|
||||
@@ -66,12 +70,16 @@ public class HoodieFlinkStreamer {
|
||||
env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
|
||||
}
|
||||
|
||||
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
|
||||
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
|
||||
|
||||
TypedProperties props = StreamerUtil.appendKafkaProps(cfg);
|
||||
|
||||
// add data source config
|
||||
props.put(HoodieWriteConfig.WRITE_PAYLOAD_CLASS, cfg.payloadClassName);
|
||||
props.put(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, cfg.sourceOrderingField);
|
||||
|
||||
StreamerUtil.initTableIfNotExists(conf);
|
||||
// Read from kafka source
|
||||
DataStream<HoodieRecord> inputRecords =
|
||||
env.addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new SimpleStringSchema(), props))
|
||||
@@ -86,13 +94,20 @@ public class HoodieFlinkStreamer {
|
||||
|
||||
// Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
|
||||
.keyBy(HoodieRecord::getPartitionPath)
|
||||
|
||||
// use the bucket assigner to generate bucket IDs
|
||||
.transform(
|
||||
"bucket_assigner",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
|
||||
.uid("uid_bucket_assigner")
|
||||
// shuffle by fileId(bucket id)
|
||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||
// write operator, where the write operation really happens
|
||||
.transform(KeyedWriteProcessOperator.NAME, TypeInformation.of(new TypeHint<Tuple3<String, List<WriteStatus>, Integer>>() {
|
||||
}), new KeyedWriteProcessOperator(new KeyedWriteProcessFunction()))
|
||||
.name("write_process")
|
||||
.uid("write_process_uid")
|
||||
.setParallelism(env.getParallelism())
|
||||
.setParallelism(numWriteTask)
|
||||
|
||||
// Commit can only be executed once, so make it one parallelism
|
||||
.addSink(new CommitSink())
|
||||
|
||||
@@ -18,22 +18,25 @@
|
||||
|
||||
package org.apache.hudi.streamer;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.operator.StreamWriteOperatorFactory;
|
||||
import org.apache.hudi.operator.partitioner.BucketAssignFunction;
|
||||
import org.apache.hudi.operator.transform.RowDataToHoodieFunction;
|
||||
import org.apache.hudi.util.AvroSchemaConverter;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
|
||||
import org.apache.flink.formats.json.TimestampFormat;
|
||||
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
|
||||
import org.apache.flink.table.types.logical.LogicalType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.util.Properties;
|
||||
@@ -70,13 +73,8 @@ public class HoodieFlinkStreamerV2 {
|
||||
.getLogicalType();
|
||||
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
|
||||
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
|
||||
StreamWriteOperatorFactory<RowData> operatorFactory =
|
||||
new StreamWriteOperatorFactory<>(rowType, conf, numWriteTask);
|
||||
|
||||
int partitionFieldIndex = rowType.getFieldIndex(conf.getString(FlinkOptions.PARTITION_PATH_FIELD));
|
||||
LogicalType partitionFieldType = rowType.getTypeAt(partitionFieldIndex);
|
||||
final RowData.FieldGetter partitionFieldGetter =
|
||||
RowData.createFieldGetter(partitionFieldType, partitionFieldIndex);
|
||||
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
|
||||
new StreamWriteOperatorFactory<>(conf, numWriteTask);
|
||||
|
||||
DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
|
||||
cfg.kafkaTopic,
|
||||
@@ -89,11 +87,19 @@ public class HoodieFlinkStreamerV2 {
|
||||
), kafkaProps))
|
||||
.name("kafka_source")
|
||||
.uid("uid_kafka_source")
|
||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
|
||||
// Key-by partition path, to avoid multiple subtasks write to a partition at the same time
|
||||
.keyBy(partitionFieldGetter::getFieldOrNull)
|
||||
.keyBy(HoodieRecord::getPartitionPath)
|
||||
.transform(
|
||||
"bucket_assigner",
|
||||
TypeInformation.of(HoodieRecord.class),
|
||||
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
|
||||
.uid("uid_bucket_assigner")
|
||||
// shuffle by fileId(bucket id)
|
||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||
.transform("hoodie_stream_write", null, operatorFactory)
|
||||
.uid("uid_hoodie_stream_write")
|
||||
.setParallelism(numWriteTask); // should make it configurable
|
||||
.setParallelism(numWriteTask);
|
||||
|
||||
env.addOperator(dataStream.getTransformation());
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hudi.util;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
|
||||
import org.apache.hudi.streamer.FlinkStreamerConfig;
|
||||
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
|
||||
@@ -57,6 +59,7 @@ import java.util.Properties;
|
||||
* Utilities for Flink stream read and write.
|
||||
*/
|
||||
public class StreamerUtil {
|
||||
private static final String DEFAULT_ARCHIVE_LOG_FOLDER = "archived";
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
|
||||
|
||||
@@ -68,6 +71,9 @@ public class StreamerUtil {
|
||||
}
|
||||
|
||||
public static TypedProperties getProps(FlinkStreamerConfig cfg) {
|
||||
if (cfg.propsFilePath.isEmpty()) {
|
||||
return new TypedProperties();
|
||||
}
|
||||
return readConfig(
|
||||
FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
|
||||
new Path(cfg.propsFilePath), cfg.configs).getConfig();
|
||||
@@ -208,6 +214,10 @@ public class StreamerUtil {
|
||||
}
|
||||
}
|
||||
|
||||
public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) {
|
||||
return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
|
||||
}
|
||||
|
||||
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
|
||||
HoodieWriteConfig.Builder builder =
|
||||
HoodieWriteConfig.newBuilder()
|
||||
@@ -250,4 +260,37 @@ public class StreamerUtil {
|
||||
checkPropNames.forEach(prop ->
|
||||
Preconditions.checkState(props.containsKey(prop), "Required property " + prop + " is missing"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the table if it does not exist.
|
||||
*
|
||||
* @param conf the configuration
|
||||
* @throws IOException if errors happens when writing metadata
|
||||
*/
|
||||
public static void initTableIfNotExists(Configuration conf) throws IOException {
|
||||
final String basePath = conf.getString(FlinkOptions.PATH);
|
||||
final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf();
|
||||
// Hadoop FileSystem
|
||||
try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
|
||||
if (!fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))) {
|
||||
HoodieTableMetaClient.initTableType(
|
||||
hadoopConf,
|
||||
basePath,
|
||||
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
|
||||
conf.getString(FlinkOptions.TABLE_NAME),
|
||||
DEFAULT_ARCHIVE_LOG_FOLDER,
|
||||
conf.getString(FlinkOptions.PAYLOAD_CLASS),
|
||||
1);
|
||||
LOG.info("Table initialized under base path {}", basePath);
|
||||
} else {
|
||||
LOG.info("Table [{}/{}] already exists, no need to initialize the table",
|
||||
basePath, conf.getString(FlinkOptions.TABLE_NAME));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Generates the bucket ID using format {partition path}_{fileID}. */
|
||||
public static String generateBucketKey(String partitionPath, String fileId) {
|
||||
return String.format("%s_%s", partitionPath, fileId);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user