feat(sync): 加入是否使用HSync的配置项

This commit is contained in:
v-zhangjc9
2024-05-20 15:22:30 +08:00
parent fd9f97d03b
commit 010ea83b5c
4 changed files with 5 additions and 31 deletions

View File

@@ -200,7 +200,7 @@ public interface Constants {
String TAGS_NO_PRE_COMBINE = "NO_PRE_COMBINE";
String TAGS_PRE_COMBINE = "PRE_COMBINE";
String TAGS_NO_IGNORE_FAILED = "NO_IGNORE_FAILED";
String TAGS_USE_HFLUSH = "USE_HFLUSH";
String TAGS_DISABLE_HSYNC = "DISABLE_HSYNC";
String TAGS_DISABLE_CHAINING = "DISABLE_CHAINING";
String TAGS_TRACE_LATEST_OP_TS = "TRACE_LATEST_OP_TS";
String TAGS_SOURCE_READER = "SOURCE_READER";

View File

@@ -2037,7 +2037,7 @@ let tagsMapping = [
mappingItem('不忽略写日志错误', 'NO_IGNORE_FAILED'),
mappingItem('取消算子合并', 'DISABLE_CHAINING'),
mappingItem('跟踪压缩op_ts', 'TRACE_LATEST_OP_TS'),
mappingItem('HFlush', 'USE_HFLUSH'),
mappingItem('不使用HSync', 'DISABLE_HSYNC'),
]
let hudiTableTypeMapping = [

View File

@@ -114,9 +114,9 @@ public class ConfigurationUtils {
configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false);
}
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) {
logger.info("Enable hflush");
configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true);
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_DISABLE_HSYNC)) {
logger.info("Disable hsync");
configuration.setBoolean(HoodieWriteConfig.USE_HSYNC.key(), false);
}
configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default");

View File

@@ -1,21 +1,14 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.EnumUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.Record;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import com.lanyuanxiaoyao.service.sync.functions.OperationTypeFilter;
import com.lanyuanxiaoyao.service.sync.functions.Record2RowDataFunction;
import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverter;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
@@ -25,33 +18,14 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.client.TraceWriteStatus;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.util.AvroSchemaConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.Constants.HOUR;
/**
* Flink 相关的工具
*