1
0

[HUDI-3665] Support flink multiple versions (#5072)

This commit is contained in:
Danny Chan
2022-03-21 10:34:50 +08:00
committed by GitHub
parent 15d1c18625
commit 799c78e688
193 changed files with 2264 additions and 629 deletions

View File

@@ -57,7 +57,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Punit-tests -pl hudi-common,hudi-flink,hudi-client/hudi-spark-client
options: -Punit-tests -pl hudi-common,hudi-flink-datasource/hudi-flink,hudi-client/hudi-spark-client
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -66,7 +66,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl hudi-common,hudi-flink
options: -Pfunctional-tests -pl hudi-common,hudi-flink-datasource/hudi-flink
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -165,7 +165,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Punit-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
options: -Punit-tests -pl !hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -174,7 +174,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
options: -Pfunctional-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
options: -Pfunctional-tests -pl !hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'

View File

@@ -60,7 +60,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>${flink.table.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
@@ -159,7 +159,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<artifactId>${flink.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>

View File

@@ -0,0 +1,364 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi-flink-datasource</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.11.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-flink</artifactId>
<version>0.11.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<main.basedir>${project.parent.parent.basedir}</main.basedir>
<parquet.version>1.11.1</parquet.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/test/resources</directory>
</resource>
</resources>
</build>
<dependencies>
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>${hudi.flink.module}</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
<exclusion>
<groupId>com.esotericsoftware.minlog</groupId>
<artifactId>minlog</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.table.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.table.planner.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
<scope>test</scope>
</dependency>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<!-- Override the version to be same with Flink avro -->
<version>1.10.0</version>
<scope>compile</scope>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_${scala.binary.version}</artifactId>
<version>0.9.7</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
</dependency>
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>${hive.exec.classifier}</classifier>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Test dependencies -->
<!-- Junit 5 dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<!-- Hoodie dependencies -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client-common</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-client</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>${hudi.flink.module}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink.table.runtime.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -323,8 +323,8 @@ public class FlinkOptions extends HoodieConfig {
.stringType()
.defaultValue("")
.withDescription("Index key field. Value to be used as hashing to find the bucket ID. Should be a subset of or equal to the recordKey fields.\n"
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
+ "Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigOptions
.key(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())

View File

@@ -18,10 +18,6 @@
package org.apache.hudi.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -31,6 +27,11 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,7 +132,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
if (partitionOfBucket == taskID) {
LOG.info(String.format("Bootstrapping index. Adding bucket %s , "
+ "Current parallelism: %s , Max parallelism: %s , Current task id: %s",
+ "Current parallelism: %s , Max parallelism: %s , Current task id: %s",
i, parallelism, maxParallelism, taskID));
bucketToLoad.add(i);
}
@@ -155,7 +156,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID));
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found "
+ "during the BucketStreamWriteFunction index bootstrap.", fileID, partitionBucketId));
+ "during the BucketStreamWriteFunction index bootstrap.", fileID, partitionBucketId));
} else {
LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId));
bucketToFileIDMap.put(partitionBucketId, fileID);

View File

@@ -18,10 +18,11 @@
package org.apache.hudi.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
import org.apache.flink.configuration.Configuration;
/**
* Operator for {@link BucketStreamWriteFunction}.
*

View File

@@ -256,9 +256,9 @@ public class BootstrapOperator<I, O extends HoodieRecord<?>>
}
protected boolean shouldLoadFile(String fileId,
int maxParallelism,
int parallelism,
int taskID) {
int maxParallelism,
int parallelism,
int taskID) {
return KeyGroupRangeAssignment.assignKeyToParallelOperator(
fileId, maxParallelism, parallelism) == taskID;
}

View File

@@ -41,6 +41,7 @@ public abstract class AbstractWriteFunction<I> extends ProcessFunction<I, Object
/**
* Handles the operator event sent by the coordinator.
*
* @param event The event
*/
public abstract void handleOperatorEvent(OperatorEvent event);

View File

@@ -86,6 +86,7 @@ public class CkpMetadata implements Serializable {
// -------------------------------------------------------------------------
// WRITE METHODS
// -------------------------------------------------------------------------
/**
* Initialize the message bus, would clean all the messages and publish the last pending instant.
*
@@ -134,7 +135,7 @@ public class CkpMetadata implements Serializable {
/**
* Add a checkpoint commit message.
*
* @param instant The committed instant
* @param instant The committed instant
*/
public void commitInstant(String instant) {
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.COMPLETED));

View File

@@ -18,10 +18,11 @@
package org.apache.hudi.sink.partitioner;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.flink.api.common.functions.Partitioner;
/**
* Bucket index input partitioner.
* The fields to hash can be a subset of the primary key fields.

View File

@@ -18,10 +18,6 @@
package org.apache.hudi.sink.partitioner.profile;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -33,6 +29,11 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,7 +99,7 @@ public class WriteProfiles {
FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
Map<String, FileStatus> uniqueIdToFileStatus = new HashMap<>();
metadataList.forEach(metadata ->
uniqueIdToFileStatus.putAll(getFilesToReadOfInstant(basePath, metadata, fs, tableType)));
uniqueIdToFileStatus.putAll(getFilesToReadOfInstant(basePath, metadata, fs, tableType)));
return uniqueIdToFileStatus.values().toArray(new FileStatus[0]);
}

View File

@@ -18,11 +18,11 @@
package org.apache.hudi.sink.transform;
import org.apache.hudi.adapter.RateLimiterAdapter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -39,7 +39,7 @@ public class RowDataToHoodieFunctionWithRateLimit<I extends RowData, O extends H
/**
* Rate limit per second for per task.
*/
private transient RateLimiter rateLimiter;
private transient RateLimiterAdapter rateLimiter;
public RowDataToHoodieFunctionWithRateLimit(RowType rowType, Configuration config) {
super(rowType, config);
@@ -50,7 +50,7 @@ public class RowDataToHoodieFunctionWithRateLimit<I extends RowData, O extends H
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.rateLimiter =
RateLimiter.create(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
RateLimiterAdapter.create(totalLimit / getRuntimeContext().getNumberOfParallelSubtasks());
}
@Override

View File

@@ -173,12 +173,12 @@ public class Pipelines {
* The bootstrap operator loads the existing data index (primary key to file id mapping),
* then send the indexing data set to subsequent operator(usually the bucket assign operator).
*
* @param conf The configuration
* @param rowType The row type
* @param conf The configuration
* @param rowType The row type
* @param defaultParallelism The default parallelism
* @param dataStream The data stream
* @param bounded Whether the source is bounded
* @param overwrite Whether it is insert overwrite
* @param dataStream The data stream
* @param bounded Whether the source is bounded
* @param overwrite Whether it is insert overwrite
*/
public static DataStream<HoodieRecord> bootstrap(
Configuration conf,
@@ -268,9 +268,9 @@ public class Pipelines {
* <p>The bucket assigner assigns the inputs to suitable file groups, the write task caches
* and flushes the data set to disk.
*
* @param conf The configuration
* @param conf The configuration
* @param defaultParallelism The default parallelism
* @param dataStream The input data stream
* @param dataStream The input data stream
* @return the stream write data stream pipeline
*/
public static DataStream<Object> hoodieStreamWrite(Configuration conf, int defaultParallelism, DataStream<HoodieRecord> dataStream) {
@@ -280,25 +280,25 @@ public class Pipelines {
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
}
@@ -324,8 +324,8 @@ public class Pipelines {
*/
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
return dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.rebalance()
.transform("compact_task",

View File

@@ -18,24 +18,23 @@
package org.apache.hudi.source;
import org.apache.hudi.adapter.AbstractStreamOperatorAdapter;
import org.apache.hudi.adapter.AbstractStreamOperatorFactoryAdapter;
import org.apache.hudi.adapter.MailboxExecutorAdapter;
import org.apache.hudi.adapter.Utils;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -55,11 +54,11 @@ import java.util.concurrent.LinkedBlockingDeque;
* this operator can have multiple parallelism.
*
* <p>As soon as an input split {@link MergeOnReadInputSplit} is received, it is put into a queue,
* the {@link MailboxExecutor} read the actual data of the split.
* the {@code MailboxExecutor} read the actual data of the split.
* This architecture allows the separation of split reading from processing the checkpoint barriers,
* thus removing any potential back-pressure.
*/
public class StreamReadOperator extends AbstractStreamOperator<RowData>
public class StreamReadOperator extends AbstractStreamOperatorAdapter<RowData>
implements OneInputStreamOperator<MergeOnReadInputSplit, RowData> {
private static final Logger LOG = LoggerFactory.getLogger(StreamReadOperator.class);
@@ -69,7 +68,7 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
// It's the same thread that runs this operator and checkpoint actions. Use this executor to schedule only
// splits for subsequent reading, so that a new checkpoint could be triggered without blocking a long time
// for exhausting all scheduled split reading tasks.
private final MailboxExecutor executor;
private final MailboxExecutorAdapter executor;
private MergeOnReadInputFormat format;
@@ -86,7 +85,7 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
private transient volatile SplitState currentSplitState;
private StreamReadOperator(MergeOnReadInputFormat format, ProcessingTimeService timeService,
MailboxExecutor mailboxExecutor) {
MailboxExecutorAdapter mailboxExecutor) {
this.format = Preconditions.checkNotNull(format, "The InputFormat should not be null.");
this.processingTimeService = timeService;
this.executor = Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor should not be null.");
@@ -114,14 +113,12 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
}
}
this.sourceContext = StreamSourceContexts.getSourceContext(
this.sourceContext = Utils.getSourceContext(
getOperatorConfig().getTimeCharacteristic(),
getProcessingTimeService(),
new Object(), // no actual locking needed
getContainingTask(),
output,
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
-1,
true);
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
// Enqueue to process the recovered input splits.
enqueueProcessSplits();
@@ -236,26 +233,19 @@ public class StreamReadOperator extends AbstractStreamOperator<RowData>
IDLE, RUNNING
}
private static class OperatorFactory extends AbstractStreamOperatorFactory<RowData>
implements YieldingOperatorFactory<RowData>, OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
private static class OperatorFactory extends AbstractStreamOperatorFactoryAdapter<RowData>
implements OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> {
private final MergeOnReadInputFormat format;
private transient MailboxExecutor mailboxExecutor;
private OperatorFactory(MergeOnReadInputFormat format) {
this.format = format;
}
@Override
public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
this.mailboxExecutor = mailboxExecutor;
}
@SuppressWarnings("unchecked")
@Override
public <O extends StreamOperator<RowData>> O createStreamOperator(StreamOperatorParameters<RowData> parameters) {
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, mailboxExecutor);
StreamReadOperator operator = new StreamReadOperator(format, processingTimeService, getMailboxExecutorAdapter());
operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
return (O) operator;
}

View File

@@ -79,14 +79,14 @@ public class HoodieFlinkStreamer {
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
DataStream<RowData> dataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
), kafkaProps))
cfg.kafkaTopic,
new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
), kafkaProps))
.name("kafka_source")
.uid("uid_kafka_source");

View File

@@ -33,6 +33,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -42,7 +43,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.util.StreamerUtil;
import java.util.ArrayList;
import java.util.Arrays;

View File

@@ -334,7 +334,7 @@ public class ParquetSplitReaderUtil {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType)fieldType).getPrecision());
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:

View File

@@ -31,8 +31,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;

View File

@@ -76,8 +76,8 @@ public class AvroSchemaConverter {
return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
case MAP:
return DataTypes.MAP(
DataTypes.STRING().notNull(),
convertToDataType(schema.getValueType()))
DataTypes.STRING().notNull(),
convertToDataType(schema.getValueType()))
.notNull();
case UNION:
final Schema actualSchema;

View File

@@ -51,10 +51,10 @@ public class CompactionUtil {
/**
* Schedules a new compaction instant.
*
* @param metaClient The metadata client
* @param writeClient The write client
* @param metaClient The metadata client
* @param writeClient The write client
* @param deltaTimeCompaction Whether the compaction is trigger by elapsed delta time
* @param committed Whether the last instant was committed successfully
* @param committed Whether the last instant was committed successfully
*/
public static void scheduleCompaction(
HoodieTableMetaClient metaClient,

Some files were not shown because too many files have changed in this diff Show More