diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml
index 8be65626d..a22848d8f 100644
--- a/hudi-client/hudi-flink-client/pom.xml
+++ b/hudi-client/hudi-flink-client/pom.xml
@@ -52,6 +52,30 @@
${flink.version}provided
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-parquet_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.parquet
+ parquet-hadoop
+
+
+
+
+ org.apache.flink
+ flink-avro
+ ${flink.version}
+ provided
+
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
new file mode 100644
index 000000000..5ecc7e57f
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+/**
+ * RowData implementation for Hoodie Row. It wraps an {@link RowData} and keeps meta columns locally. But the {@link RowData}
+ * does include the meta columns as well just that {@link HoodieRowData} will intercept queries for meta columns and serve from its
+ * copy rather than fetching from {@link RowData}.
+ */
+public class HoodieRowData implements RowData {
+
+ private final String commitTime;
+ private final String commitSeqNumber;
+ private final String recordKey;
+ private final String partitionPath;
+ private final String fileName;
+ private final RowData row;
+ private final int metaColumnsNum;
+
+ public HoodieRowData(String commitTime,
+ String commitSeqNumber,
+ String recordKey,
+ String partitionPath,
+ String fileName,
+ RowData row) {
+ this.commitTime = commitTime;
+ this.commitSeqNumber = commitSeqNumber;
+ this.recordKey = recordKey;
+ this.partitionPath = partitionPath;
+ this.fileName = fileName;
+ this.row = row;
+ this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size();
+ }
+
+ @Override
+ public int getArity() {
+ return metaColumnsNum + row.getArity();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return row.getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ this.row.setRowKind(kind);
+ }
+
+ private String getMetaColumnVal(int ordinal) {
+ switch (ordinal) {
+ case 0: {
+ return commitTime;
+ }
+ case 1: {
+ return commitSeqNumber;
+ }
+ case 2: {
+ return recordKey;
+ }
+ case 3: {
+ return partitionPath;
+ }
+ case 4: {
+ return fileName;
+ }
+ default:
+ throw new IllegalArgumentException("Not expected");
+ }
+ }
+
+ @Override
+ public boolean isNullAt(int ordinal) {
+ if (ordinal < metaColumnsNum) {
+ return null == getMetaColumnVal(ordinal);
+ }
+ return row.isNullAt(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ return row.getBoolean(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public byte getByte(int ordinal) {
+ return row.getByte(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public short getShort(int ordinal) {
+ return row.getShort(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public int getInt(int ordinal) {
+ return row.getInt(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public long getLong(int ordinal) {
+ return row.getLong(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public float getFloat(int ordinal) {
+ return row.getFloat(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public double getDouble(int ordinal) {
+ return row.getDouble(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public DecimalData getDecimal(int ordinal, int precision, int scale) {
+ return row.getDecimal(ordinal - metaColumnsNum, precision, scale);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ return row.getTimestamp(pos - metaColumnsNum, precision);
+ }
+
+ @Override
+ public RawValueData getRawValue(int pos) {
+ return row.getRawValue(pos - metaColumnsNum);
+ }
+
+ @Override
+ public StringData getString(int ordinal) {
+ if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) {
+ return StringData.fromString(getMetaColumnVal(ordinal));
+ }
+ return row.getString(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ return row.getBinary(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public RowData getRow(int ordinal, int numFields) {
+ return row.getRow(ordinal - metaColumnsNum, numFields);
+ }
+
+ @Override
+ public ArrayData getArray(int ordinal) {
+ return row.getArray(ordinal - metaColumnsNum);
+ }
+
+ @Override
+ public MapData getMap(int ordinal) {
+ return row.getMap(ordinal - metaColumnsNum);
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
new file mode 100644
index 000000000..3ceee8ba0
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieRowData;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with RowData for datasource implemention of bulk insert.
+ */
+public class HoodieRowDataCreateHandle implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LogManager.getLogger(HoodieRowDataCreateHandle.class);
+ private static final AtomicLong SEQGEN = new AtomicLong(1);
+
+ private final String instantTime;
+ private final int taskPartitionId;
+ private final long taskId;
+ private final long taskEpochId;
+ private final HoodieTable table;
+ private final HoodieWriteConfig writeConfig;
+ protected final HoodieRowDataFileWriter fileWriter;
+ private final String partitionPath;
+ private final Path path;
+ private final String fileId;
+ private final FileSystem fs;
+ protected final HoodieInternalWriteStatus writeStatus;
+ private final HoodieTimer currTimer;
+
+ public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
+ String instantTime, int taskPartitionId, long taskId, long taskEpochId,
+ RowType rowType) {
+ this.partitionPath = partitionPath;
+ this.table = table;
+ this.writeConfig = writeConfig;
+ this.instantTime = instantTime;
+ this.taskPartitionId = taskPartitionId;
+ this.taskId = taskId;
+ this.taskEpochId = taskEpochId;
+ this.fileId = fileId;
+ this.currTimer = new HoodieTimer();
+ this.currTimer.startTimer();
+ this.fs = table.getMetaClient().getFs();
+ this.path = makeNewPath(partitionPath);
+ this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+ writeConfig.getWriteStatusFailureFraction());
+ writeStatus.setPartitionPath(partitionPath);
+ writeStatus.setFileId(fileId);
+ try {
+ HoodiePartitionMetadata partitionMetadata =
+ new HoodiePartitionMetadata(
+ fs,
+ instantTime,
+ new Path(writeConfig.getBasePath()),
+ FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+ partitionMetadata.trySave(taskPartitionId);
+ createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
+ this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType);
+ } catch (IOException e) {
+ throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
+ }
+ LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
+ }
+
+ /**
+ * Writes an {@link RowData} to the underlying {@link HoodieRowDataFileWriter}.
+ * Before writing, value for meta columns are computed as required
+ * and wrapped in {@link HoodieRowData}. {@link HoodieRowData} is what gets written to HoodieRowDataFileWriter.
+ *
+ * @param recordKey The record key
+ * @param partitionPath The partition path
+ * @param record instance of {@link RowData} that needs to be written to the fileWriter.
+ * @throws IOException
+ */
+ public void write(String recordKey, String partitionPath, RowData record) throws IOException {
+ try {
+ String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
+ HoodieRowData rowData = new HoodieRowData(instantTime, seqId, recordKey, partitionPath, path.getName(),
+ record);
+ try {
+ fileWriter.writeRow(recordKey, rowData);
+ writeStatus.markSuccess(recordKey);
+ } catch (Throwable t) {
+ writeStatus.markFailure(recordKey, t);
+ }
+ } catch (Throwable ge) {
+ writeStatus.setGlobalError(ge);
+ throw ge;
+ }
+ }
+
+ /**
+ * @returns {@code true} if this handle can take in more writes. else {@code false}.
+ */
+ public boolean canWrite() {
+ return fileWriter.canWrite();
+ }
+
+ /**
+ * Closes the {@link HoodieRowDataCreateHandle} and returns an instance of {@link HoodieInternalWriteStatus} containing the stats and
+ * status of the writes to this handle.
+ *
+ * @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle.
+ * @throws IOException
+ */
+ public HoodieInternalWriteStatus close() throws IOException {
+ fileWriter.close();
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setPartitionPath(partitionPath);
+ stat.setNumWrites(writeStatus.getTotalRecords());
+ stat.setNumDeletes(0);
+ stat.setNumInserts(writeStatus.getTotalRecords());
+ stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+ stat.setFileId(fileId);
+ stat.setPath(new Path(writeConfig.getBasePath()), path);
+ long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getFs(), path);
+ stat.setTotalWriteBytes(fileSizeInBytes);
+ stat.setFileSizeInBytes(fileSizeInBytes);
+ stat.setTotalWriteErrors(writeStatus.getFailedRowsSize());
+ HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
+ runtimeStats.setTotalCreateTime(currTimer.endTimer());
+ stat.setRuntimeStats(runtimeStats);
+ writeStatus.setStat(stat);
+ return writeStatus;
+ }
+
+ public String getFileName() {
+ return path.getName();
+ }
+
+ private Path makeNewPath(String partitionPath) {
+ Path path = FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath);
+ try {
+ if (!fs.exists(path)) {
+ fs.mkdirs(path); // create a new partition as needed.
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to make dir " + path, e);
+ }
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, getWriteToken(), fileId,
+ tableConfig.getBaseFileFormat().getFileExtension()));
+ }
+
+ /**
+ * Creates an empty marker file corresponding to storage writer path.
+ *
+ * @param partitionPath Partition path
+ */
+ private void createMarkerFile(String partitionPath, String dataFileName) {
+ MarkerFiles markerFiles = new MarkerFiles(table, instantTime);
+ markerFiles.create(partitionPath, dataFileName, IOType.CREATE);
+ }
+
+ private String getWriteToken() {
+ return taskPartitionId + "-" + taskId + "-" + taskEpochId;
+ }
+
+ protected HoodieRowDataFileWriter createNewFileWriter(
+ Path path, HoodieTable hoodieTable, HoodieWriteConfig config, RowType rowType)
+ throws IOException {
+ return HoodieRowDataFileWriterFactory.getRowDataFileWriter(
+ path, hoodieTable, config, rowType);
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java
new file mode 100644
index 000000000..5a03b43ad
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+
+/**
+ * Abstraction to assist in writing {@link RowData}s to be used in datasource implementation.
+ */
+public interface HoodieRowDataFileWriter {
+
+ /**
+ * Returns {@code true} if this RowFileWriter can take in more writes. else {@code false}.
+ */
+ boolean canWrite();
+
+ /**
+ * Writes an {@link RowData} to the {@link HoodieRowDataFileWriter}. Also takes in associated record key to be added to bloom filter if required.
+ *
+ * @throws IOException on any exception while writing.
+ */
+ void writeRow(String key, RowData row) throws IOException;
+
+ /**
+ * Writes an {@link RowData} to the {@link HoodieRowDataFileWriter}.
+ *
+ * @throws IOException on any exception while writing.
+ */
+ void writeRow(RowData row) throws IOException;
+
+ /**
+ * Closes the {@link HoodieRowDataFileWriter} and may not take in any more writes.
+ */
+ void close() throws IOException;
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
new file mode 100644
index 000000000..8cd8bc89a
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
+/**
+ * Factory to assist in instantiating a new {@link HoodieRowDataFileWriter}.
+ */
+public class HoodieRowDataFileWriterFactory {
+
+ /**
+ * Factory method to assist in instantiating an instance of {@link HoodieRowDataFileWriter}.
+ *
+ * @param path path of the RowFileWriter.
+ * @param hoodieTable instance of {@link HoodieTable} in use.
+ * @param config instance of {@link HoodieWriteConfig} to use.
+ * @param schema schema of the dataset in use.
+ * @return the instantiated {@link HoodieRowDataFileWriter}.
+ * @throws IOException if format is not supported or if any exception during instantiating the RowFileWriter.
+ */
+ public static HoodieRowDataFileWriter getRowDataFileWriter(
+ Path path, HoodieTable hoodieTable, HoodieWriteConfig config, RowType schema)
+ throws IOException {
+ final String extension = FSUtils.getFileExtension(path.getName());
+ if (PARQUET.getFileExtension().equals(extension)) {
+ return newParquetInternalRowFileWriter(path, config, schema, hoodieTable);
+ }
+ throw new UnsupportedOperationException(extension + " format not supported yet.");
+ }
+
+ private static HoodieRowDataFileWriter newParquetInternalRowFileWriter(
+ Path path, HoodieWriteConfig writeConfig, RowType rowType, HoodieTable table)
+ throws IOException {
+ BloomFilter filter = BloomFilterFactory.createBloomFilter(
+ writeConfig.getBloomFilterNumEntries(),
+ writeConfig.getBloomFilterFPP(),
+ writeConfig.getDynamicBloomFilterMaxNumEntries(),
+ writeConfig.getBloomFilterType());
+ HoodieRowDataParquetWriteSupport writeSupport =
+ new HoodieRowDataParquetWriteSupport(table.getHadoopConf(), rowType, filter);
+ return new HoodieRowDataParquetWriter(
+ path, new HoodieRowDataParquetConfig(
+ writeSupport,
+ writeConfig.getParquetCompressionCodec(),
+ writeConfig.getParquetBlockSize(),
+ writeConfig.getParquetPageSize(),
+ writeConfig.getParquetMaxFileSize(),
+ writeSupport.getHadoopConf(),
+ writeConfig.getParquetCompressionRatio()));
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java
new file mode 100644
index 000000000..99b72da22
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.io.storage.HoodieBaseParquetConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * ParquetConfig for datasource implementation with {@link org.apache.flink.table.data.RowData}.
+ */
+public class HoodieRowDataParquetConfig extends HoodieBaseParquetConfig {
+
+ public HoodieRowDataParquetConfig(HoodieRowDataParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName,
+ int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf,
+ double compressionRatio) {
+ super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio);
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
new file mode 100644
index 000000000..035cb2eab
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+
+import java.util.HashMap;
+
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
+import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;
+
+/**
+ * Hoodie Write Support for directly writing {@link RowData} to Parquet.
+ */
+public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport {
+
+ private final Configuration hadoopConf;
+ private final BloomFilter bloomFilter;
+ private String minRecordKey;
+ private String maxRecordKey;
+
+ public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, BloomFilter bloomFilter) {
+ super(rowType);
+ this.hadoopConf = new Configuration(conf);
+ this.bloomFilter = bloomFilter;
+ }
+
+ public Configuration getHadoopConf() {
+ return hadoopConf;
+ }
+
+ @Override
+ public WriteSupport.FinalizedWriteContext finalizeWrite() {
+ HashMap extraMetaData = new HashMap<>();
+ if (bloomFilter != null) {
+ extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
+ if (minRecordKey != null && maxRecordKey != null) {
+ extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey);
+ extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey);
+ }
+ if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
+ extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
+ }
+ }
+ return new WriteSupport.FinalizedWriteContext(extraMetaData);
+ }
+
+ public void add(String recordKey) {
+ this.bloomFilter.add(recordKey);
+ if (minRecordKey != null) {
+ minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey;
+ } else {
+ minRecordKey = recordKey;
+ }
+
+ if (maxRecordKey != null) {
+ maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey;
+ } else {
+ maxRecordKey = recordKey;
+ }
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
new file mode 100644
index 000000000..373e6b1f5
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+
+import java.io.IOException;
+
+/**
+ * Parquet's impl of {@link HoodieRowDataFileWriter} to write {@link RowData}s.
+ */
+public class HoodieRowDataParquetWriter extends ParquetWriter
+ implements HoodieRowDataFileWriter {
+
+ private final Path file;
+ private final HoodieWrapperFileSystem fs;
+ private final long maxFileSize;
+ private final HoodieRowDataParquetWriteSupport writeSupport;
+
+ public HoodieRowDataParquetWriter(Path file, HoodieRowDataParquetConfig parquetConfig)
+ throws IOException {
+ super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()),
+ ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(),
+ parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(),
+ DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED,
+ DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf()));
+ this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
+ this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file,
+ parquetConfig.getHadoopConf()));
+ this.maxFileSize = parquetConfig.getMaxFileSize()
+ + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio());
+ this.writeSupport = parquetConfig.getWriteSupport();
+ }
+
+ @Override
+ public boolean canWrite() {
+ return fs.getBytesWritten(file) < maxFileSize;
+ }
+
+ @Override
+ public void writeRow(String key, RowData row) throws IOException {
+ super.write(row);
+ writeSupport.add(key);
+ }
+
+ @Override
+ public void writeRow(RowData row) throws IOException {
+ super.write(row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
new file mode 100644
index 000000000..8ba526ef3
--- /dev/null
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.flink.formats.parquet.row.ParquetRowDataWriter;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.HashMap;
+
+/**
+ * Row data parquet write support.
+ */
+public class RowDataParquetWriteSupport extends WriteSupport {
+
+ private final RowType rowType;
+ private final MessageType schema;
+ private ParquetRowDataWriter writer;
+
+ public RowDataParquetWriteSupport(RowType rowType) {
+ super();
+ this.rowType = rowType;
+ this.schema = ParquetSchemaConverter.convertToParquetMessageType("flink_schema", rowType);
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ return new WriteContext(schema, new HashMap<>());
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ // should make the utc timestamp configurable
+ this.writer = new ParquetRowDataWriter(recordConsumer, rowType, schema, true);
+ }
+
+ @Override
+ public void write(RowData record) {
+ try {
+ this.writer.write(record);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml
index 0629dbc63..b3c485f0c 100644
--- a/hudi-flink/pom.xml
+++ b/hudi-flink/pom.xml
@@ -339,5 +339,11 @@
testtest-jar
+
+ org.apache.flink
+ flink-csv
+ ${flink.version}
+ test
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index e6f094138..95e21812a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -344,6 +344,22 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
+ "waits for the instant commit success, only for internal use");
+ public static final ConfigOption SINK_SHUFFLE_BY_PARTITION = ConfigOptions
+ .key("sink.shuffle-by-partition.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "The option to enable shuffle data by dynamic partition fields in sink"
+ + " phase, this can greatly reduce the number of file for filesystem sink but may"
+ + " lead data skew.");
+
+ // this is only for internal use
+ public static final ConfigOption WRITE_BULK_INSERT_PARTITION_SORTED = ConfigOptions
+ .key("write.bulk_insert.partition.sorted")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether the bulk insert write task input records are already sorted by the partition path");
+
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
@@ -581,7 +597,9 @@ public class FlinkOptions extends HoodieConfig {
return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
- /** Creates a new configuration that is initialized with the options of the given map. */
+ /**
+ * Creates a new configuration that is initialized with the options of the given map.
+ */
public static Configuration fromMap(Map map) {
final Configuration configuration = new Configuration();
map.forEach(configuration::setString);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 5c9207a2f..891193b73 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;
@@ -61,7 +62,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -568,24 +568,17 @@ public class StreamWriteFunction
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
- long waitingTime = 0L;
- long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
- long interval = 500L;
+ TimeWait timeWait = TimeWait.builder()
+ .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
+ .action("instant initialize")
+ .build();
while (confirming) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
// sleep for a while
- try {
- if (waitingTime > ckpTimeout) {
- throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for instant " + instant + " to commit");
- }
- TimeUnit.MILLISECONDS.sleep(interval);
- waitingTime += interval;
- } catch (InterruptedException e) {
- throw new HoodieException("Error while waiting for instant " + instant + " to commit", e);
- }
+ timeWait.waitFor();
// refresh the inflight instant
instant = this.writeClient.getLastPendingInstant(this.actionType);
} else {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java
index 7e78d581d..ce898866b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
/**
* Factory class for {@link StreamWriteOperator}.
@@ -63,9 +62,4 @@ public class StreamWriteOperatorFactory
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
}
-
- @Override
- public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
- super.setProcessingTimeService(processingTimeService);
- }
}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
new file mode 100644
index 000000000..dd0f7bce2
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ *
The function should only be used in operation type {@link WriteOperationType#BULK_INSERT}.
+ *
+ *
Note: The function task requires the input stream be shuffled by partition path.
+ *
+ * @param Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ */
+public class BulkInsertWriteFunction
+ extends ProcessFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+ /**
+ * Helper class for bulk insert mode.
+ */
+ private transient BulkInsertWriterHelper writerHelper;
+
+ /**
+ * Config options.
+ */
+ private final Configuration config;
+
+ /**
+ * Table row type.
+ */
+ private final RowType rowType;
+
+ /**
+ * Id of current subtask.
+ */
+ private int taskID;
+
+ /**
+ * Write Client.
+ */
+ private transient HoodieFlinkWriteClient writeClient;
+
+ /**
+ * The initial inflight instant when start up.
+ */
+ private volatile String initInstant;
+
+ /**
+ * Gateway to send operator events to the operator coordinator.
+ */
+ private transient OperatorEventGateway eventGateway;
+
+ /**
+ * Commit action type.
+ */
+ private transient String actionType;
+
+ /**
+ * Constructs a StreamingSinkFunction.
+ *
+ * @param config The config options
+ */
+ public BulkInsertWriteFunction(Configuration config, RowType rowType) {
+ this.config = config;
+ this.rowType = rowType;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws IOException {
+ this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+ this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
+ this.actionType = CommitUtils.getCommitActionType(
+ WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
+ HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
+
+ this.initInstant = this.writeClient.getLastPendingInstant(this.actionType);
+ sendBootstrapEvent();
+ initWriterHelper();
+ }
+
+ @Override
+ public void processElement(I value, Context ctx, Collector out) throws IOException {
+ this.writerHelper.write((RowData) value);
+ }
+
+ @Override
+ public void close() {
+ if (this.writeClient != null) {
+ this.writeClient.cleanHandlesGracefully();
+ this.writeClient.close();
+ }
+ }
+
+ /**
+ * End input action for batch source.
+ */
+ public void endInput() {
+ final List writeStatus;
+ try {
+ this.writerHelper.close();
+ writeStatus = this.writerHelper.getWriteStatuses().stream()
+ .map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
+ }
+ final WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .instantTime(this.writerHelper.getInstantTime())
+ .writeStatus(writeStatus)
+ .lastBatch(true)
+ .endInput(true)
+ .build();
+ this.eventGateway.sendEventToCoordinator(event);
+ }
+
+ /**
+ * Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}.
+ */
+ private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) {
+ WriteStatus writeStatus = new WriteStatus(false, 0.1);
+ writeStatus.setStat(internalWriteStatus.getStat());
+ writeStatus.setFileId(internalWriteStatus.getFileId());
+ writeStatus.setGlobalError(internalWriteStatus.getGlobalError());
+ writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords());
+ writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords());
+ return writeStatus;
+ }
+
+ // -------------------------------------------------------------------------
+ // Getter/Setter
+ // -------------------------------------------------------------------------
+
+ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+ this.eventGateway = operatorEventGateway;
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private void initWriterHelper() {
+ String instant = instantToWrite();
+ this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+ instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
+ this.rowType);
+ }
+
+ private void sendBootstrapEvent() {
+ WriteMetadataEvent event = WriteMetadataEvent.builder()
+ .taskID(taskID)
+ .writeStatus(Collections.emptyList())
+ .instantTime("")
+ .bootstrap(true)
+ .build();
+ this.eventGateway.sendEventToCoordinator(event);
+ LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
+ }
+
+ private String instantToWrite() {
+ String instant = this.writeClient.getLastPendingInstant(this.actionType);
+ // if exactly-once semantics turns on,
+ // waits for the checkpoint notification until the checkpoint timeout threshold hits.
+ TimeWait timeWait = TimeWait.builder()
+ .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
+ .action("instant initialize")
+ .build();
+ while (instant == null || instant.equals(this.initInstant)) {
+ // wait condition:
+ // 1. there is no inflight instant
+ // 2. the inflight instant does not change
+ // sleep for a while
+ timeWait.waitFor();
+ // refresh the inflight instant
+ instant = this.writeClient.getLastPendingInstant(this.actionType);
+ }
+ return instant;
+ }
+}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java
new file mode 100644
index 000000000..bc01622c8
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bulk;
+
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Operator for bulk insert mode sink.
+ *
+ * @param The input type
+ */
+public class BulkInsertWriteOperator
+ extends ProcessOperator
+ implements OperatorEventHandler, BoundedOneInput {
+ private final BulkInsertWriteFunction sinkFunction;
+
+ public BulkInsertWriteOperator(Configuration conf, RowType rowType) {
+ super(new BulkInsertWriteFunction<>(conf, rowType));
+ this.sinkFunction = (BulkInsertWriteFunction) getUserFunction();
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent event) {
+ // no operation
+ }
+
+ void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
+ sinkFunction.setOperatorEventGateway(operatorEventGateway);
+ }
+
+ @Override
+ public void endInput() {
+ sinkFunction.endInput();
+ }
+
+ public static OperatorFactory getFactory(Configuration conf, RowType rowType) {
+ return new OperatorFactory<>(conf, rowType);
+ }
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ public static class OperatorFactory
+ extends SimpleUdfStreamOperatorFactory