feat(all): 迁移common、sync、executor项目

This commit is contained in:
2024-02-29 15:32:14 +08:00
parent 0683068a02
commit 5a2e9fdfb8
73 changed files with 10204 additions and 1 deletions

View File

@@ -0,0 +1,263 @@
package com.lanyuanxiaoyao.service.sync;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.RunMeta;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.functions.CompactionEventHandler;
import com.lanyuanxiaoyao.service.sync.utils.*;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.sink.compact.*;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies;
import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategy;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 单表离线压缩
*
* @author ZhangJiacheng
* @date 2022-06-21
*/
public class Compactor {
private static final Logger logger = LoggerFactory.getLogger(Compactor.class);
private static final ObjectMapper mapper = JacksonUtils.getMapper();
public static void main(String[] args) throws Exception {
FlinkJob flinkJob = ArgumentsUtils.getFlinkJob(args);
TableMeta tableMeta = ArgumentsUtils.getTableMeta(args);
String selectedInstants = ArgumentsUtils.getInstants(args);
String cluster = ArgumentsUtils.getCluster(args);
logger.info("Bootstrap flink job: {}", mapper.writeValueAsString(flinkJob));
logger.info("Bootstrap table meta: {}", mapper.writeValueAsString(tableMeta));
logger.info("Bootstrap instants: {}", selectedInstants);
logger.info("Bootstrap cluster: {}", cluster);
String applicationId = System.getenv("_APP_ID");
RunMeta runMeta = new RunMeta(cluster, flinkJob.getId(), tableMeta.getAlias());
logger.info("Run meta: {}", runMeta);
ZkUtils.createCompactionLock(flinkJob, tableMeta, tableMeta.getConfig().getZookeeperUrl(), mapper.writeValueAsString(runMeta));
logger.info("Lock for {} {} success", flinkJob.getId(), tableMeta.getAlias());
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkCompactionConfig config = new FlinkCompactionConfig();
if (ObjectUtil.isEmpty(selectedInstants)) {
config.compactionPlanSelectStrategy = CompactionPlanStrategy.ALL;
} else {
config.compactionPlanSelectStrategy = CompactionPlanStrategy.INSTANTS;
config.compactionPlanInstant = selectedInstants;
}
GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta);
Configuration configuration = SyncUtils.getCompactionFlinkConfiguration(
globalConfiguration,
new Configuration(),
flinkJob,
tableMeta,
SyncUtils.avroSchemaWithExtraFields(tableMeta),
1
);
CompactionEventHandler eventHandler = new CompactionEventHandler(globalConfiguration, flinkJob, tableMeta);
HoodieFlinkWriteClient<?> writeClient = StreamerUtil.createWriteClient(configuration);
HoodieFlinkTable<?> table = writeClient.getHoodieTable();
table.getMetaClient().reloadActiveTimeline();
StatusUtils.compactionStart(globalConfiguration, flinkJob, tableMeta);
// 检查时间线当前状态
logger.info("{} timeline detail ({})", tableMeta.getAlias(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
table.getActiveTimeline()
.getInstants()
.forEach(instant -> logger.info("{} {} {}", instant.getTimestamp(), instant.getAction(), instant.getState()));
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
List<HoodieInstant> requested = CompactionPlanStrategies.getStrategy(config).select(pendingCompactionTimeline);
if (requested.isEmpty()) {
logger.info("No compaction plan scheduled");
eventHandler.closed("No compaction plan scheduled", null);
return;
}
List<String> compactionInstantTimes = requested.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
compactionInstantTimes.forEach(timestamp -> {
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(timestamp);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
logger.info("Rollback inflight compaction instant: [" + timestamp + "]");
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
}
});
List<Pair<String, HoodieCompactionPlan>> compactionPlans = compactionInstantTimes.stream()
.map(timestamp -> {
try {
return Pair.of(timestamp, CompactionUtils.getCompactionPlan(table.getMetaClient(), timestamp));
} catch (IOException e) {
throw new HoodieException("Get compaction plan at instant " + timestamp + " error", e);
}
})
.filter(pair -> {
HoodieCompactionPlan plan = pair.getRight();
return plan != null && plan.getOperations() != null && plan.getOperations().size() > 0;
})
.collect(Collectors.toList());
if (compactionPlans.isEmpty()) {
logger.info("No compaction plan for instant " + String.join(",", compactionInstantTimes));
eventHandler.closed("No compaction plan for instant " + String.join(",", compactionInstantTimes), null);
return;
}
List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
logger.info("Start to compaction for instant " + compactionInstantTimes);
for (HoodieInstant instant : instants) {
if (!pendingCompactionTimeline.containsInstant(instant)) {
CompactionUtil.cleanInstant(table.getMetaClient(), instant);
}
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
}
table.getMetaClient().reloadActiveTimeline();
try {
compactionPlans.forEach(pair -> preCommit(globalConfiguration, flinkJob, tableMeta, table, pair.getLeft(), pair.getRight()));
} catch (Throwable e) {
logger.warn("Cannot submit pre-commit log");
}
environment.addSource(new CompactionPlanSourceFunction(compactionPlans))
.name("compaction_source")
.uid("uid_compaction_source")
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(configuration)))
.setParallelism(configuration.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(configuration, eventHandler))
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1);
environment.execute(NameHelper.compactionFlinkName(flinkJob.getId(), tableMeta.getSchema(), tableMeta.getAlias()));
}
private static void preCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, HoodieFlinkTable<?> table, String instant, HoodieCompactionPlan compactionPlan) {
HoodieTableMetaClient client = table.getMetaClient();
FileSystem fileSystem = client.getRawFs();
List<Path> deltaFilePaths = compactionPlan.getOperations()
.stream()
.flatMap(operation -> operation.getDeltaFilePaths().stream().map(path -> StrUtil.format("{}/{}", operation.getPartitionPath(), path)))
.map(path -> new Path(StrUtil.format("{}/{}", client.getBasePathV2(), path)))
.collect(Collectors.toList());
AtomicLong totalLogFilesCompacted = new AtomicLong(0);
AtomicLong totalLogFilesSize = new AtomicLong(0);
AtomicLong totalRecordsDeleted = new AtomicLong(0);
AtomicLong totalCompactedRecordsUpdated = new AtomicLong(0);
deltaFilePaths.parallelStream().forEach(path -> {
try {
FileStatus fileStatus = fileSystem.getFileStatus(path);
totalLogFilesCompacted.incrementAndGet();
totalLogFilesSize.addAndGet(fileStatus.getLen());
MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fileSystem, path);
if (ObjectUtil.isNull(messageType)) {
return;
}
Schema writerSchema = new AvroSchemaConverter().convert(messageType);
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fileSystem, new HoodieLogFile(path), writerSchema)) {
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
switch (block.getBlockType()) {
case AVRO_DATA_BLOCK:
HoodieAvroDataBlock avroDataBlock = (HoodieAvroDataBlock) block;
try (ClosableIterator<IndexedRecord> avroDataBlockRecordIterator = avroDataBlock.getRecordIterator()) {
while (avroDataBlockRecordIterator.hasNext()) {
// logger.info("totalCompactedRecordsUpdated {}", totalCompactedRecordsUpdated.incrementAndGet());
totalCompactedRecordsUpdated.incrementAndGet();
avroDataBlockRecordIterator.next();
}
}
break;
case PARQUET_DATA_BLOCK:
HoodieParquetDataBlock parquetDataBlock = (HoodieParquetDataBlock) block;
try (ClosableIterator<IndexedRecord> parquetDataBlockRecordIterator = parquetDataBlock.getRecordIterator()) {
while (parquetDataBlockRecordIterator.hasNext()) {
// logger.info("totalCompactedRecordsUpdated {}", totalCompactedRecordsUpdated.incrementAndGet());
totalCompactedRecordsUpdated.incrementAndGet();
parquetDataBlockRecordIterator.next();
}
}
break;
case DELETE_BLOCK:
HoodieDeleteBlock deleteBlock = (HoodieDeleteBlock) block;
// logger.info("totalRecordsDeleted {}", totalRecordsDeleted.addAndGet(deleteBlock.getRecordsToDelete().length));
totalRecordsDeleted.addAndGet(deleteBlock.getRecordsToDelete().length);
break;
default:
break;
}
}
}
} catch (Exception e) {
logger.warn("Parse log file failure for " + path, e);
}
});
Map<String, Long> metadata = new ConcurrentHashMap<>(5);
metadata.put("totalLogFilesCompacted", totalLogFilesCompacted.get());
metadata.put("totalLogFilesSize", totalLogFilesSize.get());
metadata.put("totalRecordsDeleted", totalRecordsDeleted.get());
metadata.put("totalCompactedRecordsUpdated", totalCompactedRecordsUpdated.get());
metadata.put("totalLogRecordsCompacted", totalRecordsDeleted.get() + totalCompactedRecordsUpdated.get());
StatusUtils.compactionPreCommit(configuration, flinkJob, tableMeta, instant, metadata);
}
}

View File

@@ -0,0 +1,170 @@
package com.lanyuanxiaoyao.service.sync;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.RunMeta;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.exception.CheckpointRootPathNotFoundException;
import com.lanyuanxiaoyao.service.common.exception.ZookeeperUrlNotFoundException;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.functions.PulsarMessage2RecordFunction;
import com.lanyuanxiaoyao.service.sync.functions.PulsarMessageSourceReader;
import com.lanyuanxiaoyao.service.sync.functions.ValidateRecordFilter;
import com.lanyuanxiaoyao.service.sync.utils.*;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.Constants.*;
/**
* 同步应用
*
* @author lanyuanxiaoyao
* @version 0.0.1
* @date 2021-11-26
*/
public class Synchronizer {
private static final Logger logger = LoggerFactory.getLogger(Synchronizer.class);
private static final ObjectMapper mapper = JacksonUtils.getMapper();
private static String findConfigFromList(List<TableMeta> metas, Function<TableMeta, String> getter, Supplier<Exception> notFoundException) throws Exception {
return metas.stream()
.map(getter)
.distinct()
.findFirst()
.orElseThrow(notFoundException);
}
public static void main(String[] args) throws Exception {
FlinkJob flinkJob = ArgumentsUtils.getFlinkJob(args);
List<TableMeta> tableMetaList = ArgumentsUtils.getTableMetaList(args);
String cluster = ArgumentsUtils.getCluster(args);
logger.info("Bootstrap flink job: {}", mapper.writeValueAsString(flinkJob));
logger.info("Bootstrap table meta list: {}", mapper.writeValueAsString(tableMetaList));
logger.info("Bootstrap cluster: {}", cluster);
String applicationId = System.getenv("_APP_ID");
String zkUrl = findConfigFromList(tableMetaList, meta -> meta.getConfig().getZookeeperUrl(), ZookeeperUrlNotFoundException::new);
for (TableMeta tableMeta : tableMetaList) {
RunMeta runMeta = new RunMeta(cluster, flinkJob.getId(), tableMeta.getAlias());
logger.info("Run meta: {}", runMeta);
ZkUtils.createSynchronizerLock(flinkJob, tableMeta, zkUrl, mapper.writeValueAsString(runMeta));
}
RunMeta runMeta = new RunMeta(cluster, flinkJob.getId());
logger.info("Run meta: {}", runMeta);
ZkUtils.createSynchronizerLock(flinkJob, zkUrl, mapper.writeValueAsString(runMeta));
logger.info("Lock for {} success", flinkJob.getId());
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(15 * MINUTE);
String checkpointRootPath = findConfigFromList(tableMetaList, meta -> meta.getConfig().getCheckpointRootPath(), CheckpointRootPathNotFoundException::new);
environment.getCheckpointConfig().setCheckpointStorage(new Path(checkpointRootPath + "/" + flinkJob.getId()));
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
environment.getCheckpointConfig().setCheckpointTimeout(2 * HOUR);
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(15 * MINUTE);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setForceUnalignedCheckpoints(true);
environment.getCheckpointConfig().enableUnalignedCheckpoints();
environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
if (tableMetaList.stream().anyMatch(meta -> TableMetaHelper.existsTag(meta, TAGS_DISABLE_CHAINING))) {
logger.warn("Disable operator chaining");
environment.disableOperatorChaining();
}
environment.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, MINUTE));
environment.setStateBackend(new HashMapStateBackend());
switch (flinkJob.getRunMode()) {
case ALL_IN_ONE:
for (TableMeta tableMeta : tableMetaList) {
GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta);
createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta);
publishSyncStart(globalConfiguration, flinkJob, tableMeta);
}
environment.execute(NameHelper.syncFlinkName(flinkJob.getId(), flinkJob.getName()));
break;
case ONE_IN_ONE:
for (TableMeta tableMeta : tableMetaList) {
GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta);
createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta);
publishSyncStart(globalConfiguration, flinkJob, tableMeta);
environment.execute(NameHelper.syncFlinkName(flinkJob.getId(), flinkJob.getName(), tableMeta.getAlias()));
}
break;
case ALL_IN_ONE_BY_TABLE:
scheduleOneInOneRegistryByField(environment, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getTable);
break;
case ALL_IN_ONE_BY_SCHEMA:
scheduleOneInOneRegistryByField(environment, cluster, applicationId, flinkJob, tableMetaList, TableMeta::getSchema);
break;
default:
throw new IllegalArgumentException("Unsupported run mode: " + flinkJob.getRunMode());
}
}
private static void scheduleOneInOneRegistryByField(StreamExecutionEnvironment environment, String cluster, String applicationId, FlinkJob flinkJob, List<TableMeta> tableMetaList, Function<TableMeta, String> field) throws Exception {
Map<String, List<TableMeta>> map = tableMetaList.stream()
.collect(Collectors.groupingBy(field));
for (Map.Entry<String, List<TableMeta>> entry : map.entrySet()) {
for (TableMeta tableMeta : entry.getValue()) {
GlobalConfiguration globalConfiguration = new GlobalConfiguration(cluster, applicationId, tableMeta);
createFlinkJob(environment, globalConfiguration, flinkJob, tableMeta);
publishSyncStart(globalConfiguration, flinkJob, tableMeta);
}
environment.execute(NameHelper.syncFlinkName(flinkJob.getId(), flinkJob.getName(), entry.getKey()));
}
}
private static void createFlinkJob(StreamExecutionEnvironment environment, GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) throws IOException {
logger.info("Table meta: {}", mapper.writeValueAsString(tableMeta));
logger.info("Config meta: {}", mapper.writeValueAsString(configuration));
SingleOutputStreamOperator<String> source = environment
.addSource(new PulsarMessageSourceReader(configuration, flinkJob, tableMeta))
.setParallelism(tableMeta.getHudi().getSourceTasks());
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_PULSAR_BACKUP)) {
Path path = new Path(StrUtil.format("hdfs://b2/apps/datalake/hive_test/source/{}/{}", String.join("_", flinkJob.getName().split("\\s")), tableMeta.getAlias()));
StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(path, new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withInactivityInterval(HOUR)
.withMaxPartSize(GB)
.build())
.build();
source.addSink(fileSink).name("Backup pulsar data");
}
SingleOutputStreamOperator<Record> middle = source
.map(new PulsarMessage2RecordFunction(configuration, flinkJob, tableMeta))
.name("Json ( " + tableMeta.getSchema() + "-" + tableMeta.getAlias() + " )")
.filter(new ValidateRecordFilter(configuration, flinkJob, tableMeta))
.name("Reject json parse failure");
SyncUtils.sinkToHoodieByTable(configuration, flinkJob, tableMeta, environment, middle);
}
private static void publishSyncStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
StatusUtils.syncStart(configuration, flinkJob, tableMeta);
}
}

View File

@@ -0,0 +1,36 @@
package com.lanyuanxiaoyao.service.sync.configuration;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
/**
* @author ZhangJiacheng
*/
public class DefaultPartitionNameKeyGenerator extends SimpleAvroKeyGenerator {
private final String defaultPartitionName;
public DefaultPartitionNameKeyGenerator(TypedProperties props) {
super(props);
defaultPartitionName = props.getString(FlinkOptions.PARTITION_DEFAULT_NAME.key(), FlinkOptions.PARTITION_DEFAULT_NAME.defaultValue());
}
@Override
public String getPartitionPath(GenericRecord record) {
String partitionPathField = getPartitionPathFields().get(0);
String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true, consistentLogicalTimestampEnabled);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = defaultPartitionName;
}
if (encodePartitionPath) {
partitionPath = PartitionPathEncodeUtils.escapePathName(partitionPath);
}
if (hiveStylePartitioning) {
partitionPath = partitionPathField + "=" + partitionPath;
}
return partitionPath;
}
}

View File

@@ -0,0 +1,92 @@
package com.lanyuanxiaoyao.service.sync.configuration;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import java.io.Serializable;
/**
* 同步静态配置
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class GlobalConfiguration implements Serializable {
private final String cluster;
private final String applicationId;
private final Boolean metricEnable = false;
private final String metricPublishUrl;
private final String metricPublishPrometheusUrl;
private final String metricsPublishCustomUrl;
private final Integer metricPublishDelay;
private final Integer metricPublishPeriod;
private final Integer metricPublishTimeout;
private final Integer metricPublishBatch;
public GlobalConfiguration(String cluster, String applicationId, TableMeta meta) {
this.cluster = cluster;
this.applicationId = applicationId;
this.metricPublishUrl = meta.getConfig().getMetricPublishUrl();
this.metricPublishPrometheusUrl = meta.getConfig().getMetricPrometheusUrl();
this.metricsPublishCustomUrl = meta.getConfig().getMetricApiUrl();
this.metricPublishDelay = meta.getConfig().getMetricPublishDelay();
this.metricPublishPeriod = meta.getConfig().getMetricPublishPeriod();
this.metricPublishTimeout = meta.getConfig().getMetricPublishTimeout();
this.metricPublishBatch = meta.getConfig().getMetricPublishBatch();
}
public String getCluster() {
return cluster;
}
public String getApplicationId() {
return applicationId;
}
public Boolean getMetricEnable() {
return metricEnable;
}
public String getMetricPublishUrl() {
return metricPublishUrl;
}
public String getMetricPublishPrometheusUrl() {
return metricPublishPrometheusUrl;
}
public String getMetricsPublishCustomUrl() {
return metricsPublishCustomUrl;
}
public Integer getMetricPublishDelay() {
return metricPublishDelay;
}
public Integer getMetricPublishPeriod() {
return metricPublishPeriod;
}
public Integer getMetricPublishTimeout() {
return metricPublishTimeout;
}
public Integer getMetricPublishBatch() {
return metricPublishBatch;
}
@Override
public String toString() {
return "GlobalConfiguration{" +
"cluster='" + cluster + '\'' +
", applicationId='" + applicationId + '\'' +
", metricEnable=" + metricEnable +
", metricPublishUrl='" + metricPublishUrl + '\'' +
", metricPublishPrometheusUrl='" + metricPublishPrometheusUrl + '\'' +
", metricsPublishCustomUrl='" + metricsPublishCustomUrl + '\'' +
", metricPublishDelay=" + metricPublishDelay +
", metricPublishPeriod=" + metricPublishPeriod +
", metricPublishTimeout=" + metricPublishTimeout +
", metricPublishBatch=" + metricPublishBatch +
'}';
}
}

View File

@@ -0,0 +1,12 @@
package com.lanyuanxiaoyao.service.sync.configuration;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
public interface RetryPolicyProvider {
RetryPolicy<String> HTTP_RETRY = RetryPolicy.<String>builder()
.handle(Throwable.class)
.withDelay(Duration.ofSeconds(1))
.withMaxAttempts(10)
.build();
}

View File

@@ -0,0 +1,52 @@
package com.lanyuanxiaoyao.service.sync.configuration;
import com.lanyuanxiaoyao.service.common.Constants;
import java.util.HashMap;
import java.util.Map;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.org.apache.avro.generic.GenericRecord;
import org.apache.hudi.org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author lanyuanxiaoyao
* @date 2023-04-18
*/
public class TraceOverwriteWithLatestAvroPayload extends OverwriteWithLatestAvroPayload {
private static final Logger logger = LoggerFactory.getLogger(TraceOverwriteWithLatestAvroPayload.class);
private final String latestOpts;
public TraceOverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
this.latestOpts = updateLatestOpts(Option.ofNullable(record));
}
public TraceOverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
super(record);
this.latestOpts = updateLatestOpts(record);
}
private String updateLatestOpts(Option<GenericRecord> record) {
try {
return record
.map(r -> ((Utf8) r.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)).toString())
.orElse(null);
} catch (Throwable throwable) {
logger.error("Get latest opts failure", throwable);
}
return null;
}
@Override
public Option<Map<String, String>> getMetadata() {
if (this.latestOpts == null) {
return Option.empty();
}
Map<String, String> metadata = super.getMetadata().orElse(new HashMap<>());
metadata.put(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, this.latestOpts);
return Option.of(metadata);
}
}

View File

@@ -0,0 +1,58 @@
package com.lanyuanxiaoyao.service.sync.configuration;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author lanyuanxiaoyao
* @date 2023-04-17
*/
public class TraceWriteStatus extends WriteStatus {
private static final Logger logger = LoggerFactory.getLogger(TraceWriteStatus.class);
private final static DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private long latestOpts = 0L;
public TraceWriteStatus() {
super();
}
public TraceWriteStatus(Boolean trackSuccessRecords, Double failureFraction) {
super(trackSuccessRecords, failureFraction);
}
public long getLatestOpts() {
return latestOpts;
}
@Override
public void markSuccess(HoodieRecord record, Option<Map<String, String>> optionalRecordMetadata) {
super.markSuccess(record, optionalRecordMetadata);
try {
optionalRecordMetadata.ifPresent(map -> {
if (map.containsKey(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME)) {
String inOpts = map.get(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME);
if (StrUtil.isNotBlank(inOpts)) {
long current = LocalDateTime.parse(inOpts, FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
latestOpts = Long.max(latestOpts, current);
}
}
});
} catch (Throwable throwable) {
logger.error("Parse latest opts failure", throwable);
}
}
@Override
public void markFailure(HoodieRecord record, Throwable t, Option<Map<String, String>> optionalRecordMetadata) {
super.markFailure(record, t, optionalRecordMetadata);
}
}

View File

@@ -0,0 +1,65 @@
package com.lanyuanxiaoyao.service.sync.functions;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.configuration.TraceWriteStatus;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import java.io.Serializable;
import java.util.List;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.sink.compact.CompactEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 压缩事件处理
*
* @author ZhangJiacheng
* @date 2022-06-15
*/
public class CompactionEventHandler implements CompactEventHandler, Serializable {
private static final Logger logger = LoggerFactory.getLogger(CompactionEventHandler.class);
private final GlobalConfiguration configuration;
private final FlinkJob flinkJob;
private final TableMeta tableMeta;
public CompactionEventHandler(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
this.configuration = configuration;
this.flinkJob = flinkJob;
this.tableMeta = tableMeta;
}
@Override
public void failure(String instant) {
}
@Override
public void success(String instant, List<WriteStatus> statuses, HoodieCommitMetadata metadata) {
StatusUtils.compactionCommit(configuration, flinkJob, tableMeta, instant, metadata);
logger.info("WriteStatus: {}", statuses);
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_TRACE_LATEST_OP_TS)) {
Long max = statuses.stream()
.map(status -> {
if (status instanceof TraceWriteStatus) {
TraceWriteStatus s = (TraceWriteStatus) status;
return s.getLatestOpts();
}
return 0L;
})
.max(Long::compare)
.orElse(0L);
logger.info("Latest op ts: {}", max);
StatusUtils.compactionLatestOpTs(configuration, flinkJob, tableMeta, max);
}
}
@Override
public void closed(String message, Exception exception) {
StatusUtils.compactionFinish(configuration, flinkJob, tableMeta, message, exception);
}
}

View File

@@ -0,0 +1,90 @@
package com.lanyuanxiaoyao.service.sync.functions;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.metrics.CountMetric;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
/**
* 操作类型过滤算子
*
* @author ZhangJiacheng
* @date 2022-06-12
*/
public class OperationTypeFilter extends RichFilterFunction<Record> implements CheckpointedFunction {
private final CountMetric insertRateMetric;
private final CountMetric updateRateMetric;
private final CountMetric deleteRateMetric;
private final CountMetric ddlRateMetric;
private final CountMetric unknownRateMetric;
private final List<CountMetric> metrics;
private final GlobalConfiguration globalConfiguration;
public OperationTypeFilter(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
this.globalConfiguration = globalConfiguration;
Function<String, Map<String, String>> fillTags = operator -> MapUtil.<String, String>builder()
.put(Constants.METRICS_LABEL_TYPE, operator)
.build();
insertRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.INSERT));
updateRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.UPDATE));
deleteRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.DELETE));
ddlRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.DDL));
unknownRateMetric = new CountMetric(globalConfiguration, Constants.METRICS_SYNC_SOURCE_OPERATION_TYPE_RECEIVE, flinkJob, tableMeta, fillTags.apply(Constants.UNKNOWN));
metrics = ListUtil.toList(insertRateMetric, updateRateMetric, deleteRateMetric, ddlRateMetric, unknownRateMetric);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化指标
MetricsUtils.createMakePointTimer(globalConfiguration, metrics);
}
@Override
public boolean filter(Record record) {
String opType = record.getStatement().getOpType();
switch (opType) {
case Constants.INSERT:
insertRateMetric.increment();
break;
case Constants.UPDATE:
updateRateMetric.increment();
break;
case Constants.DELETE:
deleteRateMetric.increment();
break;
case Constants.DDL:
ddlRateMetric.increment();
break;
default:
unknownRateMetric.increment();
}
return !Constants.DDL.equals(record.getStatement().getOpType());
}
@Override
public void initializeState(FunctionInitializationContext context) {
}
@Override
public void snapshotState(FunctionSnapshotContext context) {
MetricsUtils.publishAllMetrics(metrics);
}
}

View File

@@ -0,0 +1,80 @@
package com.lanyuanxiaoyao.service.sync.functions;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Pulsar message to object
*
* @author ZhangJiacheng
* @date 2022-06-11
*/
public class PulsarMessage2RecordFunction extends RichMapFunction<String, Record> implements CheckpointedFunction {
private static final Logger logger = LoggerFactory.getLogger(PulsarMessage2RecordFunction.class);
private static final AtomicReference<String> lastOperationTime = new AtomicReference<>("");
private final static DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final Pattern OPTS_PATTERN = Pattern.compile("^\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}$");
private final GlobalConfiguration globalConfiguration;
private final FlinkJob flinkJob;
private final TableMeta tableMeta;
private final ObjectMapper mapper = JacksonUtils.getMapper();
public PulsarMessage2RecordFunction(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
this.globalConfiguration = globalConfiguration;
this.flinkJob = flinkJob;
this.tableMeta = tableMeta;
}
@Override
public Record map(String message) throws JsonProcessingException {
Record record = null;
try {
record = mapper.readValue(message, Record.class);
if (RecordHelper.isNotVersionUpdateRecord(record)) {
lastOperationTime.set(record.getStatement().getOpTs());
}
} catch (Exception exception) {
logger.error("Message json parse failure", exception);
}
return record;
}
@Override
public void snapshotState(FunctionSnapshotContext context) {
String opTs = lastOperationTime.get();
Long timestamp = null;
try {
if (StrUtil.isNotBlank(opTs) && OPTS_PATTERN.matcher(opTs).matches()) {
timestamp = LocalDateTime.parse(opTs, FORMATTER).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
} else {
throw new Exception("Opts is not match regex " + OPTS_PATTERN.pattern());
}
} catch (Exception e) {
logger.error("Parse operation time error", e);
}
StatusUtils.syncOperation(globalConfiguration, flinkJob, tableMeta, timestamp);
}
@Override
public void initializeState(FunctionInitializationContext context) {
}
}

View File

@@ -0,0 +1,235 @@
package com.lanyuanxiaoyao.service.sync.functions;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.LogHelper;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.metrics.MessageSizeSizeMetric;
import com.lanyuanxiaoyao.service.sync.metrics.RateMetric;
import com.lanyuanxiaoyao.service.sync.utils.LoadBalance;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.*;
/**
* Pulsar Reader Source
*
* @author ZhangJiacheng
* @date 2022-06-11
*/
public class PulsarMessageSourceReader extends RichParallelSourceFunction<String> implements CheckpointedFunction, CheckpointListener {
private static final Logger logger = LoggerFactory.getLogger(PulsarMessageSourceReader.class);
private static final RetryPolicy<String> MESSAGE_ID_RETRY = RetryPolicy.<String>builder()
.handle(Exception.class)
.withDelay(Duration.ofSeconds(1))
.withMaxAttempts(10)
.build();
private final String topic;
private final GlobalConfiguration globalConfiguration;
private final FlinkJob flinkJob;
private final TableMeta tableMeta;
private final AtomicReference<MessageId> lastMessageId = new AtomicReference<>();
private final AtomicLong lastPublishTime = new AtomicLong(0);
private final RateMetric messageReceiveMetric;
private final MessageSizeSizeMetric messageSizeReceiveMetric;
private final Map<Long, MessageId> messageIdMap = new ConcurrentHashMap<>();
private boolean running = true;
private PulsarClient client = null;
private Reader<String> reader = null;
public PulsarMessageSourceReader(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
logger.info("Use PulsarMessageSourceReader");
this.globalConfiguration = globalConfiguration;
this.flinkJob = flinkJob;
this.topic = tableMeta.getTopic();
this.tableMeta = tableMeta;
String messageId = tableMeta.getConfig().getMessageId();
logger.info("{} {}", Constants.LOG_POINT_PULSAR_SOURCE_BOOTSTRAP_MESSAGE_ID, messageId);
if (StrUtil.isNotBlank(messageId)) {
lastMessageId.set(parseMessageId(messageId));
} else {
logger.warn("Message id is empty");
lastMessageId.set(MessageId.earliest);
}
messageReceiveMetric = new RateMetric(
globalConfiguration,
Constants.METRICS_SYNC_SOURCE_MESSAGE_RECEIVE,
flinkJob,
tableMeta
);
messageSizeReceiveMetric = new MessageSizeSizeMetric(
globalConfiguration,
Constants.METRICS_SYNC_SOURCE_MESSAGE_SIZE_RECEIVE_BYTES,
flinkJob, tableMeta
);
}
private static MessageId parseMessageId(String messageIdText) {
return DefaultImplementation.newMessageId(Long.parseLong(messageIdText.split(":")[0]), Long.parseLong(messageIdText.split(":")[1]), -1);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
LogHelper.info(logger, CHECKPOINT_INITIAL);
String queryUrl = StrUtil.format(
"{}/api/message_id?flink_job_id={}&alias={}",
LoadBalance.getCustomPublishUrl(globalConfiguration),
flinkJob.getId(),
tableMeta.getAlias()
);
logger.info("Query url: {}", queryUrl);
String messageId = Failsafe.with(MESSAGE_ID_RETRY)
.onFailure(event -> {
if (ObjectUtil.isNotNull(event.getException())) {
logger.error(StrUtil.format("{} Get message id error", Constants.LOG_POINT_PULSAR_SOURCE_GET_MESSAGE_ID_ERROR), event.getException());
}
})
.get(() ->
HttpUtil.createGet(queryUrl)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.execute()
.body()
);
LogHelper.info(logger, CHECKPOINT_INITIAL_MESSAGE_ID, "Get message id: {}", messageId);
if (StrUtil.isNotBlank(messageId)) {
lastMessageId.set(parseMessageId(messageId));
} else {
logger.warn(StrUtil.format("{} Message id is empty, now message id is {}", Constants.LOG_POINT_MESSAGE_ID_EMPTY, lastMessageId.get()));
}
}
@Override
public void run(SourceContext<String> context) throws Exception {
String currentValue = null;
while (running) {
Message<String> message;
try {
message = reader.readNext();
if (ObjectUtil.isNotNull(message)) {
String value = message.getValue();
currentValue = value;
if (ObjectUtil.isEmpty(value)) {
logger.warn("{} {}", message.getValue(), message.getMessageId());
}
synchronized (context.getCheckpointLock()) {
context.collect(value);
}
if (RecordHelper.isNotVersionUpdateRecord(value)) {
lastPublishTime.set(message.getPublishTime());
}
lastMessageId.set(message.getMessageId());
messageReceiveMetric.increment();
try {
messageSizeReceiveMetric.increment(message.getValue().getBytes().length);
} catch (Throwable t) {
logger.warn("Parse message size failure", t);
}
}
} catch (Throwable t) {
logger.error("Read message failure, current value: " + currentValue, t);
}
}
}
@Override
public void open(Configuration configuration) throws Exception {
super.open(configuration);
// 初始化指标
MetricsUtils.createMakePointTimer(globalConfiguration, messageReceiveMetric);
MetricsUtils.createMakePointTimer(globalConfiguration, messageSizeReceiveMetric);
try {
client = PulsarClient.builder()
.serviceUrl(tableMeta.getPulsarAddress())
.build();
reader = client.newReader(new StringSchema())
.topic(topic)
.receiverQueueSize(10000)
.subscriptionName(NameHelper.pulsarSubscriptionName(flinkJob.getId(), tableMeta.getAlias()))
.startMessageId(lastMessageId.get())
.startMessageIdInclusive()
.create();
} catch (Exception exception) {
logger.error(StrUtil.format("Connect pulsar error ({} {})", tableMeta.getPulsarAddress(), topic), exception);
throw exception;
}
logger.info("Message id set to {}", lastMessageId.get());
}
@Override
public void cancel() {
running = false;
}
@Override
public void close() throws Exception {
super.close();
if (reader != null) {
try {
reader.close();
} catch (PulsarClientException e) {
logger.error("Pulsar reader close error", e);
}
}
if (client != null) {
try {
client.close();
} catch (PulsarClientException e) {
logger.error("Pulsar client close error", e);
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) {
MessageId messageId = lastMessageId.get();
messageIdMap.put(context.getCheckpointId(), messageId);
LogHelper.info(logger, CHECKPOINT_START, "Checkpoint start message id: {}, checkpoint id: {}", messageId, context.getCheckpointId());
messageReceiveMetric.publish();
messageSizeReceiveMetric.publish();
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
MessageId messageId = messageIdMap.getOrDefault(checkpointId, MessageId.earliest);
LogHelper.info(logger, CHECKPOINT_COMPLETE, "Checkpoint complete message id: {}, checkpoint id: {}", messageId, checkpointId);
StatusUtils.syncCheckpoint(globalConfiguration, flinkJob, tableMeta, messageId.toString(), lastPublishTime.get());
messageIdMap.remove(checkpointId);
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
CheckpointListener.super.notifyCheckpointAborted(checkpointId);
}
}

View File

@@ -0,0 +1,171 @@
package com.lanyuanxiaoyao.service.sync.functions;
import cn.hutool.core.collection.ListUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.LogHelper;
import com.lanyuanxiaoyao.service.common.utils.MapHelper;
import com.lanyuanxiaoyao.service.common.utils.RecordHelper;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverter;
import com.lanyuanxiaoyao.service.sync.metrics.CountMetric;
import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import com.lanyuanxiaoyao.service.sync.utils.StatusUtils;
import com.lanyuanxiaoyao.service.sync.utils.SyncUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.utils.LogHelper.LogPoint.VERSION_UPDATE;
/**
* Record 转 Rowdata
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class Record2RowDataFunction extends RichMapFunction<Record, List<RowData>> implements CheckpointedFunction {
private static final Logger logger = LoggerFactory.getLogger(Record2RowDataFunction.class);
private final GlobalConfiguration globalConfiguration;
private final TableMeta tableMeta;
private final CountMetric changeFilterMetric;
private final CountMetric changePartitionMetric;
private final ObjectMapper mapper = JacksonUtils.getMapper();
private final FlinkJob flinkJob;
private Schema schema;
public Record2RowDataFunction(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta) {
this.globalConfiguration = globalConfiguration;
this.flinkJob = flinkJob;
this.tableMeta = tableMeta;
changeFilterMetric = new CountMetric(
globalConfiguration,
Constants.METRICS_SYNC_SOURCE_CHANGE_FILTER,
flinkJob, tableMeta
);
changePartitionMetric = new CountMetric(
globalConfiguration,
Constants.METRICS_SYNC_SOURCE_CHANGE_PARTITION,
flinkJob, tableMeta
);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
schema = SyncUtils.avroSchemaWithExtraFields(tableMeta);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 初始化指标
MetricsUtils.createMakePointTimer(globalConfiguration, changeFilterMetric);
MetricsUtils.createMakePointTimer(globalConfiguration, changePartitionMetric);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
MetricsUtils.publishAllMetrics(changeFilterMetric);
MetricsUtils.publishAllMetrics(changePartitionMetric);
}
private GenericRowData covert2RowData(Schema schema, Map<String, Object> current) {
List<Schema.Field> fields = schema.getFields();
GenericRowData data = new GenericRowData(fields.size());
for (int index = 0; index < fields.size(); index++) {
Schema.Field field = fields.get(index);
// 如果是telepg的话字段名就要统一改成小写上游不规范下游擦屁股
Object value = current.getOrDefault(Constants.FIELD_COVERT.apply(tableMeta, field.name()), null);
if (field.schema().getType().equals(Schema.Type.STRING)
|| (field.schema().isUnion() && field.schema().getTypes().contains(Schema.create(Schema.Type.STRING)))
|| value instanceof String) {
data.setField(index, StringData.fromString((String) value));
} else {
data.setField(index, value);
}
}
return data;
}
private Boolean isFilterOut(TableMeta tableMeta, Map<String, Object> current) {
if (!tableMeta.getFilterType().equals(TableMeta.FilterType.NONE)) {
if (current.containsKey(tableMeta.getFilterField())) {
String fieldValue = MapHelper.getStringWithoutCase(current, tableMeta.getFilterField());
if (tableMeta.getFilterType().equals(TableMeta.FilterType.EXCLUDE)) {
return tableMeta.getFilterValues().contains(fieldValue);
} else if (tableMeta.getFilterType().equals(TableMeta.FilterType.INCLUDE)) {
return !tableMeta.getFilterValues().contains(fieldValue);
}
}
}
return false;
}
@Override
public List<RowData> map(Record record) throws Exception {
List<Map<String, Object>> result = ListUtil.list(false);
if (RecordHelper.isVersionUpdateRecord(record)) {
Record.Statement statement = record.getStatement();
LogHelper.info(logger, VERSION_UPDATE, "{} {} version: {}", mapper.writeValueAsString(statement.getSchema()), statement.getVersion(), statement.getVersion());
LogHelper.info(logger, VERSION_UPDATE, "Raw: {}", mapper.writeValueAsString(record));
StatusUtils.versionUpdate(globalConfiguration, flinkJob, tableMeta, record.getStatement().getVersion(), statement.getOpTs());
return ListUtil.empty();
}
Map<String, Object> current = RecordHelper.getCurrentStatement(record);
if (Objects.isNull(current)) {
logger.error("Record: {}", mapper.writeValueAsString(record));
throw new RuntimeException("Current cannot be null");
}
// 如果 update 改变了过滤字段的值也需要先删除
boolean isChangeFilter = RecordHelper.isChangeField(tableMeta, record, TableMetaHelper::getFilterField);
if (isChangeFilter) {
logger.info("Change filter: {}", mapper.writeValueAsString(record));
changeFilterMetric.increment();
}
// 如果是 update 且 city_id 不相等就先删除旧记录
boolean isChangePartition = RecordHelper.isChangeField(tableMeta, record, TableMetaHelper::getPartitionField);
if (isChangePartition) {
logger.info("Change partition field: {}", mapper.writeValueAsString(record));
changePartitionMetric.increment();
}
if (isChangeFilter || isChangePartition) {
Map<String, Object> before = record.getStatement().getBefore();
result.add(0, RecordHelper.addExtraMetadata(before, tableMeta, record, true));
}
// 增加 Hudi 特有字段信息
result.add(RecordHelper.addExtraMetadata(current, tableMeta, record));
return result.stream()
// 按过滤字段过滤
.filter(r -> !isFilterOut(tableMeta, r))
.map(r -> TypeConverter.getInstance(tableMeta)
.convertToGenericRowData(tableMeta, schema, r))
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,40 @@
package com.lanyuanxiaoyao.service.sync.functions;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 判断 Record 是否正确
*
* @author ZhangJiacheng
* @date 2022-11-15
*/
public class ValidateRecordFilter extends RichFilterFunction<Record> {
private static final Logger logger = LoggerFactory.getLogger(ValidateRecordFilter.class);
public ValidateRecordFilter(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
}
@Override
public boolean filter(Record record) {
if (ObjectUtil.isNull(record)) {
logger.warn("Record is null");
return false;
}
if (ObjectUtil.isNull(record.getSource())) {
logger.warn("Record Source is null");
return false;
}
if (ObjectUtil.isNull(record.getStatement())) {
logger.warn("Record Statement is null");
return false;
}
return true;
}
}

View File

@@ -0,0 +1,43 @@
package com.lanyuanxiaoyao.service.sync.functions.type;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import java.io.Serializable;
import java.util.Map;
import org.apache.flink.table.data.GenericRowData;
import org.apache.hudi.org.apache.avro.Schema;
/**
* 处理类型相关内容
*
* @author ZhangJiacheng
* @date 2023-07-20
*/
public interface TypeConverter extends Serializable {
LoadingCache<Integer, TypeConverter> CACHE = Caffeine.newBuilder()
.build(version -> {
switch (version) {
case 1:
return new TypeConverterV2();
case 0:
default:
return new TypeConverterV1();
}
});
static TypeConverter getInstance(TableMeta meta) {
return getInstance(meta.getVersion());
}
static TypeConverter getInstance(Integer version) {
if (version == null) {
version = 0;
}
return CACHE.get(version);
}
Schema convertToSchema(TableMeta meta);
GenericRowData convertToGenericRowData(TableMeta meta, Schema schema, Map<String, Object> data);
}

View File

@@ -0,0 +1,53 @@
package com.lanyuanxiaoyao.service.sync.functions.type;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.org.apache.avro.SchemaBuilder;
/**
* 第一版本的类型转换
* 所有字段类型都是 String
*
* @author ZhangJiacheng
* @date 2023-07-20
*/
public class TypeConverterV1 implements TypeConverter {
@Override
public Schema convertToSchema(TableMeta meta) {
SchemaBuilder.FieldAssembler<Schema> fieldBuilder = SchemaBuilder.builder()
.record(meta.getTable())
.fields();
fieldBuilder.nullableBoolean(Constants.HUDI_DELETE_KEY_NAME, false);
fieldBuilder.nullableString(Constants.UNION_KEY_NAME, "");
meta.getFields().forEach(fieldMeta -> fieldBuilder.optionalString(fieldMeta.getName()));
fieldBuilder.nullableLong(Constants.UPDATE_TIMESTAMP_KEY_NAME, -1);
fieldBuilder.nullableString(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, "");
return fieldBuilder.endRecord();
}
@Override
public GenericRowData convertToGenericRowData(TableMeta meta, Schema schema, Map<String, Object> data) {
List<Schema.Field> fields = schema.getFields();
GenericRowData row = new GenericRowData(fields.size());
for (int index = 0; index < fields.size(); index++) {
Schema.Field field = fields.get(index);
// 如果是telepg的话字段名就要统一改成小写上游不规范下游擦屁股
Object value = data.getOrDefault(Constants.FIELD_COVERT.apply(meta, field.name()), null);
if (field.schema().getType().equals(Schema.Type.STRING)
|| (field.schema().isUnion() && field.schema().getTypes().contains(Schema.create(Schema.Type.STRING)))
|| value instanceof String) {
row.setField(index, StringData.fromString((String) value));
} else {
row.setField(index, value);
}
}
return row;
}
}

View File

@@ -0,0 +1,141 @@
package com.lanyuanxiaoyao.service.sync.functions.type;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import java.math.BigDecimal;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.org.apache.avro.JsonProperties;
import org.apache.hudi.org.apache.avro.LogicalTypes;
import org.apache.hudi.org.apache.avro.Schema;
import static org.apache.hudi.org.apache.avro.Schema.*;
/**
* 类型转换第二版本
*
* @author ZhangJiacheng
* @date 2023-07-20
*/
public class TypeConverterV2 implements TypeConverter {
public static final Schema NULL_SCHEMA = create(Type.NULL);
public static final Schema BOOLEAN_SCHEMA = create(Type.BOOLEAN);
public static final Schema INT_SCHEMA = create(Type.INT);
public static final Schema LONG_SCHEMA = create(Type.LONG);
public static final Schema FLOAT_SCHEMA = create(Type.FLOAT);
public static final Schema DOUBLE_SCHEMA = create(Type.DOUBLE);
public static final Schema STRING_SCHEMA = create(Type.STRING);
public static final Function<Integer, Schema> FIXED_SCHEMA = length -> createFixed("decimal_" + length, null, null, length);
public static final BiFunction<Integer, Integer, Schema> DECIMAL_SCHEMA = (length, scala) -> LogicalTypes.decimal(length, scala).addToSchema(FIXED_SCHEMA.apply(length));
public static final BiFunction<Integer, Integer, Schema> NULLABLE_DECIMAL_SCHEMA = (length, scala) -> createUnion(NULL_SCHEMA, DECIMAL_SCHEMA.apply(length, scala));
public static final Schema NULLABLE_BOOLEAN_SCHEMA = createUnion(NULL_SCHEMA, BOOLEAN_SCHEMA);
public static final Schema NULLABLE_INT_SCHEMA = createUnion(NULL_SCHEMA, INT_SCHEMA);
public static final Schema NULLABLE_LONG_SCHEMA = createUnion(NULL_SCHEMA, LONG_SCHEMA);
public static final Schema NULLABLE_FLOAT_SCHEMA = createUnion(NULL_SCHEMA, FLOAT_SCHEMA);
public static final Schema NULLABLE_DOUBLE_SCHEMA = createUnion(NULL_SCHEMA, DOUBLE_SCHEMA);
public static final Schema NULLABLE_STRING_SCHEMA = createUnion(NULL_SCHEMA, STRING_SCHEMA);
public static final Function<Integer, Schema> NULLABLE_FIXED_SCHEMA = length -> createUnion(NULL_SCHEMA, FIXED_SCHEMA.apply(length));
private static final Pattern BOOLEAN_REGEX = Pattern.compile("^boolean|bool$");
private static final Pattern INT_REGEX = Pattern.compile("^(tinyint|smallint|int|smallserial|integer)(\\(\\d+\\))?$");
private static final Pattern LONG_REGEX = Pattern.compile("^(bigint unsigned)|((bigint|serial|long)(\\(\\d+\\))?)$");
private static final Pattern DATE_REGEX = Pattern.compile("^date|timestamp|timestamp without time zone|datetime|time$");
private static final Pattern FLOAT_REGEX = Pattern.compile("^float(\\(\\d+\\))?$");
private static final Pattern DOUBLE_REGEX = Pattern.compile("^double(\\(\\d+\\))?$");
private static final Pattern FIXED_REGEX = Pattern.compile("^(number|money|bigserial)(\\(\\d+\\))?$");
private static final Pattern DECIMAL_REGEX = Pattern.compile("^(double precision)|(decimal(\\(\\s*\\d+\\s*(,\\s*\\d+\\s*)?\\))?)$");
private static final Pattern NUMERIC_REGEX = Pattern.compile("^numeric(\\(\\s*\\d+\\s*(,\\s*\\d+\\s*)?\\))?$");
private static final Pattern STRING_REGEX = Pattern.compile("^(character varying|(long|medium)text)|((varchar|char|text|clob|binary|bit)(\\(\\d+\\))?)$");
private static final Pattern YYYYMMDD = Pattern.compile("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}");
private static final DateTimeFormatter YYYYMMDD_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Override
public Schema convertToSchema(TableMeta meta) {
List<Field> fields = new ArrayList<>(meta.getFields().size() + 4);
fields.add(new Field(Constants.HUDI_DELETE_KEY_NAME, BOOLEAN_SCHEMA, null, false));
fields.add(new Field(Constants.UNION_KEY_NAME, STRING_SCHEMA, null, ""));
for (TableMeta.FieldMeta field : meta.getFields()) {
fields.add(new Field(field.getName(), convertType(field.getType(), field.getLength(), field.getScala()), null, JsonProperties.NULL_VALUE));
}
fields.add(new Field(Constants.UPDATE_TIMESTAMP_KEY_NAME, LONG_SCHEMA, null, -1));
fields.add(new Field(Constants.LATEST_OPERATION_TIMESTAMP_KEY_NAME, STRING_SCHEMA, null, ""));
return Schema.createRecord(meta.getTable(), null, null, false, fields);
}
private Schema convertType(String type, Long length, Integer scala) {
type = type.trim().toLowerCase();
if (BOOLEAN_REGEX.matcher(type).matches()) {
return NULLABLE_BOOLEAN_SCHEMA;
} else if (STRING_REGEX.matcher(type).matches() || DATE_REGEX.matcher(type).matches()) {
return NULLABLE_STRING_SCHEMA;
} else if (INT_REGEX.matcher(type).matches()) {
return NULLABLE_INT_SCHEMA;
} else if (LONG_REGEX.matcher(type).matches()) {
return NULLABLE_LONG_SCHEMA;
} else if (FLOAT_REGEX.matcher(type).matches()) {
return NULLABLE_FLOAT_SCHEMA;
} else if (DOUBLE_REGEX.matcher(type).matches()) {
return NULLABLE_DOUBLE_SCHEMA;
} else if (FIXED_REGEX.matcher(type).matches()) {
return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), 0);
} else if (DECIMAL_REGEX.matcher(type).matches() || NUMERIC_REGEX.matcher(type).matches()) {
if (ObjectUtil.isNull(scala)) {
return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), 6);
} else {
return NULLABLE_DECIMAL_SCHEMA.apply(length.intValue(), scala);
}
} else {
throw new RuntimeException(Constants.LOG_POINT_FIELD_TYPE_NOT_FOUND + " Cannot find correct type for source type: " + type + " length: " + length);
}
}
@Override
public GenericRowData convertToGenericRowData(TableMeta meta, Schema schema, Map<String, Object> data) {
List<Field> fields = schema.getFields();
GenericRowData row = new GenericRowData(fields.size());
for (int index = 0; index < fields.size(); index++) {
Field field = fields.get(index);
Object value = data.getOrDefault(Constants.FIELD_COVERT.apply(meta, field.name()), null);
row.setField(index, covertValue(field.schema(), value));
}
return row;
}
private Object covertValue(Schema schema, Object value) {
if (ObjectUtil.isNull(value)) {
return value;
} else if (NULLABLE_BOOLEAN_SCHEMA.equals(schema) || BOOLEAN_SCHEMA.equals(schema)) {
return value instanceof String ? Boolean.valueOf((String) value) : value;
} else if (NULLABLE_INT_SCHEMA.equals(schema) || INT_SCHEMA.equals(schema)) {
return value instanceof String ? Integer.valueOf((String) value) : value;
} else if (NULLABLE_LONG_SCHEMA.equals(schema) || LONG_SCHEMA.equals(schema)) {
return value instanceof String ? Long.valueOf((String) value) : value;
} else if (NULLABLE_FLOAT_SCHEMA.equals(schema) || FLOAT_SCHEMA.equals(schema)) {
return value instanceof String ? Float.valueOf((String) value) : value;
} else if (NULLABLE_DOUBLE_SCHEMA.equals(schema) || DOUBLE_SCHEMA.equals(schema)) {
return value instanceof String ? Double.valueOf((String) value) : value;
} else if (NULLABLE_STRING_SCHEMA.equals(schema) || STRING_SCHEMA.equals(schema)) {
return StringData.fromString((String) value);
} else {
for (Schema type : schema.getTypes()) {
if (type.getLogicalType() instanceof LogicalTypes.Decimal) {
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) type.getLogicalType();
int precision = decimalType.getPrecision();
int scala = decimalType.getScale();
return DecimalData.fromBigDecimal(new BigDecimal((String) value), precision, scala);
}
}
return value;
}
}
}

View File

@@ -0,0 +1,123 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.JacksonUtils;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 指标基础类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public abstract class AbstractMetric implements Metric {
private static final Logger logger = LoggerFactory.getLogger(AbstractMetric.class);
private static final ObjectMapper MAPPER = JacksonUtils.getMapper();
private static final RetryPolicy<HttpResponse> PUBLISH_RETRY = RetryPolicy.<HttpResponse>builder()
.handle(Exception.class)
.withDelay(Duration.ofSeconds(1))
.withMaxAttempts(5)
.build();
private final GlobalConfiguration globalConfiguration;
private final List<String> lineCache = ListUtil.toList();
private final LongAdder autoPublishCount = new LongAdder();
private List<HttpMetricsRequest> requests = new ArrayList<>();
public AbstractMetric(GlobalConfiguration globalConfiguration) {
this.globalConfiguration = globalConfiguration;
}
public void setRequests(HttpMetricsRequest... requests) {
setRequests(ListUtil.toList(requests));
}
public void setRequests(List<HttpMetricsRequest> requests) {
this.requests = requests;
}
public void addRequest(HttpMetricsRequest request) {
this.requests.add(request);
}
public void addRequests(HttpMetricsRequest... requests) {
addRequests(ListUtil.toList(requests));
}
public void addRequests(List<HttpMetricsRequest> requests) {
this.requests.addAll(requests);
}
@Override
public void addTag(String key, String value) {
requests.forEach(request -> request.addTag(key, value));
}
@Override
public void makePoint(boolean autoPublish, int batch) {
if (!globalConfiguration.getMetricEnable()) {
return;
}
if (autoPublish) {
if (autoPublishCount.sum() >= batch) {
publish();
}
autoPublishCount.increment();
}
makePoint();
}
public synchronized void publish() {
if (!globalConfiguration.getMetricEnable()) {
return;
}
try {
requests.stream()
.filter(request -> !request.isEmpty())
.map(request -> {
try {
String data = MAPPER.writeValueAsString(request);
request.clear();
return data;
} catch (JsonProcessingException e) {
logger.warn("Parse metrics failure: " + request, e);
}
return null;
})
.filter(Objects::nonNull)
.forEach(lineCache::add);
if (lineCache.isEmpty()) {
return;
}
String lines = String.join("\n", lineCache);
logger.debug("Push metrics: \n{}", lines);
HttpResponse response = Failsafe.with(PUBLISH_RETRY)
.get(() -> HttpUtil.createPost(globalConfiguration.getMetricPublishUrl())
.body(lines)
.basicAuth(Constants.VICTORIA_USERNAME, Constants.VICTORIA_PASSWORD)
.timeout(globalConfiguration.getMetricPublishTimeout())
.execute());
if (response.isOk()) {
logger.debug("Metrics push success");
}
} catch (Throwable throwable) {
logger.warn("Push metrics failure, url: " + globalConfiguration.getMetricPublishUrl(), throwable);
} finally {
lineCache.clear();
autoPublishCount.reset();
}
}
}

View File

@@ -0,0 +1,67 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
/**
* 基础类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class CountMetric extends AbstractMetric {
private final LongAdder count = new LongAdder();
private final HttpMetricsRequest countMetrics;
public CountMetric(GlobalConfiguration globalConfiguration, String name) {
this(globalConfiguration, name, MapUtil.empty());
}
public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build());
}
public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) {
this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue));
}
public CountMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map<String, String> tags) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta))
.putAll(tags)
.build());
}
public CountMetric(GlobalConfiguration globalConfiguration, String name, Map<String, String> tags) {
super(globalConfiguration);
countMetrics = new HttpMetricsRequest(
name + "_count",
MapUtil.<String, String>builder().putAll(tags).build()
);
setRequests(countMetrics);
}
public void increment() {
count.increment();
}
@Override
public void makePoint() {
double count = this.count.doubleValue();
if (count != 0) {
countMetrics.addMetric(count);
}
}
@Override
public List<HttpMetricsRequest> getMetrics() {
return ListUtil.toList(countMetrics);
}
}

View File

@@ -0,0 +1,88 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import java.beans.Transient;
import java.io.Serializable;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 指标实体类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class HttpMetricsRequest implements Serializable {
private final Map<String, String> metric;
private final List<Double> values;
private final List<Long> timestamps;
private final Lock lock = new ReentrantLock();
public HttpMetricsRequest(String name, Map<String, String> metrics) {
this.metric = MapUtil.<String, String>builder()
.put("__name__", name)
.build();
this.metric.putAll(metrics);
this.values = Collections.synchronizedList(ListUtil.list(true));
this.timestamps = Collections.synchronizedList(ListUtil.list(true));
}
public void addTag(String key, String value) {
this.metric.put(key, value);
}
public void addMetric(Double value) {
addMetric(value, Instant.now().toEpochMilli());
}
public void addMetric(Double value, Long timestamp) {
synchronized (this) {
values.add(value);
timestamps.add(timestamp);
}
}
public void clear() {
synchronized (this) {
this.values.clear();
this.timestamps.clear();
}
}
@Transient
public boolean isEmpty() {
return this.values.isEmpty() && this.timestamps.isEmpty();
}
@Transient
public boolean isNonEmpty() {
return !isEmpty();
}
public Map<String, String> getMetric() {
return metric;
}
public List<Double> getValues() {
return values;
}
public List<Long> getTimestamps() {
return timestamps;
}
@Override
public String toString() {
return "MetricsItem{" +
"metrics=" + metric +
", values=" + values +
", timestamps=" + timestamps +
'}';
}
}

View File

@@ -0,0 +1,77 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
/**
* 基础类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class MessageSizeSizeMetric extends AbstractMetric {
private final LongAdder count = new LongAdder();
private final DoubleAdder size = new DoubleAdder();
private final HttpMetricsRequest sizeMetrics;
private final HttpMetricsRequest perMessageSizeMetrics;
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name) {
this(globalConfiguration, name, MapUtil.empty());
}
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build());
}
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) {
this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue));
}
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map<String, String> tags) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta))
.putAll(tags)
.build());
}
public MessageSizeSizeMetric(GlobalConfiguration globalConfiguration, String name, Map<String, String> tags) {
super(globalConfiguration);
sizeMetrics = new HttpMetricsRequest(
name + "_total",
MapUtil.<String, String>builder().putAll(tags).build()
);
perMessageSizeMetrics = new HttpMetricsRequest(
name + "_per_message",
MapUtil.<String, String>builder().putAll(tags).build()
);
setRequests(sizeMetrics, perMessageSizeMetrics);
}
public void increment(long size) {
this.count.increment();
this.size.add(size);
}
@Override
public void makePoint() {
double count = this.count.doubleValue();
double size = this.size.doubleValue();
if (size != 0 && count != 0) {
sizeMetrics.addMetric(size);
perMessageSizeMetrics.addMetric(size / count);
}
}
@Override
public List<HttpMetricsRequest> getMetrics() {
return ListUtil.toList(sizeMetrics);
}
}

View File

@@ -0,0 +1,20 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import java.io.Serializable;
import java.util.List;
/**
* 指标类定义
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public interface Metric extends Serializable {
void addTag(String key, String value);
void makePoint(boolean autoPublish, int batch);
void makePoint();
List<HttpMetricsRequest> getMetrics();
}

View File

@@ -0,0 +1,87 @@
package com.lanyuanxiaoyao.service.sync.metrics;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.utils.MetricsUtils;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;
/**
* 基础类
*
* @author ZhangJiacheng
* @date 2022-06-13
*/
public class RateMetric extends AbstractMetric {
private final LongAdder count = new LongAdder();
private final HttpMetricsRequest countMetrics;
private final HttpMetricsRequest millisecondMetrics;
private final HttpMetricsRequest perSecondMetrics;
private final Instant startInstant;
public RateMetric(GlobalConfiguration globalConfiguration, String name) {
this(globalConfiguration, name, MapUtil.empty());
}
public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta)).build());
}
public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, String extraTagKey, String extraTagValue) {
this(globalConfiguration, name, job, meta, MapUtil.of(extraTagKey, extraTagValue));
}
public RateMetric(GlobalConfiguration globalConfiguration, String name, FlinkJob job, TableMeta meta, Map<String, String> tags) {
this(globalConfiguration, name, MapUtil.builder(MetricsUtils.commonTags(job, meta))
.putAll(tags)
.build());
}
public RateMetric(GlobalConfiguration globalConfiguration, String name, Map<String, String> tags) {
super(globalConfiguration);
startInstant = Instant.now();
countMetrics = new HttpMetricsRequest(
name + "_count",
MapUtil.<String, String>builder().putAll(tags).build()
);
millisecondMetrics = new HttpMetricsRequest(
name + "_millisecond",
MapUtil.<String, String>builder().putAll(tags).build()
);
perSecondMetrics = new HttpMetricsRequest(
name + "_per_millisecond",
MapUtil.<String, String>builder().putAll(tags).build()
);
setRequests(countMetrics, millisecondMetrics, perSecondMetrics);
}
public void increment() {
count.increment();
}
@Override
public void makePoint() {
double count = this.count.doubleValue();
if (count != 0) {
long millis = Duration.between(startInstant, Instant.now()).toMillis();
countMetrics.addMetric(count);
millisecondMetrics.addMetric((double) millis);
perSecondMetrics.addMetric(count / millis);
}
}
@Override
public List<HttpMetricsRequest> getMetrics() {
return ListUtil.toList(countMetrics, millisecondMetrics, perSecondMetrics);
}
}

View File

@@ -0,0 +1,98 @@
package com.lanyuanxiaoyao.service.sync.utils;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.exception.MissingArgumentException;
import java.util.List;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
/**
* 入参解析相关内容
*
* @author ZhangJiacheng
* @date 2022-03-10
*/
public class ArgumentsUtils {
public static long getJobId(String[] args) throws MissingArgumentException {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.JOB_ID)) {
throw new MissingArgumentException(Constants.JOB_ID);
}
return argsTool.getLong(Constants.JOB_ID);
}
public static String getTable(String[] args) throws MissingArgumentException {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.TABLE_NAME)) {
throw new MissingArgumentException(Constants.TABLE_NAME);
}
return argsTool.get(Constants.TABLE_NAME);
}
public static Boolean getServiceMode(String[] args) throws Exception {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.SERVICE_MODE)) {
return false;
}
return argsTool.getBoolean(Constants.SERVICE_MODE);
}
public static String getMessageId(String[] args) throws MissingArgumentException {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.MESSAGE_ID)) {
throw new MissingArgumentException(Constants.MESSAGE_ID);
}
return argsTool.get(Constants.MESSAGE_ID);
}
public static TableMeta getTableMeta(String[] args) throws Exception {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.TABLE_META)) {
throw new MissingArgumentException(Constants.TABLE_META);
}
return JacksonUtils.getMapper().readValue(argsTool.get(Constants.TABLE_META), TableMeta.class);
}
public static List<TableMeta> getTableMetaList(String[] args) throws Exception {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.TABLE_META_LIST)) {
throw new MissingArgumentException(Constants.TABLE_META_LIST);
}
return JacksonUtils.getMapper().readValue(argsTool.get(Constants.TABLE_META_LIST), new TypeReference<List<TableMeta>>() {});
}
public static FlinkJob getFlinkJob(String[] args) throws MissingArgumentException, JsonProcessingException {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.FLINK_JOB)) {
throw new MissingArgumentException(Constants.FLINK_JOB);
}
return JacksonUtils.getMapper().readValue(argsTool.get(Constants.FLINK_JOB), FlinkJob.class);
}
public static String getInstants(String[] args) throws JsonProcessingException {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.INSTANTS)) {
return "";
}
return argsTool.get(Constants.INSTANTS);
}
public static Boolean getBetaMode(String[] args) {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.BETA)) {
return false;
}
return argsTool.getBoolean(Constants.BETA);
}
public static String getCluster(String[] args) {
ParameterTool argsTool = ParameterTool.fromArgs(args);
if (!argsTool.has(Constants.CLUSTER)) {
return "";
}
return argsTool.get(Constants.CLUSTER);
}
}

View File

@@ -0,0 +1,22 @@
package com.lanyuanxiaoyao.service.sync.utils;
import com.lanyuanxiaoyao.service.common.Constants;
/**
* 环境变量相关参数
*
* @author ZhangJiacheng
* @date 2022-06-21
*/
public class EnvUtils {
public static void setEnv() {
}
public static void setCompactionEnv() {
System.setProperty(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION);
}
public static void setSyncEnv() {
System.setProperty(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC);
}
}

View File

@@ -0,0 +1,31 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.util.ObjectUtil;
import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Json 解析相关工具
*
* @author ZhangJiacheng
* @date 2022-06-12
*/
public class JacksonUtils implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(JacksonUtils.class);
private static ObjectMapper INSTANCE = null;
public static ObjectMapper getMapper() {
if (ObjectUtil.isNull(INSTANCE)) {
INSTANCE = new ObjectMapper();
INSTANCE.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
INSTANCE.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
}
return INSTANCE;
}
}

View File

@@ -0,0 +1,29 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 动态平衡获取指标 url
*
* @author ZhangJiacheng
* @date 2022-07-05
*/
public class LoadBalance {
private static final Logger logger = LoggerFactory.getLogger(LoadBalance.class);
private static String[] urls = null;
private static int length = 0;
public static String getCustomPublishUrl(GlobalConfiguration globalConfiguration) {
if (ObjectUtil.isNull(urls)) {
urls = globalConfiguration.getMetricsPublishCustomUrl().split(",");
length = urls.length;
}
String url = urls[RandomUtil.randomInt(length)];
logger.info("Random url: {}", url);
return url;
}
}

View File

@@ -0,0 +1,64 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.metrics.AbstractMetric;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 指标工具
*
* @author ZhangJiacheng
* @date 2022-06-12
*/
public class MetricsUtils implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(MetricsUtils.class);
public static Map<String, String> commonTags(FlinkJob job, TableMeta meta) {
return MapUtil.<String, String>builder()
.put(Constants.METRICS_LABEL_FLINK_JOB_ID, job.getId().toString())
.put(Constants.METRICS_LABEL_FLINK_JOB_NAME, job.getName())
.put(Constants.METRICS_LABEL_SCHEMA, meta.getSchema())
.put(Constants.METRICS_LABEL_TABLE, meta.getTable())
.put(Constants.METRICS_LABEL_ALIAS, meta.getAlias())
.build();
}
@SafeVarargs
public static <T extends AbstractMetric> void createMakePointTimer(GlobalConfiguration globalConfiguration, T... metrics) {
createMakePointTimer(globalConfiguration, ListUtil.toList(metrics));
}
public static <T extends AbstractMetric> void createMakePointTimer(GlobalConfiguration globalConfiguration, List<T> metrics) {
logger.info("Create timer: {}", metrics);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
for (AbstractMetric metric : metrics) {
metric.makePoint(true, globalConfiguration.getMetricPublishBatch());
}
}
}, globalConfiguration.getMetricPublishDelay(), globalConfiguration.getMetricPublishPeriod());
}
@SafeVarargs
public static <T extends AbstractMetric> void publishAllMetrics(T... metrics) {
publishAllMetrics(ListUtil.toList(metrics));
}
public static <T extends AbstractMetric> void publishAllMetrics(List<T> metrics) {
for (AbstractMetric metric : metrics) {
metric.publish();
}
}
}

View File

@@ -0,0 +1,271 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.util.EnumUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.configuration.RetryPolicyProvider;
import dev.failsafe.Failsafe;
import java.time.Instant;
import java.util.Map;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 状态输出
*
* @author ZhangJiacheng
* @date 2022-07-05
*/
public class StatusUtils {
private static final Logger logger = LoggerFactory.getLogger(StatusUtils.class);
private static final ObjectMapper MAPPER = JacksonUtils.getMapper();
private static final int HTTP_TIMEOUT = (int) Constants.MINUTE;
public static void syncStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
logger.info("Enter method: syncStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_start?flink_job_id={}&alias={}&database={}&schema={}&table={}&cluster={}&application_id={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
tableMeta.getSource(),
tableMeta.getSchema(),
tableMeta.getTable(),
configuration.getCluster(),
configuration.getApplicationId()
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute()
);
} catch (Exception e) {
logger.warn("sync start metrics submit failure");
}
}
public static void syncCheckpoint(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String messageId, Long publishTime) {
logger.info("Enter method: syncCheckpoint[configuration, flinkJob, tableMeta, messageId, publishTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "messageId:" + messageId + "," + "publishTime:" + publishTime);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_checkpoint_state?flink_job_id={}&alias={}&message_id={}&publish_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
messageId,
publishTime
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute()
);
} catch (Exception e) {
logger.warn("sync checkpoint metrics submit failure");
}
}
public static void syncOperation(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
syncOperation(configuration, flinkJob, tableMeta, null);
}
public static void syncOperation(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long operationTime) {
logger.info("Enter method: syncOperation[configuration, flinkJob, tableMeta, operationTime]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "operationTime:" + operationTime);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> {
if (ObjectUtil.isNull(operationTime)) {
HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_operation_state?flink_job_id={}&alias={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias()
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute();
} else {
HttpUtil.createGet(
StrUtil.format(
"{}/api/sync_operation_state?flink_job_id={}&alias={}&operation_time={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
operationTime
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute();
}
});
} catch (Exception e) {
logger.warn("sync operation metrics submit failure");
}
}
public static void compactionStart(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta) {
logger.info("Enter method: compactionStart[configuration, flinkJob, tableMeta]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/compaction_start?flink_job_id={}&alias={}&type={}&cluster={}&application_id={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
EnumUtil.toString(tableMeta.getSourceType()),
configuration.getCluster(),
configuration.getApplicationId()
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute()
);
} catch (Exception e) {
logger.warn("compaction start metrics submit failure");
}
}
public static void compactionPreCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, Map<String, Long> metadata) {
logger.info("Enter method: compactionPreCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> HttpUtil.createPost(
StrUtil.format(
"{}/api/compaction_pre_commit?flink_job_id={}&alias={}&instant={}&cluster={}&application_id={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
instant,
configuration.getCluster(),
configuration.getApplicationId()
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.body(MAPPER.writeValueAsString(metadata))
.timeout(HTTP_TIMEOUT)
.execute()
);
} catch (Exception e) {
logger.warn("compaction pre commit metrics submit failure");
}
}
public static void compactionCommit(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String instant, HoodieCommitMetadata metadata) {
logger.info("Enter method: compactionCommit[configuration, flinkJob, tableMeta, instant, metadata]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "instant:" + instant + "," + "metadata:" + metadata);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> HttpUtil.createPost(
StrUtil.format(
"{}/api/compaction_commit?flink_job_id={}&alias={}&instant={}&cluster={}&application_id={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
instant,
configuration.getCluster(),
configuration.getApplicationId()
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.body(MAPPER.writeValueAsString(metadata))
.timeout(HTTP_TIMEOUT)
.execute()
);
} catch (Exception e) {
logger.warn("compaction commit metrics submit failure");
}
}
public static void compactionFinish(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String message, Exception exception) {
logger.info("Enter method: compactionFinish[configuration, flinkJob, tableMeta, message, exception]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "message:" + message + "," + "exception:" + exception);
try {
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() -> {
boolean success = (exception == null);
HttpUtil.createPost(StrUtil.format(
"{}/api/compaction_finish?flink_job_id={}&alias={}&time={}&state={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
Instant.now().toEpochMilli(),
success
))
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.body(success ? message == null ? "" : message : exception.toString(), "text/plain")
.timeout(HTTP_TIMEOUT)
.execute()
.close();
});
} catch (Exception e) {
logger.warn("compaction finish metrics submit failure");
}
}
public static void versionUpdate(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, String version, String opts) {
logger.info("Enter method: versionUpdate[configuration, flinkJob, tableMeta, version, opts]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "version:" + version + "," + "opts:" + opts);
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/version_update?flink_job_id={}&alias={}&version={}&opts={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
version,
opts
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute()
);
}
public static void compactionLatestOpTs(GlobalConfiguration configuration, FlinkJob flinkJob, TableMeta tableMeta, Long latestOpTs) {
logger.info("Enter method: compactionLatestOpTs[configuration, flinkJob, tableMeta, latestOpTs]. " + "configuration:" + configuration + "," + "flinkJob:" + flinkJob + "," + "tableMeta:" + tableMeta + "," + "latestOpTs:" + latestOpTs);
Failsafe.with(RetryPolicyProvider.HTTP_RETRY)
.run(() ->
HttpUtil.createGet(
StrUtil.format(
"{}/api/compaction_latest_operation_time?flink_job_id={}&alias={}&latest_op_ts={}",
LoadBalance.getCustomPublishUrl(configuration),
flinkJob.getId(),
tableMeta.getAlias(),
latestOpTs
)
)
.header(Constants.API_HEADER_NAME, Constants.API_VERSION)
.basicAuth(Constants.SPRING_SECURITY_USERNAME, Constants.SPRING_SECURITY_PASSWORD_PLAIN)
.timeout(HTTP_TIMEOUT)
.execute()
);
}
}

View File

@@ -0,0 +1,257 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.EnumUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.DefaultPartitionNameKeyGenerator;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.configuration.TraceOverwriteWithLatestAvroPayload;
import com.lanyuanxiaoyao.service.sync.configuration.TraceWriteStatus;
import com.lanyuanxiaoyao.service.sync.functions.OperationTypeFilter;
import com.lanyuanxiaoyao.service.sync.functions.Record2RowDataFunction;
import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverter;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.util.AvroSchemaConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.Constants.HOUR;
/**
* Flink 相关的工具
*
* @author lanyuanxiaoyao
* @version 0.0.2
* @date 2022-04-20
*/
@SuppressWarnings("UnusedAssignment")
public class SyncUtils {
private static final Logger logger = LoggerFactory.getLogger(SyncUtils.class);
private static final long K = 1024;
private static final long M = 1024 * K;
private static final long G = 1024 * M;
public static Schema avroSchemaWithExtraFields(TableMeta meta) {
return TypeConverter.getInstance(meta).convertToSchema(meta);
}
public static Configuration getSyncFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), false);
configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name());
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl());
configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000);
configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList(
Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")),
Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()),
Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()),
Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias())
).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";")));
return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism);
}
public static Configuration getCompactionFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), false);
configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name());
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl());
configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000);
configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList(
Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")),
Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()),
Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()),
Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias())
).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";")));
return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism);
}
public static Configuration getFlinkConfiguration(Configuration inputConfiguration, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
String tableType = tableMeta.getHudi().getTargetTableType();
logger.info("Hudi table type: {}", tableMeta.getHudi().getTargetTableType());
// 基本信息
configuration.setString(FlinkOptions.TABLE_NAME, tableMeta.getHudi().getTargetTable());
configuration.setString(FlinkOptions.TABLE_TYPE, tableType);
configuration.setString(FlinkOptions.PATH, tableMeta.getHudi().getTargetHdfsPath());
configuration.setString(FlinkOptions.RECORD_KEY_FIELD, Constants.UNION_KEY_NAME);
configuration.setBoolean(FlinkOptions.PRE_COMBINE, false);
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_PRE_COMBINE)) {
configuration.setBoolean(FlinkOptions.PRE_COMBINE, true);
}
configuration.setString(FlinkOptions.PRECOMBINE_FIELD, Constants.UPDATE_TIMESTAMP_KEY_NAME);
configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema.toString());
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_NO_IGNORE_FAILED)) {
configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false);
}
configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default");
configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, DefaultPartitionNameKeyGenerator.class.getName());
Optional<String> partitionPath = TableMetaHelper.getPartitionField(tableMeta);
logger.info("Partition field: {}", partitionPath.orElse(""));
if (partitionPath.isPresent()) {
configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get());
}
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_TRACE_LATEST_OP_TS)) {
logger.info("Enable trace latest op ts");
configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName());
configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName());
}
configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false);
configuration.setInteger(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), Integer.MAX_VALUE);
configuration.setString(FileSystemViewStorageConfig.SECONDARY_VIEW_TYPE.key(), FileSystemViewStorageType.SPILLABLE_DISK.name());
// Write
configuration.setInteger(FlinkOptions.WRITE_TASKS, tableMeta.getHudi().getWriteTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getWriteTasks());
configuration.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, 0);
configuration.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, tableMeta.getHudi().getWriteTaskMaxMemory() == 0 ? FlinkOptions.WRITE_TASK_MAX_SIZE.defaultValue() : tableMeta.getHudi().getWriteTaskMaxMemory());
configuration.setDouble(FlinkOptions.WRITE_BATCH_SIZE, tableMeta.getHudi().getWriteBatchSize() == 0 ? FlinkOptions.WRITE_BATCH_SIZE.defaultValue() : tableMeta.getHudi().getWriteBatchSize());
configuration.setLong(FlinkOptions.WRITE_RATE_LIMIT, tableMeta.getHudi().getWriteRateLimit());
configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, HOUR);
// 索引
configuration.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
configuration.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, tableMeta.getHudi().getBucketIndexNumber() == 0 ? 50 : tableMeta.getHudi().getBucketIndexNumber());
configuration.setString(FlinkOptions.INDEX_KEY_FIELD, Constants.UNION_KEY_NAME);
configuration.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false);
configuration.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, false);
configuration.setDouble(FlinkOptions.INDEX_STATE_TTL, -1);
// 增大 就 OOM
// configuration.setDouble(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 64 * M);
// 增大 就 OOM
// configuration.setDouble(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), 128 * M);
// Compaction
configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
if (EnumUtil.equals(HoodieTableType.COPY_ON_WRITE, tableType)) {
configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
}
configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks());
configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, StrUtil.isBlank(tableMeta.getHudi().getCompactionStrategy()) ? FlinkOptions.NUM_OR_TIME : tableMeta.getHudi().getCompactionStrategy());
configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds() == 0 ? 15 * 60 : tableMeta.getHudi().getCompactionDeltaSeconds());
configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits() == 0 ? 5 : tableMeta.getHudi().getCompactionDeltaCommits());
configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), UnBoundedCompactionStrategy.class.getName());
// configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), CombineAllCompactionStrategy.class.getName());
// configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
// configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getCompactionTasks());
// configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, tableMeta.getHudi().getCompactionMaxMemory());
// configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, tableMeta.getHudi().getCompactionStrategy());
// configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits());
// configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds());
// 时间线保留个数
configuration.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion());
// 时间线归档最小保留个数,要比上一个参数大
configuration.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 50);
// 时间线归档最大保留个数,要比上一个参数大
configuration.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 100);
// log文件和data文件保留版本数
configuration.setString(FlinkOptions.CLEAN_POLICY, HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name());
configuration.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, tableMeta.getHudi().getKeepFileVersion());
// 关闭一个内置的 http 服务
// configuration.setBoolean(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), false);
return configuration;
}
public static void sinkToHoodieByTable(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta, StreamExecutionEnvironment environment, DataStream<Record> inputDataStream) {
Schema schema = avroSchemaWithExtraFields(tableMeta);
DataStream<RowData> dataStream = inputDataStream
.filter(new OperationTypeFilter(globalConfiguration, flinkJob, tableMeta))
.name("Count operation type")
.map(new Record2RowDataFunction(globalConfiguration, flinkJob, tableMeta), TypeInformation.of(new TypeHint<List<RowData>>() {
}))
.name("Covert Row ( " + tableMeta.getSchema() + "-" + tableMeta.getTable() + " )")
.flatMap((list, collector) -> list.forEach(collector::collect), TypeInformation.of(RowData.class))
.filter(Objects::nonNull)
.name("Filter not null");
RowType rowType = (RowType) AvroSchemaConverter.convertToDataType(schema).getLogicalType();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(
environment,
EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build()
);
Configuration configuration = tableEnvironment.getConfig().getConfiguration();
int parallelism = configuration.getInteger("parallelism", 1);
configuration = getSyncFlinkConfiguration(globalConfiguration, configuration, flinkJob, tableMeta, schema, parallelism);
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism, hoodieRecordDataStream);
if (OptionsResolver.needsAsyncCompaction(configuration)) {
Pipelines.compact(configuration, pipeline);
} else {
Pipelines.clean(configuration, pipeline);
}
}
}

View File

@@ -0,0 +1,98 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.util.ObjectUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.NameHelper;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.flink.shaded.curator4.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.shaded.curator4.org.apache.curator.utils.CloseableUtils;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Zk 操作
*
* @author ZhangJiacheng
* @date 2023-05-10
*/
public class ZkUtils {
private static final Logger logger = LoggerFactory.getLogger(ZkUtils.class);
private static CuratorFramework client;
private static void createClient(String url) {
System.setProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, "false");
if (ObjectUtil.isNull(client)) {
client = CuratorFrameworkFactory.builder()
.connectString(url)
.retryPolicy(new ExponentialBackoffRetry((int) (5 * Constants.SECOND), 5))
.sessionTimeoutMs((int) (10 * Constants.SECOND))
.connectionTimeoutMs((int) (10 * Constants.SECOND))
.build();
}
if (!CuratorFrameworkState.STARTED.equals(client.getState())) {
client.start();
}
}
public static void closeClient() {
if (ObjectUtil.isNotNull(client)) {
CloseableUtils.closeQuietly(client);
}
}
public static void createSynchronizerLock(FlinkJob job, String zookeeperUrl, String runMeta) {
createLock(zookeeperUrl, runMeta, NameHelper.syncRunningLockPath(job.getId()));
}
public static void createSynchronizerLock(FlinkJob job, TableMeta meta, String zookeeperUrl, String runMeta) {
createLock(zookeeperUrl, runMeta, NameHelper.syncRunningLockPath(job.getId(), meta.getAlias()));
}
public static void releaseSynchronizerLock(FlinkJob job, TableMeta meta) {
releaseLock(NameHelper.syncRunningLockPath(job.getId(), meta.getAlias()));
}
public static void createCompactionLock(FlinkJob job, TableMeta meta, String zookeeperUrl, String runMeta) {
createLock(zookeeperUrl, runMeta, NameHelper.compactionRunningLockPath(job.getId(), meta.getAlias()));
}
public static void releaseCompactionLock(FlinkJob job, TableMeta meta) {
releaseLock(NameHelper.compactionRunningLockPath(job.getId(), meta.getAlias()));
}
private static void createLock(String zookeeperUrl, String runMeta, String lockPath) {
try {
createClient(zookeeperUrl);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(lockPath, runMeta.getBytes());
} catch (KeeperException.NodeExistsException e) {
logger.error("Lock exists for " + lockPath, e);
throw new RuntimeException(e);
} catch (Exception e) {
logger.error("Unknown error", e);
throw new RuntimeException(e);
}
}
private static void releaseLock(String lockPath) {
try {
if (ObjectUtil.isNotNull(client)) {
if (ObjectUtil.isNotNull(client.checkExists().forPath(lockPath))) {
client.delete().forPath(lockPath);
}
}
} catch (Exception e) {
logger.error("Unknown error", e);
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,87 @@
<configuration>
<appender name="Loki" class="pl.tkowalcz.tjahzi.logback.LokiAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<url>${loki_push_url:- }</url>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%ex{full}</pattern>
</encoder>
<label>
<name>app</name>
<value>hudi-${run_type:- }</value>
</label>
<label>
<name>host</name>
<value>${HOSTNAME}</value>
</label>
<label>
<name>run_type</name>
<value>${run_type:- }</value>
</label>
<label>
<name>flink_job_id</name>
<value>${flink_job_id:- }</value>
</label>
<label>
<name>flink_job_name</name>
<value>${flink_job_name:- }</value>
</label>
<label>
<name>schema</name>
<value>${schema:- }</value>
</label>
<label>
<name>table</name>
<value>${table:- }</value>
</label>
<label>
<name>batch_id</name>
<value>${batch_id:- }</value>
</label>
<label>
<name>alias</name>
<value>${alias:- }</value>
</label>
<label>
<name>app_id</name>
<value>${_APP_ID:- }</value>
</label>
<label>
<name>container_id</name>
<value>${CONTAINER_ID:- }</value>
</label>
<logLevelLabel>level</logLevelLabel>
</appender>
<appender name="File" class="ch.qos.logback.core.FileAppender">
<file>run.log</file>
<append>false</append>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%ex{full}</pattern>
</encoder>
</appender>
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %p [${HOSTNAME}] [%t] %logger #@# %m%n%ex{full}</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="Loki"/>
<appender-ref ref="File"/>
<appender-ref ref="Console"/>
</root>
<logger name="org.apache.hadoop.conf.Configuration" level="ERROR"/>
<logger name="org.apache.hadoop.util.NativeCodeLoader" level="ERROR"/>
<logger name="org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory" level="ERROR"/>
<logger name="akka" level="ERROR"/>
<logger name="org.apache.flink.runtime" level="ERROR"/>
<logger name="org.apache.flink.runtime.taskexecutor.TaskExecutor" level="WARN"/>
<logger name="org.apache.flink.core.plugin.PluginConfig" level="ERROR"/>
<logger name="org.apache.hudi" level="INFO"/>
<logger name="com.eshore.odcp.hudi.connector.sync" level="INFO"/>
</configuration>