[HUDI-731] Add ChainedTransformer (#1440)
* [HUDI-731] Add ChainedTransformer
This commit is contained in:
@@ -78,6 +78,7 @@ import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
@@ -183,39 +184,39 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
static class TestHelpers {
|
||||
|
||||
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, Operation op) {
|
||||
return makeConfig(basePath, op, DropAllTransformer.class.getName());
|
||||
return makeConfig(basePath, op, Collections.singletonList(DropAllTransformer.class.getName()));
|
||||
}
|
||||
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op) {
|
||||
return makeConfig(basePath, op, TripsWithDistanceTransformer.class.getName());
|
||||
return makeConfig(basePath, op, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||
}
|
||||
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName) {
|
||||
return makeConfig(basePath, op, transformerClassName, PROPS_FILENAME_TEST_SOURCE, false);
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames) {
|
||||
return makeConfig(basePath, op, transformerClassNames, PROPS_FILENAME_TEST_SOURCE, false);
|
||||
}
|
||||
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
|
||||
String propsFilename, boolean enableHiveSync) {
|
||||
return makeConfig(basePath, op, transformerClassName, propsFilename, enableHiveSync, true,
|
||||
false, null, null);
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames,
|
||||
String propsFilename, boolean enableHiveSync) {
|
||||
return makeConfig(basePath, op, transformerClassNames, propsFilename, enableHiveSync, true,
|
||||
false, null, null);
|
||||
}
|
||||
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, List<String> transformerClassNames,
|
||||
String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
|
||||
String payloadClassName, String tableType) {
|
||||
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync,
|
||||
String payloadClassName, String tableType) {
|
||||
return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassNames, propsFilename, enableHiveSync,
|
||||
useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType, "timestamp");
|
||||
}
|
||||
|
||||
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName,
|
||||
String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
|
||||
List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass,
|
||||
int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField) {
|
||||
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
|
||||
cfg.targetBasePath = basePath;
|
||||
cfg.targetTableName = "hoodie_trips";
|
||||
cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
|
||||
cfg.sourceClassName = sourceClassName;
|
||||
cfg.transformerClassName = transformerClassName;
|
||||
cfg.transformerClassNames = transformerClassNames;
|
||||
cfg.operation = op;
|
||||
cfg.enableHiveSync = enableHiveSync;
|
||||
cfg.sourceOrderingField = sourceOrderingField;
|
||||
@@ -339,7 +340,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
String tableBasePath = dfsBasePath + "/test_table";
|
||||
HoodieDeltaStreamer deltaStreamer =
|
||||
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
||||
TripsWithDistanceTransformer.class.getName(), PROPS_FILENAME_TEST_INVALID, false), jsc);
|
||||
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
|
||||
deltaStreamer.sync();
|
||||
fail("Should error out when setting the key generator class property to an invalid value");
|
||||
} catch (IOException e) {
|
||||
@@ -451,7 +452,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
|
||||
// Initial bulk insert to ingest to first hudi table
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true);
|
||||
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true);
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
||||
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
@@ -524,7 +525,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
public void testNullSchemaProvider() throws Exception {
|
||||
String tableBasePath = dfsBasePath + "/test_table";
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
false, false, null, null);
|
||||
try {
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
||||
@@ -539,15 +540,15 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
public void testPayloadClassUpdate() throws Exception {
|
||||
String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, false, null, "MERGE_ON_READ");
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
||||
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
|
||||
|
||||
//now create one more deltaStreamer instance and update payload class
|
||||
cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
|
||||
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
|
||||
|
||||
//now assert that hoodie.properties file now has updated payload class name
|
||||
@@ -565,14 +566,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
public void testPayloadClassUpdateWithCOWTable() throws Exception {
|
||||
String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, false, null, null);
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
||||
TestHelpers.assertRecordCount(1000, dataSetBasePath + "/*/*.parquet", sqlContext);
|
||||
|
||||
//now create one more deltaStreamer instance and update payload class
|
||||
cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
||||
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, true,
|
||||
true, true, DummyAvroPayload.class.getName(), null);
|
||||
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
|
||||
|
||||
@@ -668,12 +669,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET);
|
||||
}
|
||||
|
||||
private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception {
|
||||
prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
|
||||
private void testParquetDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
|
||||
prepareParquetDFSSource(useSchemaProvider, transformerClassNames != null);
|
||||
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
|
||||
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
||||
TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(),
|
||||
transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
|
||||
transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
|
||||
useSchemaProvider, 100000, false, null, null, "timestamp"), jsc);
|
||||
deltaStreamer.sync();
|
||||
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
@@ -687,7 +688,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
|
||||
@Test
|
||||
public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception {
|
||||
testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName());
|
||||
testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -697,7 +698,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
|
||||
@Test
|
||||
public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception {
|
||||
testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
|
||||
testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||
}
|
||||
|
||||
private void prepareCsvDFSSource(
|
||||
@@ -740,14 +741,14 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
}
|
||||
|
||||
private void testCsvDFSSource(
|
||||
boolean hasHeader, char sep, boolean useSchemaProvider, String transformerClassName) throws Exception {
|
||||
prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassName != null);
|
||||
boolean hasHeader, char sep, boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
|
||||
prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, transformerClassNames != null);
|
||||
String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
|
||||
String sourceOrderingField = (hasHeader || useSchemaProvider) ? "timestamp" : "_c0";
|
||||
HoodieDeltaStreamer deltaStreamer =
|
||||
new HoodieDeltaStreamer(TestHelpers.makeConfig(
|
||||
tableBasePath, Operation.INSERT, CsvDFSSource.class.getName(),
|
||||
transformerClassName, PROPS_FILENAME_TEST_CSV, false,
|
||||
transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
|
||||
useSchemaProvider, 1000, false, null, null, sourceOrderingField), jsc);
|
||||
deltaStreamer.sync();
|
||||
TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
@@ -785,7 +786,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
// No schema provider is specified, transformer is applied
|
||||
// In this case, the source schema comes from the inferred schema of the CSV files.
|
||||
// Target schema is determined based on the Dataframe after transformation
|
||||
testCsvDFSSource(true, '\t', false, TripsWithDistanceTransformer.class.getName());
|
||||
testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -793,7 +794,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
// The CSV files have header, the columns are separated by '\t'
|
||||
// File schema provider is used, transformer is applied
|
||||
// In this case, the source and target schema come from the Avro schema files
|
||||
testCsvDFSSource(true, '\t', true, TripsWithDistanceTransformer.class.getName());
|
||||
testCsvDFSSource(true, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -824,7 +825,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
// No CSV header and no schema provider at the same time are not recommended,
|
||||
// as the transformer behavior may be unexpected
|
||||
try {
|
||||
testCsvDFSSource(false, '\t', false, TripsWithDistanceTransformer.class.getName());
|
||||
testCsvDFSSource(false, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||
fail("Should error out when doing the transformation.");
|
||||
} catch (AnalysisException e) {
|
||||
LOG.error("Expected error during transformation", e);
|
||||
@@ -837,7 +838,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
// The CSV files do not have header, the columns are separated by '\t'
|
||||
// File schema provider is used, transformer is applied
|
||||
// In this case, the source and target schema come from the Avro schema files
|
||||
testCsvDFSSource(false, '\t', true, TripsWithDistanceTransformer.class.getName());
|
||||
testCsvDFSSource(false, '\t', true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utilities;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.utilities.transform.ChainedTransformer;
|
||||
import org.apache.hudi.utilities.transform.Transformer;
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.runners.Enclosed;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(Enclosed.class)
|
||||
public class TestUtilHelpers {
|
||||
|
||||
public static class TestCreateTransformer {
|
||||
|
||||
public static class TransformerFoo implements Transformer {
|
||||
|
||||
@Override
|
||||
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TransformerBar implements Transformer {
|
||||
|
||||
@Override
|
||||
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException exceptionRule = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testCreateTransformerNotPresent() throws IOException {
|
||||
assertFalse(UtilHelpers.createTransformer(null).isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateTransformerLoadOneClass() throws IOException {
|
||||
Transformer transformer = UtilHelpers.createTransformer(Collections.singletonList(TransformerFoo.class.getName())).get();
|
||||
assertTrue(transformer instanceof ChainedTransformer);
|
||||
List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames();
|
||||
assertEquals(1, transformerNames.size());
|
||||
assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateTransformerLoadMultipleClasses() throws IOException {
|
||||
List<String> classNames = Arrays.asList(TransformerFoo.class.getName(), TransformerBar.class.getName());
|
||||
Transformer transformer = UtilHelpers.createTransformer(classNames).get();
|
||||
assertTrue(transformer instanceof ChainedTransformer);
|
||||
List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames();
|
||||
assertEquals(2, transformerNames.size());
|
||||
assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
|
||||
assertEquals(TransformerBar.class.getName(), transformerNames.get(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateTransformerThrowsException() throws IOException {
|
||||
exceptionRule.expect(IOException.class);
|
||||
exceptionRule.expectMessage("Could not load transformer class(es) [foo, bar]");
|
||||
UtilHelpers.createTransformer(Arrays.asList("foo", "bar"));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utilities.transform;
|
||||
|
||||
import org.apache.hudi.utilities.UtilHelpers;
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.RowFactory;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.spark.sql.types.DataTypes.IntegerType;
|
||||
import static org.apache.spark.sql.types.DataTypes.StringType;
|
||||
import static org.apache.spark.sql.types.DataTypes.createStructField;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestChainedTransformer {
|
||||
|
||||
private JavaSparkContext jsc;
|
||||
private SparkSession sparkSession;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
jsc = UtilHelpers.buildSparkContext(this.getClass().getName() + "-hoodie", "local[2]");
|
||||
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
jsc.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChainedTransformation() {
|
||||
StructType schema = DataTypes.createStructType(
|
||||
new StructField[] {
|
||||
createStructField("foo", StringType, false)
|
||||
});
|
||||
Row r1 = RowFactory.create("100");
|
||||
Row r2 = RowFactory.create("200");
|
||||
Dataset<Row> original = sparkSession.sqlContext().createDataFrame(Arrays.asList(r1, r2), schema);
|
||||
|
||||
Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar");
|
||||
Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType));
|
||||
ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2));
|
||||
Dataset<Row> transformed = transformer.apply(jsc, sparkSession, original, null);
|
||||
|
||||
assertEquals(2, transformed.count());
|
||||
assertArrayEquals(new String[] {"bar"}, transformed.columns());
|
||||
List<Row> rows = transformed.collectAsList();
|
||||
assertEquals(100, rows.get(0).getInt(0));
|
||||
assertEquals(200, rows.get(1).getInt(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetTransformersNames() {
|
||||
Transformer t1 = (jsc, sparkSession, dataset, properties) -> dataset.withColumnRenamed("foo", "bar");
|
||||
Transformer t2 = (jsc, sparkSession, dataset, properties) -> dataset.withColumn("bar", dataset.col("bar").cast(IntegerType));
|
||||
ChainedTransformer transformer = new ChainedTransformer(Arrays.asList(t1, t2));
|
||||
List<String> classNames = transformer.getTransformersNames();
|
||||
assertEquals(t1.getClass().getName(), classNames.get(0));
|
||||
assertEquals(t2.getClass().getName(), classNames.get(1));
|
||||
}
|
||||
}
|
||||
@@ -16,9 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utilities;
|
||||
|
||||
import org.apache.hudi.utilities.transform.FlatteningTransformer;
|
||||
package org.apache.hudi.utilities.transform;
|
||||
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
import org.apache.spark.sql.types.Metadata;
|
||||
Reference in New Issue
Block a user