1
0

[HUDI-1919] Type mismatch when streaming read copy_on_write table using flink (#2986)

* [HUDI-1919] Type mismatch when streaming read copy_on_write table using flink #2976

* Update ParquetSplitReaderUtil.java
This commit is contained in:
Town
2021-05-25 11:36:43 +08:00
committed by GitHub
parent 369a849337
commit aba1eadbfc
3 changed files with 108 additions and 9 deletions

View File

@@ -95,16 +95,13 @@ public class ParquetSplitReaderUtil {
Path path,
long splitStart,
long splitLength) throws IOException {
List<String> nonPartNames = Arrays.stream(fullFieldNames)
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
.filter(n -> !partitionSpec.containsKey(n))
.collect(Collectors.toList());
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
.filter(nonPartNames::contains).collect(Collectors.toList());
int[] selParquetFields = selNonPartNames.stream()
.mapToInt(nonPartNames::indexOf)
int[] selParquetFields = Arrays.stream(selectedFields)
.filter(i -> !partitionSpec.containsKey(fullFieldNames[i]))
.toArray();
ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> {

View File

@@ -33,8 +33,11 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
@@ -43,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.Collection;
@@ -281,6 +285,55 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
+ "id8,Han,56,1970-01-01T00:00:08,par4]");
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWriteAndReadParMiddle(boolean streaming) throws Exception {
String hoodieTableDDL = "create table t1(\n"
+ " uuid varchar(20),\n"
+ " name varchar(10),\n"
+ " age int,\n"
+ " `partition` varchar(20),\n" // test streaming read with partition field in the middle
+ " ts timestamp(3),\n"
+ " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ ")\n"
+ "PARTITIONED BY (`partition`)\n"
+ "with (\n"
+ " 'connector' = 'hudi',\n"
+ " 'path' = '" + tempFile.getAbsolutePath() + "',\n"
+ " 'read.streaming.enabled' = '" + streaming + "'\n"
+ ")";
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 values\n"
+ "('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n"
+ "('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n"
+ "('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n"
+ "('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n"
+ "('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n"
+ "('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n"
+ "('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n"
+ "('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')";
execInsertSql(streamTableEnv, insertInto);
final String expected = "["
+ "id1,Danny,23,par1,1970-01-01T00:00:01, "
+ "id2,Stephen,33,par1,1970-01-01T00:00:02, "
+ "id3,Julian,53,par2,1970-01-01T00:00:03, "
+ "id4,Fabian,31,par2,1970-01-01T00:00:04, "
+ "id5,Sophia,18,par3,1970-01-01T00:00:05, "
+ "id6,Emma,20,par3,1970-01-01T00:00:06, "
+ "id7,Bob,44,par4,1970-01-01T00:00:07, "
+ "id8,Han,56,par4,1970-01-01T00:00:08]";
List<Row> result = execSelectSql(streamTableEnv, "select * from t1", streaming);
assertRowsEquals(result, expected);
// insert another batch of data
execInsertSql(streamTableEnv, insertInto);
List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", streaming);
assertRowsEquals(result2, expected);
}
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testInsertOverwrite(ExecMode execMode) {
@@ -480,8 +533,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
}
private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException {
tEnv.executeSql(TestConfigurations.getCollectSinkDDL("sink"));
private List<Row> execSelectSql(TableEnvironment tEnv, String select, boolean streaming)
throws TableNotExistException, InterruptedException {
if (streaming) {
final String[] splits = select.split(" ");
final String tableName = splits[splits.length - 1];
return execSelectSql(tEnv, select, 10, tableName);
} else {
return CollectionUtil.iterableToList(
() -> tEnv.sqlQuery("select * from t1").execute().collect());
}
}
private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout)
throws InterruptedException, TableNotExistException {
return execSelectSql(tEnv, select, timeout, null);
}
private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable)
throws InterruptedException, TableNotExistException {
final String sinkDDL;
if (sourceTable != null) {
// use the source table schema as the sink schema if the source table was specified, .
ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable);
TableSchema schema = tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema();
sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema);
} else {
sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
}
tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql("insert into sink " + select);
// wait for the timeout then cancels the job
TimeUnit.SECONDS.sleep(timeout);

View File

@@ -127,6 +127,28 @@ public class TestConfigurations {
+ ")";
}
public static String getCollectSinkDDL(String tableName, TableSchema tableSchema) {
final StringBuilder builder = new StringBuilder("create table " + tableName + "(\n");
String[] fieldNames = tableSchema.getFieldNames();
DataType[] fieldTypes = tableSchema.getFieldDataTypes();
for (int i = 0; i < fieldNames.length; i++) {
builder.append(" `")
.append(fieldNames[i])
.append("` ")
.append(fieldTypes[i].toString());
if (i != fieldNames.length - 1) {
builder.append(",");
}
builder.append("\n");
}
final String withProps = ""
+ ") with (\n"
+ " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
+ ")";
builder.append(withProps);
return builder.toString();
}
public static final RowDataSerializer SERIALIZER = new RowDataSerializer(ROW_TYPE);
public static Configuration getDefaultConf(String tablePath) {