1
0

[HUDI-1743] Added support for SqlFileBasedTransformer (#2747)

This commit is contained in:
Vinoth Govindarajan
2021-06-07 18:48:27 -07:00
committed by GitHub
parent 919590988a
commit 57611d10b5
5 changed files with 358 additions and 0 deletions

View File

@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.transform;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
/**
* A transformer that allows a sql template file be used to transform the source before writing to
* Hudi data-set.
*
* <p>The query should reference the source as a table named "\<SRC\>"
*
* <p>The final sql statement result is used as the write payload.
*
* <p>The SQL file is configured with this hoodie property:
* hoodie.deltastreamer.transformer.sql.file
*
* <p>Example Spark SQL Query:
*
* <p>CACHE TABLE tmp_personal_trips AS
* SELECT * FROM <SRC> WHERE trip_type='personal_trips';
* <p>
* SELECT * FROM tmp_personal_trips;
*/
public class SqlFileBasedTransformer implements Transformer {
private static final Logger LOG = LogManager.getLogger(SqlFileBasedTransformer.class);
private static final String SRC_PATTERN = "<SRC>";
private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
@Override
public Dataset<Row> apply(
final JavaSparkContext jsc,
final SparkSession sparkSession,
final Dataset<Row> rowDataset,
final TypedProperties props) {
final String sqlFile = props.getString(Config.TRANSFORMER_SQL_FILE);
if (null == sqlFile) {
throw new IllegalArgumentException(
"Missing required configuration : (" + Config.TRANSFORMER_SQL_FILE + ")");
}
final FileSystem fs = FSUtils.getFs(sqlFile, jsc.hadoopConfiguration(), true);
// tmp table name doesn't like dashes
final String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
LOG.info("Registering tmp table : " + tmpTable);
rowDataset.registerTempTable(tmpTable);
try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) {
Dataset<Row> rows = null;
// each sql statement is separated with semicolon hence set that as delimiter.
scanner.useDelimiter(";");
LOG.info("SQL Query for transformation : ");
while (scanner.hasNext()) {
String sqlStr = scanner.next();
sqlStr = sqlStr.replaceAll(SRC_PATTERN, tmpTable).trim();
if (!sqlStr.isEmpty()) {
LOG.info(sqlStr);
// overwrite the same dataset object until the last statement then return.
rows = sparkSession.sql(sqlStr);
}
}
return rows;
} catch (final IOException ioe) {
throw new HoodieIOException("Error reading transformer SQL file.", ioe);
}
}
/** Configs supported. */
private static class Config {
private static final String TRANSFORMER_SQL_FILE = "hoodie.deltastreamer.transformer.sql.file";
}
}

View File

@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestSqlFileBasedTransformer extends UtilitiesTestBase {
private TypedProperties props;
private SqlFileBasedTransformer sqlFileTransformer;
private Dataset<Row> inputDatasetRows;
private Dataset<Row> emptyDatasetRow;
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
UtilitiesTestBase.Helpers.copyToDFS(
"delta-streamer-config/sql-file-transformer.sql",
UtilitiesTestBase.dfs,
UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
UtilitiesTestBase.Helpers.copyToDFS(
"delta-streamer-config/sql-file-transformer-invalid.sql",
UtilitiesTestBase.dfs,
UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
UtilitiesTestBase.Helpers.copyToDFS(
"delta-streamer-config/sql-file-transformer-empty.sql",
UtilitiesTestBase.dfs,
UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
}
@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
}
@Override
@BeforeEach
public void setup() throws Exception {
super.setup();
props = new TypedProperties();
sqlFileTransformer = new SqlFileBasedTransformer();
inputDatasetRows = getInputDatasetRows();
emptyDatasetRow = getEmptyDatasetRow();
}
@Override
@AfterEach
public void teardown() throws Exception {
super.teardown();
}
@Test
public void testSqlFileBasedTransformerIllegalArguments() {
// Test if the class throws illegal argument exception when argument not present.
assertThrows(
IllegalArgumentException.class,
() -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
}
@Test
public void testSqlFileBasedTransformerIncorrectConfig() {
// Test if the class throws hoodie IO exception correctly when given a incorrect config.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
UtilitiesTestBase.dfsBasePath + "/non-exist-sql-file.sql");
assertThrows(
HoodieIOException.class,
() -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
}
@Test
public void testSqlFileBasedTransformerInvalidSQL() {
// Test if the SQL file based transformer works as expected for the invalid SQL statements.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
assertThrows(
ParseException.class,
() -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props));
}
@Test
public void testSqlFileBasedTransformerEmptyDataset() {
// Test if the SQL file based transformer works as expected for the empty SQL statements.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
Dataset<Row> emptyRow = sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
String[] actualRows = emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
String[] expectedRows = emptyDatasetRow.collectAsList().toArray(new String[0]);
assertArrayEquals(expectedRows, actualRows);
}
@Test
public void testSqlFileBasedTransformer() {
// Test if the SQL file based transformer works as expected for the correct input.
props.setProperty(
"hoodie.deltastreamer.transformer.sql.file",
UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
Dataset<Row> transformedRow =
sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
// Called distinct() and sort() to match the transformation in this file:
// hudi-utilities/src/test/resources/delta-streamer-config/sql-file-transformer.sql
String[] expectedRows =
inputDatasetRows
.distinct()
.sort("col1")
.as(Encoders.STRING())
.collectAsList()
.toArray(new String[0]);
String[] actualRows = transformedRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
assertArrayEquals(expectedRows, actualRows);
}
private Dataset<Row> getInputDatasetRows() {
// Create few rows with duplicate data.
List<Row> list = new ArrayList<>();
list.add(RowFactory.create("one"));
list.add(RowFactory.create("two"));
list.add(RowFactory.create("three"));
list.add(RowFactory.create("four"));
list.add(RowFactory.create("four"));
// Create the schema struct.
List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<>();
listOfStructField.add(DataTypes.createStructField("col1", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(listOfStructField);
// Create the data frame with the rows and schema.
return sparkSession.createDataFrame(list, structType);
}
private Dataset<Row> getEmptyDatasetRow() {
// Create the schema struct.
List<org.apache.spark.sql.types.StructField> listOfStructField = new ArrayList<>();
listOfStructField.add(DataTypes.createStructField("col1", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(listOfStructField);
// Create the data frame with the rows and schema.
List<Row> list = new ArrayList<>();
// Create empty dataframe with the schema.
return sparkSession.createDataFrame(list, structType);
}
}

View File

@@ -0,0 +1,25 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"), you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
CACHE
TABLE tmp_trips
SELECT *
FROM <SRC>
WHERE
True = False;
SELECT *
FROM tmp_trips;

View File

@@ -0,0 +1,22 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"), you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
SEECT
DISTINCT col1
FROM
<SRC>
ORDER BY col1;

View File

@@ -0,0 +1,27 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"), you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
SELECT RAND();
SELECT 1;
SELECT
DISTINCT col1
FROM
<SRC>
ORDER BY col1;