From d46cd5697c00564b5afeb5ee535515c7d7d30f24 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Fri, 10 May 2024 14:14:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(sync):=20=E5=A2=9E=E5=8A=A0=E4=BD=BF?= =?UTF-8?q?=E7=94=A8hflush=E7=9A=84=E6=A0=87=E8=AE=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/lanyuanxiaoyao/service/common/Constants.java | 1 + service-web/src/main/resources/static/components/common.js | 1 + .../com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java | 6 +++++- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index 8c4daac..27595ad 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -202,6 +202,7 @@ public interface Constants { String TAGS_NO_PRE_COMBINE = "NO_PRE_COMBINE"; String TAGS_PRE_COMBINE = "PRE_COMBINE"; String TAGS_NO_IGNORE_FAILED = "NO_IGNORE_FAILED"; + String TAGS_USE_HFLUSH = "USE_HFLUSH"; String TAGS_DISABLE_CHAINING = "DISABLE_CHAINING"; String TAGS_TRACE_LATEST_OP_TS = "TRACE_LATEST_OP_TS"; String TAGS_SOURCE_READER = "SOURCE_READER"; diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index 4685923..527b9a0 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -2036,6 +2036,7 @@ let tagsMapping = [ mappingItem('不忽略写日志错误', 'NO_IGNORE_FAILED'), mappingItem('取消算子合并', 'DISABLE_CHAINING'), mappingItem('跟踪压缩op_ts', 'TRACE_LATEST_OP_TS'), + mappingItem('HFlush', 'USE_HFLUSH'), ] let hudiTableTypeMapping = [ diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java index a6f10e3..3201e50 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java @@ -59,7 +59,6 @@ import static com.lanyuanxiaoyao.service.common.Constants.HOUR; * @version 0.0.2 * @date 2022-04-20 */ -@SuppressWarnings("UnusedAssignment") public class SyncUtils { private static final Logger logger = LoggerFactory.getLogger(SyncUtils.class); @@ -144,6 +143,11 @@ public class SyncUtils { configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false); } + if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) { + logger.info("Enable hflush"); + configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true); + } + configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default"); configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, DefaultPartitionNameKeyGenerator.class.getName());