[HUDI-1647] Supports snapshot read for Flink (#2613)
This commit is contained in:
@@ -34,6 +34,7 @@ import org.apache.hudi.sink.CommitSink;
|
||||
import org.apache.hudi.streamer.FlinkStreamerConfig;
|
||||
import org.apache.hudi.util.AvroSchemaConverter;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
import org.apache.hudi.utils.source.ContinuousFileSource;
|
||||
|
||||
import org.apache.flink.api.common.JobStatus;
|
||||
import org.apache.flink.api.common.io.FilePathFilter;
|
||||
@@ -111,14 +112,11 @@ public class StreamWriteITCase extends TestLogger {
|
||||
String sourcePath = Objects.requireNonNull(Thread.currentThread()
|
||||
.getContextClassLoader().getResource("test_source.data")).toString();
|
||||
|
||||
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
|
||||
format.setFilesFilter(FilePathFilter.createDefaultFilter());
|
||||
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
|
||||
format.setCharsetName("UTF-8");
|
||||
|
||||
DataStream<Object> dataStream = execEnv
|
||||
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
|
||||
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
|
||||
// use continuous file source to trigger checkpoint
|
||||
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
|
||||
.name("continuous_file_source")
|
||||
.setParallelism(1)
|
||||
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
|
||||
.setParallelism(4)
|
||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
|
||||
@@ -136,14 +134,8 @@ public class StreamWriteITCase extends TestLogger {
|
||||
execEnv.addOperator(dataStream.getTransformation());
|
||||
|
||||
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
|
||||
if (client.getJobStatus().get() != JobStatus.FAILED) {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(8);
|
||||
client.cancel();
|
||||
} catch (Throwable var1) {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
// wait for the streaming job to finish
|
||||
client.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
|
||||
|
||||
TestData.checkWrittenFullData(tempFile, EXPECTED);
|
||||
}
|
||||
@@ -175,14 +167,11 @@ public class StreamWriteITCase extends TestLogger {
|
||||
String sourcePath = Objects.requireNonNull(Thread.currentThread()
|
||||
.getContextClassLoader().getResource("test_source.data")).toString();
|
||||
|
||||
TextInputFormat format = new TextInputFormat(new Path(sourcePath));
|
||||
format.setFilesFilter(FilePathFilter.createDefaultFilter());
|
||||
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
|
||||
format.setCharsetName("UTF-8");
|
||||
|
||||
execEnv
|
||||
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
|
||||
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
|
||||
// use continuous file source to trigger checkpoint
|
||||
.addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), 2))
|
||||
.name("continuous_file_source")
|
||||
.setParallelism(1)
|
||||
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
|
||||
.setParallelism(4)
|
||||
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
|
||||
@@ -214,14 +203,8 @@ public class StreamWriteITCase extends TestLogger {
|
||||
.setParallelism(1);
|
||||
|
||||
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph(conf.getString(FlinkOptions.TABLE_NAME)));
|
||||
if (client.getJobStatus().get() != JobStatus.FAILED) {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(8);
|
||||
client.cancel();
|
||||
} catch (Throwable var1) {
|
||||
// ignored
|
||||
}
|
||||
}
|
||||
// wait for the streaming job to finish
|
||||
client.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
|
||||
|
||||
TestData.checkWrittenFullData(tempFile, EXPECTED);
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ public class StreamWriteOperatorCoordinatorTest {
|
||||
@BeforeEach
|
||||
public void before() throws Exception {
|
||||
coordinator = new StreamWriteOperatorCoordinator(
|
||||
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2);
|
||||
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2, false);
|
||||
coordinator.start();
|
||||
}
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ public class StreamWriteFunctionWrapper<I> {
|
||||
this.gateway = new MockOperatorEventGateway();
|
||||
this.conf = conf;
|
||||
// one function
|
||||
this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
|
||||
this.coordinator = new StreamWriteOperatorCoordinator(conf, 1, false);
|
||||
this.functionInitializationContext = new MockFunctionInitializationContext();
|
||||
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
|
||||
}
|
||||
|
||||
@@ -20,13 +20,17 @@ package org.apache.hudi.operator.utils;
|
||||
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.streamer.FlinkStreamerConfig;
|
||||
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
|
||||
|
||||
import org.apache.flink.api.common.ExecutionConfig;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.table.api.DataTypes;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
|
||||
import org.apache.flink.table.types.DataType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
@@ -36,14 +40,56 @@ public class TestConfigurations {
|
||||
private TestConfigurations() {
|
||||
}
|
||||
|
||||
public static final RowType ROW_TYPE = (RowType) DataTypes.ROW(
|
||||
public static final DataType ROW_DATA_TYPE = DataTypes.ROW(
|
||||
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
|
||||
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
|
||||
DataTypes.FIELD("age", DataTypes.INT()),
|
||||
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
|
||||
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
|
||||
.notNull()
|
||||
.getLogicalType();
|
||||
.notNull();
|
||||
|
||||
public static final RowType ROW_TYPE = (RowType) ROW_DATA_TYPE.getLogicalType();
|
||||
|
||||
public static final TableSchema TABLE_SCHEMA = TableSchema.builder()
|
||||
.fields(
|
||||
ROW_TYPE.getFieldNames().toArray(new String[0]),
|
||||
ROW_DATA_TYPE.getChildren().toArray(new DataType[0]))
|
||||
.build();
|
||||
|
||||
public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
|
||||
String createTable = "create table " + tableName + "(\n"
|
||||
+ " uuid varchar(20),\n"
|
||||
+ " name varchar(10),\n"
|
||||
+ " age int,\n"
|
||||
+ " ts timestamp(3),\n"
|
||||
+ " `partition` varchar(20)\n"
|
||||
+ ")\n"
|
||||
+ "PARTITIONED BY (`partition`)\n"
|
||||
+ "with (\n"
|
||||
+ " 'connector' = 'hudi'";
|
||||
StringBuilder builder = new StringBuilder(createTable);
|
||||
if (options.size() != 0) {
|
||||
options.forEach((k, v) -> builder.append(",\n")
|
||||
.append(" '").append(k).append("' = '").append(v).append("'"));
|
||||
}
|
||||
builder.append("\n)");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
public static String getFileSourceDDL(String tableName) {
|
||||
String sourcePath = Objects.requireNonNull(Thread.currentThread()
|
||||
.getContextClassLoader().getResource("test_source.data")).toString();
|
||||
return "create table " + tableName + "(\n"
|
||||
+ " uuid varchar(20),\n"
|
||||
+ " name varchar(10),\n"
|
||||
+ " age int,\n"
|
||||
+ " ts timestamp(3),\n"
|
||||
+ " `partition` varchar(20)\n"
|
||||
+ ") with (\n"
|
||||
+ " 'connector' = '" + ContinuousFileSourceFactory.FACTORY_ID + "',\n"
|
||||
+ " 'path' = '" + sourcePath + "'\n"
|
||||
+ ")";
|
||||
}
|
||||
|
||||
public static final RowDataSerializer SERIALIZER = new RowDataSerializer(new ExecutionConfig(), ROW_TYPE);
|
||||
|
||||
|
||||
@@ -25,14 +25,19 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.table.HoodieFlinkTable;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.data.StringData;
|
||||
import org.apache.flink.table.data.TimestampData;
|
||||
import org.apache.flink.table.data.binary.BinaryRowData;
|
||||
import org.apache.flink.table.data.conversion.DataStructureConverter;
|
||||
import org.apache.flink.table.data.conversion.DataStructureConverters;
|
||||
import org.apache.flink.table.data.writer.BinaryRowWriter;
|
||||
import org.apache.flink.table.data.writer.BinaryWriter;
|
||||
import org.apache.flink.table.runtime.types.InternalSerializers;
|
||||
@@ -112,27 +117,46 @@ public class TestData {
|
||||
TimestampData.fromEpochMillis(1), StringData.fromString("par1"))));
|
||||
}
|
||||
|
||||
public static List<RowData> DATA_SET_FOUR = Arrays.asList(
|
||||
// update: advance the age by 1
|
||||
binaryRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,
|
||||
TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
|
||||
binaryRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34,
|
||||
TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
|
||||
binaryRow(StringData.fromString("id3"), StringData.fromString("Julian"), 54,
|
||||
TimestampData.fromEpochMillis(4), StringData.fromString("par2")),
|
||||
binaryRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 32,
|
||||
TimestampData.fromEpochMillis(5), StringData.fromString("par2")),
|
||||
// same with before
|
||||
binaryRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18,
|
||||
TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
|
||||
// new data
|
||||
binaryRow(StringData.fromString("id9"), StringData.fromString("Jane"), 19,
|
||||
TimestampData.fromEpochMillis(6), StringData.fromString("par3")),
|
||||
binaryRow(StringData.fromString("id10"), StringData.fromString("Ella"), 38,
|
||||
TimestampData.fromEpochMillis(7), StringData.fromString("par4")),
|
||||
binaryRow(StringData.fromString("id11"), StringData.fromString("Phoebe"), 52,
|
||||
TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
|
||||
);
|
||||
/**
|
||||
* Returns string format of a list of RowData.
|
||||
*/
|
||||
public static String rowDataToString(List<RowData> rows) {
|
||||
DataStructureConverter<Object, Object> converter =
|
||||
DataStructureConverters.getConverter(TestConfigurations.ROW_DATA_TYPE);
|
||||
return rows.stream()
|
||||
.map(row -> converter.toExternal(row).toString())
|
||||
.sorted(Comparator.naturalOrder())
|
||||
.collect(Collectors.toList()).toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a list of row data with Hoodie format base on the given configuration.
|
||||
*
|
||||
* @param dataBuffer The data buffer to write
|
||||
* @param conf The flink configuration
|
||||
* @throws Exception if error occurs
|
||||
*/
|
||||
public static void writeData(
|
||||
List<RowData> dataBuffer,
|
||||
Configuration conf) throws Exception {
|
||||
StreamWriteFunctionWrapper<RowData> funcWrapper = new StreamWriteFunctionWrapper<>(
|
||||
conf.getString(FlinkOptions.PATH),
|
||||
conf);
|
||||
funcWrapper.openFunction();
|
||||
|
||||
for (RowData rowData : dataBuffer) {
|
||||
funcWrapper.invoke(rowData);
|
||||
}
|
||||
|
||||
// this triggers the data write and event send
|
||||
funcWrapper.checkpointFunction(1);
|
||||
|
||||
final OperatorEvent nextEvent = funcWrapper.getNextEvent();
|
||||
funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
|
||||
funcWrapper.checkpointComplete(1);
|
||||
|
||||
funcWrapper.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the source data TestConfigurations.DATA_SET_ONE are written as expected.
|
||||
|
||||
@@ -0,0 +1,162 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.source;
|
||||
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.operator.utils.TestConfigurations;
|
||||
|
||||
import org.apache.flink.table.api.EnvironmentSettings;
|
||||
import org.apache.flink.table.api.TableEnvironment;
|
||||
import org.apache.flink.table.api.TableResult;
|
||||
import org.apache.flink.table.api.config.ExecutionConfigOptions;
|
||||
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
|
||||
import org.apache.flink.test.util.AbstractTestBase;
|
||||
import org.apache.flink.types.Row;
|
||||
import org.apache.flink.util.CollectionUtil;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
/**
|
||||
* IT cases for Hoodie table source and sink.
|
||||
*
|
||||
* Note: should add more SQL cases when batch write is supported.
|
||||
*/
|
||||
public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
private TableEnvironment streamTableEnv;
|
||||
private TableEnvironment batchTableEnv;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() {
|
||||
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
|
||||
streamTableEnv = TableEnvironmentImpl.create(settings);
|
||||
streamTableEnv.getConfig().getConfiguration()
|
||||
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
|
||||
streamTableEnv.getConfig().getConfiguration()
|
||||
.setString("execution.checkpointing.interval", "2s");
|
||||
|
||||
settings = EnvironmentSettings.newInstance().inBatchMode().build();
|
||||
batchTableEnv = TableEnvironmentImpl.create(settings);
|
||||
batchTableEnv.getConfig().getConfiguration()
|
||||
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
|
||||
}
|
||||
|
||||
@TempDir
|
||||
File tempFile;
|
||||
|
||||
@Test
|
||||
void testStreamWriteBatchRead() {
|
||||
// create filesystem table named source
|
||||
String createSource = TestConfigurations.getFileSourceDDL("source");
|
||||
streamTableEnv.executeSql(createSource);
|
||||
|
||||
Map<String, String> options = new HashMap<>();
|
||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||
options.put(FlinkOptions.READ_SCHEMA_FILE_PATH.key(),
|
||||
Objects.requireNonNull(Thread.currentThread()
|
||||
.getContextClassLoader().getResource("test_read_schema.avsc")).toString());
|
||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||
streamTableEnv.executeSql(hoodieTableDDL);
|
||||
String insertInto = "insert into t1 select * from source";
|
||||
execInsertSql(streamTableEnv, insertInto);
|
||||
|
||||
List<Row> rows = CollectionUtil.iterableToList(
|
||||
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
|
||||
final String expected = "[id1,Danny,23,1970-01-01T00:00:01,par1, "
|
||||
+ "id2,Stephen,33,1970-01-01T00:00:02,par1, "
|
||||
+ "id3,Julian,53,1970-01-01T00:00:03,par2, "
|
||||
+ "id4,Fabian,31,1970-01-01T00:00:04,par2, "
|
||||
+ "id5,Sophia,18,1970-01-01T00:00:05,par3, "
|
||||
+ "id6,Emma,20,1970-01-01T00:00:06,par3, "
|
||||
+ "id7,Bob,44,1970-01-01T00:00:07,par4, "
|
||||
+ "id8,Han,56,1970-01-01T00:00:08,par4]";
|
||||
assertRowsEquals(rows, expected);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBatchWriteAndRead() {
|
||||
Map<String, String> options = new HashMap<>();
|
||||
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
|
||||
options.put(FlinkOptions.READ_SCHEMA_FILE_PATH.key(),
|
||||
Objects.requireNonNull(Thread.currentThread()
|
||||
.getContextClassLoader().getResource("test_read_schema.avsc")).toString());
|
||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||
batchTableEnv.executeSql(hoodieTableDDL);
|
||||
String insertInto = "insert into t1 values\n"
|
||||
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
|
||||
+ "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
|
||||
+ "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
|
||||
+ "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
|
||||
+ "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
|
||||
+ "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
|
||||
+ "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
|
||||
+ "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
|
||||
|
||||
execInsertSql(batchTableEnv, insertInto);
|
||||
|
||||
List<Row> rows = CollectionUtil.iterableToList(
|
||||
() -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
|
||||
final String expected = "[id1,Danny,23,1970-01-01T00:00:01,par1, "
|
||||
+ "id2,Stephen,33,1970-01-01T00:00:02,par1, "
|
||||
+ "id3,Julian,53,1970-01-01T00:00:03,par2, "
|
||||
+ "id4,Fabian,31,1970-01-01T00:00:04,par2, "
|
||||
+ "id5,Sophia,18,1970-01-01T00:00:05,par3, "
|
||||
+ "id6,Emma,20,1970-01-01T00:00:06,par3, "
|
||||
+ "id7,Bob,44,1970-01-01T00:00:07,par4, "
|
||||
+ "id8,Han,56,1970-01-01T00:00:08,par4]";
|
||||
assertRowsEquals(rows, expected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sort the {@code rows} using field at index 0 and asserts
|
||||
* it equals with the expected string {@code expected}.
|
||||
*
|
||||
* @param rows Actual result rows
|
||||
* @param expected Expected string of the sorted rows
|
||||
*/
|
||||
private static void assertRowsEquals(List<Row> rows, String expected) {
|
||||
String rowsString = rows.stream()
|
||||
.sorted(Comparator.comparing(o -> o.getField(0).toString()))
|
||||
.collect(Collectors.toList()).toString();
|
||||
assertThat(rowsString, is(expected));
|
||||
}
|
||||
|
||||
private void execInsertSql(TableEnvironment tEnv, String insert) {
|
||||
TableResult tableResult = tEnv.executeSql(insert);
|
||||
// wait to finish
|
||||
try {
|
||||
tableResult.getJobClient().get()
|
||||
.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
|
||||
} catch (InterruptedException | ExecutionException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,122 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.source;
|
||||
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.operator.utils.TestConfigurations;
|
||||
import org.apache.hudi.operator.utils.TestData;
|
||||
import org.apache.hudi.source.format.mor.MergeOnReadInputFormat;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.api.common.io.FileInputFormat;
|
||||
import org.apache.flink.api.common.io.InputFormat;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.core.fs.Path;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
/**
|
||||
* Test cases for HoodieTableSource.
|
||||
*/
|
||||
public class HoodieTableSourceTest {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HoodieTableSourceTest.class);
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
@TempDir
|
||||
File tempFile;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
final String path = tempFile.getAbsolutePath();
|
||||
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
StreamerUtil.initTableIfNotExists(conf);
|
||||
IntStream.range(1, 5)
|
||||
.forEach(i -> new File(path + File.separator + "par" + i).mkdirs());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetReadPaths() {
|
||||
HoodieTableSource tableSource = new HoodieTableSource(
|
||||
TestConfigurations.TABLE_SCHEMA,
|
||||
new Path(tempFile.getPath()),
|
||||
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
|
||||
"default-par",
|
||||
conf);
|
||||
Path[] paths = tableSource.getReadPaths();
|
||||
assertNotNull(paths);
|
||||
String[] names = Arrays.stream(paths).map(Path::getName)
|
||||
.sorted(Comparator.naturalOrder()).toArray(String[]::new);
|
||||
assertThat(Arrays.toString(names), is("[par1, par2, par3, par4]"));
|
||||
// apply partition pruning
|
||||
Map<String, String> partitions = new HashMap<>();
|
||||
partitions.put("partition", "par1");
|
||||
|
||||
tableSource = (HoodieTableSource) tableSource
|
||||
.applyPartitionPruning(Collections.singletonList(partitions));
|
||||
|
||||
Path[] paths2 = tableSource.getReadPaths();
|
||||
assertNotNull(paths2);
|
||||
String[] names2 = Arrays.stream(paths2).map(Path::getName)
|
||||
.sorted(Comparator.naturalOrder()).toArray(String[]::new);
|
||||
assertThat(Arrays.toString(names2), is("[par1]"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetInputFormat() throws Exception {
|
||||
// write some data to let the TableSchemaResolver get the right instant
|
||||
TestData.writeData(TestData.DATA_SET_ONE, conf);
|
||||
|
||||
HoodieTableSource tableSource = new HoodieTableSource(
|
||||
TestConfigurations.TABLE_SCHEMA,
|
||||
new Path(tempFile.getPath()),
|
||||
Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")),
|
||||
"default-par",
|
||||
conf);
|
||||
InputFormat<RowData, ?> inputFormat = tableSource.getInputFormat();
|
||||
assertThat(inputFormat, is(instanceOf(FileInputFormat.class)));
|
||||
conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||
inputFormat = tableSource.getInputFormat();
|
||||
assertThat(inputFormat, is(instanceOf(MergeOnReadInputFormat.class)));
|
||||
conf.setString(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_INCREMENTAL);
|
||||
assertThrows(HoodieException.class,
|
||||
() -> tableSource.getInputFormat(),
|
||||
"Invalid query type : 'incremental'. Only 'snapshot' is supported now");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,197 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.source.format;
|
||||
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.operator.utils.TestConfigurations;
|
||||
import org.apache.hudi.operator.utils.TestData;
|
||||
import org.apache.hudi.source.HoodieTableSource;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.api.common.io.InputFormat;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.core.fs.Path;
|
||||
import org.apache.flink.core.io.InputSplit;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
|
||||
/**
|
||||
* Test cases for MergeOnReadInputFormat and ParquetInputFormat.
|
||||
*/
|
||||
public class InputFormatTest {
|
||||
|
||||
private HoodieTableSource tableSource;
|
||||
private Configuration conf;
|
||||
|
||||
@TempDir
|
||||
File tempFile;
|
||||
|
||||
void beforeEach(String tableType) throws IOException {
|
||||
conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setString(FlinkOptions.TABLE_TYPE, tableType);
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // close the async compaction
|
||||
|
||||
StreamerUtil.initTableIfNotExists(conf);
|
||||
this.tableSource = new HoodieTableSource(
|
||||
TestConfigurations.TABLE_SCHEMA,
|
||||
new Path(tempFile.getAbsolutePath()),
|
||||
Collections.singletonList("partition"),
|
||||
"default",
|
||||
conf);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {
|
||||
FlinkOptions.TABLE_TYPE_COPY_ON_WRITE,
|
||||
FlinkOptions.TABLE_TYPE_MERGE_ON_READ})
|
||||
void testRead(String tableType) throws Exception {
|
||||
beforeEach(tableType);
|
||||
|
||||
TestData.writeData(TestData.DATA_SET_ONE, conf);
|
||||
|
||||
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
|
||||
|
||||
List<RowData> result = readData(inputFormat);
|
||||
|
||||
String actual = TestData.rowDataToString(result);
|
||||
String expected = TestData.rowDataToString(TestData.DATA_SET_ONE);
|
||||
assertThat(actual, is(expected));
|
||||
|
||||
// write another commit to read again
|
||||
TestData.writeData(TestData.DATA_SET_TWO, conf);
|
||||
|
||||
// refresh the input format
|
||||
this.tableSource.reloadActiveTimeline();
|
||||
inputFormat = this.tableSource.getInputFormat();
|
||||
|
||||
result = readData(inputFormat);
|
||||
|
||||
actual = TestData.rowDataToString(result);
|
||||
expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
|
||||
+ "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
|
||||
+ "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
|
||||
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
|
||||
+ "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
|
||||
+ "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
|
||||
+ "id5,Sophia,18,1970-01-01T00:00:00.005,par3, "
|
||||
+ "id6,Emma,20,1970-01-01T00:00:00.006,par3, "
|
||||
+ "id7,Bob,44,1970-01-01T00:00:00.007,par4, "
|
||||
+ "id8,Han,56,1970-01-01T00:00:00.008,par4, "
|
||||
+ "id9,Jane,19,1970-01-01T00:00:00.006,par3]";
|
||||
assertThat(actual, is(expected));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReadBaseAndLogFiles() throws Exception {
|
||||
beforeEach(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||
|
||||
// write parquet first with compaction
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
|
||||
TestData.writeData(TestData.DATA_SET_ONE, conf);
|
||||
|
||||
InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
|
||||
|
||||
List<RowData> result = readData(inputFormat);
|
||||
|
||||
String actual = TestData.rowDataToString(result);
|
||||
String expected = TestData.rowDataToString(TestData.DATA_SET_ONE);
|
||||
assertThat(actual, is(expected));
|
||||
|
||||
// write another commit using logs and read again
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||
TestData.writeData(TestData.DATA_SET_TWO, conf);
|
||||
|
||||
// refresh the input format
|
||||
this.tableSource.reloadActiveTimeline();
|
||||
inputFormat = this.tableSource.getInputFormat();
|
||||
|
||||
result = readData(inputFormat);
|
||||
|
||||
actual = TestData.rowDataToString(result);
|
||||
expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
|
||||
+ "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
|
||||
+ "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
|
||||
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
|
||||
+ "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
|
||||
+ "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
|
||||
+ "id5,Sophia,18,1970-01-01T00:00:00.005,par3, "
|
||||
+ "id6,Emma,20,1970-01-01T00:00:00.006,par3, "
|
||||
+ "id7,Bob,44,1970-01-01T00:00:00.007,par4, "
|
||||
+ "id8,Han,56,1970-01-01T00:00:00.008,par4, "
|
||||
+ "id9,Jane,19,1970-01-01T00:00:00.006,par3]";
|
||||
assertThat(actual, is(expected));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {
|
||||
FlinkOptions.TABLE_TYPE_COPY_ON_WRITE,
|
||||
FlinkOptions.TABLE_TYPE_MERGE_ON_READ})
|
||||
void testReadWithPartitionPrune(String tableType) throws Exception {
|
||||
beforeEach(tableType);
|
||||
|
||||
TestData.writeData(TestData.DATA_SET_ONE, conf);
|
||||
|
||||
Map<String, String> prunedPartitions = new HashMap<>();
|
||||
prunedPartitions.put("partition", "par1");
|
||||
// prune to only be with partition 'par1'
|
||||
HoodieTableSource newSource = (HoodieTableSource) tableSource
|
||||
.applyPartitionPruning(Collections.singletonList(prunedPartitions));
|
||||
InputFormat<RowData, ?> inputFormat = newSource.getInputFormat();
|
||||
|
||||
List<RowData> result = readData(inputFormat);
|
||||
|
||||
String actual = TestData.rowDataToString(result);
|
||||
String expected = "[id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1]";
|
||||
assertThat(actual, is(expected));
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
@SuppressWarnings("unchecked, rawtypes")
|
||||
private static List<RowData> readData(InputFormat inputFormat) throws IOException {
|
||||
InputSplit[] inputSplits = inputFormat.createInputSplits(1);
|
||||
|
||||
List<RowData> result = new ArrayList<>();
|
||||
|
||||
for (InputSplit inputSplit : inputSplits) {
|
||||
inputFormat.open(inputSplit);
|
||||
while (!inputFormat.reachedEnd()) {
|
||||
result.add(TestConfigurations.SERIALIZER.copy((RowData) inputFormat.nextRecord(null))); // no reuse
|
||||
}
|
||||
inputFormat.close();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utils.factory;
|
||||
|
||||
import org.apache.hudi.operator.FlinkOptions;
|
||||
import org.apache.hudi.utils.source.ContinuousFileSource;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.core.fs.Path;
|
||||
import org.apache.flink.table.api.ValidationException;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.factories.FactoryUtil;
|
||||
import org.apache.flink.table.factories.TableSourceFactory;
|
||||
import org.apache.flink.table.sources.TableSource;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Factory for ContinuousFileSource.
|
||||
*/
|
||||
public class ContinuousFileSourceFactory implements TableSourceFactory<RowData> {
|
||||
public static final String FACTORY_ID = "continuous-file-source";
|
||||
|
||||
@Override
|
||||
public TableSource<RowData> createTableSource(Context context) {
|
||||
Configuration conf = FlinkOptions.fromMap(context.getTable().getOptions());
|
||||
Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
|
||||
new ValidationException("Option [path] should be not empty.")));
|
||||
return new ContinuousFileSource(context.getTable().getSchema(), path, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> requiredContext() {
|
||||
Map<String, String> context = new HashMap<>();
|
||||
context.put(FactoryUtil.CONNECTOR.key(), FACTORY_ID);
|
||||
return context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> supportedProperties() {
|
||||
return Collections.singletonList("*");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,173 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utils.source;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.core.fs.Path;
|
||||
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
|
||||
import org.apache.flink.formats.json.TimestampFormat;
|
||||
import org.apache.flink.runtime.state.CheckpointListener;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.source.SourceFunction;
|
||||
import org.apache.flink.table.api.TableSchema;
|
||||
import org.apache.flink.table.data.RowData;
|
||||
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
|
||||
import org.apache.flink.table.sources.StreamTableSource;
|
||||
import org.apache.flink.table.types.DataType;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* A continuous file source that can trigger checkpoints continuously.
|
||||
*
|
||||
* <p>It loads the data in the specified file and split the data into number of checkpoints batches.
|
||||
* Say, if you want 4 checkpoints and there are 8 records in the file, the emit strategy is:
|
||||
*
|
||||
* <pre>
|
||||
* | 2 records | 2 records | 2 records | 2 records |
|
||||
* | cp1 | cp2 |cp3 | cp4 |
|
||||
* </pre>
|
||||
*
|
||||
* <p>If all the data are flushed out, it waits for the next checkpoint to finish and tear down the source.
|
||||
*/
|
||||
public class ContinuousFileSource implements StreamTableSource<RowData> {
|
||||
|
||||
private final TableSchema tableSchema;
|
||||
private final Path path;
|
||||
private final Configuration conf;
|
||||
|
||||
public ContinuousFileSource(
|
||||
TableSchema tableSchema,
|
||||
Path path,
|
||||
Configuration conf) {
|
||||
this.tableSchema = tableSchema;
|
||||
this.path = path;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
|
||||
final RowType rowType = (RowType) this.tableSchema.toRowDataType().getLogicalType();
|
||||
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
|
||||
rowType,
|
||||
new RowDataTypeInfo(rowType),
|
||||
false,
|
||||
true,
|
||||
TimestampFormat.ISO_8601);
|
||||
|
||||
return execEnv.addSource(new BoundedSourceFunction(this.path, 2))
|
||||
.name("continuous_file_source")
|
||||
.setParallelism(1)
|
||||
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)),
|
||||
new RowDataTypeInfo(rowType));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableSchema getTableSchema() {
|
||||
return this.tableSchema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataType getProducedDataType() {
|
||||
return this.tableSchema.toRowDataType().bridgedTo(RowData.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Source function that partition the data into given number checkpoints batches.
|
||||
*/
|
||||
public static class BoundedSourceFunction implements SourceFunction<String>, CheckpointListener {
|
||||
private final Path path;
|
||||
private List<String> dataBuffer;
|
||||
|
||||
private final int checkpoints;
|
||||
private final AtomicInteger currentCP = new AtomicInteger(0);
|
||||
|
||||
private volatile boolean isRunning = true;
|
||||
|
||||
public BoundedSourceFunction(Path path, int checkpoints) {
|
||||
this.path = path;
|
||||
this.checkpoints = checkpoints;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(SourceContext<String> context) throws Exception {
|
||||
if (this.dataBuffer == null) {
|
||||
loadDataBuffer();
|
||||
}
|
||||
int oldCP = this.currentCP.get();
|
||||
boolean finish = false;
|
||||
while (isRunning) {
|
||||
int batchSize = this.dataBuffer.size() / this.checkpoints;
|
||||
int start = batchSize * oldCP;
|
||||
synchronized (context.getCheckpointLock()) {
|
||||
for (int i = start; i < start + batchSize; i++) {
|
||||
if (i >= this.dataBuffer.size()) {
|
||||
finish = true;
|
||||
break;
|
||||
// wait for the next checkpoint and exit
|
||||
}
|
||||
context.collect(this.dataBuffer.get(i));
|
||||
}
|
||||
}
|
||||
oldCP++;
|
||||
while (this.currentCP.get() < oldCP) {
|
||||
synchronized (context.getCheckpointLock()) {
|
||||
context.getCheckpointLock().wait(10);
|
||||
}
|
||||
}
|
||||
if (finish || !isRunning) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
this.isRunning = false;
|
||||
}
|
||||
|
||||
private void loadDataBuffer() {
|
||||
this.dataBuffer = new ArrayList<>();
|
||||
try (BufferedReader reader =
|
||||
new BufferedReader(new FileReader(this.path.getPath()))) {
|
||||
String line = reader.readLine();
|
||||
while (line != null) {
|
||||
this.dataBuffer.add(line);
|
||||
// read next line
|
||||
line = reader.readLine();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Read file " + this.path + " error", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyCheckpointComplete(long l) throws Exception {
|
||||
this.currentCP.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.hudi.factory.HoodieTableFactory
|
||||
org.apache.hudi.utils.factory.ContinuousFileSourceFactory
|
||||
Reference in New Issue
Block a user