1
0

[HUDI-2418] Support HiveSchemaProvider (#3671)

Co-authored-by: jian.feng <fengjian428@gmial.com>
This commit is contained in:
冯健
2021-12-05 16:10:13 +08:00
committed by GitHub
parent 63b15607ff
commit 734c9f5f2d
8 changed files with 500 additions and 0 deletions

View File

@@ -233,6 +233,13 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>

View File

@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hudi.utilities.schema;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.types.StructType;
import java.util.Collections;
public class HiveSchemaProvider extends SchemaProvider {
/**
* Configs supported.
*/
public static class Config {
private static final String SOURCE_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.database";
private static final String SOURCE_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.table";
private static final String TARGET_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.database";
private static final String TARGET_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.table";
}
private static final Logger LOG = LogManager.getLogger(HiveSchemaProvider.class);
private final Schema sourceSchema;
private Schema targetSchema;
public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_TABLE_PROP));
String sourceSchemaDBName = props.getString(Config.SOURCE_SCHEMA_DATABASE_PROP, "default");
String sourceSchemaTableName = props.getString(Config.SOURCE_SCHEMA_TABLE_PROP);
SparkSession spark = SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate();
try {
TableIdentifier sourceSchemaTable = new TableIdentifier(sourceSchemaTableName, scala.Option.apply(sourceSchemaDBName));
StructType sourceSchema = spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema();
this.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
sourceSchema,
sourceSchemaTableName,
"hoodie." + sourceSchemaDBName);
if (props.containsKey(Config.TARGET_SCHEMA_TABLE_PROP)) {
String targetSchemaDBName = props.getString(Config.TARGET_SCHEMA_DATABASE_PROP, "default");
String targetSchemaTableName = props.getString(Config.TARGET_SCHEMA_TABLE_PROP);
TableIdentifier targetSchemaTable = new TableIdentifier(targetSchemaTableName, scala.Option.apply(targetSchemaDBName));
StructType targetSchema = spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
this.targetSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
targetSchema,
targetSchemaTableName,
"hoodie." + targetSchemaDBName);
}
} catch (NoSuchTableException | NoSuchDatabaseException e) {
String message = String.format("Can't find Hive table(s): %s", sourceSchemaTableName + "," + props.getString(Config.TARGET_SCHEMA_TABLE_PROP));
throw new IllegalArgumentException(message, e);
}
}
@Override
public Schema getSourceSchema() {
return sourceSchema;
}
@Override
public Schema getTargetSchema() {
if (targetSchema != null) {
return targetSchema;
} else {
return super.getTargetSchema();
}
}
}

View File

@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.functional;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.schema.HiveSchemaProvider;
import org.apache.hudi.utilities.testutils.SparkClientFunctionalTestHarnessWithHiveSupport;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Basic tests against {@link HiveSchemaProvider}.
*/
@Tag("functional")
public class TestHiveSchemaProvider extends SparkClientFunctionalTestHarnessWithHiveSupport {
private static final Logger LOG = LogManager.getLogger(TestHiveSchemaProvider.class);
private static final TypedProperties PROPS = new TypedProperties();
private static final String SOURCE_SCHEMA_TABLE_NAME = "schema_registry.source_schema_tab";
private static final String TARGET_SCHEMA_TABLE_NAME = "schema_registry.target_schema_tab";
@BeforeAll
public static void init() {
Pair<String, String> dbAndTableName = paresDBAndTableName(SOURCE_SCHEMA_TABLE_NAME);
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.database", dbAndTableName.getLeft());
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table", dbAndTableName.getRight());
}
@Test
public void testSourceSchema() throws Exception {
try {
createSchemaTable(SOURCE_SCHEMA_TABLE_NAME);
Schema sourceSchema = UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema();
Schema originalSchema = new Schema.Parser().parse(
UtilitiesTestBase.Helpers.readFile("delta-streamer-config/hive_schema_provider_source.avsc")
);
for (Schema.Field field : sourceSchema.getFields()) {
Schema.Field originalField = originalSchema.getField(field.name());
assertTrue(originalField != null);
}
} catch (HoodieException e) {
LOG.error("Failed to get source schema. ", e);
throw e;
}
}
@Test
public void testTargetSchema() throws Exception {
try {
Pair<String, String> dbAndTableName = paresDBAndTableName(TARGET_SCHEMA_TABLE_NAME);
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.database", dbAndTableName.getLeft());
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.table", dbAndTableName.getRight());
createSchemaTable(SOURCE_SCHEMA_TABLE_NAME);
createSchemaTable(TARGET_SCHEMA_TABLE_NAME);
Schema targetSchema = UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getTargetSchema();
Schema originalSchema = new Schema.Parser().parse(
UtilitiesTestBase.Helpers.readFile("delta-streamer-config/hive_schema_provider_target.avsc"));
for (Schema.Field field : targetSchema.getFields()) {
Schema.Field originalField = originalSchema.getField(field.name());
assertTrue(originalField != null);
}
} catch (HoodieException e) {
LOG.error("Failed to get source/target schema. ", e);
throw e;
}
}
@Test
public void testNotExistTable() {
String wrongName = "wrong_schema_tab";
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table", wrongName);
Assertions.assertThrows(NoSuchTableException.class, () -> {
try {
UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS, jsc()).getSourceSchema();
} catch (Throwable exception) {
while (exception.getCause() != null) {
exception = exception.getCause();
}
throw exception;
}
});
}
private static Pair<String, String> paresDBAndTableName(String fullName) {
String[] dbAndTableName = fullName.split("\\.");
if (dbAndTableName.length > 1) {
return new ImmutablePair<>(dbAndTableName[0], dbAndTableName[1]);
} else {
return new ImmutablePair<>("default", dbAndTableName[0]);
}
}
private void createSchemaTable(String fullName) throws IOException {
SparkSession spark = spark();
String createTableSQL = UtilitiesTestBase.Helpers.readFile(String.format("delta-streamer-config/%s.sql", fullName));
Pair<String, String> dbAndTableName = paresDBAndTableName(fullName);
spark.sql(String.format("CREATE DATABASE IF NOT EXISTS %s", dbAndTableName.getLeft()));
spark.sql(createTableSQL);
}
}

View File

@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.testutils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.SparkConf;
import java.util.Collections;
public class SparkClientFunctionalTestHarnessWithHiveSupport extends SparkClientFunctionalTestHarness {
public SparkConf conf() {
return conf(Collections.singletonMap("spark.sql.catalogImplementation", "hive"));
}
}

View File

@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type": "record",
"name": "source_schema_tab",
"namespace": "hoodie.schema_registry",
"fields": [
{
"name": "id",
"type": [
"long",
"null"
]
},
{
"name": "name",
"type": [
"string",
"null"
]
},
{
"name": "num1",
"type": [
"int",
"null"
]
},
{
"name": "num2",
"type": [
"long",
"null"
]
},
{
"name": "num3",
"type": [
{
"type": "fixed",
"name": "fixed",
"namespace": "hoodie.schema_registry.source_schema_tab.num3",
"size": 9,
"logicalType": "decimal",
"precision": 20,
"scale": 0
},
"null"
]
},
{
"name": "num4",
"type": [
"int",
"null"
]
},
{
"name": "num5",
"type": [
"float",
"null"
]
},
{
"name": "num6",
"type": [
"double",
"null"
]
},
{
"name": "bool",
"type": [
"boolean",
"null"
]
},
{
"name": "bin",
"type": [
"bytes",
"null"
]
}
]
}

View File

@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type": "record",
"name": "target_schema_tab",
"namespace": "hoodie.schema_registry",
"fields": [
{
"name": "id",
"type": [
"long",
"null"
]
},
{
"name": "name",
"type": [
"string",
"null"
]
},
{
"name": "num1",
"type": [
"int",
"null"
]
},
{
"name": "num2",
"type": [
"long",
"null"
]
},
{
"name": "num3",
"type": [
{
"type": "fixed",
"name": "fixed",
"namespace": "hoodie.schema_registry.target_schema_tab.num3",
"size": 9,
"logicalType": "decimal",
"precision": 20,
"scale": 0
},
"null"
]
},
{
"name": "num4",
"type": [
"int",
"null"
]
},
{
"name": "num5",
"type": [
"float",
"null"
]
},
{
"name": "num6",
"type": [
"double",
"null"
]
},
{
"name": "bool",
"type": [
"boolean",
"null"
]
},
{
"name": "bin",
"type": [
"bytes",
"null"
]
}
]
}

View File

@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS `schema_registry`.`source_schema_tab`(
`id` BIGINT,
`name` STRING,
`num1` INT,
`num2` BIGINT,
`num3` DECIMAL(20,0),
`num4` TINYINT,
`num5` FLOAT,
`num6` DOUBLE,
`bool` BOOLEAN,
`bin` BINARY
)

View File

@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS `schema_registry`.`target_schema_tab`(
`id` BIGINT,
`name` STRING,
`num1` INT,
`num2` BIGINT,
`num3` DECIMAL(20,0),
`num4` TINYINT,
`num5` FLOAT,
`num6` DOUBLE,
`bool` BOOLEAN,
`bin` BINARY
)