[HUDI-3980] Suport kerberos hbase index (#5464)
- Add configurations in HoodieHBaseIndexConfig.java to support kerberos hbase connection. Co-authored-by: xicm <xicm@asiainfo.com>
This commit is contained in:
@@ -157,6 +157,33 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
|
||||
.withDocumentation("When set to true, the rollback method will delete the last failed task index. "
|
||||
+ "The default value is false. Because deleting the index will add extra load on the Hbase cluster for each rollback");
|
||||
|
||||
public static final ConfigProperty<String> SECURITY_AUTHENTICATION = ConfigProperty
|
||||
.key("hoodie.index.hbase.security.authentication")
|
||||
.defaultValue("simple")
|
||||
.withDocumentation("Property to decide if the hbase cluster secure authentication is enabled or not. "
|
||||
+ "Possible values are 'simple' (no authentication), and 'kerberos'.");
|
||||
|
||||
public static final ConfigProperty<String> KERBEROS_USER_KEYTAB = ConfigProperty
|
||||
.key("hoodie.index.hbase.kerberos.user.keytab")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("File name of the kerberos keytab file for connecting to the hbase cluster.");
|
||||
|
||||
public static final ConfigProperty<String> KERBEROS_USER_PRINCIPAL = ConfigProperty
|
||||
.key("hoodie.index.hbase.kerberos.user.principal")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("The kerberos principal name for connecting to the hbase cluster.");
|
||||
|
||||
public static final ConfigProperty<String> REGIONSERVER_PRINCIPAL = ConfigProperty
|
||||
.key("hoodie.index.hbase.regionserver.kerberos.principal")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("The value of hbase.regionserver.kerberos.principal in hbase cluster.");
|
||||
|
||||
public static final ConfigProperty<String> MASTER_PRINCIPAL = ConfigProperty
|
||||
.key("hoodie.index.hbase.master.kerberos.principal")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("The value of hbase.master.kerberos.principal in hbase cluster.");
|
||||
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link #ZKQUORUM} and its methods instead
|
||||
*/
|
||||
@@ -444,6 +471,31 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder hbaseSecurityAuthentication(String authentication) {
|
||||
hBaseIndexConfig.setValue(SECURITY_AUTHENTICATION, authentication);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder hbaseKerberosUserKeytab(String keytab) {
|
||||
hBaseIndexConfig.setValue(KERBEROS_USER_KEYTAB, keytab);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder hbaseKerberosUserPrincipal(String principal) {
|
||||
hBaseIndexConfig.setValue(KERBEROS_USER_PRINCIPAL, principal);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder hbaseKerberosRegionserverPrincipal(String principal) {
|
||||
hBaseIndexConfig.setValue(REGIONSERVER_PRINCIPAL, principal);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder hbaseKerberosMasterPrincipal(String principal) {
|
||||
hBaseIndexConfig.setValue(MASTER_PRINCIPAL, principal);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Method to set maximum QPS allowed per Region Server. This should be same across various jobs. This is intended to
|
||||
|
||||
@@ -1488,6 +1488,26 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return getBoolean(HoodieHBaseIndexConfig.COMPUTE_QPS_DYNAMICALLY);
|
||||
}
|
||||
|
||||
public String getHBaseIndexSecurityAuthentication() {
|
||||
return getString(HoodieHBaseIndexConfig.SECURITY_AUTHENTICATION);
|
||||
}
|
||||
|
||||
public String getHBaseIndexKerberosUserKeytab() {
|
||||
return getString(HoodieHBaseIndexConfig.KERBEROS_USER_KEYTAB);
|
||||
}
|
||||
|
||||
public String getHBaseIndexKerberosUserPrincipal() {
|
||||
return getString(HoodieHBaseIndexConfig.KERBEROS_USER_PRINCIPAL);
|
||||
}
|
||||
|
||||
public String getHBaseIndexRegionserverPrincipal() {
|
||||
return getString(HoodieHBaseIndexConfig.REGIONSERVER_PRINCIPAL);
|
||||
}
|
||||
|
||||
public String getHBaseIndexMasterPrincipal() {
|
||||
return getString(HoodieHBaseIndexConfig.MASTER_PRINCIPAL);
|
||||
}
|
||||
|
||||
public int getHBaseIndexDesiredPutsTime() {
|
||||
return getInt(HoodieHBaseIndexConfig.DESIRED_PUTS_TIME_IN_SECONDS);
|
||||
}
|
||||
|
||||
@@ -42,7 +42,6 @@ import org.apache.hudi.exception.HoodieDependentSystemUnavailableException;
|
||||
import org.apache.hudi.exception.HoodieIndexException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
@@ -60,10 +59,12 @@ import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.Partitioner;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.SparkFiles;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
@@ -72,6 +73,7 @@ import org.joda.time.DateTime;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
@@ -150,9 +152,28 @@ public class SparkHoodieHBaseIndex extends HoodieIndex<Object, Object> {
|
||||
}
|
||||
String port = String.valueOf(config.getHbaseZkPort());
|
||||
hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
|
||||
|
||||
try {
|
||||
return ConnectionFactory.createConnection(hbaseConfig);
|
||||
} catch (IOException e) {
|
||||
String authentication = config.getHBaseIndexSecurityAuthentication();
|
||||
if (authentication.equals("kerberos")) {
|
||||
hbaseConfig.set("hbase.security.authentication", "kerberos");
|
||||
hbaseConfig.set("hadoop.security.authentication", "kerberos");
|
||||
hbaseConfig.set("hbase.security.authorization", "true");
|
||||
hbaseConfig.set("hbase.regionserver.kerberos.principal", config.getHBaseIndexRegionserverPrincipal());
|
||||
hbaseConfig.set("hbase.master.kerberos.principal", config.getHBaseIndexMasterPrincipal());
|
||||
|
||||
String principal = config.getHBaseIndexKerberosUserPrincipal();
|
||||
String keytab = SparkFiles.get(config.getHBaseIndexKerberosUserKeytab());
|
||||
|
||||
UserGroupInformation.setConfiguration(hbaseConfig);
|
||||
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
|
||||
return ugi.doAs((PrivilegedExceptionAction<Connection>) () ->
|
||||
(Connection) ConnectionFactory.createConnection(hbaseConfig)
|
||||
);
|
||||
} else {
|
||||
return ConnectionFactory.createConnection(hbaseConfig);
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
throw new HoodieDependentSystemUnavailableException(HoodieDependentSystemUnavailableException.HBASE,
|
||||
quorum + ":" + port, e);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user