diff --git a/hudi-flink/pom.xml b/hudi-flink/pom.xml
index a95404cba..0629dbc63 100644
--- a/hudi-flink/pom.xml
+++ b/hudi-flink/pom.xml
@@ -332,5 +332,12 @@
test
test-jar
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+ test
+ test-jar
+
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
index 0623eb942..e35a69371 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java
@@ -144,6 +144,9 @@ public class FilePathUtils {
boolean hivePartition,
String[] partitionKeys) {
LinkedHashMap fullPartSpec = new LinkedHashMap<>();
+ if (partitionKeys.length == 0) {
+ return fullPartSpec;
+ }
List kvs = new ArrayList<>();
int curDepth = 0;
do {
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index f0f4f41b0..7375c3018 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -273,7 +273,7 @@ public class MergeOnReadInputFormat
LinkedHashMap partSpec = FilePathUtils.extractPartitionKeyValues(
new org.apache.hadoop.fs.Path(path).getParent(),
this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION),
- this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","));
+ extractPartitionKeys());
LinkedHashMap partObjects = new LinkedHashMap<>();
partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
defaultPartName.equals(v) ? null : v,
@@ -293,6 +293,13 @@ public class MergeOnReadInputFormat
Long.MAX_VALUE); // read the whole file
}
+ private String[] extractPartitionKeys() {
+ if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PARTITION_PATH_FIELD)) {
+ return new String[0];
+ }
+ return this.conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
+ }
+
private Iterator getLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index f475d56d5..c34fd5909 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -46,13 +46,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
-import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -286,8 +286,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
- void testWriteAndReadParMiddle(boolean streaming) throws Exception {
+ @EnumSource(value = ExecMode.class)
+ void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
+ boolean streaming = execMode == ExecMode.STREAM;
String hoodieTableDDL = "create table t1(\n"
+ " uuid varchar(20),\n"
+ " name varchar(10),\n"
@@ -324,13 +325,13 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
+ "id7,Bob,44,par4,1970-01-01T00:00:07, "
+ "id8,Han,56,par4,1970-01-01T00:00:08]";
- List result = execSelectSql(streamTableEnv, "select * from t1", streaming);
+ List result = execSelectSql(streamTableEnv, "select * from t1", execMode);
assertRowsEquals(result, expected);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
- List result2 = execSelectSql(streamTableEnv, "select * from t1", streaming);
+ List result2 = execSelectSql(streamTableEnv, "select * from t1", execMode);
assertRowsEquals(result2, expected);
}
@@ -516,6 +517,58 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
+ @ParameterizedTest
+ @EnumSource(value = ExecMode.class)
+ void testWriteAndReadDebeziumJson(ExecMode execMode) throws Exception {
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("debezium_json.data")).toString();
+ String sourceDDL = ""
+ + "CREATE TABLE debezium_source(\n"
+ + " id INT NOT NULL,\n"
+ + " ts BIGINT,\n"
+ + " name STRING,\n"
+ + " description STRING,\n"
+ + " weight DOUBLE\n"
+ + ") WITH (\n"
+ + " 'connector' = 'filesystem',\n"
+ + " 'path' = '" + sourcePath + "',\n"
+ + " 'format' = 'debezium-json'\n"
+ + ")";
+ streamTableEnv.executeSql(sourceDDL);
+ String hoodieTableDDL = ""
+ + "CREATE TABLE hoodie_sink(\n"
+ + " id INT NOT NULL,\n"
+ + " ts BIGINT,\n"
+ + " name STRING,"
+ + " weight DOUBLE,"
+ + " PRIMARY KEY (id) NOT ENFORCED"
+ + ") with (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'path' = '" + tempFile.getAbsolutePath() + "',\n"
+ + " 'read.streaming.enabled' = '" + (execMode == ExecMode.STREAM) + "',\n"
+ + " 'write.insert.drop.duplicates' = 'true'"
+ + ")";
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into hoodie_sink select id, ts, name, weight from debezium_source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ final String expected = "["
+ + "101,1000,scooter,3.140000104904175, "
+ + "102,2000,car battery,8.100000381469727, "
+ + "103,3000,12-pack drill bits,0.800000011920929, "
+ + "104,4000,hammer,0.75, "
+ + "105,5000,hammer,0.875, "
+ + "106,10000,hammer,1.0, "
+ + "107,11000,rocks,5.099999904632568, "
+ + "108,8000,jacket,0.10000000149011612, "
+ + "109,9000,spare tire,22.200000762939453, "
+ + "110,14000,jacket,0.5]";
+
+ List result = execSelectSql(streamTableEnv, "select * from hoodie_sink", execMode);
+
+ assertRowsEquals(result, expected);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -533,15 +586,18 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
}
- private List execSelectSql(TableEnvironment tEnv, String select, boolean streaming)
+ private List execSelectSql(TableEnvironment tEnv, String select, ExecMode execMode)
throws TableNotExistException, InterruptedException {
- if (streaming) {
- final String[] splits = select.split(" ");
- final String tableName = splits[splits.length - 1];
- return execSelectSql(tEnv, select, 10, tableName);
- } else {
- return CollectionUtil.iterableToList(
- () -> tEnv.sqlQuery("select * from t1").execute().collect());
+ final String[] splits = select.split(" ");
+ final String tableName = splits[splits.length - 1];
+ switch (execMode) {
+ case STREAM:
+ return execSelectSql(tEnv, select, 10, tableName);
+ case BATCH:
+ return CollectionUtil.iterableToList(
+ () -> tEnv.sqlQuery("select * from " + tableName).execute().collect());
+ default:
+ throw new AssertionError();
}
}
diff --git a/hudi-flink/src/test/resources/debezium_json.data b/hudi-flink/src/test/resources/debezium_json.data
new file mode 100644
index 000000000..d4c02e9e9
--- /dev/null
+++ b/hudi-flink/src/test/resources/debezium_json.data
@@ -0,0 +1,16 @@
+{"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
+{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
+{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
+{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}
+{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
+{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
+{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}
+{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}
+{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}