1
0

[HUDI-3315] RFC-35 Part-1 Support bucket index in Flink writer (#4679)

* Support bucket index in Flink writer
* Use record key as default index key
This commit is contained in:
Gary Li
2022-03-02 15:14:44 +08:00
committed by GitHub
parent 85f47b53df
commit 10d866f083
14 changed files with 321 additions and 11 deletions

View File

@@ -141,6 +141,6 @@ public abstract class HoodieIndex<I, O> implements Serializable {
}
public enum IndexType {
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE
}
}

View File

@@ -56,6 +56,10 @@ public class BucketIdentifier {
return hashKeyFields.hashCode() % numBuckets;
}
public static String partitionBucketIdStr(String partition, int bucketId) {
return String.format("%s_%s", partition, bucketIdStr(bucketId));
}
public static int bucketIdFromFileId(String fileId) {
return Integer.parseInt(fileId.substring(0, 8));
}
@@ -64,6 +68,10 @@ public class BucketIdentifier {
return String.format("%08d", n);
}
public static String newBucketFileIdPrefix(int bucketId) {
return newBucketFileIdPrefix(bucketIdStr(bucketId));
}
public static String newBucketFileIdPrefix(String bucketId) {
return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
}
@@ -71,4 +79,8 @@ public class BucketIdentifier {
public static boolean isBucketFileName(String name) {
return BUCKET_NAME.matcher(name).matches();
}
public static int mod(int x, int y) {
return x % y;
}
}

View File

@@ -145,6 +145,10 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
return viewManager;
}
public HoodieTableMetadata getMetadata() {
return metadata;
}
/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
* @param context HoodieEngineContext

View File

@@ -23,9 +23,11 @@ import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -106,6 +108,12 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Index Options
// ------------------------------------------------------------------------
public static final ConfigOption<String> INDEX_TYPE = ConfigOptions
.key("index.type")
.stringType()
.defaultValue(HoodieIndex.IndexType.FLINK_STATE.name())
.withDescription("Index type of Flink write job, default is using state backed index.");
public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
.key("index.bootstrap.enabled")
.booleanType()
@@ -310,6 +318,20 @@ public class FlinkOptions extends HoodieConfig {
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
public static final ConfigOption<String> INDEX_KEY_FIELD = ConfigOptions
.key(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key())
.stringType()
.defaultValue("")
.withDescription("Index key field. Value to be used as hashing to find the bucket ID. Should be a subset of or equal to the recordKey fields.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigOptions
.key(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())
.intType()
.defaultValue(4) // default 4 buckets per partition
.withDescription("Hudi bucket number per partition. Only affected if using Hudi bucket index.");
public static final ConfigOption<String> PARTITION_PATH_FIELD = ConfigOptions
.key(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.stringType()

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.configuration;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.flink.configuration.Configuration;
@@ -101,6 +102,10 @@ public class OptionsResolver {
return FilePathUtils.extractPartitionKeys(conf).length > 0;
}
public static boolean isBucketIndexType(Configuration conf) {
return conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name());
}
/**
* Returns whether the source should emit changelog.
*

View File

@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.table.HoodieFlinkTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import static java.util.stream.Collectors.toList;
public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);
private int maxParallelism;
private int parallelism;
private int bucketNum;
protected transient HoodieFlinkTable table;
private String indexKeyFields;
private HashMap<String, String> bucketToFileIDMap;
/**
* Constructs a BucketStreamWriteFunction.
*
* @param config The config options
*/
public BucketStreamWriteFunction(Configuration config) {
super(config);
this.bucketToFileIDMap = new HashMap<String, String>();
}
@Override
public void open(Configuration parameters) throws IOException {
super.open(parameters);
this.bucketNum = config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
bootstrapIndex();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
super.initializeState(context);
this.table = this.writeClient.getHoodieTable();
}
@Override
public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
HoodieRecord<?> record = (HoodieRecord<?>) i;
final HoodieKey hoodieKey = record.getKey();
final HoodieRecordLocation location;
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum);
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
location = new HoodieRecordLocation("U", bucketToFileIDMap.get(partitionBucketId));
} else {
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
bucketToFileIDMap.put(partitionBucketId, newFileId);
}
record.unseal();
record.setCurrentLocation(location);
record.seal();
bufferRecord(record);
}
/**
* Get partition_bucket -> fileID mapping from the existing hudi table.
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
*/
private void bootstrapIndex() throws IOException {
Option<HoodieInstant> latestCommitTime = table.getFileSystemView().getTimeline().filterCompletedInstants().lastInstant();
if (!latestCommitTime.isPresent()) {
return;
}
// bootstrap bucket info from existing file system
// bucketNum % totalParallelism == this taskID belongs to this task
HashSet<Integer> bucketToLoad = new HashSet<>();
for (int i = 0; i < bucketNum; i++) {
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
if (partitionOfBucket == taskID) {
LOG.info(String.format("Bootstrapping index. Adding bucket %s , "
+ "Current parallelism: %s , Max parallelism: %s , Current task id: %s",
i, parallelism, maxParallelism, taskID));
bucketToLoad.add(i);
}
}
bucketToLoad.stream().forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket)));
LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(),
table.getMetaClient().getBasePath()));
// Iterate through all existing partitions to load existing fileID belongs to this task
List<String> partitions = table.getMetadata().getAllPartitionPaths();
for (String partitionPath : partitions) {
List<FileSlice> latestFileSlices = table.getSliceView()
.getLatestFileSlices(partitionPath)
.collect(toList());
for (FileSlice fileslice : latestFileSlices) {
String fileID = fileslice.getFileId();
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
if (bucketToLoad.contains(bucketNumber)) {
String partitionBucketId = BucketIdentifier.partitionBucketIdStr(partitionPath, bucketNumber);
LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID));
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found "
+ "during the fileGroupPerPartitionedBucketState initialization.", fileID, partitionBucketId));
} else {
LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId));
bucketToFileIDMap.put(partitionBucketId, fileID);
}
}
}
}
}
}

View File

@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
public BucketStreamWriteOperator(Configuration conf) {
super(new BucketStreamWriteFunction<>(conf));
}
public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
return WriteOperatorFactory.instance(conf, new BucketStreamWriteOperator<>(conf));
}
}

View File

@@ -371,7 +371,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
*
* @param value HoodieRecord
*/
private void bufferRecord(HoodieRecord<?> value) {
protected void bufferRecord(HoodieRecord<?> value) {
final String bucketID = getBucketID(value);
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,

View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.partitioner;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.index.bucket.BucketIdentifier;
public class BucketIndexPartitioner implements Partitioner {
private final int bucketNum;
private final String indexKeyFields;
public BucketIndexPartitioner(int bucketNum, String indexKeyFields) {
this.bucketNum = bucketNum;
this.indexKeyFields = indexKeyFields;
}
@Override
public int partition(Object key, int numPartitions) {
int curBucket = BucketIdentifier.getBucketId((HoodieKey) key, indexKeyFields, bucketNum);
return BucketIdentifier.mod(curBucket, numPartitions);
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.BucketStreamWriteOperator;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
@@ -36,6 +37,7 @@ import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
import org.apache.hudi.table.format.FilePathUtils;
@@ -185,7 +187,7 @@ public class Pipelines {
boolean bounded,
boolean overwrite) {
final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
if (overwrite) {
if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
return rowDataToHoodieRecord(conf, rowType, dataStream);
} else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) {
return boundedBootstrap(conf, rowType, defaultParallelism, dataStream);
@@ -270,14 +272,24 @@ public class Pipelines {
* @return the stream write data stream pipeline
*/
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
if (OptionsResolver.isBucketIndexType(conf)) {
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner partitioner = new BucketIndexPartitioner(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform("hoodie_bucket_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_bucket_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
// shuffle by fileId(bucket id)
@@ -285,6 +297,7 @@ public class Pipelines {
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
}
/**

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
@@ -199,6 +200,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
// the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys));
}
// set index key for bucket index if not defined
if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())
&& conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD));
}
// tweak the key gen class if possible
final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");

View File

@@ -84,7 +84,6 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
// default parallelism
int parallelism = dataStream.getExecutionConfig().getParallelism();
DataStream<Object> pipeline;
// bootstrap
final DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, parallelism, dataStream, context.isBounded(), overwrite);

View File

@@ -51,6 +51,8 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.nio.charset.StandardCharsets;
@@ -129,10 +131,14 @@ public class StreamWriteITCase extends TestLogger {
testWriteToHoodie(null, EXPECTED);
}
@Test
public void testMergeOnReadWriteWithCompaction() throws Exception {
@ParameterizedTest
@ValueSource(strings = {"BUCKET", "FLINK_STATE"})
public void testMergeOnReadWriteWithCompaction(String indexType) throws Exception {
int parallelism = 4;
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
@@ -347,6 +348,16 @@ public class TestHoodieTableFactory {
final Configuration conf3 = tableSink3.getConf();
assertThat(conf3.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS_NAME), is(NonpartitionedAvroKeyGenerator.class.getName()));
// definition of bucket index
this.conf.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
final MockContext sinkContext4 = MockContext.getInstance(this.conf, schema2, "");
final HoodieTableSink tableSink4 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext4);
final Configuration conf4 = tableSink4.getConf();
assertThat(conf4.get(FlinkOptions.RECORD_KEY_FIELD), is("f0,f1"));
assertThat(conf4.get(FlinkOptions.INDEX_KEY_FIELD), is("f0,f1"));
assertThat(conf4.get(FlinkOptions.INDEX_TYPE), is(HoodieIndex.IndexType.BUCKET.name()));
assertThat(conf4.get(FlinkOptions.KEYGEN_CLASS_NAME), is(NonpartitionedAvroKeyGenerator.class.getName()));
}
@Test