diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java new file mode 100644 index 000000000..04264bf4c --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.util.Scanner; +import java.util.UUID; + +/** + * A transformer that allows a sql template file be used to transform the source before writing to + * Hudi data-set. + * + *

The query should reference the source as a table named "\" + * + *

The final sql statement result is used as the write payload. + * + *

The SQL file is configured with this hoodie property: + * hoodie.deltastreamer.transformer.sql.file + * + *

Example Spark SQL Query: + * + *

CACHE TABLE tmp_personal_trips AS + * SELECT * FROM WHERE trip_type='personal_trips'; + *

+ * SELECT * FROM tmp_personal_trips; + */ +public class SqlFileBasedTransformer implements Transformer { + + private static final Logger LOG = LogManager.getLogger(SqlFileBasedTransformer.class); + + private static final String SRC_PATTERN = ""; + private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_"; + + @Override + public Dataset apply( + final JavaSparkContext jsc, + final SparkSession sparkSession, + final Dataset rowDataset, + final TypedProperties props) { + + final String sqlFile = props.getString(Config.TRANSFORMER_SQL_FILE); + if (null == sqlFile) { + throw new IllegalArgumentException( + "Missing required configuration : (" + Config.TRANSFORMER_SQL_FILE + ")"); + } + + final FileSystem fs = FSUtils.getFs(sqlFile, jsc.hadoopConfiguration(), true); + // tmp table name doesn't like dashes + final String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); + LOG.info("Registering tmp table : " + tmpTable); + rowDataset.registerTempTable(tmpTable); + + try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) { + Dataset rows = null; + // each sql statement is separated with semicolon hence set that as delimiter. + scanner.useDelimiter(";"); + LOG.info("SQL Query for transformation : "); + while (scanner.hasNext()) { + String sqlStr = scanner.next(); + sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim(); + if (!sqlStr.isEmpty()) { + LOG.info(sqlStr); + // overwrite the same dataset object until the last statement then return. + rows = sparkSession.sql(sqlStr); + } + } + return rows; + } catch (final IOException ioe) { + throw new HoodieIOException("Error reading transformer SQL file.", ioe); + } + } + + /** Configs supported. */ + private static class Config { + + private static final String TRANSFORMER_SQL_FILE = "hoodie.deltastreamer.transformer.sql.file"; + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java new file mode 100644 index 000000000..833d7b449 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.transform; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestSqlFileBasedTransformer extends UtilitiesTestBase { + private TypedProperties props; + private SqlFileBasedTransformer sqlFileTransformer; + private Dataset inputDatasetRows; + private Dataset emptyDatasetRow; + + @BeforeAll + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(); + UtilitiesTestBase.Helpers.copyToDFS( + "delta-streamer-config/sql-file-transformer.sql", + UtilitiesTestBase.dfs, + UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql"); + UtilitiesTestBase.Helpers.copyToDFS( + "delta-streamer-config/sql-file-transformer-invalid.sql", + UtilitiesTestBase.dfs, + UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql"); + UtilitiesTestBase.Helpers.copyToDFS( + "delta-streamer-config/sql-file-transformer-empty.sql", + UtilitiesTestBase.dfs, + UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql"); + } + + @AfterAll + public static void cleanupClass() { + UtilitiesTestBase.cleanupClass(); + } + + @Override + @BeforeEach + public void setup() throws Exception { + super.setup(); + props = new TypedProperties(); + sqlFileTransformer = new SqlFileBasedTransformer(); + inputDatasetRows = getInputDatasetRows(); + emptyDatasetRow = getEmptyDatasetRow(); + } + + @Override + @AfterEach + public void teardown() throws Exception { + super.teardown(); + } + + @Test + public void testSqlFileBasedTransformerIllegalArguments() { + // Test if the class throws illegal argument exception when argument not present. + assertThrows( + IllegalArgumentException.class, + () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props)); + } + + @Test + public void testSqlFileBasedTransformerIncorrectConfig() { + // Test if the class throws hoodie IO exception correctly when given a incorrect config. + props.setProperty( + "hoodie.deltastreamer.transformer.sql.file", + UtilitiesTestBase.dfsBasePath + "/non-exist-sql-file.sql"); + assertThrows( + HoodieIOException.class, + () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props)); + } + + @Test + public void testSqlFileBasedTransformerInvalidSQL() { + // Test if the SQL file based transformer works as expected for the invalid SQL statements. + props.setProperty( + "hoodie.deltastreamer.transformer.sql.file", + UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql"); + assertThrows( + ParseException.class, + () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props)); + } + + @Test + public void testSqlFileBasedTransformerEmptyDataset() { + // Test if the SQL file based transformer works as expected for the empty SQL statements. + props.setProperty( + "hoodie.deltastreamer.transformer.sql.file", + UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql"); + Dataset emptyRow = sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props); + String[] actualRows = emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]); + String[] expectedRows = emptyDatasetRow.collectAsList().toArray(new String[0]); + assertArrayEquals(expectedRows, actualRows); + } + + @Test + public void testSqlFileBasedTransformer() { + // Test if the SQL file based transformer works as expected for the correct input. + props.setProperty( + "hoodie.deltastreamer.transformer.sql.file", + UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql"); + Dataset transformedRow = + sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props); + + // Called distinct() and sort() to match the transformation in this file: + // hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql + String[] expectedRows = + inputDatasetRows + .distinct() + .sort("col1") + .as(Encoders.STRING()) + .collectAsList() + .toArray(new String[0]); + String[] actualRows = transformedRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]); + assertArrayEquals(expectedRows, actualRows); + } + + private Dataset getInputDatasetRows() { + // Create few rows with duplicate data. + List list = new ArrayList<>(); + list.add(RowFactory.create("one")); + list.add(RowFactory.create("two")); + list.add(RowFactory.create("three")); + list.add(RowFactory.create("four")); + list.add(RowFactory.create("four")); + // Create the schema struct. + List listOfStructField = new ArrayList<>(); + listOfStructField.add(DataTypes.createStructField("col1", DataTypes.StringType, true)); + StructType structType = DataTypes.createStructType(listOfStructField); + // Create the data frame with the rows and schema. + return sparkSession.createDataFrame(list, structType); + } + + private Dataset getEmptyDatasetRow() { + // Create the schema struct. + List listOfStructField = new ArrayList<>(); + listOfStructField.add(DataTypes.createStructField("col1", DataTypes.StringType, true)); + StructType structType = DataTypes.createStructType(listOfStructField); + // Create the data frame with the rows and schema. + List list = new ArrayList<>(); + // Create empty dataframe with the schema. + return sparkSession.createDataFrame(list, structType); + } +} diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql new file mode 100644 index 000000000..ae83b47c3 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-empty.sql @@ -0,0 +1,25 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"), you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CACHE +TABLE tmp_trips +SELECT * +FROM +WHERE + True = False; + +SELECT * +FROM tmp_trips; \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql new file mode 100644 index 000000000..42b2e7eca --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer-invalid.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"), you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +SEECT +DISTINCT col1 +FROM + +ORDER BY col1; + diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql new file mode 100644 index 000000000..ce14b7917 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql @@ -0,0 +1,27 @@ + +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"), you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +SELECT RAND(); + +SELECT 1; + +SELECT + DISTINCT col1 +FROM + +ORDER BY col1; +