1
0

[HUDI-514] A schema provider to get metadata through Jdbc (#1200)

This commit is contained in:
openopen2
2020-02-14 10:06:06 +08:00
committed by GitHub
parent 175de0db7b
commit dfbee673ef
9 changed files with 378 additions and 4 deletions

View File

@@ -69,6 +69,13 @@
</build>
<dependencies>
<!-- H2 database for JdbcbaseSchemaProvider -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
<scope>test</scope>
</dependency>
<!-- Jetty -->
<dependency>

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.utilities;
import org.apache.avro.Schema;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.util.DFSPropertiesConfiguration;
@@ -27,6 +29,7 @@ import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -45,16 +48,31 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry;
import org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper;
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcDialects;
import org.apache.spark.sql.types.StructType;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.DriverManager;
import java.sql.Driver;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Enumeration;
/**
* Bunch of helper methods.
@@ -235,4 +253,86 @@ public class UtilHelpers {
defaults.load(in);
return defaults;
}
/**
* Returns a factory for creating connections to the given JDBC URL.
* @param options - JDBC options that contains url, table and other information.
* @return
* @throws SQLException if the driver could not open a JDBC connection.
*/
private static Connection createConnectionFactory(Map<String, String> options) throws SQLException {
String driverClass = options.get(JDBCOptions.JDBC_DRIVER_CLASS());
DriverRegistry.register(driverClass);
Enumeration<Driver> drivers = DriverManager.getDrivers();
Driver driver = null;
while (drivers.hasMoreElements()) {
Driver d = drivers.nextElement();
if (d instanceof DriverWrapper) {
if (((DriverWrapper) d).wrapped().getClass().getCanonicalName().equals(driverClass)) {
driver = d;
}
} else if (d.getClass().getCanonicalName().equals(driverClass)) {
driver = d;
}
if (driver != null) {
break;
}
}
Preconditions.checkNotNull(driver, String.format("Did not find registered driver with class %s", driverClass));
Properties properties = new Properties();
properties.putAll(options);
Connection connect = null;
String url = options.get(JDBCOptions.JDBC_URL());
connect = driver.connect(url, properties);
Preconditions.checkNotNull(connect, String.format("The driver could not open a JDBC connection. Check the URL: %s", url));
return connect;
}
/**
* Returns true if the table already exists in the JDBC database.
*/
private static Boolean tableExists(Connection conn, Map<String, String> options) {
JdbcDialect dialect = JdbcDialects.get(options.get(JDBCOptions.JDBC_URL()));
try (PreparedStatement statement = conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME())))) {
statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT())));
statement.executeQuery();
} catch (SQLException e) {
return false;
}
return true;
}
/***
* call spark function get the schema through jdbc.
* The code logic implementation refers to spark 2.4.x and spark 3.x.
* @param options
* @return
* @throws Exception
*/
public static Schema getJDBCSchema(Map<String, String> options) throws Exception {
Connection conn = createConnectionFactory(options);
String url = options.get(JDBCOptions.JDBC_URL());
String table = options.get(JDBCOptions.JDBC_TABLE_NAME());
boolean tableExists = tableExists(conn,options);
if (tableExists) {
JdbcDialect dialect = JdbcDialects.get(url);
try (PreparedStatement statement = conn.prepareStatement(dialect.getSchemaQuery(table))) {
statement.setQueryTimeout(Integer.parseInt(options.get("queryTimeout")));
try (ResultSet rs = statement.executeQuery()) {
StructType structType;
if (Boolean.parseBoolean(options.get("nullable"))) {
structType = JdbcUtils.getSchema(rs, dialect, true);
} else {
structType = JdbcUtils.getSchema(rs, dialect, false);
}
return AvroConversionUtils.convertStructTypeToAvroSchema(structType, table, "hoodie." + table);
}
}
} else {
throw new HoodieException(String.format("%s table does not exists!", table));
}
}
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.schema;
import org.apache.avro.Schema;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.HashMap;
import java.util.Map;
/**
* A schema provider to get metadata through Jdbc.
*/
public class JdbcbasedSchemaProvider extends SchemaProvider {
private Schema sourceSchema;
private Map<String, String> options = new HashMap<>();
/**
* Configs supported.
*/
public static class Config {
// The JDBC URL to connect to. The source-specific connection properties may be specified in the URL.
// e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
private static final String SOURCE_SCHEMA_JDBC_CONNECTION_URL = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url";
// The class name of the JDBC driver to use to connect to this URL. such as org.h2.Driver
private static final String SOURCE_SCHEMA_JDBC_DRIVER_TYPE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type";
private static final String SOURCE_SCHEMA_JDBC_USERNAME = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username";
private static final String SOURCE_SCHEMA_JDBC_PASSWORD = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password";
// example : test_database.test1_table or test1_table
private static final String SOURCE_SCHEMA_JDBC_DBTABLE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable";
// The number of seconds the driver will wait for a Statement object to execute to the given number of seconds.
// Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout,
// e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.
private static final String SOURCE_SCHEMA_JDBC_TIMEOUT = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout";
// If true, all the columns are nullable.
private static final String SOURCE_SCHEMA_JDBC_NULLABLE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable";
}
public JdbcbasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
options.put("url", props.getString(Config.SOURCE_SCHEMA_JDBC_CONNECTION_URL));
options.put("driver", props.getString(Config.SOURCE_SCHEMA_JDBC_DRIVER_TYPE));
options.put("user", props.getString(Config.SOURCE_SCHEMA_JDBC_USERNAME));
options.put("password", props.getString(Config.SOURCE_SCHEMA_JDBC_PASSWORD));
options.put("dbtable", props.getString(Config.SOURCE_SCHEMA_JDBC_DBTABLE));
// the number of seconds the driver will wait for a Statement object to execute to the given
// number of seconds. Zero means there is no limit.
options.put("queryTimeout", props.getString(Config.SOURCE_SCHEMA_JDBC_TIMEOUT, "0"));
options.put("nullable", props.getString(Config.SOURCE_SCHEMA_JDBC_NULLABLE, "true"));
}
@Override
public Schema getSourceSchema() {
if (this.sourceSchema != null) {
return sourceSchema;
}
try {
sourceSchema = UtilHelpers.getJDBCSchema(options);
} catch (Exception e) {
throw new HoodieException("Failed to get Schema through jdbc. ", e);
}
return sourceSchema;
}
}

View File

@@ -123,6 +123,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
// Hive Configs
props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb1");
@@ -526,7 +527,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertTrue(e.getMessage().contains("Please provide a valid schema provider class!"));
}
}
@Test
public void testPayloadClassUpdate() throws Exception {
String dataSetBasePath = dfsBasePath + "/test_dataset_mor";

View File

@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import static org.junit.Assert.assertEquals;
public class TestJdbcbasedSchemaProvider {
private static final Logger LOG = LogManager.getLogger(TestJdbcbasedSchemaProvider.class);
private static final TypedProperties PROPS = new TypedProperties();
protected transient JavaSparkContext jsc = null;
@Before
public void init() {
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url", "jdbc:h2:mem:test_mem");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type", "org.h2.Driver");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username", "sa");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password", "");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable", "triprec");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout", "0");
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable", "false");
}
@After
public void teardown() throws Exception {
if (jsc != null) {
jsc.stop();
}
}
@Test
public void testJdbcbasedSchemaProvider() throws Exception {
try {
initH2Database();
Schema sourceSchema = UtilHelpers.createSchemaProvider(JdbcbasedSchemaProvider.class.getName(), PROPS, jsc).getSourceSchema();
assertEquals(sourceSchema.toString().toUpperCase(), new Schema.Parser().parse(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/source-jdbc.avsc")).toString().toUpperCase());
} catch (HoodieException e) {
LOG.error("Failed to get connection through jdbc. ", e);
}
}
/**
* Initialize the H2 database and obtain a connection, then create a table as a test.
* Based on the characteristics of the H2 in-memory database, we do not need to display the initialized database.
* @throws SQLException
* @throws IOException
*/
private void initH2Database() throws SQLException, IOException {
Connection conn = DriverManager.getConnection("jdbc:h2:mem:test_mem", "sa", "");
PreparedStatement ps = conn.prepareStatement(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/triprec.sql"));
ps.executeUpdate();
}
}

View File

@@ -161,14 +161,20 @@ public class UtilitiesTestBase {
// to get hold of resources bundled with jar
private static ClassLoader classLoader = Helpers.class.getClassLoader();
public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException {
public static String readFile(String testResourcePath) throws IOException {
BufferedReader reader =
new BufferedReader(new InputStreamReader(classLoader.getResourceAsStream(testResourcePath)));
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
StringBuffer sb = new StringBuffer();
String line;
while ((line = reader.readLine()) != null) {
os.println(line);
sb.append(line + "\n");
}
return sb.toString();
}
public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException {
PrintStream os = new PrintStream(fs.create(new Path(targetPath), true));
os.print(readFile(testResourcePath));
os.flush();
os.close();
}

View File

@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"type": "record",
"name": "triprec",
"namespace": "hoodie.triprec",
"fields": [
{
"name":"ID",
"type": "int"
},
{
"name": "TIMESTAMP",
"type": ["double", "null"]
},
{
"name": "RIDER",
"type": ["string", "null"]
},
{
"name": "DRIVER",
"type": ["string", "null"]
},
{
"name": "BEGIN_LAT",
"type": ["double", "null"]
},
{
"name": "BEGIN_LON",
"type": ["double", "null"]
},
{
"name": "END_LAT",
"type": ["double", "null"]
},
{
"name": "END_LON",
"type": ["double", "null"]
},
{
"name": "FARE",
"type": ["double", "null"]
} ]
}

View File

@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE triprec (
id INT NOT NULL,
timestamp DOUBLE,
rider VARCHAR(200),
driver VARCHAR(200),
begin_lat DOUBLE,
begin_lon DOUBLE,
end_lat DOUBLE,
end_lon DOUBLE,
fare DOUBLE
);

View File

@@ -104,6 +104,7 @@
<jetty.version>7.6.0.v20120127</jetty.version>
<hbase.version>1.2.3</hbase.version>
<codehaus-jackson.version>1.9.13</codehaus-jackson.version>
<h2.version>1.4.199</h2.version>
<skipTests>false</skipTests>
<skipITs>${skipTests}</skipITs>
<skipUTs>${skipTests}</skipUTs>