[HUDI-1961] Add a debezium json integration test case for flink (#3030)
This commit is contained in:
@@ -332,5 +332,12 @@
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-json</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -144,6 +144,9 @@ public class FilePathUtils {
|
||||
boolean hivePartition,
|
||||
String[] partitionKeys) {
|
||||
LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>();
|
||||
if (partitionKeys.length == 0) {
|
||||
return fullPartSpec;
|
||||
}
|
||||
List<String[]> kvs = new ArrayList<>();
|
||||
int curDepth = 0;
|
||||
do {
|
||||
|
||||
@@ -273,7 +273,7 @@ public class MergeOnReadInputFormat
|
||||
LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues(
|
||||
new org.apache.hadoop.fs.Path(path).getParent(),
|
||||
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
|
||||
this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","));
|
||||
extractPartitionKeys());
|
||||
LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
|
||||
partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
|
||||
defaultPartName.equals(v) ? null : v,
|
||||
@@ -293,6 +293,13 @@ public class MergeOnReadInputFormat
|
||||
Long.MAX_VALUE); // read the whole file
|
||||
}
|
||||
|
||||
private String[] extractPartitionKeys() {
|
||||
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PARTITION_PATH_FIELD)) {
|
||||
return new String[0];
|
||||
}
|
||||
return this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
|
||||
}
|
||||
|
||||
private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
|
||||
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
|
||||
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
|
||||
|
||||
@@ -46,13 +46,13 @@ import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -286,8 +286,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
void testWriteAndReadParMiddle(boolean streaming) throws Exception {
|
||||
@EnumSource(value = ExecMode.class)
|
||||
void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
|
||||
boolean streaming = execMode == ExecMode.STREAM;
|
||||
String hoodieTableDDL = "create table t1(\n"
|
||||
+ " uuid varchar(20),\n"
|
||||
+ " name varchar(10),\n"
|
||||
@@ -324,13 +325,13 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
+ "id7,Bob,44,par4,1970-01-01T00:00:07, "
|
||||
+ "id8,Han,56,par4,1970-01-01T00:00:08]";
|
||||
|
||||
List<Row> result = execSelectSql(streamTableEnv, "select * from t1", streaming);
|
||||
List<Row> result = execSelectSql(streamTableEnv, "select * from t1", execMode);
|
||||
|
||||
assertRowsEquals(result, expected);
|
||||
|
||||
// insert another batch of data
|
||||
execInsertSql(streamTableEnv, insertInto);
|
||||
List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", streaming);
|
||||
List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", execMode);
|
||||
assertRowsEquals(result2, expected);
|
||||
}
|
||||
|
||||
@@ -516,6 +517,58 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@EnumSource(value = ExecMode.class)
|
||||
void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
|
||||
String sourcePath = Objects.requireNonNull(Thread.currentThread()
|
||||
.getContextClassLoader().getResource("debezium_json.data")).toString();
|
||||
String sourceDDL = ""
|
||||
+ "CREATE TABLE debezium_source(\n"
|
||||
+ " id INT NOT NULL,\n"
|
||||
+ " ts BIGINT,\n"
|
||||
+ " name STRING,\n"
|
||||
+ " description STRING,\n"
|
||||
+ " weight DOUBLE\n"
|
||||
+ ") WITH (\n"
|
||||
+ " 'connector' = 'filesystem',\n"
|
||||
+ " 'path' = '" + sourcePath + "',\n"
|
||||
+ " 'format' = 'debezium-json'\n"
|
||||
+ ")";
|
||||
streamTableEnv.executeSql(sourceDDL);
|
||||
String hoodieTableDDL = ""
|
||||
+ "CREATE TABLE hoodie_sink(\n"
|
||||
+ " id INT NOT NULL,\n"
|
||||
+ " ts BIGINT,\n"
|
||||
+ " name STRING,"
|
||||
+ " weight DOUBLE,"
|
||||
+ " PRIMARY KEY (id) NOT ENFORCED"
|
||||
+ ") with (\n"
|
||||
+ " 'connector' = 'hudi',\n"
|
||||
+ " 'path' = '" + tempFile.getAbsolutePath() + "',\n"
|
||||
+ " 'read.streaming.enabled' = '" + (execMode == ExecMode.STREAM) + "',\n"
|
||||
+ " 'write.insert.drop.duplicates' = 'true'"
|
||||
+ ")";
|
||||
streamTableEnv.executeSql(hoodieTableDDL);
|
||||
String insertInto = "insert into hoodie_sink select id, ts, name, weight from debezium_source";
|
||||
execInsertSql(streamTableEnv, insertInto);
|
||||
|
||||
final String expected = "["
|
||||
+ "101,1000,scooter,3.140000104904175, "
|
||||
+ "102,2000,car battery,8.100000381469727, "
|
||||
+ "103,3000,12-pack drill bits,0.800000011920929, "
|
||||
+ "104,4000,hammer,0.75, "
|
||||
+ "105,5000,hammer,0.875, "
|
||||
+ "106,10000,hammer,1.0, "
|
||||
+ "107,11000,rocks,5.099999904632568, "
|
||||
+ "108,8000,jacket,0.10000000149011612, "
|
||||
+ "109,9000,spare tire,22.200000762939453, "
|
||||
+ "110,14000,jacket,0.5]";
|
||||
|
||||
List<Row> result = execSelectSql(streamTableEnv, "select * from hoodie_sink", execMode);
|
||||
|
||||
assertRowsEquals(result, expected);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Utilities
|
||||
// -------------------------------------------------------------------------
|
||||
@@ -533,15 +586,18 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
private List<Row> execSelectSql(TableEnvironment tEnv, String select, boolean streaming)
|
||||
private List<Row> execSelectSql(TableEnvironment tEnv, String select, ExecMode execMode)
|
||||
throws TableNotExistException, InterruptedException {
|
||||
if (streaming) {
|
||||
final String[] splits = select.split(" ");
|
||||
final String tableName = splits[splits.length - 1];
|
||||
return execSelectSql(tEnv, select, 10, tableName);
|
||||
} else {
|
||||
return CollectionUtil.iterableToList(
|
||||
() -> tEnv.sqlQuery("select * from t1").execute().collect());
|
||||
final String[] splits = select.split(" ");
|
||||
final String tableName = splits[splits.length - 1];
|
||||
switch (execMode) {
|
||||
case STREAM:
|
||||
return execSelectSql(tEnv, select, 10, tableName);
|
||||
case BATCH:
|
||||
return CollectionUtil.iterableToList(
|
||||
() -> tEnv.sqlQuery("select * from " + tableName).execute().collect());
|
||||
default:
|
||||
throw new AssertionError();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
16
hudi-flink/src/test/resources/debezium_json.data
Normal file
16
hudi-flink/src/test/resources/debezium_json.data
Normal file
@@ -0,0 +1,16 @@
|
||||
{"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
|
||||
{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
|
||||
{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
|
||||
{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
|
||||
{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
|
||||
{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
|
||||
{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
|
||||
{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
|
||||
{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
|
||||
{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
|
||||
{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}
|
||||
{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
|
||||
{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
|
||||
{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}
|
||||
{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}
|
||||
{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
|
||||
Reference in New Issue
Block a user