diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index b626e8833..778598fa6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -95,16 +95,13 @@ public class ParquetSplitReaderUtil { Path path, long splitStart, long splitLength) throws IOException { - List nonPartNames = Arrays.stream(fullFieldNames) + List selNonPartNames = Arrays.stream(selectedFields) + .mapToObj(i -> fullFieldNames[i]) .filter(n -> !partitionSpec.containsKey(n)) .collect(Collectors.toList()); - List selNonPartNames = Arrays.stream(selectedFields) - .mapToObj(i -> fullFieldNames[i]) - .filter(nonPartNames::contains).collect(Collectors.toList()); - - int[] selParquetFields = selNonPartNames.stream() - .mapToInt(nonPartNames::indexOf) + int[] selParquetFields = Arrays.stream(selectedFields) + .filter(i -> !partitionSpec.containsKey(fullFieldNames[i])) .toArray(); ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> { diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index bb80cf156..f475d56d5 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -33,8 +33,11 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; @@ -43,6 +46,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.util.Collection; @@ -281,6 +285,55 @@ public class HoodieDataSourceITCase extends AbstractTestBase { + "id8,Han,56,1970-01-01T00:00:08,par4]"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testWriteAndReadParMiddle(boolean streaming) throws Exception { + String hoodieTableDDL = "create table t1(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " `partition` varchar(20),\n" // test streaming read with partition field in the middle + + " ts timestamp(3),\n" + + " PRIMARY KEY(uuid) NOT ENFORCED\n" + + ")\n" + + "PARTITIONED BY (`partition`)\n" + + "with (\n" + + " 'connector' = 'hudi',\n" + + " 'path' = '" + tempFile.getAbsolutePath() + "',\n" + + " 'read.streaming.enabled' = '" + streaming + "'\n" + + ")"; + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 values\n" + + "('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n" + + "('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n" + + "('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n" + + "('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n" + + "('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n" + + "('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n" + + "('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n" + + "('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')"; + execInsertSql(streamTableEnv, insertInto); + + final String expected = "[" + + "id1,Danny,23,par1,1970-01-01T00:00:01, " + + "id2,Stephen,33,par1,1970-01-01T00:00:02, " + + "id3,Julian,53,par2,1970-01-01T00:00:03, " + + "id4,Fabian,31,par2,1970-01-01T00:00:04, " + + "id5,Sophia,18,par3,1970-01-01T00:00:05, " + + "id6,Emma,20,par3,1970-01-01T00:00:06, " + + "id7,Bob,44,par4,1970-01-01T00:00:07, " + + "id8,Han,56,par4,1970-01-01T00:00:08]"; + + List result = execSelectSql(streamTableEnv, "select * from t1", streaming); + + assertRowsEquals(result, expected); + + // insert another batch of data + execInsertSql(streamTableEnv, insertInto); + List result2 = execSelectSql(streamTableEnv, "select * from t1", streaming); + assertRowsEquals(result2, expected); + } + @ParameterizedTest @EnumSource(value = ExecMode.class) void testInsertOverwrite(ExecMode execMode) { @@ -480,8 +533,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } } - private List execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException { - tEnv.executeSql(TestConfigurations.getCollectSinkDDL("sink")); + private List execSelectSql(TableEnvironment tEnv, String select, boolean streaming) + throws TableNotExistException, InterruptedException { + if (streaming) { + final String[] splits = select.split(" "); + final String tableName = splits[splits.length - 1]; + return execSelectSql(tEnv, select, 10, tableName); + } else { + return CollectionUtil.iterableToList( + () -> tEnv.sqlQuery("select * from t1").execute().collect()); + } + } + + private List execSelectSql(TableEnvironment tEnv, String select, long timeout) + throws InterruptedException, TableNotExistException { + return execSelectSql(tEnv, select, timeout, null); + } + + private List execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable) + throws InterruptedException, TableNotExistException { + final String sinkDDL; + if (sourceTable != null) { + // use the source table schema as the sink schema if the source table was specified, . + ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable); + TableSchema schema = tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema(); + sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema); + } else { + sinkDDL = TestConfigurations.getCollectSinkDDL("sink"); + } + tEnv.executeSql(sinkDDL); TableResult tableResult = tEnv.executeSql("insert into sink " + select); // wait for the timeout then cancels the job TimeUnit.SECONDS.sleep(timeout); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index f5b9fb4b3..c0dbfce62 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -127,6 +127,28 @@ public class TestConfigurations { + ")"; } + public static String getCollectSinkDDL(String tableName, TableSchema tableSchema) { + final StringBuilder builder = new StringBuilder("create table " + tableName + "(\n"); + String[] fieldNames = tableSchema.getFieldNames(); + DataType[] fieldTypes = tableSchema.getFieldDataTypes(); + for (int i = 0; i < fieldNames.length; i++) { + builder.append(" `") + .append(fieldNames[i]) + .append("` ") + .append(fieldTypes[i].toString()); + if (i != fieldNames.length - 1) { + builder.append(","); + } + builder.append("\n"); + } + final String withProps = "" + + ") with (\n" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n" + + ")"; + builder.append(withProps); + return builder.toString(); + } + public static final RowDataSerializer SERIALIZER = new RowDataSerializer(ROW_TYPE); public static Configuration getDefaultConf(String tablePath) {