[HUDI-2045] Support Read Hoodie As DataSource Table For Flink And DeltaStreamer
This commit is contained in:
@@ -42,7 +42,6 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
@@ -70,6 +69,11 @@ public class TestHiveSyncTool {
|
||||
return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}});
|
||||
}
|
||||
|
||||
// (useJdbc, useSchemaFromCommitMetadata, syncAsDataSource)
|
||||
private static Iterable<Object[]> syncDataSourceTableParams() {
|
||||
return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}});
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
HiveTestUtil.setUp();
|
||||
@@ -157,17 +161,15 @@ public class TestHiveSyncTool {
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
|
||||
@MethodSource({"syncDataSourceTableParams"})
|
||||
public void testSyncCOWTableWithProperties(boolean useJdbc,
|
||||
boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
boolean useSchemaFromCommitMetadata,
|
||||
boolean syncAsDataSourceTable) throws Exception {
|
||||
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
|
||||
Map<String, String> serdeProperties = new HashMap<String, String>() {
|
||||
{
|
||||
put("path", hiveSyncConfig.basePath);
|
||||
put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type");
|
||||
put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized");
|
||||
put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -177,6 +179,7 @@ public class TestHiveSyncTool {
|
||||
put("tp_1", "p1");
|
||||
}
|
||||
};
|
||||
hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable;
|
||||
hiveSyncConfig.useJdbc = useJdbc;
|
||||
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties);
|
||||
hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties);
|
||||
@@ -195,9 +198,12 @@ public class TestHiveSyncTool {
|
||||
|
||||
String tblPropertiesWithoutDdlTime = String.join("\n",
|
||||
results.subList(0, results.size() - 1));
|
||||
|
||||
String sparkTableProperties = getSparkTableProperties(syncAsDataSourceTable, useSchemaFromCommitMetadata);
|
||||
assertEquals(
|
||||
"EXTERNAL\tTRUE\n"
|
||||
+ "last_commit_time_sync\t100\n"
|
||||
+ sparkTableProperties
|
||||
+ "tp_0\tp0\n"
|
||||
+ "tp_1\tp1", tblPropertiesWithoutDdlTime);
|
||||
assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime"));
|
||||
@@ -208,21 +214,54 @@ public class TestHiveSyncTool {
|
||||
hiveDriver.getResults(results);
|
||||
String ddl = String.join("\n", results);
|
||||
assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
|
||||
assertTrue(ddl.contains("'hoodie.datasource.query.type'='snapshot'"));
|
||||
if (syncAsDataSourceTable) {
|
||||
assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='false'"));
|
||||
}
|
||||
}
|
||||
|
||||
private String getSparkTableProperties(boolean syncAsDataSourceTable, boolean useSchemaFromCommitMetadata) {
|
||||
if (syncAsDataSourceTable) {
|
||||
if (useSchemaFromCommitMetadata) {
|
||||
return "spark.sql.sources.provider\thudi\n"
|
||||
+ "spark.sql.sources.schema.numPartCols\t1\n"
|
||||
+ "spark.sql.sources.schema.numParts\t1\n"
|
||||
+ "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":"
|
||||
+ "[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
|
||||
+ "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
|
||||
+ "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
|
||||
+ "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
|
||||
+ "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
|
||||
+ "{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},"
|
||||
+ "{\"name\":\"favorite_number\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},"
|
||||
+ "{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},"
|
||||
+ "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n"
|
||||
+ "spark.sql.sources.schema.partCol.0\tdatestr\n";
|
||||
} else {
|
||||
return "spark.sql.sources.provider\thudi\n"
|
||||
+ "spark.sql.sources.schema.numPartCols\t1\n"
|
||||
+ "spark.sql.sources.schema.numParts\t1\n"
|
||||
+ "spark.sql.sources.schema.part.0\t{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":"
|
||||
+ "\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_number\",\"type\":\"integer\","
|
||||
+ "\"nullable\":false,\"metadata\":{}},{\"name\":\"favorite_color\",\"type\":\"string\",\"nullable\":false,"
|
||||
+ "\"metadata\":{}}]}\n"
|
||||
+ "{\"name\":\"datestr\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}\n"
|
||||
+ "spark.sql.sources.schema.partCol.0\tdatestr\n";
|
||||
}
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
|
||||
@MethodSource({"syncDataSourceTableParams"})
|
||||
public void testSyncMORTableWithProperties(boolean useJdbc,
|
||||
boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
boolean useSchemaFromCommitMetadata,
|
||||
boolean syncAsDataSourceTable) throws Exception {
|
||||
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
|
||||
Map<String, String> serdeProperties = new HashMap<String, String>() {
|
||||
{
|
||||
put("path", hiveSyncConfig.basePath);
|
||||
put(ConfigUtils.SPARK_QUERY_TYPE_KEY, "hoodie.datasource.query.type");
|
||||
put(ConfigUtils.SPARK_QUERY_AS_RO_KEY, "read_optimized");
|
||||
put(ConfigUtils.SPARK_QUERY_AS_RT_KEY, "snapshot");
|
||||
}
|
||||
};
|
||||
|
||||
@@ -232,6 +271,7 @@ public class TestHiveSyncTool {
|
||||
put("tp_1", "p1");
|
||||
}
|
||||
};
|
||||
hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable;
|
||||
hiveSyncConfig.useJdbc = useJdbc;
|
||||
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties);
|
||||
hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties);
|
||||
@@ -247,14 +287,15 @@ public class TestHiveSyncTool {
|
||||
String rtTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||
|
||||
String[] tableNames = new String[] {roTableName, rtTableName};
|
||||
String[] expectQueryTypes = new String[] {"read_optimized", "snapshot"};
|
||||
String[] readAsOptimizedResults = new String[] {"true", "false"};
|
||||
|
||||
SessionState.start(HiveTestUtil.getHiveConf());
|
||||
Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf());
|
||||
|
||||
String sparkTableProperties = getSparkTableProperties(syncAsDataSourceTable, useSchemaFromCommitMetadata);
|
||||
for (int i = 0;i < 2; i++) {
|
||||
String dbTableName = hiveSyncConfig.databaseName + "." + tableNames[i];
|
||||
String expectQueryType = expectQueryTypes[i];
|
||||
String readAsOptimized = readAsOptimizedResults[i];
|
||||
|
||||
hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName);
|
||||
List<String> results = new ArrayList<>();
|
||||
@@ -265,6 +306,7 @@ public class TestHiveSyncTool {
|
||||
assertEquals(
|
||||
"EXTERNAL\tTRUE\n"
|
||||
+ "last_commit_time_sync\t101\n"
|
||||
+ sparkTableProperties
|
||||
+ "tp_0\tp0\n"
|
||||
+ "tp_1\tp1", tblPropertiesWithoutDdlTime);
|
||||
assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime"));
|
||||
@@ -275,8 +317,10 @@ public class TestHiveSyncTool {
|
||||
hiveDriver.getResults(results);
|
||||
String ddl = String.join("\n", results);
|
||||
assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'"));
|
||||
assertTrue(ddl.contains("'hoodie.datasource.query.type'='" + expectQueryType + "'"));
|
||||
assertTrue(ddl.toLowerCase().contains("create external table"));
|
||||
if (syncAsDataSourceTable) {
|
||||
assertTrue(ddl.contains("'" + ConfigUtils.IS_QUERY_AS_RO_TABLE + "'='" + readAsOptimized + "'"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.hive;
|
||||
|
||||
import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
|
||||
import org.apache.spark.sql.execution.SparkSqlParser;
|
||||
import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter;
|
||||
import org.apache.spark.sql.internal.SQLConf;
|
||||
import org.apache.spark.sql.types.ArrayType;
|
||||
import org.apache.spark.sql.types.MapType;
|
||||
import org.apache.spark.sql.types.Metadata;
|
||||
import org.apache.spark.sql.types.IntegerType$;
|
||||
import org.apache.spark.sql.types.StringType$;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
public class TestParquet2SparkSchemaUtils {
|
||||
private final SparkToParquetSchemaConverter spark2ParquetConverter =
|
||||
new SparkToParquetSchemaConverter(
|
||||
(Boolean) SQLConf.PARQUET_WRITE_LEGACY_FORMAT().defaultValue().get(),
|
||||
SQLConf.ParquetOutputTimestampType$.MODULE$.INT96());
|
||||
private final SparkSqlParser parser = new SparkSqlParser(new SQLConf());
|
||||
|
||||
@Test
|
||||
public void testConvertPrimitiveType() {
|
||||
StructType sparkSchema = parser.parseTableSchema(
|
||||
"f0 int, f1 string, f3 bigint,"
|
||||
+ " f4 decimal(5,2), f5 timestamp, f6 date,"
|
||||
+ " f7 short, f8 float, f9 double, f10 byte,"
|
||||
+ " f11 tinyint, f12 smallint, f13 binary, f14 boolean");
|
||||
|
||||
String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(
|
||||
spark2ParquetConverter.convert(sparkSchema).asGroupType());
|
||||
StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson);
|
||||
assertEquals(sparkSchema.json(), convertedSparkSchema.json());
|
||||
// Test type with nullable
|
||||
StructField field0 = new StructField("f0", StringType$.MODULE$, false, Metadata.empty());
|
||||
StructField field1 = new StructField("f1", StringType$.MODULE$, true, Metadata.empty());
|
||||
StructType sparkSchemaWithNullable = new StructType(new StructField[]{field0, field1});
|
||||
String sparkSchemaWithNullableJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(
|
||||
spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType());
|
||||
StructType convertedSparkSchemaWithNullable = (StructType) StructType.fromJson(sparkSchemaWithNullableJson);
|
||||
assertEquals(sparkSchemaWithNullable.json(), convertedSparkSchemaWithNullable.json());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertComplexType() {
|
||||
StructType sparkSchema = parser.parseTableSchema(
|
||||
"f0 int, f1 map<string, int>, f2 array<decimal(10,2)>"
|
||||
+ ",f3 map<array<date>, bigint>, f4 array<array<double>>"
|
||||
+ ",f5 struct<id:int, name:string>");
|
||||
String sparkSchemaJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(
|
||||
spark2ParquetConverter.convert(sparkSchema).asGroupType());
|
||||
StructType convertedSparkSchema = (StructType) StructType.fromJson(sparkSchemaJson);
|
||||
assertEquals(sparkSchema.json(), convertedSparkSchema.json());
|
||||
// Test complex type with nullable
|
||||
StructField field0 = new StructField("f0", new ArrayType(StringType$.MODULE$, true), false, Metadata.empty());
|
||||
StructField field1 = new StructField("f1", new MapType(StringType$.MODULE$, IntegerType$.MODULE$, true), false, Metadata.empty());
|
||||
StructType sparkSchemaWithNullable = new StructType(new StructField[]{field0, field1});
|
||||
String sparkSchemaWithNullableJson = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(
|
||||
spark2ParquetConverter.convert(sparkSchemaWithNullable).asGroupType());
|
||||
StructType convertedSparkSchemaWithNullable = (StructType) StructType.fromJson(sparkSchemaWithNullableJson);
|
||||
assertEquals(sparkSchemaWithNullable.json(), convertedSparkSchemaWithNullable.json());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user