feat(patch): 增加一个patch包用来放hudi相关代码

latest_op_ts相关的内容,直接放在业务包里,spark无法使用,单独打在patch包里,可以方便其他人加到spark jar里
This commit is contained in:
v-zhangjc9
2024-05-20 17:51:30 +08:00
parent e8fe8c4680
commit 52b58426e4
6 changed files with 194 additions and 1 deletions

View File

@@ -0,0 +1,57 @@
package org.apache.hudi.client;
import com.lanyuanxiaoyao.service.common.Constants;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author lanyuanxiaoyao
* @date 2023-04-17
*/
public class TraceWriteStatus extends WriteStatus {
private static final Logger logger = LoggerFactory.getLogger(TraceWriteStatus.class);
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private long latestOpts = 0L;
public TraceWriteStatus() {
super();
}
public TraceWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
super(trackSuccessRecords, failureFraction);
}
public long getLatestOpts() {
return latestOpts;
}
@Override
public void markSuccess(HoodieRecord record, Option<Map<String, String>> optionalRecordMetadata) {
super.markSuccess(record, optionalRecordMetadata);
try {
optionalRecordMetadata.ifPresent(map -> {
if (map.containsKey(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)) {
String inOpts = map.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME);
if (StringUtils.nonEmpty(inOpts)) {
long current = LocalDateTime.parse(inOpts, FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
latestOpts = Long.max(latestOpts, current);
}
}
});
} catch (Throwable throwable) {
logger.error("Parse latest opts failure", throwable);
}
}
@Override
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> optionalRecordMetadata) {
super.markFailure(record, t, optionalRecordMetadata);
}
}

View File

@@ -0,0 +1,57 @@
package org.apache.hudi.common.model;
import com.lanyuanxiaoyao.service.common.Constants;
import java.util.HashMap;
import java.util.Map;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.util.Utf8;
import org.apache.hudi.table.HoodieTableFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 通过将latest_op_ts值写入payload进行传递使压缩完成后可以从status中获取该值
* Hudi默认使用{@link OverwriteWithLatestAvroPayload},由于{@link HoodieTableFactory}156行中的说明
* 如果使用默认值,则改为使用{@link EventTimeAvroPayload}所以这里的payload直接继承EventTimeAvroPayload
* 从而符合原来的使用逻辑
*
* @author lanyuanxiaoyao
* @date 2023-04-18
*/
public class TraceEventTimeAvroPayload extends EventTimeAvroPayload {
private static final Logger logger = LoggerFactory.getLogger(TraceEventTimeAvroPayload.class);
private final String latestOpts;
public TraceEventTimeAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
this.latestOpts = updateLatestOpts(Option.ofNullable(record));
}
public TraceEventTimeAvroPayload(Option<GenericRecord> record) {
super(record);
this.latestOpts = updateLatestOpts(record);
}
private String updateLatestOpts(Option<GenericRecord> record) {
try {
return record
.map(r -> ((Utf8) r.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)).toString())
.orElse(null);
} catch (Throwable throwable) {
logger.error("Get latest opts failure", throwable);
}
return null;
}
@Override
public Option<Map<String, String>> getMetadata() {
if (this.latestOpts == null) {
return Option.empty();
}
Map<String, String> metadata = super.getMetadata().orElse(new HashMap<>());
metadata.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, this.latestOpts);
return Option.of(metadata);
}
}