[HUDI-2325] Add hive sync support to kafka connect (#3660)
Co-authored-by: Rajesh Mahindra <rmahindra@Rajeshs-MacBook-Pro.local>
This commit is contained in:
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.SerializationUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.connect.ControlMessage;
|
||||
import org.apache.hudi.connect.writers.KafkaConnectConfigs;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.keygen.BaseKeyGenerator;
|
||||
import org.apache.hudi.keygen.CustomAvroKeyGenerator;
|
||||
@@ -63,6 +64,7 @@ import java.util.stream.Collectors;
|
||||
public class KafkaConnectUtils {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(KafkaConnectUtils.class);
|
||||
private static final String HOODIE_CONF_PREFIX = "hoodie.";
|
||||
|
||||
public static int getLatestNumPartitions(String bootstrapServers, String topicName) {
|
||||
Properties props = new Properties();
|
||||
@@ -85,9 +87,15 @@ public class KafkaConnectUtils {
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static Configuration getDefaultHadoopConf() {
|
||||
public static Configuration getDefaultHadoopConf(KafkaConnectConfigs connectConfigs) {
|
||||
Configuration hadoopConf = new Configuration();
|
||||
hadoopConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||
connectConfigs.getProps().keySet().stream().filter(prop -> {
|
||||
// In order to prevent printing unnecessary warn logs, here filter out the hoodie
|
||||
// configuration items before passing to hadoop/hive configs
|
||||
return !prop.toString().startsWith(HOODIE_CONF_PREFIX);
|
||||
}).forEach(prop -> {
|
||||
hadoopConf.set(prop.toString(), connectConfigs.getProps().get(prop.toString()).toString());
|
||||
});
|
||||
return hadoopConf;
|
||||
}
|
||||
|
||||
|
||||
@@ -94,8 +94,6 @@ public class KafkaConnectConfigs extends HoodieConfig {
|
||||
|
||||
protected KafkaConnectConfigs(Properties props) {
|
||||
super(props);
|
||||
Properties newProps = new Properties();
|
||||
newProps.putAll(props);
|
||||
}
|
||||
|
||||
public static KafkaConnectConfigs.Builder newBuilder() {
|
||||
|
||||
@@ -18,31 +18,43 @@
|
||||
|
||||
package org.apache.hudi.connect.writers;
|
||||
|
||||
import org.apache.hudi.DataSourceUtils;
|
||||
import org.apache.hudi.client.HoodieJavaWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.engine.EngineType;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.connect.transaction.TransactionCoordinator;
|
||||
import org.apache.hudi.connect.utils.KafkaConnectUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.hive.HiveSyncConfig;
|
||||
import org.apache.hudi.hive.HiveSyncTool;
|
||||
import org.apache.hudi.keygen.KeyGenerator;
|
||||
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
|
||||
import org.apache.hudi.sync.common.AbstractSyncTool;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Implementation of Transaction service APIs used by
|
||||
@@ -53,10 +65,10 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class);
|
||||
|
||||
private final KafkaConnectConfigs connectConfigs;
|
||||
private final Option<HoodieTableMetaClient> tableMetaClient;
|
||||
private final Configuration hadoopConf;
|
||||
private final HoodieWriteConfig writeConfig;
|
||||
private final KafkaConnectConfigs connectConfigs;
|
||||
private final String tableBasePath;
|
||||
private final String tableName;
|
||||
private final HoodieEngineContext context;
|
||||
@@ -72,7 +84,7 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
|
||||
|
||||
tableBasePath = writeConfig.getBasePath();
|
||||
tableName = writeConfig.getTableName();
|
||||
hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
|
||||
hadoopConf = KafkaConnectUtils.getDefaultHadoopConf(connectConfigs);
|
||||
context = new HoodieJavaEngineContext(hadoopConf);
|
||||
|
||||
try {
|
||||
@@ -123,6 +135,7 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
|
||||
javaClient.scheduleCompaction(Option.empty()).ifPresent(
|
||||
instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs));
|
||||
}
|
||||
syncMeta();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -144,4 +157,44 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
|
||||
&& HoodieTableType.MERGE_ON_READ.equals(tableMetaClient.get().getTableType())
|
||||
&& connectConfigs.isAsyncCompactEnabled();
|
||||
}
|
||||
|
||||
private void syncMeta() {
|
||||
Set<String> syncClientToolClasses = new HashSet<>(
|
||||
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
|
||||
if (connectConfigs.isMetaSyncEnabled()) {
|
||||
for (String impl : syncClientToolClasses) {
|
||||
impl = impl.trim();
|
||||
switch (impl) {
|
||||
case "org.apache.hudi.hive.HiveSyncTool":
|
||||
syncHive();
|
||||
break;
|
||||
default:
|
||||
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
|
||||
Properties properties = new Properties();
|
||||
properties.putAll(connectConfigs.getProps());
|
||||
properties.put("basePath", tableBasePath);
|
||||
AbstractSyncTool syncTool = (AbstractSyncTool) ReflectionUtils.loadClass(impl, new Class[] {Properties.class, FileSystem.class}, properties, fs);
|
||||
syncTool.syncHoodieTable();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void syncHive() {
|
||||
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(
|
||||
new TypedProperties(connectConfigs.getProps()),
|
||||
tableBasePath,
|
||||
"PARQUET");
|
||||
LOG.info("Syncing target hoodie table with hive table("
|
||||
+ hiveSyncConfig.tableName
|
||||
+ "). Hive metastore URL :"
|
||||
+ hiveSyncConfig.jdbcUrl
|
||||
+ ", basePath :" + tableBasePath);
|
||||
LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
|
||||
FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConf);
|
||||
HiveConf hiveConf = new HiveConf();
|
||||
hiveConf.addResource(fs.getConf());
|
||||
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
|
||||
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ public class KafkaConnectWriterProvider implements ConnectWriterProvider<WriteSt
|
||||
KafkaConnectConfigs connectConfigs,
|
||||
TopicPartition partition) throws HoodieException {
|
||||
this.connectConfigs = connectConfigs;
|
||||
Configuration hadoopConf = KafkaConnectUtils.getDefaultHadoopConf();
|
||||
Configuration hadoopConf = KafkaConnectUtils.getDefaultHadoopConf(connectConfigs);
|
||||
|
||||
try {
|
||||
this.schemaProvider = StringUtils.isNullOrEmpty(connectConfigs.getSchemaProviderClass()) ? null
|
||||
|
||||
Reference in New Issue
Block a user