feat(sync): 增加latest_op_ts输出能力

This commit is contained in:
v-zhangjc9
2024-05-07 17:37:01 +08:00
parent 9967d1258f
commit 5c9089419f
6 changed files with 37 additions and 40 deletions

View File

@@ -1,14 +1,12 @@
package com.lanyuanxiaoyao.service.sync.functions;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.configuration.TraceWriteStatus;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import java.io.Serializable;
import java.util.List;
import org.apache.hudi.client.TraceWriteStatus;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.sink.compact.CompactEventHandler;
@@ -41,21 +39,19 @@ public class CompactionEventHandler implements CompactEventHandler, Serializable
@Override
public void success(String instant, List<WriteStatus> statuses, HoodieCommitMetadata metadata) {
StatusUtils.compactionCommit(configuration, flinkJob, tableMeta, instant, metadata);
logger.info("WriteStatus: {}", statuses);
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_TRACE_LATEST_OP_TS)) {
Long max = statuses.stream()
.map(status -> {
if (status instanceof TraceWriteStatus) {
TraceWriteStatus s = (TraceWriteStatus) status;
return s.getLatestOpts();
}
return 0L;
})
.max(Long::compare)
.orElse(0L);
logger.info("Latest op ts: {}", max);
StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max);
}
// Trace latest_op_ts
Long max = statuses.stream()
.map(status -> {
if (status instanceof TraceWriteStatus) {
TraceWriteStatus s = (TraceWriteStatus) status;
return s.getLatestOpts();
}
return 0L;
})
.max(Long::compare)
.orElse(0L);
logger.info("Latest op ts: {}", max);
StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max);
}
@Override

View File

@@ -30,7 +30,7 @@ public class StatusUtils {
private static final int HTTP_TIMEOUT = (int) Constants.MINUTE;
public static void syncStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
logger.info("Enter method: syncStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta);
// logger.info("Enter method: syncStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
@@ -86,7 +86,7 @@ public class StatusUtils {
}
public static void syncOperation(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) {
logger.info("Enter method: syncOperation[configuration, flinkJob, tableMeta, operationTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "operationTime:" + operationTime);
// logger.info("Enter method: syncOperation[configuration, flinkJob, tableMeta, operationTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "operationTime:" + operationTime);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> {
@@ -125,7 +125,7 @@ public class StatusUtils {
}
public static void compactionStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
logger.info("Enter method: compactionStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta);
// logger.info("Enter method: compactionStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
@@ -151,7 +151,7 @@ public class StatusUtils {
}
public static void compactionPreCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, Map<String, Long> metadata) {
logger.info("Enter method: compactionPreCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata);
// logger.info("Enter method: compactionPreCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> HttpUtil.createPost(
@@ -177,7 +177,7 @@ public class StatusUtils {
}
public static void compactionCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, HoodieCommitMetadata metadata) {
logger.info("Enter method: compactionCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata);
// logger.info("Enter method: compactionCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> HttpUtil.createPost(
@@ -203,7 +203,7 @@ public class StatusUtils {
}
public static void compactionFinish(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String message, Exception exception) {
logger.info("Enter method: compactionFinish[configuration, flinkJob, tableMeta, message, exception]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "message:" + message + "," + "exception:" + exception);
// logger.info("Enter method: compactionFinish[configuration, flinkJob, tableMeta, message, exception]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "message:" + message + "," + "exception:" + exception);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> {

View File

@@ -8,10 +8,7 @@ import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.DefaultPartitionNameKeyGenerator;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.configuration.TraceOverwriteWithLatestAvroPayload;
import com.lanyuanxiaoyao.service.sync.configuration.TraceWriteStatus;
import com.lanyuanxiaoyao.service.sync.functions.OperationTypeFilter;
import com.lanyuanxiaoyao.service.sync.functions.Record2RowDataFunction;
import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverter;
@@ -28,9 +25,11 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.client.TraceWriteStatus;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.collection.Pair;
@@ -42,6 +41,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.sink.utils.Pipelines;
@@ -77,7 +77,7 @@ public class SyncUtils {
configuration = inputConfiguration;
}
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), false);
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true);
configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name());
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl());
configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000);
@@ -102,7 +102,7 @@ public class SyncUtils {
configuration = inputConfiguration;
}
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), false);
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true);
configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name());
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl());
configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000);
@@ -153,11 +153,8 @@ public class SyncUtils {
configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get());
}
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_TRACE_LATEST_OP_TS)) {
logger.info("Enable trace latest op ts");
configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName());
configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName());
}
configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName());
configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName());
configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false);
configuration.setInteger(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), Integer.MAX_VALUE);

View File

@@ -1,4 +1,4 @@
package com.lanyuanxiaoyao.service.sync.configuration;
package org.apache.hudi.client;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
@@ -6,7 +6,6 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.slf4j.Logger;

View File

@@ -1,20 +1,25 @@
package com.lanyuanxiaoyao.service.sync.configuration;
package org.apache.hudi.common.model;
import com.lanyuanxiaoyao.service.common.Constants;
import java.util.HashMap;
import java.util.Map;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.util.Utf8;
import org.apache.hudi.table.HoodieTableFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 通过将latest_op_ts值写入payload进行传递使压缩完成后可以从status中获取该值
* Hudi默认使用{@link OverwriteWithLatestAvroPayload}由于{@link HoodieTableFactory}156行中的说明
* 如果使用默认值则改为使用{@link EventTimeAvroPayload}所以这里的payload直接继承EventTimeAvroPayload
* 从而符合原来的使用逻辑
*
* @author lanyuanxiaoyao
* @date 2023-04-18
*/
public class TraceOverwriteWithLatestAvroPayload extends OverwriteWithLatestAvroPayload {
public class TraceOverwriteWithLatestAvroPayload extends EventTimeAvroPayload {
private static final Logger logger = LoggerFactory.getLogger(TraceOverwriteWithLatestAvroPayload.class);
private final String latestOpts;

View File

@@ -1,4 +1,4 @@
package com.lanyuanxiaoyao.service.sync.configuration;
package org.apache.hudi.keygen;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;