diff --git a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java index 8c4daac..27595ad 100644 --- a/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java +++ b/service-common/src/main/java/com/lanyuanxiaoyao/service/common/Constants.java @@ -202,6 +202,7 @@ public interface Constants { String TAGS_NO_PRE_COMBINE = "NO_PRE_COMBINE"; String TAGS_PRE_COMBINE = "PRE_COMBINE"; String TAGS_NO_IGNORE_FAILED = "NO_IGNORE_FAILED"; + String TAGS_USE_HFLUSH = "USE_HFLUSH"; String TAGS_DISABLE_CHAINING = "DISABLE_CHAINING"; String TAGS_TRACE_LATEST_OP_TS = "TRACE_LATEST_OP_TS"; String TAGS_SOURCE_READER = "SOURCE_READER"; diff --git a/service-web/src/main/resources/static/components/common.js b/service-web/src/main/resources/static/components/common.js index 4685923..527b9a0 100644 --- a/service-web/src/main/resources/static/components/common.js +++ b/service-web/src/main/resources/static/components/common.js @@ -2036,6 +2036,7 @@ let tagsMapping = [ mappingItem('不忽略写日志错误', 'NO_IGNORE_FAILED'), mappingItem('取消算子合并', 'DISABLE_CHAINING'), mappingItem('跟踪压缩op_ts', 'TRACE_LATEST_OP_TS'), + mappingItem('HFlush', 'USE_HFLUSH'), ] let hudiTableTypeMapping = [ diff --git a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java index a6f10e3..3201e50 100644 --- a/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java +++ b/utils/sync/src/main/java/com/lanyuanxiaoyao/service/sync/utils/SyncUtils.java @@ -59,7 +59,6 @@ import static com.lanyuanxiaoyao.service.common.Constants.HOUR; * @version 0.0.2 * @date 2022-04-20 */ -@SuppressWarnings("UnusedAssignment") public class SyncUtils { private static final Logger logger = LoggerFactory.getLogger(SyncUtils.class); @@ -144,6 +143,11 @@ public class SyncUtils { configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false); } + if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) { + logger.info("Enable hflush"); + configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true); + } + configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default"); configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, DefaultPartitionNameKeyGenerator.class.getName());