[HUDI-3112] Fix KafkaConnect cannot sync to Hive Problem (#4458)
This commit is contained in:
@@ -32,6 +32,8 @@ import org.apache.hudi.common.util.StringUtils;
|
|||||||
import org.apache.hudi.connect.ControlMessage;
|
import org.apache.hudi.connect.ControlMessage;
|
||||||
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
|
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.hive.HiveSyncConfig;
|
||||||
|
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
|
||||||
import org.apache.hudi.keygen.BaseKeyGenerator;
|
import org.apache.hudi.keygen.BaseKeyGenerator;
|
||||||
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
|
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
|
||||||
import org.apache.hudi.keygen.CustomKeyGenerator;
|
import org.apache.hudi.keygen.CustomKeyGenerator;
|
||||||
@@ -57,6 +59,7 @@ import java.security.MessageDigest;
|
|||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
@@ -266,4 +269,32 @@ public class KafkaConnectUtils {
|
|||||||
ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
|
ControlMessage.ConnectWriteStatus connectWriteStatus = participantInfo.getWriteStatus();
|
||||||
return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
|
return SerializationUtils.deserialize(connectWriteStatus.getSerializedWriteStatus().toByteArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build Hive Sync Config
|
||||||
|
* Note: This method is a temporary solution.
|
||||||
|
* Future solutions can be referred to: https://issues.apache.org/jira/browse/HUDI-3199
|
||||||
|
*/
|
||||||
|
public static HiveSyncConfig buildSyncConfig(TypedProperties props, String tableBasePath) {
|
||||||
|
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
|
||||||
|
hiveSyncConfig.basePath = tableBasePath;
|
||||||
|
hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(KafkaConnectConfigs.HIVE_USE_PRE_APACHE_INPUT_FORMAT, false);
|
||||||
|
hiveSyncConfig.databaseName = props.getString(KafkaConnectConfigs.HIVE_DATABASE, "default");
|
||||||
|
hiveSyncConfig.tableName = props.getString(KafkaConnectConfigs.HIVE_TABLE, "");
|
||||||
|
hiveSyncConfig.hiveUser = props.getString(KafkaConnectConfigs.HIVE_USER, "");
|
||||||
|
hiveSyncConfig.hivePass = props.getString(KafkaConnectConfigs.HIVE_PASS, "");
|
||||||
|
hiveSyncConfig.jdbcUrl = props.getString(KafkaConnectConfigs.HIVE_URL, "");
|
||||||
|
hiveSyncConfig.partitionFields = props.getStringList(KafkaConnectConfigs.HIVE_PARTITION_FIELDS, ",", Collections.emptyList());
|
||||||
|
hiveSyncConfig.partitionValueExtractorClass =
|
||||||
|
props.getString(KafkaConnectConfigs.HIVE_PARTITION_EXTRACTOR_CLASS, SlashEncodedDayPartitionValueExtractor.class.getName());
|
||||||
|
hiveSyncConfig.useJdbc = props.getBoolean(KafkaConnectConfigs.HIVE_USE_JDBC, true);
|
||||||
|
if (props.containsKey(KafkaConnectConfigs.HIVE_SYNC_MODE)) {
|
||||||
|
hiveSyncConfig.syncMode = props.getString(KafkaConnectConfigs.HIVE_SYNC_MODE);
|
||||||
|
}
|
||||||
|
hiveSyncConfig.autoCreateDatabase = props.getBoolean(KafkaConnectConfigs.HIVE_AUTO_CREATE_DATABASE, true);
|
||||||
|
hiveSyncConfig.ignoreExceptions = props.getBoolean(KafkaConnectConfigs.HIVE_IGNORE_EXCEPTIONS, false);
|
||||||
|
hiveSyncConfig.skipROSuffix = props.getBoolean(KafkaConnectConfigs.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE, false);
|
||||||
|
hiveSyncConfig.supportTimestamp = props.getBoolean(KafkaConnectConfigs.HIVE_SUPPORT_TIMESTAMP_TYPE, false);
|
||||||
|
return hiveSyncConfig;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -164,6 +164,22 @@ public class KafkaConnectConfigs extends HoodieConfig {
|
|||||||
return getString(HADOOP_HOME);
|
return getString(HADOOP_HOME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final String HIVE_USE_PRE_APACHE_INPUT_FORMAT = "hoodie.datasource.hive_sync.use_pre_apache_input_format";
|
||||||
|
public static final String HIVE_DATABASE = "hoodie.datasource.hive_sync.database";
|
||||||
|
public static final String HIVE_TABLE = "hoodie.datasource.hive_sync.table";
|
||||||
|
public static final String HIVE_USER = "hoodie.datasource.hive_sync.username";
|
||||||
|
public static final String HIVE_PASS = "hoodie.datasource.hive_sync.password";
|
||||||
|
public static final String HIVE_URL = "hoodie.datasource.hive_sync.jdbcurl";
|
||||||
|
public static final String HIVE_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
|
||||||
|
public static final String HIVE_PARTITION_EXTRACTOR_CLASS = "hoodie.datasource.hive_sync.partition_extractor_class";
|
||||||
|
public static final String HIVE_USE_JDBC = "hoodie.datasource.hive_sync.use_jdbc";
|
||||||
|
public static final String HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
|
||||||
|
public static final String HIVE_AUTO_CREATE_DATABASE = "hoodie.datasource.hive_sync.auto_create_database";
|
||||||
|
public static final String HIVE_IGNORE_EXCEPTIONS = "hoodie.datasource.hive_sync.ignore_exceptions";
|
||||||
|
public static final String HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE = "hoodie.datasource.hive_sync.skip_ro_suffix";
|
||||||
|
public static final String HIVE_SUPPORT_TIMESTAMP_TYPE = "hoodie.datasource.hive_sync.support_timestamp";
|
||||||
|
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
|
|
||||||
protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs();
|
protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs();
|
||||||
|
|||||||
@@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hudi.connect.writers;
|
package org.apache.hudi.connect.writers;
|
||||||
|
|
||||||
import org.apache.hudi.DataSourceUtils;
|
|
||||||
import org.apache.hudi.client.HoodieJavaWriteClient;
|
import org.apache.hudi.client.HoodieJavaWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||||
@@ -32,12 +31,14 @@ import org.apache.hudi.common.model.HoodieTableType;
|
|||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ReflectionUtils;
|
import org.apache.hudi.common.util.ReflectionUtils;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.connect.transaction.TransactionCoordinator;
|
import org.apache.hudi.connect.transaction.TransactionCoordinator;
|
||||||
import org.apache.hudi.connect.utils.KafkaConnectUtils;
|
import org.apache.hudi.connect.utils.KafkaConnectUtils;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.hive.HiveSyncConfig;
|
import org.apache.hudi.hive.HiveSyncConfig;
|
||||||
import org.apache.hudi.hive.HiveSyncTool;
|
import org.apache.hudi.hive.HiveSyncTool;
|
||||||
|
import org.apache.hudi.hive.ddl.HiveSyncMode;
|
||||||
import org.apache.hudi.keygen.KeyGenerator;
|
import org.apache.hudi.keygen.KeyGenerator;
|
||||||
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
|
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
|
||||||
import org.apache.hudi.sync.common.AbstractSyncTool;
|
import org.apache.hudi.sync.common.AbstractSyncTool;
|
||||||
@@ -163,9 +164,9 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void syncMeta() {
|
private void syncMeta() {
|
||||||
|
if (connectConfigs.isMetaSyncEnabled()) {
|
||||||
Set<String> syncClientToolClasses = new HashSet<>(
|
Set<String> syncClientToolClasses = new HashSet<>(
|
||||||
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
|
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
|
||||||
if (connectConfigs.isMetaSyncEnabled()) {
|
|
||||||
for (String impl : syncClientToolClasses) {
|
for (String impl : syncClientToolClasses) {
|
||||||
impl = impl.trim();
|
impl = impl.trim();
|
||||||
switch (impl) {
|
switch (impl) {
|
||||||
@@ -185,16 +186,20 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void syncHive() {
|
private void syncHive() {
|
||||||
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(
|
HiveSyncConfig hiveSyncConfig = KafkaConnectUtils.buildSyncConfig(new TypedProperties(connectConfigs.getProps()), tableBasePath);
|
||||||
new TypedProperties(connectConfigs.getProps()),
|
String url;
|
||||||
tableBasePath,
|
if (!StringUtils.isNullOrEmpty(hiveSyncConfig.syncMode) && HiveSyncMode.of(hiveSyncConfig.syncMode) == HiveSyncMode.HMS) {
|
||||||
"PARQUET");
|
url = hadoopConf.get(KafkaConnectConfigs.HIVE_METASTORE_URIS);
|
||||||
|
} else {
|
||||||
|
url = hiveSyncConfig.jdbcUrl;
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Syncing target hoodie table with hive table("
|
LOG.info("Syncing target hoodie table with hive table("
|
||||||
+ hiveSyncConfig.tableName
|
+ hiveSyncConfig.tableName
|
||||||
+ "). Hive metastore URL :"
|
+ "). Hive URL :"
|
||||||
+ hiveSyncConfig.jdbcUrl
|
+ url
|
||||||
+ ", basePath :" + tableBasePath);
|
+ ", basePath :" + tableBasePath);
|
||||||
LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
|
LOG.info("Hive Sync Conf => " + hiveSyncConfig);
|
||||||
FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
|
FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
|
||||||
HiveConf hiveConf = new HiveConf();
|
HiveConf hiveConf = new HiveConf();
|
||||||
hiveConf.addResource(fs.getConf());
|
hiveConf.addResource(fs.getConf());
|
||||||
|
|||||||
Reference in New Issue
Block a user