feat(patch): 适配删除记录latest_op_ts的改动
This commit is contained in:
@@ -22,36 +22,30 @@ import org.slf4j.LoggerFactory;
|
|||||||
public class TraceEventTimeAvroPayload extends EventTimeAvroPayload {
|
public class TraceEventTimeAvroPayload extends EventTimeAvroPayload {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(TraceEventTimeAvroPayload.class);
|
private static final Logger logger = LoggerFactory.getLogger(TraceEventTimeAvroPayload.class);
|
||||||
|
|
||||||
private final String latestOpts;
|
private final Map<String, String> metadata = new HashMap<>();
|
||||||
|
|
||||||
public TraceEventTimeAvroPayload(GenericRecord record, Comparable orderingVal) {
|
public TraceEventTimeAvroPayload(GenericRecord record, Comparable orderingVal) {
|
||||||
super(record, orderingVal);
|
super(record, orderingVal);
|
||||||
this.latestOpts = updateLatestOpts(Option.ofNullable(record));
|
updateLatestOpts(Option.ofNullable(record));
|
||||||
}
|
}
|
||||||
|
|
||||||
public TraceEventTimeAvroPayload(Option<GenericRecord> record) {
|
public TraceEventTimeAvroPayload(Option<GenericRecord> record) {
|
||||||
super(record);
|
super(record);
|
||||||
this.latestOpts = updateLatestOpts(record);
|
updateLatestOpts(record);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String updateLatestOpts(Option<GenericRecord> record) {
|
private void updateLatestOpts(Option<GenericRecord> record) {
|
||||||
try {
|
try {
|
||||||
return record
|
record
|
||||||
.map(r -> ((Utf8) r.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)).toString())
|
.map(r -> ((Utf8) r.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)).toString())
|
||||||
.orElse(null);
|
.ifPresent(latestOpts -> metadata.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, latestOpts));
|
||||||
} catch (Throwable throwable) {
|
} catch (Throwable throwable) {
|
||||||
logger.error("Get latest opts failure", throwable);
|
logger.error("Get latest opts failure", throwable);
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Option<Map<String, String>> getMetadata() {
|
public Option<Map<String, String>> getMetadata() {
|
||||||
if (this.latestOpts == null) {
|
|
||||||
return Option.empty();
|
|
||||||
}
|
|
||||||
Map<String, String> metadata = super.getMetadata().orElse(new HashMap<>());
|
|
||||||
metadata.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, this.latestOpts);
|
|
||||||
return Option.of(metadata);
|
return Option.of(metadata);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user