[HUDI-3953]Flink Hudi module should support low-level source and sink api (#5445)
Co-authored-by: jerryyue <jerryyue@didiglobal.com>
This commit is contained in:
@@ -0,0 +1,271 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.util;
|
||||
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.table.HoodieTableFactory;
|
||||
|
||||
import org.apache.flink.configuration.ConfigOption;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSink;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.table.api.EnvironmentSettings;
|
||||
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
||||
import org.apache.flink.table.catalog.Catalog;
|
||||
import org.apache.flink.table.catalog.ObjectIdentifier;
|
||||
import org.apache.flink.table.catalog.ObjectPath;
|
||||
import org.apache.flink.table.catalog.ResolvedCatalogTable;
|
||||
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
|
||||
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
|
||||
import org.apache.flink.table.connector.source.DataStreamScanProvider;
|
||||
import org.apache.flink.table.connector.source.ScanTableSource;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.factories.FactoryUtil;
|
||||
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
|
||||
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A tool class to construct hoodie flink pipeline.
|
||||
*
|
||||
* <p>How to use ?</p>
|
||||
* Method {@link #builder(String)} returns a pipeline builder. The builder
|
||||
* can then define the hudi table columns, primary keys and partitions.
|
||||
*
|
||||
* <p>An example:</p>
|
||||
* <pre>
|
||||
* HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
|
||||
* DataStreamSink<?> sinkStream = builder
|
||||
* .column("f0 int")
|
||||
* .column("f1 varchar(10)")
|
||||
* .column("f2 varchar(20)")
|
||||
* .pk("f0,f1")
|
||||
* .partition("f2")
|
||||
* .sink(input, false);
|
||||
* </pre>
|
||||
*/
|
||||
public class HoodiePipeline {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class);
|
||||
|
||||
/**
|
||||
* Returns the builder for hoodie pipeline construction.
|
||||
*/
|
||||
public static Builder builder(String tableName) {
|
||||
return new Builder(tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder for hudi source/sink pipeline construction.
|
||||
*/
|
||||
public static class Builder {
|
||||
private final String tableName;
|
||||
private final List<String> columns;
|
||||
private final Map<String, String> options;
|
||||
|
||||
private String pk;
|
||||
private List<String> partitions;
|
||||
|
||||
private Builder(String tableName) {
|
||||
this.tableName = tableName;
|
||||
this.columns = new ArrayList<>();
|
||||
this.options = new HashMap<>();
|
||||
this.partitions = new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a table column definition.
|
||||
*
|
||||
* @param column the column format should be in the form like 'f0 int'
|
||||
*/
|
||||
public Builder column(String column) {
|
||||
this.columns.add(column);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add primary keys.
|
||||
*/
|
||||
public Builder pk(String... pks) {
|
||||
this.pk = String.join(",", pks);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add partition fields.
|
||||
*/
|
||||
public Builder partition(String... partitions) {
|
||||
this.partitions = new ArrayList<>(Arrays.asList(partitions));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a config option.
|
||||
*/
|
||||
public Builder option(ConfigOption<?> option, Object val) {
|
||||
this.options.put(option.key(), val.toString());
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder option(String key, Object val) {
|
||||
this.options.put(key, val.toString());
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder options(Map<String, String> options) {
|
||||
this.options.putAll(options);
|
||||
return this;
|
||||
}
|
||||
|
||||
public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
|
||||
TableDescriptor tableDescriptor = getTableDescriptor();
|
||||
return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
|
||||
}
|
||||
|
||||
public TableDescriptor getTableDescriptor() {
|
||||
EnvironmentSettings environmentSettings = EnvironmentSettings
|
||||
.newInstance()
|
||||
.build();
|
||||
TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(environmentSettings);
|
||||
String sql = getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions);
|
||||
tableEnv.executeSql(sql);
|
||||
String currentCatalog = tableEnv.getCurrentCatalog();
|
||||
ResolvedCatalogTable catalogTable = null;
|
||||
String defaultDatabase = null;
|
||||
try {
|
||||
Catalog catalog = tableEnv.getCatalog(currentCatalog).get();
|
||||
defaultDatabase = catalog.getDefaultDatabase();
|
||||
catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(defaultDatabase, this.tableName));
|
||||
} catch (TableNotExistException e) {
|
||||
throw new HoodieException("Create table " + this.tableName + " exception", e);
|
||||
}
|
||||
ObjectIdentifier tableId = ObjectIdentifier.of(currentCatalog, defaultDatabase, this.tableName);
|
||||
return new TableDescriptor(tableId, catalogTable);
|
||||
}
|
||||
|
||||
public DataStream<RowData> source(StreamExecutionEnvironment execEnv) {
|
||||
TableDescriptor tableDescriptor = getTableDescriptor();
|
||||
return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getResolvedCatalogTable());
|
||||
}
|
||||
}
|
||||
|
||||
private static String getCreateHoodieTableDDL(
|
||||
String tableName,
|
||||
List<String> fields,
|
||||
Map<String, String> options,
|
||||
String pkField,
|
||||
List<String> partitionField) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("create table ")
|
||||
.append(tableName)
|
||||
.append("(\n");
|
||||
for (String field : fields) {
|
||||
builder.append(" ")
|
||||
.append(field)
|
||||
.append(",\n");
|
||||
}
|
||||
builder.append(" PRIMARY KEY(")
|
||||
.append(pkField)
|
||||
.append(") NOT ENFORCED\n")
|
||||
.append(")\n");
|
||||
if (!partitionField.isEmpty()) {
|
||||
String partitons = partitionField
|
||||
.stream()
|
||||
.map(partitionName -> "`" + partitionName + "`")
|
||||
.collect(Collectors.joining(","));
|
||||
builder.append("PARTITIONED BY (")
|
||||
.append(partitons)
|
||||
.append(")\n");
|
||||
}
|
||||
builder.append("with ('connector' = 'hudi'");
|
||||
options.forEach((k, v) -> builder
|
||||
.append(",\n")
|
||||
.append(" '")
|
||||
.append(k)
|
||||
.append("' = '")
|
||||
.append(v)
|
||||
.append("'"));
|
||||
builder.append("\n)");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the data stream sink with given catalog table.
|
||||
*
|
||||
* @param input The input datastream
|
||||
* @param tablePath The table path to the hoodie table in the catalog
|
||||
* @param catalogTable The hoodie catalog table
|
||||
* @param isBounded A flag indicating whether the input data stream is bounded
|
||||
*/
|
||||
private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
|
||||
FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
|
||||
Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
|
||||
HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
|
||||
return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
|
||||
.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
|
||||
.consumeDataStream(input);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the data stream source with given catalog table.
|
||||
*
|
||||
* @param execEnv The execution environment
|
||||
* @param tablePath The table path to the hoodie table in the catalog
|
||||
* @param catalogTable The hoodie catalog table
|
||||
*/
|
||||
private static DataStream<RowData> source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable) {
|
||||
FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable,
|
||||
Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
|
||||
HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
|
||||
DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory
|
||||
.createDynamicTableSource(context))
|
||||
.getScanRuntimeProvider(new ScanRuntimeProviderContext());
|
||||
return dataStreamScanProvider.produceDataStream(execEnv);
|
||||
}
|
||||
|
||||
/***
|
||||
* A POJO that contains tableId and resolvedCatalogTable.
|
||||
*/
|
||||
public static class TableDescriptor {
|
||||
private ObjectIdentifier tableId;
|
||||
private ResolvedCatalogTable resolvedCatalogTable;
|
||||
|
||||
public TableDescriptor(ObjectIdentifier tableId, ResolvedCatalogTable resolvedCatalogTable) {
|
||||
this.tableId = tableId;
|
||||
this.resolvedCatalogTable = resolvedCatalogTable;
|
||||
}
|
||||
|
||||
public ObjectIdentifier getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public ResolvedCatalogTable getResolvedCatalogTable() {
|
||||
return resolvedCatalogTable;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -26,9 +26,11 @@ import org.apache.hudi.sink.transform.ChainedTransformer;
|
||||
import org.apache.hudi.sink.transform.Transformer;
|
||||
import org.apache.hudi.sink.utils.Pipelines;
|
||||
import org.apache.hudi.util.AvroSchemaConverter;
|
||||
import org.apache.hudi.util.HoodiePipeline;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.hudi.utils.TestConfigurations;
|
||||
import org.apache.hudi.utils.TestData;
|
||||
import org.apache.hudi.utils.TestUtils;
|
||||
import org.apache.hudi.utils.source.ContinuousFileSource;
|
||||
|
||||
import org.apache.flink.api.common.JobStatus;
|
||||
@@ -57,6 +59,7 @@ import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -326,4 +329,127 @@ public class ITTestDataStreamWrite extends TestLogger {
|
||||
|
||||
TestData.checkWrittenFullData(tempFile, expected);
|
||||
}
|
||||
|
||||
public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
|
||||
JobClient client = execEnv.executeAsync(jobName);
|
||||
if (isMor) {
|
||||
if (client.getJobStatus().get() != JobStatus.FAILED) {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
|
||||
client.cancel();
|
||||
} catch (Throwable var1) {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// wait for the streaming job to finish
|
||||
client.getJobExecutionResult().get();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHoodiePipelineBuilderSource() throws Exception {
|
||||
//create a StreamExecutionEnvironment instance.
|
||||
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
execEnv.getConfig().disableObjectReuse();
|
||||
execEnv.setParallelism(1);
|
||||
// set up checkpoint interval
|
||||
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
|
||||
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setString(FlinkOptions.TABLE_NAME, "t1");
|
||||
conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
|
||||
|
||||
// write 3 batches of data set
|
||||
TestData.writeData(TestData.dataSetInsert(1, 2), conf);
|
||||
TestData.writeData(TestData.dataSetInsert(3, 4), conf);
|
||||
TestData.writeData(TestData.dataSetInsert(5, 6), conf);
|
||||
|
||||
String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
|
||||
|
||||
Map<String, String> options = new HashMap<>();
|
||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||
options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);
|
||||
|
||||
//read a hoodie table use low-level source api.
|
||||
HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source")
|
||||
.column("uuid string not null")
|
||||
.column("name string")
|
||||
.column("age int")
|
||||
.column("`ts` timestamp(3)")
|
||||
.column("`partition` string")
|
||||
.pk("uuid")
|
||||
.partition("partition")
|
||||
.options(options);
|
||||
DataStream<RowData> rowDataDataStream = builder.source(execEnv);
|
||||
List<RowData> result = new ArrayList<>();
|
||||
rowDataDataStream.executeAndCollect().forEachRemaining(result::add);
|
||||
TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data
|
||||
TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHoodiePipelineBuilderSink() throws Exception {
|
||||
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
Map<String, String> options = new HashMap<>();
|
||||
execEnv.getConfig().disableObjectReuse();
|
||||
execEnv.setParallelism(4);
|
||||
// set up checkpoint interval
|
||||
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
|
||||
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
|
||||
|
||||
options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE");
|
||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||
options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
|
||||
options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
|
||||
options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id");
|
||||
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
|
||||
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
|
||||
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
|
||||
Configuration conf = Configuration.fromMap(options);
|
||||
// Read from file source
|
||||
RowType rowType =
|
||||
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
|
||||
.getLogicalType();
|
||||
|
||||
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
|
||||
rowType,
|
||||
InternalTypeInfo.of(rowType),
|
||||
false,
|
||||
true,
|
||||
TimestampFormat.ISO_8601
|
||||
);
|
||||
String sourcePath = Objects.requireNonNull(Thread.currentThread()
|
||||
.getContextClassLoader().getResource("test_source.data")).toString();
|
||||
|
||||
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
|
||||
format.setFilesFilter(FilePathFilter.createDefaultFilter());
|
||||
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
|
||||
format.setCharsetName("UTF-8");
|
||||
|
||||
DataStream dataStream = execEnv
|
||||
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
|
||||
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
|
||||
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
|
||||
.setParallelism(1);
|
||||
|
||||
|
||||
|
||||
//sink to hoodie table use low-level sink api.
|
||||
HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink")
|
||||
.column("uuid string not null")
|
||||
.column("name string")
|
||||
.column("age int")
|
||||
.column("`ts` timestamp(3)")
|
||||
.column("`partition` string")
|
||||
.pk("uuid")
|
||||
.partition("partition")
|
||||
.options(options);
|
||||
|
||||
builder.sink(dataStream, false);
|
||||
|
||||
execute(execEnv, true, "Api_Sink_Test");
|
||||
TestData.checkWrittenFullData(tempFile, EXPECTED);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user