[HUDI-1290] fixing mysql debezium source (#4119)
This commit is contained in:
@@ -68,7 +68,6 @@ public class MysqlDebeziumSource extends DebeziumSource {
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.UPSTREAM_PROCESSING_TS_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_NAME_FIELD, DebeziumConstants.FLATTENED_SHARD_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TXID_FIELD, DebeziumConstants.FLATTENED_TX_ID_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_FILE_FIELD, DebeziumConstants.FLATTENED_FILE_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_POS_FIELD, DebeziumConstants.FLATTENED_POS_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_ROW_FIELD, DebeziumConstants.FLATTENED_ROW_COL_NAME),
|
||||
@@ -82,7 +81,6 @@ public class MysqlDebeziumSource extends DebeziumSource {
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.UPSTREAM_PROCESSING_TS_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_NAME_FIELD, DebeziumConstants.FLATTENED_SHARD_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TXID_FIELD, DebeziumConstants.FLATTENED_TX_ID_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_FILE_FIELD, DebeziumConstants.FLATTENED_FILE_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_POS_FIELD, DebeziumConstants.FLATTENED_POS_COL_NAME),
|
||||
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_ROW_FIELD, DebeziumConstants.FLATTENED_ROW_COL_NAME),
|
||||
|
||||
@@ -49,7 +49,6 @@ public class TestMysqlDebeziumSource extends TestAbstractDebeziumSource {
|
||||
private static final String TEST_DB = "ghschema";
|
||||
private static final String TEST_TABLE = "gharchive";
|
||||
private static final long TEST_TS_MS = 12345L;
|
||||
private static final long TEST_TXID = 543L;
|
||||
private static final String TEST_FILE = "mysql-bin.00007";
|
||||
private static final long TEST_POS = 98765L;
|
||||
private static final String EXPECTED_TEST_SEQ = "00007.98765";
|
||||
@@ -79,7 +78,6 @@ public class TestMysqlDebeziumSource extends TestAbstractDebeziumSource {
|
||||
sourceRecord.put("db", TEST_DB);
|
||||
sourceRecord.put("table", TEST_TABLE);
|
||||
sourceRecord.put("ts_ms", TEST_TS_MS);
|
||||
sourceRecord.put("txId", TEST_TXID);
|
||||
sourceRecord.put("file", TEST_FILE);
|
||||
sourceRecord.put("pos", TEST_POS);
|
||||
rec.put(DebeziumConstants.INCOMING_SOURCE_FIELD, sourceRecord);
|
||||
@@ -92,8 +90,6 @@ public class TestMysqlDebeziumSource extends TestAbstractDebeziumSource {
|
||||
.allMatch(r -> r.getString(0).equals(getIndexName())));
|
||||
assertTrue(records.select(DebeziumConstants.FLATTENED_TS_COL_NAME).collectAsList().stream()
|
||||
.allMatch(r -> r.getLong(0) == TEST_TS_MS));
|
||||
assertTrue(records.select(DebeziumConstants.FLATTENED_TX_ID_COL_NAME).collectAsList().stream()
|
||||
.allMatch(r -> r.getLong(0) == TEST_TXID));
|
||||
assertTrue(records.select(DebeziumConstants.ADDED_SEQ_COL_NAME).collectAsList().stream()
|
||||
.allMatch(r -> r.getString(0).equals(EXPECTED_TEST_SEQ)));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user