feat(sync): Trace latest opts 的各个类迁移到hudi flink包下,避免影响业务查询使用
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
package org.apache.hudi.keygen;
|
||||
package com.lanyuanxiaoyao.service.sync.configuration;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
@@ -7,6 +7,7 @@ import com.lanyuanxiaoyao.service.common.Constants;
|
||||
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
|
||||
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
|
||||
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
|
||||
import com.lanyuanxiaoyao.service.sync.configuration.DefaultPartitionNameKeyGenerator;
|
||||
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -14,7 +15,7 @@ import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.hudi.client.TraceWriteStatus;
|
||||
import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload;
|
||||
import org.apache.hudi.common.model.TraceEventTimeAvroPayload;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
@@ -25,7 +26,6 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig;
|
||||
import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator;
|
||||
import org.apache.hudi.metrics.MetricsReporterType;
|
||||
import org.apache.hudi.org.apache.avro.Schema;
|
||||
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
|
||||
@@ -128,7 +128,7 @@ public class ConfigurationUtils {
|
||||
configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get());
|
||||
}
|
||||
|
||||
configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName());
|
||||
configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceEventTimeAvroPayload.class.getName());
|
||||
configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName());
|
||||
|
||||
configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false);
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.lanyuanxiaoyao.service.common.Constants;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Map;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-04-17
|
||||
*/
|
||||
public class TraceWriteStatus extends WriteStatus {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TraceWriteStatus.class);
|
||||
private final static DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
private long latestOpts = 0L;
|
||||
|
||||
public TraceWriteStatus() {
|
||||
super();
|
||||
}
|
||||
|
||||
public TraceWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
|
||||
super(trackSuccessRecords, failureFraction);
|
||||
}
|
||||
|
||||
public long getLatestOpts() {
|
||||
return latestOpts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markSuccess(HoodieRecord record, Option<Map<String, String>> optionalRecordMetadata) {
|
||||
super.markSuccess(record, optionalRecordMetadata);
|
||||
try {
|
||||
optionalRecordMetadata.ifPresent(map -> {
|
||||
if (map.containsKey(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)) {
|
||||
String inOpts = map.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME);
|
||||
if (StrUtil.isNotBlank(inOpts)) {
|
||||
long current = LocalDateTime.parse(inOpts, FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
|
||||
latestOpts = Long.max(latestOpts, current);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Throwable throwable) {
|
||||
logger.error("Parse latest opts failure", throwable);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> optionalRecordMetadata) {
|
||||
super.markFailure(record, t, optionalRecordMetadata);
|
||||
}
|
||||
}
|
||||
@@ -1,57 +0,0 @@
|
||||
package org.apache.hudi.common.model;
|
||||
|
||||
import com.lanyuanxiaoyao.service.common.Constants;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.org.apache.avro.util.Utf8;
|
||||
import org.apache.hudi.table.HoodieTableFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* 通过将latest_op_ts值写入payload进行传递,使压缩完成后可以从status中获取该值
|
||||
* Hudi默认使用{@link OverwriteWithLatestAvroPayload},由于{@link HoodieTableFactory}156行中的说明
|
||||
* 如果使用默认值,则改为使用{@link EventTimeAvroPayload},所以这里的payload直接继承EventTimeAvroPayload
|
||||
* 从而符合原来的使用逻辑
|
||||
*
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2023-04-18
|
||||
*/
|
||||
public class TraceOverwriteWithLatestAvroPayload extends EventTimeAvroPayload {
|
||||
private static final Logger logger = LoggerFactory.getLogger(TraceOverwriteWithLatestAvroPayload.class);
|
||||
|
||||
private final String latestOpts;
|
||||
|
||||
public TraceOverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
|
||||
super(record, orderingVal);
|
||||
this.latestOpts = updateLatestOpts(Option.ofNullable(record));
|
||||
}
|
||||
|
||||
public TraceOverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
|
||||
super(record);
|
||||
this.latestOpts = updateLatestOpts(record);
|
||||
}
|
||||
|
||||
private String updateLatestOpts(Option<GenericRecord> record) {
|
||||
try {
|
||||
return record
|
||||
.map(r -> ((Utf8) r.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)).toString())
|
||||
.orElse(null);
|
||||
} catch (Throwable throwable) {
|
||||
logger.error("Get latest opts failure", throwable);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<Map<String, String>> getMetadata() {
|
||||
if (this.latestOpts == null) {
|
||||
return Option.empty();
|
||||
}
|
||||
Map<String, String> metadata = super.getMetadata().orElse(new HashMap<>());
|
||||
metadata.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, this.latestOpts);
|
||||
return Option.of(metadata);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user