[HUDI-4160] Make database regex of MaxwellJsonKafkaSourcePostProcessor optional (#5697)
This commit is contained in:
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigProperty;
|
|||||||
import org.apache.hudi.common.config.TypedProperties;
|
import org.apache.hudi.common.config.TypedProperties;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.util.DateTimeUtils;
|
import org.apache.hudi.common.util.DateTimeUtils;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
|
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
|
||||||
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
|
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
|
||||||
@@ -29,8 +30,6 @@ import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;
|
|||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
import org.apache.log4j.LogManager;
|
|
||||||
import org.apache.log4j.Logger;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
@@ -49,12 +48,15 @@ import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFiel
|
|||||||
*/
|
*/
|
||||||
public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor {
|
public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor {
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(MaxwellJsonKafkaSourcePostProcessor.class);
|
|
||||||
|
|
||||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private final Option<String> databaseRegex;
|
||||||
|
private final String tableRegex;
|
||||||
|
|
||||||
public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) {
|
public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) {
|
||||||
super(props);
|
super(props);
|
||||||
|
databaseRegex = Option.ofNullable(props.getString(Config.DATABASE_NAME_REGEX_PROP.key(), null));
|
||||||
|
tableRegex = props.getString(Config.TABLE_NAME_REGEX_PROP.key());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
@@ -111,9 +113,6 @@ public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProc
|
|||||||
|
|
||||||
// filter out target databases and tables
|
// filter out target databases and tables
|
||||||
if (isTargetTable(database, table)) {
|
if (isTargetTable(database, table)) {
|
||||||
|
|
||||||
LOG.info(String.format("Maxwell source processor starts process table : %s.%s", database, table));
|
|
||||||
|
|
||||||
ObjectNode result = (ObjectNode) inputJson.get(DATA);
|
ObjectNode result = (ObjectNode) inputJson.get(DATA);
|
||||||
String type = inputJson.get(OPERATION_TYPE).textValue();
|
String type = inputJson.get(OPERATION_TYPE).textValue();
|
||||||
|
|
||||||
@@ -182,9 +181,11 @@ public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProc
|
|||||||
* @param table table the data belong to
|
* @param table table the data belong to
|
||||||
*/
|
*/
|
||||||
private boolean isTargetTable(String database, String table) {
|
private boolean isTargetTable(String database, String table) {
|
||||||
String databaseRegex = this.props.getString(Config.DATABASE_NAME_REGEX_PROP.key());
|
if (!databaseRegex.isPresent()) {
|
||||||
String tableRegex = this.props.getString(Config.TABLE_NAME_REGEX_PROP.key());
|
return Pattern.matches(tableRegex, table);
|
||||||
return Pattern.matches(databaseRegex, database) && Pattern.matches(tableRegex, table);
|
} else {
|
||||||
|
return Pattern.matches(databaseRegex.get(), database) && Pattern.matches(tableRegex, table);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -178,6 +178,12 @@ public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource {
|
|||||||
+ "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\","
|
+ "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\","
|
||||||
+ "\"update_time\":\"2022-03-12 08:31:56\"}}";
|
+ "\"update_time\":\"2022-03-12 08:31:56\"}}";
|
||||||
|
|
||||||
|
// database hudi_02, table hudi_maxwell_01, insert
|
||||||
|
String hudi02Maxwell01Insert = "{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\","
|
||||||
|
+ "\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\","
|
||||||
|
+ "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\","
|
||||||
|
+ "\"update_time\":\"2022-03-12 08:31:56\"}}";
|
||||||
|
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
// Tests
|
// Tests
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
@@ -248,6 +254,14 @@ public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource {
|
|||||||
// ddl data will be ignored, ths count should be 0
|
// ddl data will be ignored, ths count should be 0
|
||||||
long ddlDataNum = processor.process(ddlData).count();
|
long ddlDataNum = processor.process(ddlData).count();
|
||||||
assertEquals(0, ddlDataNum);
|
assertEquals(0, ddlDataNum);
|
||||||
|
|
||||||
|
// test table regex without database regex
|
||||||
|
props.remove(MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key());
|
||||||
|
props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(), "hudi_maxwell(_)?[0-9]{0,2}");
|
||||||
|
|
||||||
|
JavaRDD<String> dataWithoutDatabaseRegex = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudi02Maxwell01Insert));
|
||||||
|
long countWithoutDatabaseRegex = processor.process(dataWithoutDatabaseRegex).count();
|
||||||
|
assertEquals(2, countWithoutDatabaseRegex);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
Reference in New Issue
Block a user