[HUDI-311] : Support for AWS Database Migration Service in DeltaStreamer
- Add a transformer class, that adds `Op` fiels if not found in input frame - Add a payload implementation, that issues deletes when Op=D - Remove Parquet as a top level source type, consolidate with RowSource - Made delta streamer work without a property file, simply using overridden cli options - Unit tests for transformer/payload classes
This commit is contained in:
committed by
vinoth chandar
parent
313fab5fd1
commit
350b0ecb4d
@@ -60,6 +60,17 @@ public class DFSPropertiesConfiguration {
|
||||
visitFile(rootFile);
|
||||
}
|
||||
|
||||
public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
|
||||
this(fs, rootFile, new TypedProperties());
|
||||
}
|
||||
|
||||
public DFSPropertiesConfiguration() {
|
||||
this.fs = null;
|
||||
this.rootFile = null;
|
||||
this.props = new TypedProperties();
|
||||
this.visitedFiles = new HashSet<>();
|
||||
}
|
||||
|
||||
private String[] splitProperty(String line) {
|
||||
int ind = line.indexOf('=');
|
||||
String k = line.substring(0, ind).trim();
|
||||
@@ -106,10 +117,6 @@ public class DFSPropertiesConfiguration {
|
||||
}
|
||||
}
|
||||
|
||||
public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
|
||||
this(fs, rootFile, new TypedProperties());
|
||||
}
|
||||
|
||||
public TypedProperties getConfig() {
|
||||
return props;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.payload;
|
||||
|
||||
import org.apache.hudi.OverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3.
|
||||
*
|
||||
* Typically, we get the following pattern of full change records corresponding to DML against the
|
||||
* source database
|
||||
*
|
||||
* - Full load records with no `Op` field
|
||||
* - For inserts against the source table, records contain full after image with `Op=I`
|
||||
* - For updates against the source table, records contain full after image with `Op=U`
|
||||
* - For deletes against the source table, records contain full before image with `Op=D`
|
||||
*
|
||||
* This payload implementation will issue matching insert, delete, updates against the hudi dataset
|
||||
*
|
||||
*/
|
||||
public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
|
||||
|
||||
public static final String OP_FIELD = "Op";
|
||||
|
||||
public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
|
||||
super(record, orderingVal);
|
||||
}
|
||||
|
||||
public AWSDmsAvroPayload(Option<GenericRecord> record) {
|
||||
this(record.get(), (record1) -> 0); // natural order
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
|
||||
throws IOException {
|
||||
IndexedRecord insertValue = getInsertValue(schema).get();
|
||||
boolean delete = false;
|
||||
if (insertValue instanceof GenericRecord) {
|
||||
GenericRecord record = (GenericRecord) insertValue;
|
||||
delete = record.get(OP_FIELD) != null && record.get(OP_FIELD).toString().equalsIgnoreCase("D");
|
||||
}
|
||||
|
||||
return delete ? Option.empty() : Option.of(insertValue);
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,7 @@ import org.apache.hudi.common.util.TypedProperties;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.Source;
|
||||
@@ -92,16 +92,24 @@ public class UtilHelpers {
|
||||
/**
|
||||
*/
|
||||
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath, List<String> overriddenProps) {
|
||||
DFSPropertiesConfiguration conf;
|
||||
try {
|
||||
conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
|
||||
} catch (Exception e) {
|
||||
conf = new DFSPropertiesConfiguration();
|
||||
LOG.warn("Unexpected error read props file at :" + cfgPath, e);
|
||||
}
|
||||
|
||||
try {
|
||||
DFSPropertiesConfiguration conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
|
||||
if (!overriddenProps.isEmpty()) {
|
||||
LOG.info("Adding overridden properties to file properties.");
|
||||
conf.addProperties(new BufferedReader(new StringReader(String.join("\n", overriddenProps))));
|
||||
}
|
||||
return conf;
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Unable to read props file at :" + cfgPath, e);
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException("Unexpected error adding config overrides", ioe);
|
||||
}
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
public static TypedProperties buildProperties(List<String> props) {
|
||||
|
||||
@@ -23,7 +23,6 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.utilities.sources.AvroSource;
|
||||
import org.apache.hudi.utilities.sources.InputBatch;
|
||||
import org.apache.hudi.utilities.sources.JsonSource;
|
||||
import org.apache.hudi.utilities.sources.ParquetSource;
|
||||
import org.apache.hudi.utilities.sources.RowSource;
|
||||
import org.apache.hudi.utilities.sources.Source;
|
||||
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
|
||||
@@ -60,8 +59,6 @@ public final class SourceFormatAdapter {
|
||||
switch (source.getSourceType()) {
|
||||
case AVRO:
|
||||
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
case PARQUET:
|
||||
return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
case JSON: {
|
||||
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
|
||||
@@ -102,18 +99,6 @@ public final class SourceFormatAdapter {
|
||||
.orElse(null)),
|
||||
r.getCheckpointForNextBatch(), r.getSchemaProvider());
|
||||
}
|
||||
case PARQUET: {
|
||||
InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
|
||||
return new InputBatch<>(
|
||||
Option
|
||||
.ofNullable(
|
||||
r.getBatch()
|
||||
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
|
||||
source.getSparkSession()))
|
||||
.orElse(null)),
|
||||
r.getCheckpointForNextBatch(), r.getSchemaProvider());
|
||||
}
|
||||
case JSON: {
|
||||
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
|
||||
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
|
||||
|
||||
@@ -19,8 +19,10 @@
|
||||
package org.apache.hudi.utilities.schema;
|
||||
|
||||
import org.apache.hudi.AvroConversionUtils;
|
||||
import org.apache.hudi.common.util.TypedProperties;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
|
||||
public class RowBasedSchemaProvider extends SchemaProvider {
|
||||
@@ -31,6 +33,10 @@ public class RowBasedSchemaProvider extends SchemaProvider {
|
||||
|
||||
private StructType rowStruct;
|
||||
|
||||
public RowBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
|
||||
super(props, jssc);
|
||||
}
|
||||
|
||||
public RowBasedSchemaProvider(StructType rowStruct) {
|
||||
super(null, null);
|
||||
this.rowStruct = rowStruct;
|
||||
|
||||
@@ -24,17 +24,15 @@ import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.parquet.avro.AvroParquetInputFormat;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
/**
|
||||
* DFS Source that reads parquet data.
|
||||
*/
|
||||
public class ParquetDFSSource extends ParquetSource {
|
||||
public class ParquetDFSSource extends RowSource {
|
||||
|
||||
private final DFSPathSelector pathSelector;
|
||||
|
||||
@@ -45,17 +43,15 @@ public class ParquetDFSSource extends ParquetSource {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
|
||||
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
|
||||
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
|
||||
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
|
||||
return selectPathsWithMaxModificationTime.getLeft()
|
||||
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
|
||||
.orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
|
||||
.map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
|
||||
.orElseGet(() -> Pair.of(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
|
||||
}
|
||||
|
||||
private JavaRDD<GenericRecord> fromFiles(String pathStr) {
|
||||
JavaPairRDD<Void, GenericRecord> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class,
|
||||
Void.class, GenericRecord.class, sparkContext.hadoopConfiguration());
|
||||
return avroRDD.values();
|
||||
private Dataset<Row> fromFiles(String pathStr) {
|
||||
return sparkSession.read().parquet(pathStr.split(","));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ public abstract class Source<T> implements Serializable {
|
||||
private static final Logger LOG = LogManager.getLogger(Source.class);
|
||||
|
||||
public enum SourceType {
|
||||
JSON, AVRO, ROW, PARQUET
|
||||
JSON, AVRO, ROW
|
||||
}
|
||||
|
||||
protected transient TypedProperties props;
|
||||
|
||||
@@ -16,20 +16,36 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utilities.sources;
|
||||
package org.apache.hudi.utilities.transform;
|
||||
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.TypedProperties;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
import org.apache.hudi.payload.AWSDmsAvroPayload;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> {
|
||||
import java.util.Arrays;
|
||||
|
||||
public ParquetSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
|
||||
SchemaProvider schemaProvider) {
|
||||
super(props, sparkContext, sparkSession, schemaProvider, SourceType.PARQUET);
|
||||
import static org.apache.spark.sql.functions.lit;
|
||||
|
||||
/**
|
||||
* A Simple transformer that adds `Op` field with value `I`, for AWS DMS data, if the field is not
|
||||
* present.
|
||||
*/
|
||||
public class AWSDmsTransformer implements Transformer {
|
||||
|
||||
@Override
|
||||
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
|
||||
TypedProperties properties) {
|
||||
Option<String> opColumnOpt = Option.fromJavaOptional(
|
||||
Arrays.stream(rowDataset.columns()).filter(c -> c.equals(AWSDmsAvroPayload.OP_FIELD)).findFirst());
|
||||
if (opColumnOpt.isPresent()) {
|
||||
return rowDataset;
|
||||
} else {
|
||||
return rowDataset.withColumn(AWSDmsAvroPayload.OP_FIELD, lit(""));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,107 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.utilities;
|
||||
|
||||
import org.apache.hudi.payload.AWSDmsAvroPayload;
|
||||
import org.apache.hudi.utilities.transform.AWSDmsTransformer;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestAWSDatabaseMigrationServiceSource {
|
||||
|
||||
private static JavaSparkContext jsc;
|
||||
private static SparkSession spark;
|
||||
|
||||
@BeforeClass
|
||||
public static void setupTest() {
|
||||
jsc = UtilHelpers.buildSparkContext("aws-dms-test", "local[2]");
|
||||
spark = SparkSession.builder().config(jsc.getConf()).getOrCreate();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownTest() {
|
||||
if (jsc != null) {
|
||||
jsc.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPayload() throws IOException {
|
||||
final Schema schema = Schema.createRecord(Arrays.asList(
|
||||
new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
|
||||
new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
|
||||
new Schema.Field(AWSDmsAvroPayload.OP_FIELD, Schema.create(Schema.Type.STRING), "", null)
|
||||
));
|
||||
final GenericRecord record = new GenericData.Record(schema);
|
||||
|
||||
record.put("id", "1");
|
||||
record.put("Op", "");
|
||||
record.put("ts", 0L);
|
||||
AWSDmsAvroPayload payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"));
|
||||
assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent());
|
||||
|
||||
record.put("Op", "I");
|
||||
payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"));
|
||||
assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent());
|
||||
|
||||
record.put("Op", "D");
|
||||
payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"));
|
||||
assertFalse(payload.combineAndGetUpdateValue(null, schema).isPresent());
|
||||
}
|
||||
|
||||
static class Record implements Serializable {
|
||||
String id;
|
||||
long ts;
|
||||
|
||||
Record(String id, long ts) {
|
||||
this.id = id;
|
||||
this.ts = ts;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformer() {
|
||||
AWSDmsTransformer transformer = new AWSDmsTransformer();
|
||||
Dataset<Row> inputFrame = spark.createDataFrame(Arrays.asList(
|
||||
new Record("1", 3433L),
|
||||
new Record("2", 3433L)), Record.class);
|
||||
|
||||
Dataset<Row> outputFrame = transformer.apply(jsc, spark, inputFrame, null);
|
||||
assertTrue(Arrays.asList(outputFrame.schema().fields()).stream()
|
||||
.map(f -> f.name()).anyMatch(n -> n.equals(AWSDmsAvroPayload.OP_FIELD)));
|
||||
assertTrue(outputFrame.select(AWSDmsAvroPayload.OP_FIELD).collectAsList().stream()
|
||||
.allMatch(r -> r.getString(0).equals("")));
|
||||
}
|
||||
}
|
||||
@@ -61,7 +61,6 @@ import java.util.List;
|
||||
/**
|
||||
* Abstract test that provides a dfs & spark contexts.
|
||||
*
|
||||
* TODO(vc): this needs to be done across the board.
|
||||
*/
|
||||
public class UtilitiesTestBase {
|
||||
|
||||
|
||||
@@ -139,7 +139,7 @@ public class TestDFSSource extends UtilitiesTestBase {
|
||||
|
||||
TypedProperties props = new TypedProperties();
|
||||
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/parquetFiles");
|
||||
ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
|
||||
ParquetDFSSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
|
||||
SourceFormatAdapter parquetSource = new SourceFormatAdapter(parquetDFSSource);
|
||||
|
||||
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
|
||||
|
||||
Reference in New Issue
Block a user