diff --git a/bin/build-all.sh b/bin/build-all.sh index 89704b5..355926f 100755 --- a/bin/build-all.sh +++ b/bin/build-all.sh @@ -3,7 +3,7 @@ root_path=$(dirname $(cd $(dirname $0);pwd)) source $root_path/bin/library.sh mvn deploy -N -D skipTests -P local -s ~/.m2/settings-development.xml deploy service-common service-dependencies service-configuration service-forest service-cli service-cli/service-cli-core service-executor service-executor/service-executor-core utils/executor -package service-api service-check service-cli/service-cli-runner service-cloud-query service-executor/service-executor-manager service-executor/service-executor-task service-command service-command-pro service-exporter service-flink-query service-gateway service-hudi-query service-info-query service-monitor service-loki-query service-pulsar-query service-queue service-scheduler service-uploader service-web service-yarn-query service-zookeeper-query utils/sync +package service-api service-check service-cli/service-cli-runner service-cloud-query service-executor/service-executor-manager service-executor/service-executor-task service-command service-command-pro service-exporter service-flink-query service-gateway service-hudi-query service-info-query service-monitor service-loki-query service-pulsar-query service-queue service-scheduler service-uploader service-web service-yarn-query service-zookeeper-query utils/patch utils/sync configs=(b2a4 b2b1 b2b5 b2b12) for config in ${configs[*]}; diff --git a/pom.xml b/pom.xml index cfe0cab..1c3ff22 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ service-yarn-query service-zookeeper-query utils/sync + utils/patch @@ -124,6 +125,11 @@ executor ${project.version} + + com.lanyuanxiaoyao + patch + ${project.version} + com.lanyuanxiaoyao sync diff --git a/utils/patch/pom.xml b/utils/patch/pom.xml new file mode 100644 index 0000000..e1128e9 --- /dev/null +++ b/utils/patch/pom.xml @@ -0,0 +1,69 @@ + + + + com.lanyuanxiaoyao + hudi-service + 1.0.0-SNAPSHOT + + 4.0.0 + + patch + + + 1.10.1 + + + + + com.lanyuanxiaoyao + service-common + + + org.apache.hudi + hudi-flink${flink.major.version}-bundle + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + false + true + + + reference.conf + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + log4j-surefire*.properties + + + + + + + package + + shade + + + + + + + + \ No newline at end of file diff --git a/utils/patch/src/main/java/org/apache/hudi/client/TraceWriteStatus.java b/utils/patch/src/main/java/org/apache/hudi/client/TraceWriteStatus.java new file mode 100644 index 0000000..b309217 --- /dev/null +++ b/utils/patch/src/main/java/org/apache/hudi/client/TraceWriteStatus.java @@ -0,0 +1,57 @@ +package org.apache.hudi.client; + +import com.lanyuanxiaoyao.service.common.Constants; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author lanyuanxiaoyao + * @date 2023-04-17 + */ +public class TraceWriteStatus extends WriteStatus { + private static final Logger logger = LoggerFactory.getLogger(TraceWriteStatus.class); + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private long latestOpts = 0L; + + public TraceWriteStatus() { + super(); + } + + public TraceWriteStatus(Boolean trackSuccessRecords, Double failureFraction) { + super(trackSuccessRecords, failureFraction); + } + + public long getLatestOpts() { + return latestOpts; + } + + @Override + public void markSuccess(HoodieRecord record, Option> optionalRecordMetadata) { + super.markSuccess(record, optionalRecordMetadata); + try { + optionalRecordMetadata.ifPresent(map -> { + if (map.containsKey(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)) { + String inOpts = map.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME); + if (StringUtils.nonEmpty(inOpts)) { + long current = LocalDateTime.parse(inOpts, FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli(); + latestOpts = Long.max(latestOpts, current); + } + } + }); + } catch (Throwable throwable) { + logger.error("Parse latest opts failure", throwable); + } + } + + @Override + public void markFailure(HoodieRecord record, Throwable t, Option> optionalRecordMetadata) { + super.markFailure(record, t, optionalRecordMetadata); + } +} diff --git a/utils/patch/src/main/java/org/apache/hudi/common/model/TraceEventTimeAvroPayload.java b/utils/patch/src/main/java/org/apache/hudi/common/model/TraceEventTimeAvroPayload.java new file mode 100644 index 0000000..e6819b3 --- /dev/null +++ b/utils/patch/src/main/java/org/apache/hudi/common/model/TraceEventTimeAvroPayload.java @@ -0,0 +1,57 @@ +package org.apache.hudi.common.model; + +import com.lanyuanxiaoyao.service.common.Constants; +import java.util.HashMap; +import java.util.Map; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.org.apache.avro.generic.GenericRecord; +import org.apache.hudi.org.apache.avro.util.Utf8; +import org.apache.hudi.table.HoodieTableFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 通过将latest_op_ts值写入payload进行传递,使压缩完成后可以从status中获取该值 + * Hudi默认使用{@link OverwriteWithLatestAvroPayload},由于{@link HoodieTableFactory}156行中的说明 + * 如果使用默认值,则改为使用{@link EventTimeAvroPayload},所以这里的payload直接继承EventTimeAvroPayload + * 从而符合原来的使用逻辑 + * + * @author lanyuanxiaoyao + * @date 2023-04-18 + */ +public class TraceEventTimeAvroPayload extends EventTimeAvroPayload { + private static final Logger logger = LoggerFactory.getLogger(TraceEventTimeAvroPayload.class); + + private final String latestOpts; + + public TraceEventTimeAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + this.latestOpts = updateLatestOpts(Option.ofNullable(record)); + } + + public TraceEventTimeAvroPayload(Option record) { + super(record); + this.latestOpts = updateLatestOpts(record); + } + + private String updateLatestOpts(Option record) { + try { + return record + .map(r -> ((Utf8) r.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)).toString()) + .orElse(null); + } catch (Throwable throwable) { + logger.error("Get latest opts failure", throwable); + } + return null; + } + + @Override + public Option> getMetadata() { + if (this.latestOpts == null) { + return Option.empty(); + } + Map metadata = super.getMetadata().orElse(new HashMap<>()); + metadata.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, this.latestOpts); + return Option.of(metadata); + } +} diff --git a/utils/sync/pom.xml b/utils/sync/pom.xml index 619a6ac..17137d4 100644 --- a/utils/sync/pom.xml +++ b/utils/sync/pom.xml @@ -20,6 +20,10 @@ com.lanyuanxiaoyao service-common + + com.lanyuanxiaoyao + patch + org.apache.hudi hudi-flink${flink.major.version}-bundle