From 9d2a65a6a6ff9add81411147f1cddd03f7c08e6c Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 27 Jul 2021 10:58:23 +0800 Subject: [PATCH] [HUDI-2209] Bulk insert for flink writer (#3334) --- hudi-client/hudi-flink-client/pom.xml | 24 ++ .../hudi/client/model/HoodieRowData.java | 184 ++++++++++++++ .../row/HoodieRowDataCreateHandle.java | 205 ++++++++++++++++ .../storage/row/HoodieRowDataFileWriter.java | 53 ++++ .../row/HoodieRowDataFileWriterFactory.java | 79 ++++++ .../row/HoodieRowDataParquetConfig.java | 36 +++ .../row/HoodieRowDataParquetWriteSupport.java | 86 +++++++ .../row/HoodieRowDataParquetWriter.java | 77 ++++++ .../row/RowDataParquetWriteSupport.java | 66 +++++ hudi-flink/pom.xml | 6 + .../hudi/configuration/FlinkOptions.java | 20 +- .../apache/hudi/sink/StreamWriteFunction.java | 19 +- .../hudi/sink/StreamWriteOperatorFactory.java | 6 - .../sink/bulk/BulkInsertWriteFunction.java | 225 +++++++++++++++++ .../sink/bulk/BulkInsertWriteOperator.java | 115 +++++++++ .../sink/bulk/BulkInsertWriterHelper.java | 169 +++++++++++++ .../apache/hudi/sink/bulk/RowDataKeyGen.java | 228 ++++++++++++++++++ .../org/apache/hudi/sink/utils/TimeWait.java | 93 +++++++ .../apache/hudi/table/HoodieTableSink.java | 51 +++- .../apache/hudi/util/RowDataProjection.java | 16 ++ .../hudi/sink/bulk/TestRowDataKeyGen.java | 96 ++++++++ .../hudi/table/HoodieDataSourceITCase.java | 167 ++++++++----- .../apache/hudi/utils/TestConfigurations.java | 16 ++ .../java/org/apache/hudi/utils/TestData.java | 2 +- .../java/org/apache/hudi/utils/TestSQL.java | 36 +++ .../src/test/resources/test_source_5.data | 8 + 26 files changed, 2000 insertions(+), 83 deletions(-) create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java create mode 100644 hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java create mode 100644 hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java create mode 100644 hudi-flink/src/test/resources/test_source_5.data diff --git a/hudi-client/hudi-flink-client/pom.xml b/hudi-client/hudi-flink-client/pom.xml index 8be65626d..a22848d8f 100644 --- a/hudi-client/hudi-flink-client/pom.xml +++ b/hudi-client/hudi-flink-client/pom.xml @@ -52,6 +52,30 @@ ${flink.version} provided + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + org.apache.flink + flink-parquet_${scala.binary.version} + ${flink.version} + provided + + + org.apache.parquet + parquet-hadoop + + + + + org.apache.flink + flink-avro + ${flink.version} + provided + diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java new file mode 100644 index 000000000..5ecc7e57f --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowData.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.model; + +import org.apache.hudi.common.model.HoodieRecord; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.types.RowKind; + +/** + * RowData implementation for Hoodie Row. It wraps an {@link RowData} and keeps meta columns locally. But the {@link RowData} + * does include the meta columns as well just that {@link HoodieRowData} will intercept queries for meta columns and serve from its + * copy rather than fetching from {@link RowData}. + */ +public class HoodieRowData implements RowData { + + private final String commitTime; + private final String commitSeqNumber; + private final String recordKey; + private final String partitionPath; + private final String fileName; + private final RowData row; + private final int metaColumnsNum; + + public HoodieRowData(String commitTime, + String commitSeqNumber, + String recordKey, + String partitionPath, + String fileName, + RowData row) { + this.commitTime = commitTime; + this.commitSeqNumber = commitSeqNumber; + this.recordKey = recordKey; + this.partitionPath = partitionPath; + this.fileName = fileName; + this.row = row; + this.metaColumnsNum = HoodieRecord.HOODIE_META_COLUMNS.size(); + } + + @Override + public int getArity() { + return metaColumnsNum + row.getArity(); + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + this.row.setRowKind(kind); + } + + private String getMetaColumnVal(int ordinal) { + switch (ordinal) { + case 0: { + return commitTime; + } + case 1: { + return commitSeqNumber; + } + case 2: { + return recordKey; + } + case 3: { + return partitionPath; + } + case 4: { + return fileName; + } + default: + throw new IllegalArgumentException("Not expected"); + } + } + + @Override + public boolean isNullAt(int ordinal) { + if (ordinal < metaColumnsNum) { + return null == getMetaColumnVal(ordinal); + } + return row.isNullAt(ordinal - metaColumnsNum); + } + + @Override + public boolean getBoolean(int ordinal) { + return row.getBoolean(ordinal - metaColumnsNum); + } + + @Override + public byte getByte(int ordinal) { + return row.getByte(ordinal - metaColumnsNum); + } + + @Override + public short getShort(int ordinal) { + return row.getShort(ordinal - metaColumnsNum); + } + + @Override + public int getInt(int ordinal) { + return row.getInt(ordinal - metaColumnsNum); + } + + @Override + public long getLong(int ordinal) { + return row.getLong(ordinal - metaColumnsNum); + } + + @Override + public float getFloat(int ordinal) { + return row.getFloat(ordinal - metaColumnsNum); + } + + @Override + public double getDouble(int ordinal) { + return row.getDouble(ordinal - metaColumnsNum); + } + + @Override + public DecimalData getDecimal(int ordinal, int precision, int scale) { + return row.getDecimal(ordinal - metaColumnsNum, precision, scale); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return row.getTimestamp(pos - metaColumnsNum, precision); + } + + @Override + public RawValueData getRawValue(int pos) { + return row.getRawValue(pos - metaColumnsNum); + } + + @Override + public StringData getString(int ordinal) { + if (ordinal < HoodieRecord.HOODIE_META_COLUMNS.size()) { + return StringData.fromString(getMetaColumnVal(ordinal)); + } + return row.getString(ordinal - metaColumnsNum); + } + + @Override + public byte[] getBinary(int ordinal) { + return row.getBinary(ordinal - metaColumnsNum); + } + + @Override + public RowData getRow(int ordinal, int numFields) { + return row.getRow(ordinal - metaColumnsNum, numFields); + } + + @Override + public ArrayData getArray(int ordinal) { + return row.getArray(ordinal - metaColumnsNum); + } + + @Override + public MapData getMap(int ordinal) { + return row.getMap(ordinal - metaColumnsNum); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java new file mode 100644 index 000000000..3ceee8ba0 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.client.HoodieInternalWriteStatus; +import org.apache.hudi.client.model.HoodieRowData; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Create handle with RowData for datasource implemention of bulk insert. + */ +public class HoodieRowDataCreateHandle implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LogManager.getLogger(HoodieRowDataCreateHandle.class); + private static final AtomicLong SEQGEN = new AtomicLong(1); + + private final String instantTime; + private final int taskPartitionId; + private final long taskId; + private final long taskEpochId; + private final HoodieTable table; + private final HoodieWriteConfig writeConfig; + protected final HoodieRowDataFileWriter fileWriter; + private final String partitionPath; + private final Path path; + private final String fileId; + private final FileSystem fs; + protected final HoodieInternalWriteStatus writeStatus; + private final HoodieTimer currTimer; + + public HoodieRowDataCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId, + String instantTime, int taskPartitionId, long taskId, long taskEpochId, + RowType rowType) { + this.partitionPath = partitionPath; + this.table = table; + this.writeConfig = writeConfig; + this.instantTime = instantTime; + this.taskPartitionId = taskPartitionId; + this.taskId = taskId; + this.taskEpochId = taskEpochId; + this.fileId = fileId; + this.currTimer = new HoodieTimer(); + this.currTimer.startTimer(); + this.fs = table.getMetaClient().getFs(); + this.path = makeNewPath(partitionPath); + this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(), + writeConfig.getWriteStatusFailureFraction()); + writeStatus.setPartitionPath(partitionPath); + writeStatus.setFileId(fileId); + try { + HoodiePartitionMetadata partitionMetadata = + new HoodiePartitionMetadata( + fs, + instantTime, + new Path(writeConfig.getBasePath()), + FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath)); + partitionMetadata.trySave(taskPartitionId); + createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension())); + this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType); + } catch (IOException e) { + throw new HoodieInsertException("Failed to initialize file writer for path " + path, e); + } + LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId); + } + + /** + * Writes an {@link RowData} to the underlying {@link HoodieRowDataFileWriter}. + * Before writing, value for meta columns are computed as required + * and wrapped in {@link HoodieRowData}. {@link HoodieRowData} is what gets written to HoodieRowDataFileWriter. + * + * @param recordKey The record key + * @param partitionPath The partition path + * @param record instance of {@link RowData} that needs to be written to the fileWriter. + * @throws IOException + */ + public void write(String recordKey, String partitionPath, RowData record) throws IOException { + try { + String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement()); + HoodieRowData rowData = new HoodieRowData(instantTime, seqId, recordKey, partitionPath, path.getName(), + record); + try { + fileWriter.writeRow(recordKey, rowData); + writeStatus.markSuccess(recordKey); + } catch (Throwable t) { + writeStatus.markFailure(recordKey, t); + } + } catch (Throwable ge) { + writeStatus.setGlobalError(ge); + throw ge; + } + } + + /** + * @returns {@code true} if this handle can take in more writes. else {@code false}. + */ + public boolean canWrite() { + return fileWriter.canWrite(); + } + + /** + * Closes the {@link HoodieRowDataCreateHandle} and returns an instance of {@link HoodieInternalWriteStatus} containing the stats and + * status of the writes to this handle. + * + * @return the {@link HoodieInternalWriteStatus} containing the stats and status of the writes to this handle. + * @throws IOException + */ + public HoodieInternalWriteStatus close() throws IOException { + fileWriter.close(); + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setPartitionPath(partitionPath); + stat.setNumWrites(writeStatus.getTotalRecords()); + stat.setNumDeletes(0); + stat.setNumInserts(writeStatus.getTotalRecords()); + stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); + stat.setFileId(fileId); + stat.setPath(new Path(writeConfig.getBasePath()), path); + long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getFs(), path); + stat.setTotalWriteBytes(fileSizeInBytes); + stat.setFileSizeInBytes(fileSizeInBytes); + stat.setTotalWriteErrors(writeStatus.getFailedRowsSize()); + HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); + runtimeStats.setTotalCreateTime(currTimer.endTimer()); + stat.setRuntimeStats(runtimeStats); + writeStatus.setStat(stat); + return writeStatus; + } + + public String getFileName() { + return path.getName(); + } + + private Path makeNewPath(String partitionPath) { + Path path = FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath); + try { + if (!fs.exists(path)) { + fs.mkdirs(path); // create a new partition as needed. + } + } catch (IOException e) { + throw new HoodieIOException("Failed to make dir " + path, e); + } + HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); + return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, getWriteToken(), fileId, + tableConfig.getBaseFileFormat().getFileExtension())); + } + + /** + * Creates an empty marker file corresponding to storage writer path. + * + * @param partitionPath Partition path + */ + private void createMarkerFile(String partitionPath, String dataFileName) { + MarkerFiles markerFiles = new MarkerFiles(table, instantTime); + markerFiles.create(partitionPath, dataFileName, IOType.CREATE); + } + + private String getWriteToken() { + return taskPartitionId + "-" + taskId + "-" + taskEpochId; + } + + protected HoodieRowDataFileWriter createNewFileWriter( + Path path, HoodieTable hoodieTable, HoodieWriteConfig config, RowType rowType) + throws IOException { + return HoodieRowDataFileWriterFactory.getRowDataFileWriter( + path, hoodieTable, config, rowType); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java new file mode 100644 index 000000000..5a03b43ad --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.flink.table.data.RowData; + +import java.io.IOException; + +/** + * Abstraction to assist in writing {@link RowData}s to be used in datasource implementation. + */ +public interface HoodieRowDataFileWriter { + + /** + * Returns {@code true} if this RowFileWriter can take in more writes. else {@code false}. + */ + boolean canWrite(); + + /** + * Writes an {@link RowData} to the {@link HoodieRowDataFileWriter}. Also takes in associated record key to be added to bloom filter if required. + * + * @throws IOException on any exception while writing. + */ + void writeRow(String key, RowData row) throws IOException; + + /** + * Writes an {@link RowData} to the {@link HoodieRowDataFileWriter}. + * + * @throws IOException on any exception while writing. + */ + void writeRow(RowData row) throws IOException; + + /** + * Closes the {@link HoodieRowDataFileWriter} and may not take in any more writes. + */ + void close() throws IOException; +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java new file mode 100644 index 000000000..8cd8bc89a --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; + +/** + * Factory to assist in instantiating a new {@link HoodieRowDataFileWriter}. + */ +public class HoodieRowDataFileWriterFactory { + + /** + * Factory method to assist in instantiating an instance of {@link HoodieRowDataFileWriter}. + * + * @param path path of the RowFileWriter. + * @param hoodieTable instance of {@link HoodieTable} in use. + * @param config instance of {@link HoodieWriteConfig} to use. + * @param schema schema of the dataset in use. + * @return the instantiated {@link HoodieRowDataFileWriter}. + * @throws IOException if format is not supported or if any exception during instantiating the RowFileWriter. + */ + public static HoodieRowDataFileWriter getRowDataFileWriter( + Path path, HoodieTable hoodieTable, HoodieWriteConfig config, RowType schema) + throws IOException { + final String extension = FSUtils.getFileExtension(path.getName()); + if (PARQUET.getFileExtension().equals(extension)) { + return newParquetInternalRowFileWriter(path, config, schema, hoodieTable); + } + throw new UnsupportedOperationException(extension + " format not supported yet."); + } + + private static HoodieRowDataFileWriter newParquetInternalRowFileWriter( + Path path, HoodieWriteConfig writeConfig, RowType rowType, HoodieTable table) + throws IOException { + BloomFilter filter = BloomFilterFactory.createBloomFilter( + writeConfig.getBloomFilterNumEntries(), + writeConfig.getBloomFilterFPP(), + writeConfig.getDynamicBloomFilterMaxNumEntries(), + writeConfig.getBloomFilterType()); + HoodieRowDataParquetWriteSupport writeSupport = + new HoodieRowDataParquetWriteSupport(table.getHadoopConf(), rowType, filter); + return new HoodieRowDataParquetWriter( + path, new HoodieRowDataParquetConfig( + writeSupport, + writeConfig.getParquetCompressionCodec(), + writeConfig.getParquetBlockSize(), + writeConfig.getParquetPageSize(), + writeConfig.getParquetMaxFileSize(), + writeSupport.getHadoopConf(), + writeConfig.getParquetCompressionRatio())); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java new file mode 100644 index 000000000..99b72da22 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.io.storage.HoodieBaseParquetConfig; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * ParquetConfig for datasource implementation with {@link org.apache.flink.table.data.RowData}. + */ +public class HoodieRowDataParquetConfig extends HoodieBaseParquetConfig { + + public HoodieRowDataParquetConfig(HoodieRowDataParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName, + int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, + double compressionRatio) { + super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio); + } +} \ No newline at end of file diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java new file mode 100644 index 000000000..035cb2eab --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriteSupport.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.api.WriteSupport; + +import java.util.HashMap; + +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; + +/** + * Hoodie Write Support for directly writing {@link RowData} to Parquet. + */ +public class HoodieRowDataParquetWriteSupport extends RowDataParquetWriteSupport { + + private final Configuration hadoopConf; + private final BloomFilter bloomFilter; + private String minRecordKey; + private String maxRecordKey; + + public HoodieRowDataParquetWriteSupport(Configuration conf, RowType rowType, BloomFilter bloomFilter) { + super(rowType); + this.hadoopConf = new Configuration(conf); + this.bloomFilter = bloomFilter; + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + @Override + public WriteSupport.FinalizedWriteContext finalizeWrite() { + HashMap extraMetaData = new HashMap<>(); + if (bloomFilter != null) { + extraMetaData.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString()); + if (minRecordKey != null && maxRecordKey != null) { + extraMetaData.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey); + extraMetaData.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey); + } + if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { + extraMetaData.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name()); + } + } + return new WriteSupport.FinalizedWriteContext(extraMetaData); + } + + public void add(String recordKey) { + this.bloomFilter.add(recordKey); + if (minRecordKey != null) { + minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey; + } else { + minRecordKey = recordKey; + } + + if (maxRecordKey != null) { + maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey; + } else { + maxRecordKey = recordKey; + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java new file mode 100644 index 000000000..373e6b1f5 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; + +import org.apache.flink.table.data.RowData; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; + +import java.io.IOException; + +/** + * Parquet's impl of {@link HoodieRowDataFileWriter} to write {@link RowData}s. + */ +public class HoodieRowDataParquetWriter extends ParquetWriter + implements HoodieRowDataFileWriter { + + private final Path file; + private final HoodieWrapperFileSystem fs; + private final long maxFileSize; + private final HoodieRowDataParquetWriteSupport writeSupport; + + public HoodieRowDataParquetWriter(Path file, HoodieRowDataParquetConfig parquetConfig) + throws IOException { + super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), + ParquetFileWriter.Mode.CREATE, parquetConfig.getWriteSupport(), parquetConfig.getCompressionCodecName(), + parquetConfig.getBlockSize(), parquetConfig.getPageSize(), parquetConfig.getPageSize(), + DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED, + DEFAULT_WRITER_VERSION, FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); + this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()); + this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(FSUtils.registerFileSystem(file, + parquetConfig.getHadoopConf())); + this.maxFileSize = parquetConfig.getMaxFileSize() + + Math.round(parquetConfig.getMaxFileSize() * parquetConfig.getCompressionRatio()); + this.writeSupport = parquetConfig.getWriteSupport(); + } + + @Override + public boolean canWrite() { + return fs.getBytesWritten(file) < maxFileSize; + } + + @Override + public void writeRow(String key, RowData row) throws IOException { + super.write(row); + writeSupport.add(key); + } + + @Override + public void writeRow(RowData row) throws IOException { + super.write(row); + } + + @Override + public void close() throws IOException { + super.close(); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java new file mode 100644 index 000000000..8ba526ef3 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/RowDataParquetWriteSupport.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.flink.formats.parquet.row.ParquetRowDataWriter; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.MessageType; + +import java.util.HashMap; + +/** + * Row data parquet write support. + */ +public class RowDataParquetWriteSupport extends WriteSupport { + + private final RowType rowType; + private final MessageType schema; + private ParquetRowDataWriter writer; + + public RowDataParquetWriteSupport(RowType rowType) { + super(); + this.rowType = rowType; + this.schema = ParquetSchemaConverter.convertToParquetMessageType("flink_schema", rowType); + } + + @Override + public WriteContext init(Configuration configuration) { + return new WriteContext(schema, new HashMap<>()); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + // should make the utc timestamp configurable + this.writer = new ParquetRowDataWriter(recordConsumer, rowType, schema, true); + } + + @Override + public void write(RowData record) { + try { + this.writer.write(record); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml index 0629dbc63..b3c485f0c 100644 --- a/hudi-flink/pom.xml +++ b/hudi-flink/pom.xml @@ -339,5 +339,11 @@ test test-jar + + org.apache.flink + flink-csv + ${flink.version} + test + diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index e6f094138..95e21812a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -344,6 +344,22 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n" + "waits for the instant commit success, only for internal use"); + public static final ConfigOption SINK_SHUFFLE_BY_PARTITION = ConfigOptions + .key("sink.shuffle-by-partition.enable") + .booleanType() + .defaultValue(false) + .withDescription( + "The option to enable shuffle data by dynamic partition fields in sink" + + " phase, this can greatly reduce the number of file for filesystem sink but may" + + " lead data skew."); + + // this is only for internal use + public static final ConfigOption WRITE_BULK_INSERT_PARTITION_SORTED = ConfigOptions + .key("write.bulk_insert.partition.sorted") + .booleanType() + .defaultValue(false) + .withDescription("Whether the bulk insert write task input records are already sorted by the partition path"); + // ------------------------------------------------------------------------ // Compaction Options // ------------------------------------------------------------------------ @@ -581,7 +597,9 @@ public class FlinkOptions extends HoodieConfig { return options.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX)); } - /** Creates a new configuration that is initialized with the options of the given map. */ + /** + * Creates a new configuration that is initialized with the options of the given map. + */ public static Configuration fromMap(Map map) { final Configuration configuration = new Configuration(); map.forEach(configuration::setString); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 5c9207a2f..891193b73 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.sink.event.CommitAckEvent; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.utils.TimeWait; import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; @@ -61,7 +62,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Random; -import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -568,24 +568,17 @@ public class StreamWriteFunction String instant = this.writeClient.getLastPendingInstant(this.actionType); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. - long waitingTime = 0L; - long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT); - long interval = 500L; + TimeWait timeWait = TimeWait.builder() + .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) + .action("instant initialize") + .build(); while (confirming) { // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change and the checkpoint has buffering data if (instant == null || (instant.equals(this.currentInstant) && hasData)) { // sleep for a while - try { - if (waitingTime > ckpTimeout) { - throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for instant " + instant + " to commit"); - } - TimeUnit.MILLISECONDS.sleep(interval); - waitingTime += interval; - } catch (InterruptedException e) { - throw new HoodieException("Error while waiting for instant " + instant + " to commit", e); - } + timeWait.waitFor(); // refresh the inflight instant instant = this.writeClient.getLastPendingInstant(this.actionType); } else { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java index 7e78d581d..ce898866b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorFactory.java @@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; /** * Factory class for {@link StreamWriteOperator}. @@ -63,9 +62,4 @@ public class StreamWriteOperatorFactory public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf); } - - @Override - public void setProcessingTimeService(ProcessingTimeService processingTimeService) { - super.setProcessingTimeService(processingTimeService); - } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java new file mode 100644 index 000000000..dd0f7bce2 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bulk; + +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.HoodieInternalWriteStatus; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.utils.TimeWait; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem. + * + *

The function should only be used in operation type {@link WriteOperationType#BULK_INSERT}. + * + *

Note: The function task requires the input stream be shuffled by partition path. + * + * @param Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class BulkInsertWriteFunction + extends ProcessFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(BulkInsertWriteFunction.class); + + /** + * Helper class for bulk insert mode. + */ + private transient BulkInsertWriterHelper writerHelper; + + /** + * Config options. + */ + private final Configuration config; + + /** + * Table row type. + */ + private final RowType rowType; + + /** + * Id of current subtask. + */ + private int taskID; + + /** + * Write Client. + */ + private transient HoodieFlinkWriteClient writeClient; + + /** + * The initial inflight instant when start up. + */ + private volatile String initInstant; + + /** + * Gateway to send operator events to the operator coordinator. + */ + private transient OperatorEventGateway eventGateway; + + /** + * Commit action type. + */ + private transient String actionType; + + /** + * Constructs a StreamingSinkFunction. + * + * @param config The config options + */ + public BulkInsertWriteFunction(Configuration config, RowType rowType) { + this.config = config; + this.rowType = rowType; + } + + @Override + public void open(Configuration parameters) throws IOException { + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); + this.actionType = CommitUtils.getCommitActionType( + WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)), + HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE))); + + this.initInstant = this.writeClient.getLastPendingInstant(this.actionType); + sendBootstrapEvent(); + initWriterHelper(); + } + + @Override + public void processElement(I value, Context ctx, Collector out) throws IOException { + this.writerHelper.write((RowData) value); + } + + @Override + public void close() { + if (this.writeClient != null) { + this.writeClient.cleanHandlesGracefully(); + this.writeClient.close(); + } + } + + /** + * End input action for batch source. + */ + public void endInput() { + final List writeStatus; + try { + this.writerHelper.close(); + writeStatus = this.writerHelper.getWriteStatuses().stream() + .map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList()); + } catch (IOException e) { + throw new HoodieException("Error collect the write status for task [" + this.taskID + "]"); + } + final WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .instantTime(this.writerHelper.getInstantTime()) + .writeStatus(writeStatus) + .lastBatch(true) + .endInput(true) + .build(); + this.eventGateway.sendEventToCoordinator(event); + } + + /** + * Tool to convert {@link HoodieInternalWriteStatus} into {@link WriteStatus}. + */ + private static WriteStatus toWriteStatus(HoodieInternalWriteStatus internalWriteStatus) { + WriteStatus writeStatus = new WriteStatus(false, 0.1); + writeStatus.setStat(internalWriteStatus.getStat()); + writeStatus.setFileId(internalWriteStatus.getFileId()); + writeStatus.setGlobalError(internalWriteStatus.getGlobalError()); + writeStatus.setTotalRecords(internalWriteStatus.getTotalRecords()); + writeStatus.setTotalErrorRecords(internalWriteStatus.getTotalErrorRecords()); + return writeStatus; + } + + // ------------------------------------------------------------------------- + // Getter/Setter + // ------------------------------------------------------------------------- + + public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { + this.eventGateway = operatorEventGateway; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void initWriterHelper() { + String instant = instantToWrite(); + this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), + instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), + this.rowType); + } + + private void sendBootstrapEvent() { + WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .writeStatus(Collections.emptyList()) + .instantTime("") + .bootstrap(true) + .build(); + this.eventGateway.sendEventToCoordinator(event); + LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); + } + + private String instantToWrite() { + String instant = this.writeClient.getLastPendingInstant(this.actionType); + // if exactly-once semantics turns on, + // waits for the checkpoint notification until the checkpoint timeout threshold hits. + TimeWait timeWait = TimeWait.builder() + .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) + .action("instant initialize") + .build(); + while (instant == null || instant.equals(this.initInstant)) { + // wait condition: + // 1. there is no inflight instant + // 2. the inflight instant does not change + // sleep for a while + timeWait.waitFor(); + // refresh the inflight instant + instant = this.writeClient.getLastPendingInstant(this.actionType); + } + return instant; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java new file mode 100644 index 000000000..bc01622c8 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteOperator.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bulk; + +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +/** + * Operator for bulk insert mode sink. + * + * @param The input type + */ +public class BulkInsertWriteOperator + extends ProcessOperator + implements OperatorEventHandler, BoundedOneInput { + private final BulkInsertWriteFunction sinkFunction; + + public BulkInsertWriteOperator(Configuration conf, RowType rowType) { + super(new BulkInsertWriteFunction<>(conf, rowType)); + this.sinkFunction = (BulkInsertWriteFunction) getUserFunction(); + } + + @Override + public void handleOperatorEvent(OperatorEvent event) { + // no operation + } + + void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { + sinkFunction.setOperatorEventGateway(operatorEventGateway); + } + + @Override + public void endInput() { + sinkFunction.endInput(); + } + + public static OperatorFactory getFactory(Configuration conf, RowType rowType) { + return new OperatorFactory<>(conf, rowType); + } + + // ------------------------------------------------------------------------- + // Inner Class + // ------------------------------------------------------------------------- + + public static class OperatorFactory + extends SimpleUdfStreamOperatorFactory + implements CoordinatedOperatorFactory, OneInputStreamOperatorFactory { + private static final long serialVersionUID = 1L; + + private final BulkInsertWriteOperator operator; + private final Configuration conf; + + public OperatorFactory(Configuration conf, RowType rowType) { + super(new BulkInsertWriteOperator<>(conf, rowType)); + this.operator = (BulkInsertWriteOperator) getOperator(); + this.conf = conf; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator(StreamOperatorParameters parameters) { + final OperatorID operatorID = parameters.getStreamConfig().getOperatorID(); + final OperatorEventDispatcher eventDispatcher = parameters.getOperatorEventDispatcher(); + + this.operator.setOperatorEventGateway(eventDispatcher.getOperatorEventGateway(operatorID)); + this.operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + this.operator.setProcessingTimeService(this.processingTimeService); + eventDispatcher.registerEventHandler(operatorID, operator); + return (T) operator; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) { + return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf); + } + + @Override + public void setProcessingTimeService(ProcessingTimeService processingTimeService) { + super.setProcessingTimeService(processingTimeService); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java new file mode 100644 index 000000000..50590c0ee --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bulk; + +import org.apache.hudi.client.HoodieInternalWriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Helper class for bulk insert used by Flink. + */ +public class BulkInsertWriterHelper { + + private static final Logger LOG = LogManager.getLogger(BulkInsertWriterHelper.class); + + private final String instantTime; + private final int taskPartitionId; + private final long taskId; + private final long taskEpochId; + private final HoodieTable hoodieTable; + private final HoodieWriteConfig writeConfig; + private final RowType rowType; + private final Boolean arePartitionRecordsSorted; + private final List writeStatusList = new ArrayList<>(); + private HoodieRowDataCreateHandle handle; + private String lastKnownPartitionPath = null; + private final String fileIdPrefix; + private int numFilesWritten = 0; + private final Map handles = new HashMap<>(); + private final RowDataKeyGen keyGen; + + public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig, + String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) { + this.hoodieTable = hoodieTable; + this.writeConfig = writeConfig; + this.instantTime = instantTime; + this.taskPartitionId = taskPartitionId; + this.taskId = taskId; + this.taskEpochId = taskEpochId; + this.rowType = addMetadataFields(rowType); // patch up with metadata fields + this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED); + this.fileIdPrefix = UUID.randomUUID().toString(); + this.keyGen = RowDataKeyGen.instance(conf, rowType); + } + + /** + * Returns the write instant time. + */ + public String getInstantTime() { + return this.instantTime; + } + + public void write(RowData record) throws IOException { + try { + String recordKey = keyGen.getRecordKey(record); + String partitionPath = keyGen.getPartitionPath(record); + + if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) { + LOG.info("Creating new file for partition path " + partitionPath); + handle = getRowCreateHandle(partitionPath); + lastKnownPartitionPath = partitionPath; + } + handle.write(recordKey, partitionPath, record); + } catch (Throwable t) { + LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t); + throw t; + } + } + + public List getWriteStatuses() throws IOException { + close(); + return writeStatusList; + } + + private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throws IOException { + if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path + // if records are sorted, we can close all existing handles + if (arePartitionRecordsSorted) { + close(); + } + HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), + instantTime, taskPartitionId, taskId, taskEpochId, rowType); + handles.put(partitionPath, rowCreateHandle); + } else if (!handles.get(partitionPath).canWrite()) { + // even if there is a handle to the partition path, it could have reached its max size threshold. So, we close the handle here and + // create a new one. + writeStatusList.add(handles.remove(partitionPath).close()); + HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), + instantTime, taskPartitionId, taskId, taskEpochId, rowType); + handles.put(partitionPath, rowCreateHandle); + } + return handles.get(partitionPath); + } + + public void close() throws IOException { + for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) { + writeStatusList.add(rowCreateHandle.close()); + } + handles.clear(); + handle = null; + } + + private String getNextFileId() { + return String.format("%s-%d", fileIdPrefix, numFilesWritten++); + } + + /** + * Adds the Hoodie metadata fields to the given row type. + */ + private static RowType addMetadataFields(RowType rowType) { + List mergedFields = new ArrayList<>(); + + LogicalType metadataFieldType = DataTypes.STRING().getLogicalType(); + RowType.RowField commitTimeField = + new RowType.RowField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, metadataFieldType, "commit time"); + RowType.RowField commitSeqnoField = + new RowType.RowField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, metadataFieldType, "commit seqno"); + RowType.RowField recordKeyField = + new RowType.RowField(HoodieRecord.RECORD_KEY_METADATA_FIELD, metadataFieldType, "record key"); + RowType.RowField partitionPathField = + new RowType.RowField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, metadataFieldType, "partition path"); + RowType.RowField fileNameField = + new RowType.RowField(HoodieRecord.FILENAME_METADATA_FIELD, metadataFieldType, "field name"); + + mergedFields.add(commitTimeField); + mergedFields.add(commitSeqnoField); + mergedFields.add(recordKeyField); + mergedFields.add(partitionPathField); + mergedFields.add(fileNameField); + mergedFields.addAll(rowType.getFields()); + + return new RowType(false, mergedFields); + } +} + diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java new file mode 100644 index 000000000..2c44c137c --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bulk; + +import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.util.RowDataProjection; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.List; + +/** + * Key generator for {@link RowData}. + */ +public class RowDataKeyGen { + + // reference: NonpartitionedAvroKeyGenerator + private static final String EMPTY_PARTITION = ""; + + // reference: org.apache.hudi.keygen.KeyGenUtils + private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; + + private static final String DEFAULT_PARTITION_PATH = "default"; + private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + + private final String[] recordKeyFields; + private final String[] partitionPathFields; + + private final RowDataProjection recordKeyProjection; + private final RowDataProjection partitionPathProjection; + + private final boolean hiveStylePartitioning; + private final boolean encodePartitionPath; + + // efficient code path + private boolean simpleRecordKey = false; + private RowData.FieldGetter recordKeyFieldGetter; + + private boolean simplePartitionPath = false; + private RowData.FieldGetter partitionPathFieldGetter; + + private boolean nonPartitioned; + + private RowDataKeyGen( + String recordKeys, + String partitionFields, + RowType rowType, + boolean hiveStylePartitioning, + boolean encodePartitionPath) { + this.recordKeyFields = recordKeys.split(","); + this.partitionPathFields = partitionFields.split(","); + List fieldNames = rowType.getFieldNames(); + List fieldTypes = rowType.getChildren(); + + this.hiveStylePartitioning = hiveStylePartitioning; + this.encodePartitionPath = encodePartitionPath; + if (this.recordKeyFields.length == 1) { + // efficient code path + this.simpleRecordKey = true; + int recordKeyIdx = fieldNames.indexOf(this.recordKeyFields[0]); + this.recordKeyFieldGetter = RowData.createFieldGetter(fieldTypes.get(recordKeyIdx), recordKeyIdx); + this.recordKeyProjection = null; + } else { + this.recordKeyProjection = getProjection(this.recordKeyFields, fieldNames, fieldTypes); + } + if (this.partitionPathFields.length == 1) { + // efficient code path + if (this.partitionPathFields[0].equals("")) { + this.nonPartitioned = true; + } else { + this.simplePartitionPath = true; + int partitionPathIdx = fieldNames.indexOf(this.partitionPathFields[0]); + this.partitionPathFieldGetter = RowData.createFieldGetter(fieldTypes.get(partitionPathIdx), partitionPathIdx); + } + this.partitionPathProjection = null; + } else { + this.partitionPathProjection = getProjection(this.partitionPathFields, fieldNames, fieldTypes); + } + } + + public static RowDataKeyGen instance(Configuration conf, RowType rowType) { + return new RowDataKeyGen(conf.getString(FlinkOptions.RECORD_KEY_FIELD), conf.getString(FlinkOptions.PARTITION_PATH_FIELD), + rowType, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING)); + } + + public String getRecordKey(RowData rowData) { + if (this.simpleRecordKey) { + return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), this.recordKeyFields[0]); + } else { + Object[] keyValues = this.recordKeyProjection.projectAsValues(rowData); + return getRecordKey(keyValues, this.recordKeyFields); + } + } + + public String getPartitionPath(RowData rowData) { + if (this.simplePartitionPath) { + return getPartitionPath(partitionPathFieldGetter.getFieldOrNull(rowData), + this.partitionPathFields[0], this.hiveStylePartitioning, this.encodePartitionPath); + } else if (this.nonPartitioned) { + return EMPTY_PARTITION; + } else { + Object[] partValues = this.partitionPathProjection.projectAsValues(rowData); + return getRecordPartitionPath(partValues, this.partitionPathFields, this.hiveStylePartitioning, this.encodePartitionPath); + } + } + + // reference: org.apache.hudi.keygen.KeyGenUtils.getRecordPartitionPath + private static String getRecordKey(Object[] keyValues, String[] keyFields) { + boolean keyIsNullEmpty = true; + StringBuilder recordKey = new StringBuilder(); + for (int i = 0; i < keyValues.length; i++) { + String recordKeyField = keyFields[i]; + String recordKeyValue = StringUtils.objToString(keyValues[i]); + if (recordKeyValue == null) { + recordKey.append(recordKeyField).append(":").append(NULL_RECORDKEY_PLACEHOLDER).append(","); + } else if (recordKeyValue.isEmpty()) { + recordKey.append(recordKeyField).append(":").append(EMPTY_RECORDKEY_PLACEHOLDER).append(","); + } else { + recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(","); + keyIsNullEmpty = false; + } + } + recordKey.deleteCharAt(recordKey.length() - 1); + if (keyIsNullEmpty) { + throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: " + + Arrays.toString(keyFields) + " cannot be entirely null or empty."); + } + return recordKey.toString(); + } + + // reference: org.apache.hudi.keygen.KeyGenUtils.getRecordPartitionPath + private static String getRecordPartitionPath( + Object[] partValues, + String[] partFields, + boolean hiveStylePartitioning, + boolean encodePartitionPath) { + StringBuilder partitionPath = new StringBuilder(); + for (int i = 0; i < partFields.length; i++) { + String partField = partFields[i]; + String partValue = StringUtils.objToString(partValues[i]); + if (partValue == null || partValue.isEmpty()) { + partitionPath.append(hiveStylePartitioning ? partField + "=" + DEFAULT_PARTITION_PATH + : DEFAULT_PARTITION_PATH); + } else { + if (encodePartitionPath) { + partValue = PartitionPathEncodeUtils.escapePathName(partValue); + } + partitionPath.append(hiveStylePartitioning ? partField + "=" + partValue : partValue); + } + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); + } + partitionPath.deleteCharAt(partitionPath.length() - 1); + return partitionPath.toString(); + } + + // reference: org.apache.hudi.keygen.KeyGenUtils.getRecordKey + public static String getRecordKey(Object recordKeyValue, String recordKeyField) { + String recordKey = StringUtils.objToString(recordKeyValue); + if (recordKey == null || recordKey.isEmpty()) { + throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty."); + } + return recordKey; + } + + // reference: org.apache.hudi.keygen.KeyGenUtils.getPartitionPath + public static String getPartitionPath( + Object partValue, + String partField, + boolean hiveStylePartitioning, + boolean encodePartitionPath) { + String partitionPath = StringUtils.objToString(partValue); + if (partitionPath == null || partitionPath.isEmpty()) { + partitionPath = DEFAULT_PARTITION_PATH; + } + if (encodePartitionPath) { + partitionPath = PartitionPathEncodeUtils.escapePathName(partitionPath); + } + if (hiveStylePartitioning) { + partitionPath = partField + "=" + partitionPath; + } + return partitionPath; + } + + /** + * Returns the row data projection for the given field names and table schema. + * + * @param fields The projected field names + * @param schemaFields The table schema names + * @param schemaTypes The table schema types + * @return the row data projection for the fields + */ + private static RowDataProjection getProjection(String[] fields, List schemaFields, List schemaTypes) { + int[] positions = getFieldPositions(fields, schemaFields); + LogicalType[] types = Arrays.stream(positions).mapToObj(schemaTypes::get).toArray(LogicalType[]::new); + return RowDataProjection.instance(types, positions); + } + + /** + * Returns the field positions of the given fields {@code fields} among all the fields {@code allFields}. + */ + private static int[] getFieldPositions(String[] fields, List allFields) { + return Arrays.stream(fields).mapToInt(allFields::indexOf).toArray(); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java new file mode 100644 index 000000000..2ab0819ab --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/TimeWait.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.utils; + +import org.apache.hudi.exception.HoodieException; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +/** + * Tool used for time waiting. + */ +public class TimeWait { + private final long timeout; // timeout in SECONDS + private final long interval; // interval in MILLISECONDS + private final String action; // action to report error message + private long waitingTime = 0L; + + private TimeWait(long timeout, long interval, String action) { + this.timeout = timeout; + this.interval = interval; + this.action = action; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Wait for an interval time. + */ + public void waitFor() { + try { + if (waitingTime > timeout) { + throw new HoodieException("Timeout(" + waitingTime + "ms) while waiting for " + action); + } + TimeUnit.MILLISECONDS.sleep(interval); + waitingTime += interval; + } catch (InterruptedException e) { + throw new HoodieException("Error while waiting for " + action, e); + } + } + + /** + * Builder. + */ + public static class Builder { + private long timeout; + private long interval; + private String action; + + public Builder() { + this.timeout = 3600; + this.interval = 500; + } + + public Builder timeout(long timeout) { + this.timeout = timeout; + return this; + } + + public Builder interval(long interval) { + this.interval = interval; + return this; + } + + public Builder action(String action) { + this.action = action; + return this; + } + + public TimeWait build() { + Objects.requireNonNull(this.action); + return new TimeWait(this.timeout, this.interval, this.action); + } + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index 8ad4127a8..2cdd27e8f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -24,6 +24,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperatorFactory; import org.apache.hudi.sink.bootstrap.BootstrapFunction; +import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; import org.apache.hudi.sink.compact.CompactFunction; import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; @@ -45,6 +46,7 @@ import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; @@ -58,29 +60,57 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, private final Configuration conf; private final TableSchema schema; private boolean overwrite = false; + private boolean supportsGrouping = false; public HoodieTableSink(Configuration conf, TableSchema schema) { this.conf = conf; this.schema = schema; } + public HoodieTableSink(Configuration conf, TableSchema schema, boolean overwrite, boolean supportsGrouping) { + this.conf = conf; + this.schema = schema; + this.overwrite = overwrite; + this.supportsGrouping = supportsGrouping; + } + @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { return (DataStreamSinkProvider) dataStream -> { - // Read from kafka source - RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType(); + + // setup configuration long ckpTimeout = dataStream.getExecutionEnvironment() .getCheckpointConfig().getCheckpointTimeout(); - int parallelism = dataStream.getExecutionConfig().getParallelism(); conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + + RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType(); + + // bulk_insert mode + final String writeOperation = this.conf.get(FlinkOptions.OPERATION); + if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { + this.conf.set(FlinkOptions.WRITE_BULK_INSERT_PARTITION_SORTED, this.supportsGrouping); + BulkInsertWriteOperator.OperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType); + return dataStream.transform("hoodie_bulk_insert_write", + TypeInformation.of(Object.class), + operatorFactory) + // follow the parallelism of upstream operators to avoid shuffle + .setParallelism(dataStream.getParallelism()) + .addSink(new CleanFunction<>(conf)) + .setParallelism(1) + .name("clean_commits"); + } + + // stream write + int parallelism = dataStream.getExecutionConfig().getParallelism(); StreamWriteOperatorFactory operatorFactory = new StreamWriteOperatorFactory<>(conf); - DataStream hoodieDataStream = dataStream + DataStream dataStream1 = dataStream .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class)); + // bootstrap index // TODO: This is a very time-consuming operation, will optimization if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { - hoodieDataStream = hoodieDataStream.rebalance() + dataStream1 = dataStream1.rebalance() .transform( "index_bootstrap", TypeInformation.of(HoodieRecord.class), @@ -89,7 +119,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, .uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME)); } - DataStream pipeline = hoodieDataStream + DataStream pipeline = dataStream1 // Key-by record key, to avoid multiple subtasks write to a bucket at the same time .keyBy(HoodieRecord::getRecordKey) .transform( @@ -103,6 +133,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + // compaction if (StreamerUtil.needsAsyncCompaction(conf)) { return pipeline.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), @@ -141,7 +172,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, @Override public DynamicTableSink copy() { - return new HoodieTableSink(this.conf, this.schema); + return new HoodieTableSink(this.conf, this.schema, this.overwrite, this.supportsGrouping); } @Override @@ -167,4 +198,10 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, public void applyOverwrite(boolean b) { this.overwrite = b; } + + @Override + public boolean requiresPartitionGrouping(boolean supportsGrouping) { + this.supportsGrouping = supportsGrouping; + return supportsGrouping; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java index 67bb8cafa..52f2c58fc 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java @@ -47,6 +47,10 @@ public class RowDataProjection { return new RowDataProjection(types, positions); } + public static RowDataProjection instance(LogicalType[] types, int[] positions) { + return new RowDataProjection(types, positions); + } + /** * Returns the projected row data. */ @@ -58,4 +62,16 @@ public class RowDataProjection { } return genericRowData; } + + /** + * Returns the projected values array. + */ + public Object[] projectAsValues(RowData rowData) { + Object[] values = new Object[this.fieldGetters.length]; + for (int i = 0; i < this.fieldGetters.length; i++) { + final Object val = this.fieldGetters[i].getFieldOrNull(rowData); + values[i] = val; + } + return values; + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java new file mode 100644 index 000000000..0d445d60d --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.bulk; + +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.utils.TestConfigurations; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.junit.jupiter.api.Test; + +import static org.apache.hudi.utils.TestData.insertRow; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test cases for {@link RowDataKeyGen}. + */ +public class TestRowDataKeyGen { + @Test + void testSimpleKeyAndPartition() { + Configuration conf = TestConfigurations.getDefaultConf("path1"); + final RowData rowData1 = insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")); + final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE); + assertThat(keyGen1.getRecordKey(rowData1), is("id1")); + assertThat(keyGen1.getPartitionPath(rowData1), is("par1")); + + // null record key and partition path + final RowData rowData2 = insertRow(null, StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), null); + assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2)); + assertThat(keyGen1.getPartitionPath(rowData2), is("default")); + // empty record key and partition path + final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("")); + assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3)); + assertThat(keyGen1.getPartitionPath(rowData3), is("default")); + + // hive style partitioning + conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true); + final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE); + assertThat(keyGen2.getPartitionPath(rowData1), is("partition=par1")); + assertThat(keyGen2.getPartitionPath(rowData2), is("partition=default")); + assertThat(keyGen2.getPartitionPath(rowData3), is("partition=default")); + } + + @Test + void testComplexKeyAndPartition() { + Configuration conf = TestConfigurations.getDefaultConf("path1"); + conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid,name"); + conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition,ts"); + RowData rowData1 = insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("par1")); + RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE); + assertThat(keyGen1.getRecordKey(rowData1), is("uuid:id1,name:Danny")); + assertThat(keyGen1.getPartitionPath(rowData1), is("par1/1970-01-01T00:00:00.001")); + + // null record key and partition path + final RowData rowData2 = insertRow(null, null, 23, null, null); + assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2)); + assertThat(keyGen1.getPartitionPath(rowData2), is("default/default")); + // empty record key and partition path + final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString(""), 23, + TimestampData.fromEpochMillis(1), StringData.fromString("")); + assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3)); + assertThat(keyGen1.getPartitionPath(rowData3), is("default/1970-01-01T00:00:00.001")); + + // hive style partitioning + conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true); + final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE); + assertThat(keyGen2.getPartitionPath(rowData1), is("partition=par1/ts=1970-01-01T00:00:00.001")); + assertThat(keyGen2.getPartitionPath(rowData2), is("partition=default/ts=default")); + assertThat(keyGen2.getPartitionPath(rowData3), is("partition=default/ts=1970-01-01T00:00:00.001")); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index f9ff47143..1ddcb740b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -25,6 +25,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestSQL; import org.apache.hudi.utils.TestUtils; import org.apache.hudi.utils.factory.CollectSinkTableFactory; @@ -48,6 +49,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.util.Collection; @@ -66,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; /** * IT cases for Hoodie table source and sink. - * + *

* Note: should add more SQL cases when batch write is supported. */ public class HoodieDataSourceITCase extends AbstractTestBase { @@ -267,17 +269,8 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); tableEnv.executeSql(hoodieTableDDL); - String insertInto = "insert into t1 values\n" - + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" - + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" - + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" - + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" - + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" - + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" - + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" - + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; - execInsertSql(tableEnv, insertInto); + execInsertSql(tableEnv, TestSQL.INSERT_T1); List result1 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); @@ -296,40 +289,40 @@ public class HoodieDataSourceITCase extends AbstractTestBase { void testWriteAndReadParMiddle(ExecMode execMode) throws Exception { boolean streaming = execMode == ExecMode.STREAM; String hoodieTableDDL = "create table t1(\n" - + " uuid varchar(20),\n" - + " name varchar(10),\n" - + " age int,\n" - + " `partition` varchar(20),\n" // test streaming read with partition field in the middle - + " ts timestamp(3),\n" - + " PRIMARY KEY(uuid) NOT ENFORCED\n" - + ")\n" - + "PARTITIONED BY (`partition`)\n" - + "with (\n" - + " 'connector' = 'hudi',\n" - + " 'path' = '" + tempFile.getAbsolutePath() + "',\n" - + " 'read.streaming.enabled' = '" + streaming + "'\n" - + ")"; + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " `partition` varchar(20),\n" // test streaming read with partition field in the middle + + " ts timestamp(3),\n" + + " PRIMARY KEY(uuid) NOT ENFORCED\n" + + ")\n" + + "PARTITIONED BY (`partition`)\n" + + "with (\n" + + " 'connector' = 'hudi',\n" + + " 'path' = '" + tempFile.getAbsolutePath() + "',\n" + + " 'read.streaming.enabled' = '" + streaming + "'\n" + + ")"; streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 values\n" - + "('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n" - + "('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n" - + "('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n" - + "('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n" - + "('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n" - + "('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n" - + "('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n" - + "('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')"; + + "('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n" + + "('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n" + + "('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n" + + "('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n" + + "('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n" + + "('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n" + + "('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n" + + "('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')"; execInsertSql(streamTableEnv, insertInto); final String expected = "[" - + "id1,Danny,23,par1,1970-01-01T00:00:01, " - + "id2,Stephen,33,par1,1970-01-01T00:00:02, " - + "id3,Julian,53,par2,1970-01-01T00:00:03, " - + "id4,Fabian,31,par2,1970-01-01T00:00:04, " - + "id5,Sophia,18,par3,1970-01-01T00:00:05, " - + "id6,Emma,20,par3,1970-01-01T00:00:06, " - + "id7,Bob,44,par4,1970-01-01T00:00:07, " - + "id8,Han,56,par4,1970-01-01T00:00:08]"; + + "id1,Danny,23,par1,1970-01-01T00:00:01, " + + "id2,Stephen,33,par1,1970-01-01T00:00:02, " + + "id3,Julian,53,par2,1970-01-01T00:00:03, " + + "id4,Fabian,31,par2,1970-01-01T00:00:04, " + + "id5,Sophia,18,par3,1970-01-01T00:00:05, " + + "id6,Emma,20,par3,1970-01-01T00:00:06, " + + "id7,Bob,44,par4,1970-01-01T00:00:07, " + + "id8,Han,56,par4,1970-01-01T00:00:08]"; List result = execSelectSql(streamTableEnv, "select * from t1", execMode); @@ -350,17 +343,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); tableEnv.executeSql(hoodieTableDDL); - final String insertInto1 = "insert into t1 values\n" - + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" - + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" - + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" - + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" - + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" - + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" - + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" - + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; - - execInsertSql(tableEnv, insertInto1); + execInsertSql(tableEnv, TestSQL.INSERT_T1); // overwrite partition 'par1' and increase in age by 1 final String insertInto2 = "insert overwrite t1 partition(`partition`='par1') values\n" @@ -519,7 +502,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { // execute query and assert throws exception assertThrows(HoodieException.class, () -> execSelectSql(streamTableEnv, "select * from t1", 10), - "No successful commits under path " + tempFile.getAbsolutePath()); + "No successful commits under path " + tempFile.getAbsolutePath()); } @@ -575,6 +558,80 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result, expected); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testBulkInsert(boolean hiveStylePartitioning) { + TableEnvironment tableEnv = batchTableEnv; + // csv source + String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); + tableEnv.executeSql(csvSourceDDL); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.OPERATION.key(), "bulk_insert"); + options.put(FlinkOptions.SINK_SHUFFLE_BY_PARTITION.key(), "true"); + if (hiveStylePartitioning) { + options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); + } + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("hoodie_sink", options); + tableEnv.executeSql(hoodieTableDDL); + + String insertInto = "insert into hoodie_sink select * from csv_source"; + execInsertSql(tableEnv, insertInto); + + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from hoodie_sink").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + // apply filters + List result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from hoodie_sink where uuid > 'id5'").execute().collect()); + assertRowsEquals(result2, "[" + + "id6,Emma,20,1970-01-01T00:00:06,par3, " + + "id7,Bob,44,1970-01-01T00:00:07,par4, " + + "id8,Han,56,1970-01-01T00:00:08,par4]"); + } + + @Test + void testBulkInsertNonPartitionedTable() { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = "create table t1(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " ts timestamp(3),\n" + + " `partition` varchar(20),\n" + + " PRIMARY KEY(uuid) NOT ENFORCED\n" + + ")\n" + + "with (\n" + + " 'connector' = 'hudi',\n" + + " 'path' = '" + tempFile.getAbsolutePath() + "',\n" + + " 'write.operation' = 'bulk_insert'\n" + + ")"; + tableEnv.executeSql(hoodieTableDDL); + + final String insertInto1 = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')"; + + execInsertSql(tableEnv, insertInto1); + + final String insertInto2 = "insert into t1 values\n" + + "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par2'),\n" + + "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par3'),\n" + + "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par4'),\n" + + "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par5')"; + + execInsertSql(tableEnv, insertInto2); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result, "[" + + "id1,Danny,23,1970-01-01T00:00:01,par1, " + + "id1,Stephen,33,1970-01-01T00:00:02,par2, " + + "id1,Julian,53,1970-01-01T00:00:03,par3, " + + "id1,Fabian,31,1970-01-01T00:00:04,par4, " + + "id1,Sophia,18,1970-01-01T00:00:05,par5]", 3); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- @@ -606,7 +663,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } private List execSelectSql(TableEnvironment tEnv, String select, ExecMode execMode) - throws TableNotExistException, InterruptedException { + throws TableNotExistException, InterruptedException { final String[] splits = select.split(" "); final String tableName = splits[splits.length - 1]; switch (execMode) { @@ -621,12 +678,12 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } private List execSelectSql(TableEnvironment tEnv, String select, long timeout) - throws InterruptedException, TableNotExistException { + throws InterruptedException, TableNotExistException { return execSelectSql(tEnv, select, timeout, null); } private List execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable) - throws InterruptedException, TableNotExistException { + throws InterruptedException, TableNotExistException { final String sinkDDL; if (sourceTable != null) { // use the source table schema as the sink schema if the source table was specified, . diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 6096b4f81..2be2acabd 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -137,6 +137,22 @@ public class TestConfigurations { return builder.toString(); } + public static String getCsvSourceDDL(String tableName, String fileName) { + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource(fileName)).toString(); + return "create table " + tableName + "(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " ts timestamp(3),\n" + + " `partition` varchar(20)\n" + + ") with (\n" + + " 'connector' = 'filesystem',\n" + + " 'path' = '" + sourcePath + "',\n" + + " 'format' = 'csv'\n" + + ")"; + } + public static final RowDataSerializer SERIALIZER = new RowDataSerializer(ROW_TYPE); public static Configuration getDefaultConf(String tablePath) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 50ecf543e..045fad165 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -515,7 +515,7 @@ public class TestData { return Strings.join(fields, ","); } - private static BinaryRowData insertRow(Object... fields) { + public static BinaryRowData insertRow(Object... fields) { LogicalType[] types = TestConfigurations.ROW_TYPE.getFields().stream().map(RowType.RowField::getType) .toArray(LogicalType[]::new); assertEquals( diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java new file mode 100644 index 000000000..8fb15edf2 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +/** + * Test sql statements. + */ +public class TestSQL { + private TestSQL() {} + + public static final String INSERT_T1 = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n" + + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" + + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n" + + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n" + + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n" + + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n" + + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n" + + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')"; +} diff --git a/hudi-flink/src/test/resources/test_source_5.data b/hudi-flink/src/test/resources/test_source_5.data new file mode 100644 index 000000000..19b6a25a7 --- /dev/null +++ b/hudi-flink/src/test/resources/test_source_5.data @@ -0,0 +1,8 @@ +id1,Danny,23,1970-01-01 00:00:01,par1 +id2,Stephen,33,1970-01-01 00:00:02,par1 +id3,Julian,53,1970-01-01 00:00:03,par2 +id4,Fabian,31,1970-01-01 00:00:04,par2 +id5,Sophia,18,1970-01-01 00:00:05,par3 +id6,Emma,20,1970-01-01 00:00:06,par3 +id7,Bob,44,1970-01-01 00:00:07,par4 +id8,Han,56,1970-01-01 00:00:08,par4