diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index 56f24f7..e3723f5 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -200,7 +200,7 @@ public interface Constants { String TAGS_NO_PRE_COMBINE = "NO_PRE_COMBINE"; String TAGS_PRE_COMBINE = "PRE_COMBINE"; String TAGS_NO_IGNORE_FAILED = "NO_IGNORE_FAILED"; - String TAGS_USE_HFLUSH = "USE_HFLUSH"; + String TAGS_DISABLE_HSYNC = "DISABLE_HSYNC"; String TAGS_DISABLE_CHAINING = "DISABLE_CHAINING"; String TAGS_TRACE_LATEST_OP_TS = "TRACE_LATEST_OP_TS"; String TAGS_SOURCE_READER = "SOURCE_READER"; diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index 8cdd684..1250971 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -2037,7 +2037,7 @@ let tagsMapping = [ mappingItem('不忽略写日志错误', 'NO_IGNORE_FAILED'), mappingItem('取消算子合并', 'DISABLE_CHAINING'), mappingItem('跟踪压缩op_ts', 'TRACE_LATEST_OP_TS'), - mappingItem('HFlush', 'USE_HFLUSH'), + mappingItem('不使用HSync', 'DISABLE_HSYNC'), ] let hudiTableTypeMapping = [ diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java index d22629c..d63eb1b 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/ConfigurationUtils.java @@ -114,9 +114,9 @@ public class ConfigurationUtils { configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false); } - if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) { - logger.info("Enable hflush"); - configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true); + if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_DISABLE_HSYNC)) { + logger.info("Disable hsync"); + configuration.setBoolean(HoodieWriteConfig.USE_HSYNC.key(), false); } configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default"); diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java index ddaae00..8ab894f 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java @@ -1,21 +1,14 @@ package com.lanyuanxiaoyao.service.sync.utils; -import cn.hutool.core.collection.ListUtil; -import cn.hutool.core.util.EnumUtil; -import cn.hutool.core.util.StrUtil; -import com.lanyuanxiaoyao.service.common.Constants; import com.lanyuanxiaoyao.service.common.entity.FlinkJob; import com.lanyuanxiaoyao.service.common.entity.Record; import com.lanyuanxiaoyao.service.common.entity.TableMeta; -import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper; import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration; import com.lanyuanxiaoyao.service.sync.functions.OperationTypeFilter; import com.lanyuanxiaoyao.service.sync.functions.Record2RowDataFunction; import com.lanyuanxiaoyao.service.sync.functions.type.TypeConverter; import java.util.List; import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; @@ -25,33 +18,14 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; -import org.apache.hudi.client.TraceWriteStatus; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.common.table.view.FileSystemViewStorageType; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieStorageConfig; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.config.metrics.HoodieMetricsConfig; -import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig; -import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; -import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator; -import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.org.apache.avro.Schema; import org.apache.hudi.sink.utils.Pipelines; -import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy; import org.apache.hudi.util.AvroSchemaConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.lanyuanxiaoyao.service.common.Constants.HOUR; - /** * Flink 相关的工具 *