feat(command-pro): 增加一个命令行工具用于直接操作hadoop等组件

基于微服务的命令行适合日常产品化运维操作,但能够直接操作Hadoop等组件,便于开发测试使用,因此增加一个模块用于开发过程中测试使用
This commit is contained in:
v-zhangjc9
2024-05-10 16:22:49 +08:00
parent d46cd5697c
commit 835cc6729b
17 changed files with 586 additions and 162 deletions

View File

@@ -96,7 +96,7 @@ public class Compactor {
}
GlobalConfiguration globalConfiguration = new GlobalConfiguration(signature, cluster, applicationId, tableMeta);
Configuration configuration = SyncUtils.getCompactionFlinkConfiguration(
Configuration configuration = ConfigurationUtils.getCompactionFlinkConfiguration(
globalConfiguration,
new Configuration(),
flinkJob,

View File

@@ -0,0 +1,196 @@
package com.lanyuanxiaoyao.service.sync.utils;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.EnumUtil;
import cn.hutool.core.util.StrUtil;
import com.lanyuanxiaoyao.service.common.Constants;
import com.lanyuanxiaoyao.service.common.entity.FlinkJob;
import com.lanyuanxiaoyao.service.common.entity.TableMeta;
import com.lanyuanxiaoyao.service.common.utils.TableMetaHelper;
import com.lanyuanxiaoyao.service.sync.configuration.GlobalConfiguration;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.hudi.client.TraceWriteStatus;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TraceOverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsVictoriaConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.DefaultPartitionNameKeyGenerator;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.org.apache.avro.Schema;
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.lanyuanxiaoyao.service.common.Constants.HOUR;
/**
* @author lanyuanxiaoyao
* @date 2024-05-10
*/
public class ConfigurationUtils {
private static final Logger logger = LoggerFactory.getLogger(ConfigurationUtils.class);
public static Configuration getSyncFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true);
configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name());
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl());
configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000);
configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList(
Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")),
Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()),
Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()),
Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias())
).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";")));
return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism);
}
public static Configuration getCompactionFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true);
configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name());
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl());
configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000);
configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList(
Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")),
Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()),
Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()),
Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias())
).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";")));
return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism);
}
public static Configuration getFlinkConfiguration(Configuration inputConfiguration, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
String tableType = tableMeta.getHudi().getTargetTableType();
logger.info("Hudi table type: {}", tableMeta.getHudi().getTargetTableType());
// 基本信息
configuration.setString(FlinkOptions.TABLE_NAME, tableMeta.getHudi().getTargetTable());
configuration.setString(FlinkOptions.TABLE_TYPE, tableType);
configuration.setString(FlinkOptions.PATH, tableMeta.getHudi().getTargetHdfsPath());
configuration.setString(FlinkOptions.RECORD_KEY_FIELD, Constants.UNION_KEY_NAME);
configuration.setBoolean(FlinkOptions.PRE_COMBINE, false);
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_PRE_COMBINE)) {
configuration.setBoolean(FlinkOptions.PRE_COMBINE, true);
}
configuration.setString(FlinkOptions.PRECOMBINE_FIELD, Constants.UPDATE_TIMESTAMP_KEY_NAME);
configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema.toString());
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_NO_IGNORE_FAILED)) {
configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false);
}
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) {
logger.info("Enable hflush");
configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true);
}
configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default");
configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, DefaultPartitionNameKeyGenerator.class.getName());
Optional<String> partitionPath = TableMetaHelper.getPartitionField(tableMeta);
logger.info("Partition field: {}", partitionPath.orElse(""));
if (partitionPath.isPresent()) {
configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get());
}
configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName());
configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName());
configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false);
configuration.setInteger(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), Integer.MAX_VALUE);
configuration.setString(FileSystemViewStorageConfig.SECONDARY_VIEW_TYPE.key(), FileSystemViewStorageType.SPILLABLE_DISK.name());
// Write
configuration.setInteger(FlinkOptions.WRITE_TASKS, tableMeta.getHudi().getWriteTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getWriteTasks());
configuration.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, 0);
configuration.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, tableMeta.getHudi().getWriteTaskMaxMemory() == 0 ? FlinkOptions.WRITE_TASK_MAX_SIZE.defaultValue() : tableMeta.getHudi().getWriteTaskMaxMemory());
configuration.setDouble(FlinkOptions.WRITE_BATCH_SIZE, tableMeta.getHudi().getWriteBatchSize() == 0 ? FlinkOptions.WRITE_BATCH_SIZE.defaultValue() : tableMeta.getHudi().getWriteBatchSize());
configuration.setLong(FlinkOptions.WRITE_RATE_LIMIT, tableMeta.getHudi().getWriteRateLimit());
configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, HOUR);
// 索引
configuration.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
configuration.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, tableMeta.getHudi().getBucketIndexNumber() == 0 ? 50 : tableMeta.getHudi().getBucketIndexNumber());
configuration.setString(FlinkOptions.INDEX_KEY_FIELD, Constants.UNION_KEY_NAME);
configuration.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false);
configuration.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, false);
configuration.setDouble(FlinkOptions.INDEX_STATE_TTL, -1);
// 增大 就 OOM
// configuration.setDouble(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 64 * M);
// 增大 就 OOM
// configuration.setDouble(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), 128 * M);
// Compaction
configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
if (EnumUtil.equals(HoodieTableType.COPY_ON_WRITE, tableType)) {
configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
}
configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks());
configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, StrUtil.isBlank(tableMeta.getHudi().getCompactionStrategy()) ? FlinkOptions.NUM_OR_TIME : tableMeta.getHudi().getCompactionStrategy());
configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds() == 0 ? 15 * 60 : tableMeta.getHudi().getCompactionDeltaSeconds());
configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits() == 0 ? 5 : tableMeta.getHudi().getCompactionDeltaCommits());
configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), UnBoundedCompactionStrategy.class.getName());
// configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), CombineAllCompactionStrategy.class.getName());
// configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
// configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getCompactionTasks());
// configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, tableMeta.getHudi().getCompactionMaxMemory());
// configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, tableMeta.getHudi().getCompactionStrategy());
// configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits());
// configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds());
// 时间线保留个数
configuration.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion());
// 时间线归档最小保留个数,要比上一个参数大
configuration.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 50);
// 时间线归档最大保留个数,要比上一个参数大
configuration.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 100);
// log文件和data文件保留版本数
configuration.setString(FlinkOptions.CLEAN_POLICY, HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name());
configuration.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, tableMeta.getHudi().getKeepFileVersion());
// 关闭一个内置的 http 服务
// configuration.setBoolean(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), false);
return configuration;
}
}

View File

@@ -70,159 +70,6 @@ public class SyncUtils {
return TypeConverter.getInstance(meta).convertToSchema(meta);
}
public static Configuration getSyncFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true);
configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name());
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl());
configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000);
configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList(
Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_SYNC),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")),
Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()),
Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()),
Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias())
).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";")));
return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism);
}
public static Configuration getCompactionFlinkConfiguration(GlobalConfiguration globalConfiguration, Configuration inputConfiguration, FlinkJob flinkJob, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
configuration.setBoolean(HoodieMetricsConfig.TURN_METRICS_ON.key(), true);
configuration.setString(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), MetricsReporterType.VICTORIA.name());
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_ENDPOINT.key(), globalConfiguration.getMetricPublishPrometheusUrl());
configuration.setInteger(HoodieMetricsVictoriaConfig.VICTORIA_TIMEOUT.key(), 60000);
configuration.setBoolean(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_ENABLE.key(), true);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_USERNAME.key(), Constants.VICTORIA_USERNAME);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_BASIC_AUTH_PASSWORD.key(), Constants.VICTORIA_PASSWORD);
configuration.setString(HoodieMetricsVictoriaConfig.VICTORIA_TAGS.key(), ListUtil.toList(
Pair.of(Constants.METRICS_LABEL_RUN_TYPE, Constants.METRICS_RUN_TYPE_COMPACTION),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_ID, flinkJob.getId()),
Pair.of(Constants.METRICS_LABEL_FLINK_JOB_NAME, flinkJob.getName().replaceAll("\\s", "_")),
Pair.of(Constants.METRICS_LABEL_SCHEMA, tableMeta.getSchema()),
Pair.of(Constants.METRICS_LABEL_TABLE, tableMeta.getTable()),
Pair.of(Constants.METRICS_LABEL_ALIAS, tableMeta.getAlias())
).stream().map(pair -> StrUtil.format("{}={}", pair.getLeft(), pair.getRight())).collect(Collectors.joining(";")));
return getFlinkConfiguration(configuration, tableMeta, schema, defaultParallelism);
}
public static Configuration getFlinkConfiguration(Configuration inputConfiguration, TableMeta tableMeta, Schema schema, Integer defaultParallelism) {
Configuration configuration = new Configuration();
if (inputConfiguration != null) {
configuration = inputConfiguration;
}
String tableType = tableMeta.getHudi().getTargetTableType();
logger.info("Hudi table type: {}", tableMeta.getHudi().getTargetTableType());
// 基本信息
configuration.setString(FlinkOptions.TABLE_NAME, tableMeta.getHudi().getTargetTable());
configuration.setString(FlinkOptions.TABLE_TYPE, tableType);
configuration.setString(FlinkOptions.PATH, tableMeta.getHudi().getTargetHdfsPath());
configuration.setString(FlinkOptions.RECORD_KEY_FIELD, Constants.UNION_KEY_NAME);
configuration.setBoolean(FlinkOptions.PRE_COMBINE, false);
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_PRE_COMBINE)) {
configuration.setBoolean(FlinkOptions.PRE_COMBINE, true);
}
configuration.setString(FlinkOptions.PRECOMBINE_FIELD, Constants.UPDATE_TIMESTAMP_KEY_NAME);
configuration.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema.toString());
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_NO_IGNORE_FAILED)) {
configuration.setBoolean(FlinkOptions.IGNORE_FAILED, false);
}
if (TableMetaHelper.existsTag(tableMeta, Constants.TAGS_USE_HFLUSH)) {
logger.info("Enable hflush");
configuration.setBoolean(HoodieWriteConfig.USE_HFLUSH.key(), true);
}
configuration.setString(FlinkOptions.PARTITION_DEFAULT_NAME, "default");
configuration.setString(FlinkOptions.KEYGEN_CLASS_NAME, DefaultPartitionNameKeyGenerator.class.getName());
Optional<String> partitionPath = TableMetaHelper.getPartitionField(tableMeta);
logger.info("Partition field: {}", partitionPath.orElse(""));
if (partitionPath.isPresent()) {
configuration.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPath.get());
}
configuration.setString(FlinkOptions.PAYLOAD_CLASS_NAME, TraceOverwriteWithLatestAvroPayload.class.getName());
configuration.setString(HoodieWriteConfig.WRITE_STATUS_CLASS_NAME.key(), TraceWriteStatus.class.getName());
configuration.setBoolean(FlinkOptions.METADATA_ENABLED, false);
configuration.setInteger(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), Integer.MAX_VALUE);
configuration.setString(FileSystemViewStorageConfig.SECONDARY_VIEW_TYPE.key(), FileSystemViewStorageType.SPILLABLE_DISK.name());
// Write
configuration.setInteger(FlinkOptions.WRITE_TASKS, tableMeta.getHudi().getWriteTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getWriteTasks());
configuration.setInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY, 0);
configuration.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, tableMeta.getHudi().getWriteTaskMaxMemory() == 0 ? FlinkOptions.WRITE_TASK_MAX_SIZE.defaultValue() : tableMeta.getHudi().getWriteTaskMaxMemory());
configuration.setDouble(FlinkOptions.WRITE_BATCH_SIZE, tableMeta.getHudi().getWriteBatchSize() == 0 ? FlinkOptions.WRITE_BATCH_SIZE.defaultValue() : tableMeta.getHudi().getWriteBatchSize());
configuration.setLong(FlinkOptions.WRITE_RATE_LIMIT, tableMeta.getHudi().getWriteRateLimit());
configuration.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, HOUR);
// 索引
configuration.setString(FlinkOptions.INDEX_TYPE, HoodieIndex.IndexType.BUCKET.name());
configuration.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, tableMeta.getHudi().getBucketIndexNumber() == 0 ? 50 : tableMeta.getHudi().getBucketIndexNumber());
configuration.setString(FlinkOptions.INDEX_KEY_FIELD, Constants.UNION_KEY_NAME);
configuration.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false);
configuration.setBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED, false);
configuration.setDouble(FlinkOptions.INDEX_STATE_TTL, -1);
// 增大 就 OOM
// configuration.setDouble(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 64 * M);
// 增大 就 OOM
// configuration.setDouble(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), 128 * M);
// Compaction
configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
if (EnumUtil.equals(HoodieTableType.COPY_ON_WRITE, tableType)) {
configuration.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
}
configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks());
configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, StrUtil.isBlank(tableMeta.getHudi().getCompactionStrategy()) ? FlinkOptions.NUM_OR_TIME : tableMeta.getHudi().getCompactionStrategy());
configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, 1024);
configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds() == 0 ? 15 * 60 : tableMeta.getHudi().getCompactionDeltaSeconds());
configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits() == 0 ? 5 : tableMeta.getHudi().getCompactionDeltaCommits());
configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), UnBoundedCompactionStrategy.class.getName());
// configuration.setString(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), CombineAllCompactionStrategy.class.getName());
// configuration.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, true);
// configuration.setInteger(FlinkOptions.COMPACTION_TASKS, tableMeta.getHudi().getCompactionTasks() == 0 ? defaultParallelism : tableMeta.getHudi().getCompactionTasks());
// configuration.setInteger(FlinkOptions.COMPACTION_MAX_MEMORY, tableMeta.getHudi().getCompactionMaxMemory());
// configuration.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY, tableMeta.getHudi().getCompactionStrategy());
// configuration.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, tableMeta.getHudi().getCompactionDeltaCommits());
// configuration.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS, tableMeta.getHudi().getCompactionDeltaSeconds());
// 时间线保留个数
configuration.setInteger(FlinkOptions.CLEAN_RETAIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion());
// 时间线归档最小保留个数,要比上一个参数大
configuration.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 50);
// 时间线归档最大保留个数,要比上一个参数大
configuration.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, tableMeta.getHudi().getKeepCommitVersion() + 100);
// log文件和data文件保留版本数
configuration.setString(FlinkOptions.CLEAN_POLICY, HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name());
configuration.setInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS, tableMeta.getHudi().getKeepFileVersion());
// 关闭一个内置的 http 服务
// configuration.setBoolean(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), false);
return configuration;
}
public static void sinkToHoodieByTable(GlobalConfiguration globalConfiguration, FlinkJob flinkJob, TableMeta tableMeta, StreamExecutionEnvironment environment, DataStream<Record> inputDataStream) {
Schema schema = avroSchemaWithExtraFields(tableMeta);
DataStream<RowData> dataStream = inputDataStream
@@ -245,7 +92,7 @@ public class SyncUtils {
);
Configuration configuration = tableEnvironment.getConfig().getConfiguration();
int parallelism = configuration.getInteger("parallelism", 1);
configuration = getSyncFlinkConfiguration(globalConfiguration, configuration, flinkJob, tableMeta, schema, parallelism);
configuration = ConfigurationUtils.getSyncFlinkConfiguration(globalConfiguration, configuration, flinkJob, tableMeta, schema, parallelism);
DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(configuration, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(configuration, parallelism, hoodieRecordDataStream);