feat(sync): 增加使用hflush的标记

This commit is contained in:
v-zhangjc9
2024-05-10 14:14:18 +08:00
parent d457b5d2f6
commit d46cd5697c
3 changed files with 7 additions and 1 deletions

View File

@@ -202,6 +202,7 @@ public interface Constants {
String TAGS_NO_PRE_COMBINE = "NO_PRE_COMBINE";
String TAGS_PRE_COMBINE = "PRE_COMBINE";
String TAGS_NO_IGNORE_FAILED = "NO_IGNORE_FAILED";
String TAGS_USE_HFLUSH = "USE_HFLUSH";
String TAGS_DISABLE_CHAINING = "DISABLE_CHAINING";
String TAGS_TRACE_LATEST_OP_TS = "TRACE_LATEST_OP_TS";
String TAGS_SOURCE_READER = "SOURCE_READER";

View File

@@ -2036,6 +2036,7 @@ let tagsMapping = [
mappingItem('不忽略写日志错误', 'NO_IGNORE_FAILED'),
mappingItem('取消算子合并', 'DISABLE_CHAINING'),
mappingItem('跟踪压缩op_ts', 'TRACE_LATEST_OP_TS'),
mappingItem('HFlush', 'USE_HFLUSH'),
]
let hudiTableTypeMapping = [

View File

@@ -59,7 +59,6 @@ import static com.lanyuanxiaoyao.service.common.Constants.HOUR;
* @version 0.0.2
* @date 2022-04-20
*/
@SuppressWarnings("UnusedAssignment")
public class SyncUtils {
private static final Logger logger = LoggerFactory.getLogger(SyncUtils.class);
@@ -144,6 +143,11 @@ public class SyncUtils {
configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false);
}
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) {
logger.info("Enable hflush");
configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true);
}
configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default");
configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, DefaultPartitionNameKeyGenerator.class.getName());