[HUDI-3547] Introduce MaxwellSourcePostProcessor to extract data from Maxwell json string (#4987)
* [HUDI-3547] Introduce MaxwellSourcePostProcessor to extract data from Maxwell json string * add ut * Address comment
This commit is contained in:
@@ -21,6 +21,9 @@ package org.apache.hudi.common.util;
|
|||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.time.format.DateTimeParseException;
|
import java.time.format.DateTimeParseException;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@@ -126,6 +129,20 @@ public class DateTimeUtils {
|
|||||||
return labelToUnit;
|
return labelToUnit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert UNIX_TIMESTAMP to string in given format.
|
||||||
|
*
|
||||||
|
* @param unixTimestamp UNIX_TIMESTAMP
|
||||||
|
* @param timeFormat string time format
|
||||||
|
*/
|
||||||
|
public static String formatUnixTimestamp(long unixTimestamp, String timeFormat) {
|
||||||
|
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(timeFormat));
|
||||||
|
DateTimeFormatter dtf = DateTimeFormatter.ofPattern(timeFormat);
|
||||||
|
return LocalDateTime
|
||||||
|
.ofInstant(Instant.ofEpochSecond(unixTimestamp), ZoneId.systemDefault())
|
||||||
|
.format(dtf);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enum which defines time unit, mostly used to parse value from configuration file.
|
* Enum which defines time unit, mostly used to parse value from configuration file.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -0,0 +1,190 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.utilities.sources.processor.maxwell;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.config.ConfigProperty;
|
||||||
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.util.DateTimeUtils;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
|
||||||
|
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
|
||||||
|
import java.util.Locale;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.DATE_STRING;
|
||||||
|
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.EPOCHMILLISECONDS;
|
||||||
|
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.NON_TIMESTAMP;
|
||||||
|
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.UNIX_TIMESTAMP;
|
||||||
|
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.valueOf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link JsonKafkaSourcePostProcessor} help to extract fresh data from maxwell json string and tag the record as
|
||||||
|
* delete or not.
|
||||||
|
*/
|
||||||
|
public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor {
|
||||||
|
|
||||||
|
private static final Logger LOG = LogManager.getLogger(MaxwellJsonKafkaSourcePostProcessor.class);
|
||||||
|
|
||||||
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) {
|
||||||
|
super(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// Partial fields in maxwell json string
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private static final String DATABASE = "database";
|
||||||
|
private static final String TABLE = "table";
|
||||||
|
private static final String DATA = "data";
|
||||||
|
private static final String OPERATION_TYPE = "type";
|
||||||
|
private static final String TS = "ts";
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// Operation types
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
|
||||||
|
private static final String INSERT = "insert";
|
||||||
|
private static final String UPDATE = "update";
|
||||||
|
private static final String DELETE = "delete";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configs to be passed for this processor.
|
||||||
|
*/
|
||||||
|
public static class Config {
|
||||||
|
public static final ConfigProperty<String> DATABASE_NAME_REGEX_PROP = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.database.regex")
|
||||||
|
.noDefaultValue()
|
||||||
|
.withDocumentation("Database name regex.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> TABLE_NAME_REGEX_PROP = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.table.regex")
|
||||||
|
.noDefaultValue()
|
||||||
|
.withDocumentation("Table name regex.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> PRECOMBINE_FIELD_TYPE_PROP = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.type")
|
||||||
|
.defaultValue("DATA_STRING")
|
||||||
|
.withDocumentation("Data type of the preCombine field. could be NON_TIMESTAMP, DATE_STRING,"
|
||||||
|
+ "UNIX_TIMESTAMP or EPOCHMILLISECONDS. DATA_STRING by default ");
|
||||||
|
|
||||||
|
public static final ConfigProperty<String> PRECOMBINE_FIELD_FORMAT_PROP = ConfigProperty
|
||||||
|
.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.format")
|
||||||
|
.defaultValue("yyyy-MM-dd HH:mm:ss")
|
||||||
|
.withDocumentation("When the preCombine filed is in DATE_STRING format, use should tell hoodie"
|
||||||
|
+ "what format it is. 'yyyy-MM-dd HH:mm:ss' by default");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JavaRDD<String> process(JavaRDD<String> maxwellJsonRecords) {
|
||||||
|
return maxwellJsonRecords.map(record -> {
|
||||||
|
JsonNode inputJson = MAPPER.readTree(record);
|
||||||
|
String database = inputJson.get(DATABASE).textValue();
|
||||||
|
String table = inputJson.get(TABLE).textValue();
|
||||||
|
|
||||||
|
// filter out target databases and tables
|
||||||
|
if (isTargetTable(database, table)) {
|
||||||
|
|
||||||
|
LOG.info(String.format("Maxwell source processor starts process table : %s.%s", database, table));
|
||||||
|
|
||||||
|
ObjectNode result = (ObjectNode) inputJson.get(DATA);
|
||||||
|
String type = inputJson.get(OPERATION_TYPE).textValue();
|
||||||
|
|
||||||
|
// insert or update
|
||||||
|
if (INSERT.equals(type) || UPDATE.equals(type)) {
|
||||||
|
// tag this record not delete.
|
||||||
|
result.put(HoodieRecord.HOODIE_IS_DELETED, false);
|
||||||
|
return result.toString();
|
||||||
|
|
||||||
|
// delete
|
||||||
|
} else if (DELETE.equals(type)) {
|
||||||
|
return processDelete(inputJson, result);
|
||||||
|
} else {
|
||||||
|
// there might be some ddl data, ignore it
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// not the data from target table(s), ignore it
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).filter(Objects::nonNull);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String processDelete(JsonNode inputJson, ObjectNode result) {
|
||||||
|
// tag this record as delete.
|
||||||
|
result.put(HoodieRecord.HOODIE_IS_DELETED, true);
|
||||||
|
|
||||||
|
PreCombineFieldType preCombineFieldType =
|
||||||
|
valueOf(this.props.getString(Config.PRECOMBINE_FIELD_TYPE_PROP.key(),
|
||||||
|
Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toUpperCase(Locale.ROOT));
|
||||||
|
|
||||||
|
// maxwell won't update the `update_time`(delete time) field of the record which is tagged as delete. so if we
|
||||||
|
// want to delete this record correctly, we should update its `update_time` to a time closer to where the
|
||||||
|
// delete operation actually occurred. here we use `ts` from maxwell json string as this 'delete' time.
|
||||||
|
|
||||||
|
// we can update the `update_time`(delete time) only when it is in timestamp format.
|
||||||
|
if (!preCombineFieldType.equals(NON_TIMESTAMP)) {
|
||||||
|
String preCombineField = this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(),
|
||||||
|
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue());
|
||||||
|
|
||||||
|
// ts from maxwell
|
||||||
|
long ts = inputJson.get(TS).longValue();
|
||||||
|
|
||||||
|
// convert the `update_time`(delete time) to the proper format.
|
||||||
|
if (preCombineFieldType.equals(DATE_STRING)) {
|
||||||
|
// DATE_STRING format
|
||||||
|
String timeFormat = this.props.getString(Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), Config.PRECOMBINE_FIELD_FORMAT_PROP.defaultValue());
|
||||||
|
result.put(preCombineField, DateTimeUtils.formatUnixTimestamp(ts, timeFormat));
|
||||||
|
} else if (preCombineFieldType.equals(EPOCHMILLISECONDS)) {
|
||||||
|
// EPOCHMILLISECONDS format
|
||||||
|
result.put(preCombineField, ts * 1000L);
|
||||||
|
} else if (preCombineFieldType.equals(UNIX_TIMESTAMP)) {
|
||||||
|
// UNIX_TIMESTAMP format
|
||||||
|
result.put(preCombineField, ts);
|
||||||
|
} else {
|
||||||
|
throw new HoodieSourcePostProcessException("Unsupported preCombine time format " + preCombineFieldType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if it is the right table we want to consume from.
|
||||||
|
*
|
||||||
|
* @param database database the data belong to
|
||||||
|
* @param table table the data belong to
|
||||||
|
*/
|
||||||
|
private boolean isTargetTable(String database, String table) {
|
||||||
|
String databaseRegex = this.props.getString(Config.DATABASE_NAME_REGEX_PROP.key());
|
||||||
|
String tableRegex = this.props.getString(Config.TABLE_NAME_REGEX_PROP.key());
|
||||||
|
return Pattern.matches(databaseRegex, database) && Pattern.matches(tableRegex, table);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.utilities.sources.processor.maxwell;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enum of preCombine field time type.
|
||||||
|
*/
|
||||||
|
public enum PreCombineFieldType {
|
||||||
|
/**
|
||||||
|
* Not a timestamp type field
|
||||||
|
*/
|
||||||
|
NON_TIMESTAMP,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timestamp type field in string format.
|
||||||
|
*/
|
||||||
|
DATE_STRING,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timestamp type field in UNIX_TIMESTAMP format.
|
||||||
|
*/
|
||||||
|
UNIX_TIMESTAMP,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timestamp type field in EPOCHMILLISECONDS format.
|
||||||
|
*/
|
||||||
|
EPOCHMILLISECONDS
|
||||||
|
}
|
||||||
@@ -18,24 +18,36 @@
|
|||||||
|
|
||||||
package org.apache.hudi.utilities.sources;
|
package org.apache.hudi.utilities.sources;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
|
import org.apache.hudi.common.util.DateTimeUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
|
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
|
||||||
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
|
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
|
||||||
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
|
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
|
||||||
|
import org.apache.hudi.utilities.sources.processor.maxwell.MaxwellJsonKafkaSourcePostProcessor;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT;
|
import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT;
|
||||||
import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
|
import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource {
|
public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource {
|
||||||
|
|
||||||
@@ -120,6 +132,124 @@ public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource {
|
|||||||
assertEquals(0, fetch1.getBatch().get().count());
|
assertEquals(0, fetch1.getBatch().get().count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxwellJsonKafkaSourcePostProcessor() throws IOException {
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// Maxwell data
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// database hudi, table hudi_maxwell_01 (insert, update and delete)
|
||||||
|
String hudiMaxwell01Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\","
|
||||||
|
+ "\"ts\":1647074402,\"xid\":6233,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
|
||||||
|
+ "\"name\":\"mathieu\",\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\","
|
||||||
|
+ "\"update_time\":\"2022-03-12 08:40:02\"}}";
|
||||||
|
|
||||||
|
String hudiMaxwell01Update = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"update\","
|
||||||
|
+ "\"ts\":1647074482,\"xid\":6440,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
|
||||||
|
+ "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"},"
|
||||||
|
+ "\"old\":{\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\",\"update_time\":\"2022-03-12 08:40:02\"}}";
|
||||||
|
|
||||||
|
String hudiMaxwell01Delete = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"delete\","
|
||||||
|
+ "\"ts\":1647074555,\"xid\":6631,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\","
|
||||||
|
+ "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"}}";
|
||||||
|
|
||||||
|
String hudiMaxwell01Ddl = "{\"type\":\"table-alter\",\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\","
|
||||||
|
+ "\"old\":{\"database\":\"hudi\",\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\","
|
||||||
|
+ "\"primary-key\":[\"id\"],\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},"
|
||||||
|
+ "{\"type\":\"varchar\",\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\","
|
||||||
|
+ "\"signed\":true},{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},"
|
||||||
|
+ "{\"type\":\"timestamp\",\"name\":\"update_time\",\"column-length\":0}]},\"def\":{\"database\":\"hudi\","
|
||||||
|
+ "\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\",\"primary-key\":[\"id\"],"
|
||||||
|
+ "\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},{\"type\":\"varchar\","
|
||||||
|
+ "\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\",\"signed\":true},"
|
||||||
|
+ "{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},{\"type\":\"timestamp\","
|
||||||
|
+ "\"name\":\"update_time\",\"column-length\":0}]},\"ts\":1647072305000,\"sql\":\"/* ApplicationName=DBeaver "
|
||||||
|
+ "21.0.4 - Main */ ALTER TABLE hudi.hudi_maxwell_01 MODIFY COLUMN age int(3) NULL\"}";
|
||||||
|
|
||||||
|
// database hudi, table hudi_maxwell_010, insert
|
||||||
|
String hudiMaxwell010Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_010\",\"type\":\"insert\","
|
||||||
|
+ "\"ts\":1647073982,\"xid\":5164,\"commit\":true,\"data\":{\"id\":\"f3eaf4cdf7534e47a88cdf93d19b2ee6\","
|
||||||
|
+ "\"name\":\"wangxianghu\",\"age\":18,\"insert_time\":\"2022-03-12 08:33:02\","
|
||||||
|
+ "\"update_time\":\"2022-03-12 08:33:02\"}}";
|
||||||
|
|
||||||
|
// database hudi_02, table hudi_maxwell_02, insert
|
||||||
|
String hudi02Maxwell02Insert = "{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_02\",\"type\":\"insert\","
|
||||||
|
+ "\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\","
|
||||||
|
+ "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\","
|
||||||
|
+ "\"update_time\":\"2022-03-12 08:31:56\"}}";
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// Tests
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
TypedProperties props = new TypedProperties();
|
||||||
|
props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key(), "hudi(_)?[0-9]{0,2}");
|
||||||
|
props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(), "hudi_maxwell(_)?[0-9]{0,2}");
|
||||||
|
|
||||||
|
// test insert and update
|
||||||
|
JavaRDD<String> inputInsertAndUpdate = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell01Update));
|
||||||
|
MaxwellJsonKafkaSourcePostProcessor processor = new MaxwellJsonKafkaSourcePostProcessor(props);
|
||||||
|
processor.process(inputInsertAndUpdate).map(mapper::readTree).foreach(record -> {
|
||||||
|
// database name should be null
|
||||||
|
JsonNode database = record.get("database");
|
||||||
|
// insert and update records should be tagged as no delete
|
||||||
|
boolean isDelete = record.get(HoodieRecord.HOODIE_IS_DELETED).booleanValue();
|
||||||
|
|
||||||
|
assertFalse(isDelete);
|
||||||
|
assertNull(database);
|
||||||
|
});
|
||||||
|
|
||||||
|
// test delete
|
||||||
|
props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_TYPE_PROP.key(), "DATE_STRING");
|
||||||
|
props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), "yyyy-MM-dd HH:mm:ss");
|
||||||
|
props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "update_time");
|
||||||
|
|
||||||
|
JavaRDD<String> inputDelete = jsc().parallelize(Collections.singletonList(hudiMaxwell01Delete));
|
||||||
|
|
||||||
|
long ts = mapper.readTree(hudiMaxwell01Delete).get("ts").longValue();
|
||||||
|
String formatTs = DateTimeUtils.formatUnixTimestamp(ts, "yyyy-MM-dd HH:mm:ss");
|
||||||
|
|
||||||
|
new MaxwellJsonKafkaSourcePostProcessor(props)
|
||||||
|
.process(inputDelete).map(mapper::readTree).foreach(record -> {
|
||||||
|
|
||||||
|
// delete records should be tagged as delete
|
||||||
|
boolean isDelete = record.get(HoodieRecord.HOODIE_IS_DELETED).booleanValue();
|
||||||
|
// update_time should equals ts
|
||||||
|
String updateTime = record.get("update_time").textValue();
|
||||||
|
|
||||||
|
assertEquals(formatTs, updateTime);
|
||||||
|
assertTrue(isDelete);
|
||||||
|
});
|
||||||
|
|
||||||
|
// test preCombine field is not time
|
||||||
|
props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_TYPE_PROP.key(), "NON_TIMESTAMP");
|
||||||
|
props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "id");
|
||||||
|
|
||||||
|
JavaRDD<String> inputDelete2 = jsc().parallelize(Collections.singletonList(hudiMaxwell01Delete));
|
||||||
|
|
||||||
|
String updateTimeInUpdate = mapper.readTree(hudiMaxwell01Update).get("data").get("update_time").textValue();
|
||||||
|
new MaxwellJsonKafkaSourcePostProcessor(props)
|
||||||
|
.process(inputDelete2).map(mapper::readTree).foreach(record -> {
|
||||||
|
|
||||||
|
// updateTimeInUpdate should updateTimeInDelete
|
||||||
|
String updateTimeInDelete = record.get("update_time").textValue();
|
||||||
|
assertEquals(updateTimeInUpdate, updateTimeInDelete);
|
||||||
|
});
|
||||||
|
|
||||||
|
// test database, table regex
|
||||||
|
JavaRDD<String> dirtyData = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell010Insert, hudi02Maxwell02Insert));
|
||||||
|
long validDataNum = processor.process(dirtyData).count();
|
||||||
|
// hudiMaxwell010Insert is dirty data
|
||||||
|
assertEquals(2, validDataNum);
|
||||||
|
|
||||||
|
// test ddl
|
||||||
|
JavaRDD<String> ddlData = jsc().parallelize(Collections.singletonList(hudiMaxwell01Ddl));
|
||||||
|
// ddl data will be ignored, ths count should be 0
|
||||||
|
long ddlDataNum = processor.process(ddlData).count();
|
||||||
|
assertEquals(0, ddlDataNum);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JsonKafkaSourcePostProcessor that return a sub RDD of the incoming data which get the data from incoming data using
|
* JsonKafkaSourcePostProcessor that return a sub RDD of the incoming data which get the data from incoming data using
|
||||||
* {org.apache.spark.api.java.JavaRDD#sample(boolean, double, long)} method.
|
* {org.apache.spark.api.java.JavaRDD#sample(boolean, double, long)} method.
|
||||||
|
|||||||
Reference in New Issue
Block a user