1
0

[HUDI-3701] Flink bulk_insert support bucket hash index (#5118)

This commit is contained in:
Danny Chan
2022-03-25 09:01:42 +08:00
committed by GitHub
parent 608d4bf32d
commit 5e86cdd1e9
12 changed files with 169 additions and 27 deletions

View File

@@ -38,11 +38,15 @@ public class BucketIdentifier {
}
public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) {
return getBucketId(hoodieKey.getRecordKey(), indexKeyFields, numBuckets);
}
public static int getBucketId(String recordKey, String indexKeyFields, int numBuckets) {
List<String> hashKeyFields;
if (!hoodieKey.getRecordKey().contains(":")) {
hashKeyFields = Collections.singletonList(hoodieKey.getRecordKey());
if (!recordKey.contains(":")) {
hashKeyFields = Collections.singletonList(recordKey);
} else {
Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(","))
Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(","))
.map(p -> p.split(":"))
.collect(Collectors.toMap(p -> p[0], p -> p[1]));
hashKeyFields = Arrays.stream(indexKeyFields.split(","))

View File

@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bucket;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.table.HoodieTable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Helper class for bucket index bulk insert used by Flink.
*/
public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class);
private final int bucketNum;
private final String indexKeyFields;
public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
}
public void write(RowData record) throws IOException {
try {
String recordKey = keyGen.getRecordKey(record);
String partitionPath = keyGen.getPartitionPath(record);
final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum);
String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record);
} catch (Throwable throwable) {
LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable);
throw throwable;
}
}
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) {
if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
instantTime, taskPartitionId, taskId, taskEpochId, rowType);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
}
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.sink;
package org.apache.hudi.sink.bucket;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.configuration.Configuration;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.sink;
package org.apache.hudi.sink.bucket;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;

View File

@@ -167,7 +167,7 @@ public class BulkInsertWriteFunction<I>
private void initWriterHelper() {
String instant = instantToWrite();
this.writerHelper = new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType);
}

View File

@@ -50,21 +50,21 @@ public class BulkInsertWriterHelper {
private static final Logger LOG = LogManager.getLogger(BulkInsertWriterHelper.class);
private final String instantTime;
private final int taskPartitionId;
private final long taskId;
private final long taskEpochId;
private final HoodieTable hoodieTable;
private final HoodieWriteConfig writeConfig;
private final RowType rowType;
protected final String instantTime;
protected final int taskPartitionId;
protected final long taskId;
protected final long taskEpochId;
protected final HoodieTable hoodieTable;
protected final HoodieWriteConfig writeConfig;
protected final RowType rowType;
private final Boolean arePartitionRecordsSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowDataCreateHandle handle;
private String lastKnownPartitionPath = null;
private final String fileIdPrefix;
private int numFilesWritten = 0;
private final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
private final RowDataKeyGen keyGen;
protected final Map<String, HoodieRowDataCreateHandle> handles = new HashMap<>();
protected final RowDataKeyGen keyGen;
public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {

View File

@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.bulk;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.table.HoodieTable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.logical.RowType;
/**
* Factory clazz to generate bulk insert writer helpers.
*/
public class WriterHelpers {
public static BulkInsertWriterHelper getWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
return OptionsResolver.isBucketIndexType(conf)
? new BucketBulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType)
: new BulkInsertWriterHelper(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
}
}

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.sink.partitioner;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.flink.api.common.functions.Partitioner;
@@ -29,7 +28,7 @@ import org.apache.flink.api.common.functions.Partitioner;
*
* @param <T> The type of obj to hash
*/
public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<T> {
public class BucketIndexPartitioner<T extends String> implements Partitioner<T> {
private final int bucketNum;
private final String indexKeyFields;
@@ -40,7 +39,7 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<
}
@Override
public int partition(HoodieKey key, int numPartitions) {
public int partition(String key, int numPartitions) {
int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
return BucketIdentifier.mod(curBucket, numPartitions);
}

View File

@@ -18,16 +18,15 @@
package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.BucketStreamWriteOperator;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -88,6 +87,18 @@ public class Pipelines {
*/
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
if (OptionsResolver.isBucketIndexType(conf)) {
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
.transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(DummySink.INSTANCE)
.name("dummy");
}
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) {
@@ -278,8 +289,8 @@ public class Pipelines {
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getRecordKey)
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.streamer;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.hudi.client.utils.OperationConverter;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -32,6 +30,8 @@ import org.apache.hudi.util.StreamerUtil;
import com.beust.jcommander.Parameter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import java.util.ArrayList;
import java.util.HashMap;

View File

@@ -18,12 +18,13 @@
package org.apache.hudi.util;
import org.apache.hudi.exception.HoodieException;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.ParameterException;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.hudi.exception.HoodieException;
/**
* Converter that converts a string into Flink StateBackend.

View File

@@ -906,8 +906,8 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testBulkInsert(boolean hiveStylePartitioning) {
@MethodSource("indexAndPartitioningParams")
void testBulkInsert(String indexType, boolean hiveStylePartitioning) {
TableEnvironment tableEnv = batchTableEnv;
// csv source
String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data");
@@ -917,6 +917,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true)
.option(FlinkOptions.INDEX_TYPE, indexType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
tableEnv.executeSql(hoodieTableDDL);
@@ -1262,6 +1263,19 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
return Stream.of(data).map(Arguments::of);
}
/**
* Return test params => (index type, hive style partitioning).
*/
private static Stream<Arguments> indexAndPartitioningParams() {
Object[][] data =
new Object[][] {
{"FLINK_STATE", false},
{"FLINK_STATE", true},
{"BUCKET", false},
{"BUCKET", true}};
return Stream.of(data).map(Arguments::of);
}
private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish