From dfbee673ef63c7305e131d5dfa17b304b128917f Mon Sep 17 00:00:00 2001 From: openopen2 Date: Fri, 14 Feb 2020 10:06:06 +0800 Subject: [PATCH] [HUDI-514] A schema provider to get metadata through Jdbc (#1200) --- hudi-utilities/pom.xml | 7 ++ .../apache/hudi/utilities/UtilHelpers.java | 100 ++++++++++++++++++ .../schema/JdbcbasedSchemaProvider.java | 84 +++++++++++++++ .../utilities/TestHoodieDeltaStreamer.java | 3 +- .../TestJdbcbasedSchemaProvider.java | 88 +++++++++++++++ .../hudi/utilities/UtilitiesTestBase.java | 12 ++- .../delta-streamer-config/source-jdbc.avsc | 59 +++++++++++ .../delta-streamer-config/triprec.sql | 28 +++++ pom.xml | 1 + 9 files changed, 378 insertions(+), 4 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/TestJdbcbasedSchemaProvider.java create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/triprec.sql diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 3c1e0fc8a..df0b91496 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -69,6 +69,13 @@ + + + com.h2database + h2 + ${h2.version} + test + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 3fe00ca0c..7812925d2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -18,6 +18,8 @@ package org.apache.hudi.utilities; +import org.apache.avro.Schema; +import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieWriteClient; import org.apache.hudi.WriteStatus; import org.apache.hudi.common.util.DFSPropertiesConfiguration; @@ -27,6 +29,7 @@ import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -45,16 +48,31 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry; +import org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper; +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions; +import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils; +import org.apache.spark.sql.jdbc.JdbcDialect; +import org.apache.spark.sql.jdbc.JdbcDialects; +import org.apache.spark.sql.types.StructType; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.StringReader; import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.DriverManager; +import java.sql.Driver; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.Enumeration; /** * Bunch of helper methods. @@ -235,4 +253,86 @@ public class UtilHelpers { defaults.load(in); return defaults; } + + /** + * Returns a factory for creating connections to the given JDBC URL. + * @param options - JDBC options that contains url, table and other information. + * @return + * @throws SQLException if the driver could not open a JDBC connection. + */ + private static Connection createConnectionFactory(Map options) throws SQLException { + String driverClass = options.get(JDBCOptions.JDBC_DRIVER_CLASS()); + DriverRegistry.register(driverClass); + Enumeration drivers = DriverManager.getDrivers(); + Driver driver = null; + while (drivers.hasMoreElements()) { + Driver d = drivers.nextElement(); + if (d instanceof DriverWrapper) { + if (((DriverWrapper) d).wrapped().getClass().getCanonicalName().equals(driverClass)) { + driver = d; + } + } else if (d.getClass().getCanonicalName().equals(driverClass)) { + driver = d; + } + if (driver != null) { + break; + } + } + + Preconditions.checkNotNull(driver, String.format("Did not find registered driver with class %s", driverClass)); + + Properties properties = new Properties(); + properties.putAll(options); + Connection connect = null; + String url = options.get(JDBCOptions.JDBC_URL()); + connect = driver.connect(url, properties); + Preconditions.checkNotNull(connect, String.format("The driver could not open a JDBC connection. Check the URL: %s", url)); + return connect; + } + + /** + * Returns true if the table already exists in the JDBC database. + */ + private static Boolean tableExists(Connection conn, Map options) { + JdbcDialect dialect = JdbcDialects.get(options.get(JDBCOptions.JDBC_URL())); + try (PreparedStatement statement = conn.prepareStatement(dialect.getTableExistsQuery(options.get(JDBCOptions.JDBC_TABLE_NAME())))) { + statement.setQueryTimeout(Integer.parseInt(options.get(JDBCOptions.JDBC_QUERY_TIMEOUT()))); + statement.executeQuery(); + } catch (SQLException e) { + return false; + } + return true; + } + + /*** + * call spark function get the schema through jdbc. + * The code logic implementation refers to spark 2.4.x and spark 3.x. + * @param options + * @return + * @throws Exception + */ + public static Schema getJDBCSchema(Map options) throws Exception { + Connection conn = createConnectionFactory(options); + String url = options.get(JDBCOptions.JDBC_URL()); + String table = options.get(JDBCOptions.JDBC_TABLE_NAME()); + boolean tableExists = tableExists(conn,options); + + if (tableExists) { + JdbcDialect dialect = JdbcDialects.get(url); + try (PreparedStatement statement = conn.prepareStatement(dialect.getSchemaQuery(table))) { + statement.setQueryTimeout(Integer.parseInt(options.get("queryTimeout"))); + try (ResultSet rs = statement.executeQuery()) { + StructType structType; + if (Boolean.parseBoolean(options.get("nullable"))) { + structType = JdbcUtils.getSchema(rs, dialect, true); + } else { + structType = JdbcUtils.getSchema(rs, dialect, false); + } + return AvroConversionUtils.convertStructTypeToAvroSchema(structType, table, "hoodie." + table); + } + } + } else { + throw new HoodieException(String.format("%s table does not exists!", table)); + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java new file mode 100644 index 000000000..b5622fdb4 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.avro.Schema; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.UtilHelpers; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.HashMap; +import java.util.Map; + +/** + * A schema provider to get metadata through Jdbc. + */ +public class JdbcbasedSchemaProvider extends SchemaProvider { + private Schema sourceSchema; + private Map options = new HashMap<>(); + + /** + * Configs supported. + */ + public static class Config { + // The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. + // e.g., jdbc:postgresql://localhost/test?user=fred&password=secret + private static final String SOURCE_SCHEMA_JDBC_CONNECTION_URL = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url"; + // The class name of the JDBC driver to use to connect to this URL. such as org.h2.Driver + private static final String SOURCE_SCHEMA_JDBC_DRIVER_TYPE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type"; + private static final String SOURCE_SCHEMA_JDBC_USERNAME = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username"; + private static final String SOURCE_SCHEMA_JDBC_PASSWORD = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password"; + // example : test_database.test1_table or test1_table + private static final String SOURCE_SCHEMA_JDBC_DBTABLE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable"; + // The number of seconds the driver will wait for a Statement object to execute to the given number of seconds. + // Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, + // e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0. + private static final String SOURCE_SCHEMA_JDBC_TIMEOUT = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout"; + // If true, all the columns are nullable. + private static final String SOURCE_SCHEMA_JDBC_NULLABLE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable"; + } + + public JdbcbasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + options.put("url", props.getString(Config.SOURCE_SCHEMA_JDBC_CONNECTION_URL)); + options.put("driver", props.getString(Config.SOURCE_SCHEMA_JDBC_DRIVER_TYPE)); + options.put("user", props.getString(Config.SOURCE_SCHEMA_JDBC_USERNAME)); + options.put("password", props.getString(Config.SOURCE_SCHEMA_JDBC_PASSWORD)); + options.put("dbtable", props.getString(Config.SOURCE_SCHEMA_JDBC_DBTABLE)); + // the number of seconds the driver will wait for a Statement object to execute to the given + // number of seconds. Zero means there is no limit. + options.put("queryTimeout", props.getString(Config.SOURCE_SCHEMA_JDBC_TIMEOUT, "0")); + options.put("nullable", props.getString(Config.SOURCE_SCHEMA_JDBC_NULLABLE, "true")); + } + + @Override + public Schema getSourceSchema() { + if (this.sourceSchema != null) { + return sourceSchema; + } + + try { + sourceSchema = UtilHelpers.getJDBCSchema(options); + } catch (Exception e) { + throw new HoodieException("Failed to get Schema through jdbc. ", e); + } + return sourceSchema; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index cbf9db97b..9d324dce2 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -123,6 +123,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); + // Hive Configs props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/"); props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb1"); @@ -526,7 +527,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { assertTrue(e.getMessage().contains("Please provide a valid schema provider class!")); } } - + @Test public void testPayloadClassUpdate() throws Exception { String dataSetBasePath = dfsBasePath + "/test_dataset_mor"; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestJdbcbasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestJdbcbasedSchemaProvider.java new file mode 100644 index 000000000..8a2fe78e0 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestJdbcbasedSchemaProvider.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.schema.JdbcbasedSchemaProvider; + +import org.apache.avro.Schema; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import static org.junit.Assert.assertEquals; + +public class TestJdbcbasedSchemaProvider { + + private static final Logger LOG = LogManager.getLogger(TestJdbcbasedSchemaProvider.class); + private static final TypedProperties PROPS = new TypedProperties(); + protected transient JavaSparkContext jsc = null; + + @Before + public void init() { + jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]"); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url", "jdbc:h2:mem:test_mem"); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type", "org.h2.Driver"); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username", "sa"); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password", ""); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable", "triprec"); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout", "0"); + PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable", "false"); + } + + @After + public void teardown() throws Exception { + if (jsc != null) { + jsc.stop(); + } + } + + @Test + public void testJdbcbasedSchemaProvider() throws Exception { + try { + initH2Database(); + Schema sourceSchema = UtilHelpers.createSchemaProvider(JdbcbasedSchemaProvider.class.getName(), PROPS, jsc).getSourceSchema(); + assertEquals(sourceSchema.toString().toUpperCase(), new Schema.Parser().parse(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/source-jdbc.avsc")).toString().toUpperCase()); + } catch (HoodieException e) { + LOG.error("Failed to get connection through jdbc. ", e); + } + } + + /** + * Initialize the H2 database and obtain a connection, then create a table as a test. + * Based on the characteristics of the H2 in-memory database, we do not need to display the initialized database. + * @throws SQLException + * @throws IOException + */ + private void initH2Database() throws SQLException, IOException { + Connection conn = DriverManager.getConnection("jdbc:h2:mem:test_mem", "sa", ""); + PreparedStatement ps = conn.prepareStatement(UtilitiesTestBase.Helpers.readFile("delta-streamer-config/triprec.sql")); + ps.executeUpdate(); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java index c0ec189e6..1fcd99ad1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java @@ -161,14 +161,20 @@ public class UtilitiesTestBase { // to get hold of resources bundled with jar private static ClassLoader classLoader = Helpers.class.getClassLoader(); - public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException { + public static String readFile(String testResourcePath) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(classLoader.getResourceAsStream(testResourcePath))); - PrintStream os = new PrintStream(fs.create(new Path(targetPath), true)); + StringBuffer sb = new StringBuffer(); String line; while ((line = reader.readLine()) != null) { - os.println(line); + sb.append(line + "\n"); } + return sb.toString(); + } + + public static void copyToDFS(String testResourcePath, FileSystem fs, String targetPath) throws IOException { + PrintStream os = new PrintStream(fs.create(new Path(targetPath), true)); + os.print(readFile(testResourcePath)); os.flush(); os.close(); } diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc new file mode 100644 index 000000000..cb8697da5 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type": "record", + "name": "triprec", + "namespace": "hoodie.triprec", + "fields": [ + { + "name":"ID", + "type": "int" + }, + { + "name": "TIMESTAMP", + "type": ["double", "null"] + }, + { + "name": "RIDER", + "type": ["string", "null"] + }, + { + "name": "DRIVER", + "type": ["string", "null"] + }, + { + "name": "BEGIN_LAT", + "type": ["double", "null"] + }, + { + "name": "BEGIN_LON", + "type": ["double", "null"] + }, + { + "name": "END_LAT", + "type": ["double", "null"] + }, + { + "name": "END_LON", + "type": ["double", "null"] + }, + { + "name": "FARE", + "type": ["double", "null"] + } ] +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/triprec.sql b/hudi-utilities/src/test/resources/delta-streamer-config/triprec.sql new file mode 100644 index 000000000..61f663dd3 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/triprec.sql @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +CREATE TABLE triprec ( + id INT NOT NULL, + timestamp DOUBLE, + rider VARCHAR(200), + driver VARCHAR(200), + begin_lat DOUBLE, + begin_lon DOUBLE, + end_lat DOUBLE, + end_lon DOUBLE, + fare DOUBLE +); \ No newline at end of file diff --git a/pom.xml b/pom.xml index 3dc6ec7e3..be4610591 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ 7.6.0.v20120127 1.2.3 1.9.13 + 1.4.199 false ${skipTests} ${skipTests}