This reverts commit d5c904e10e.
This commit is contained in:
@@ -88,7 +88,6 @@ stages:
|
|||||||
- stage: test
|
- stage: test
|
||||||
jobs:
|
jobs:
|
||||||
- job: UT_FT_1
|
- job: UT_FT_1
|
||||||
condition: false
|
|
||||||
displayName: UT FT common & flink & UT client/spark-client
|
displayName: UT FT common & flink & UT client/spark-client
|
||||||
timeoutInMinutes: '120'
|
timeoutInMinutes: '120'
|
||||||
steps:
|
steps:
|
||||||
@@ -119,7 +118,6 @@ stages:
|
|||||||
jdkVersionOption: '1.8'
|
jdkVersionOption: '1.8'
|
||||||
mavenOptions: '-Xmx4g'
|
mavenOptions: '-Xmx4g'
|
||||||
- job: UT_FT_2
|
- job: UT_FT_2
|
||||||
condition: false
|
|
||||||
displayName: FT client/spark-client
|
displayName: FT client/spark-client
|
||||||
timeoutInMinutes: '120'
|
timeoutInMinutes: '120'
|
||||||
steps:
|
steps:
|
||||||
@@ -171,7 +169,6 @@ stages:
|
|||||||
jdkVersionOption: '1.8'
|
jdkVersionOption: '1.8'
|
||||||
mavenOptions: '-Xmx4g'
|
mavenOptions: '-Xmx4g'
|
||||||
- job: UT_FT_4
|
- job: UT_FT_4
|
||||||
condition: false
|
|
||||||
displayName: UT FT other modules
|
displayName: UT FT other modules
|
||||||
timeoutInMinutes: '120'
|
timeoutInMinutes: '120'
|
||||||
steps:
|
steps:
|
||||||
@@ -202,7 +199,6 @@ stages:
|
|||||||
jdkVersionOption: '1.8'
|
jdkVersionOption: '1.8'
|
||||||
mavenOptions: '-Xmx4g'
|
mavenOptions: '-Xmx4g'
|
||||||
- job: IT
|
- job: IT
|
||||||
condition: false
|
|
||||||
displayName: IT modules
|
displayName: IT modules
|
||||||
timeoutInMinutes: '120'
|
timeoutInMinutes: '120'
|
||||||
steps:
|
steps:
|
||||||
|
|||||||
@@ -55,7 +55,6 @@ import java.util.UUID;
|
|||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
||||||
import static org.apache.hudi.hive.testutils.HiveTestService.HS2_JDBC_URL;
|
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
|
||||||
@@ -181,7 +180,7 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase {
|
|||||||
// Make path selection test suite specific
|
// Make path selection test suite specific
|
||||||
props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName());
|
props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName());
|
||||||
// Hive Configs
|
// Hive Configs
|
||||||
props.setProperty(HIVE_URL.key(), HS2_JDBC_URL);
|
props.setProperty(HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
|
||||||
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
|
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
|
||||||
props.setProperty(META_SYNC_TABLE_NAME.key(), "table1");
|
props.setProperty(META_SYNC_TABLE_NAME.key(), "table1");
|
||||||
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
|
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
|
||||||
|
|||||||
@@ -63,8 +63,7 @@ public class HiveSyncConfig extends HoodieSyncConfig {
|
|||||||
|
|
||||||
public HiveSyncConfig(Properties props, Configuration hadoopConf) {
|
public HiveSyncConfig(Properties props, Configuration hadoopConf) {
|
||||||
super(props, hadoopConf);
|
super(props, hadoopConf);
|
||||||
HiveConf hiveConf = hadoopConf instanceof HiveConf
|
HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
|
||||||
? (HiveConf) hadoopConf : new HiveConf(hadoopConf, HiveConf.class);
|
|
||||||
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
|
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
|
||||||
hiveConf.addResource(getHadoopFileSystem().getConf());
|
hiveConf.addResource(getHadoopFileSystem().getConf());
|
||||||
setHadoopConf(hiveConf);
|
setHadoopConf(hiveConf);
|
||||||
|
|||||||
@@ -19,7 +19,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.hive.replication;
|
package org.apache.hudi.hive.replication;
|
||||||
|
|
||||||
import org.apache.hudi.hive.testutils.HiveTestCluster;
|
import org.apache.hudi.hive.testutils.TestCluster;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
@@ -53,9 +53,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
|||||||
public class TestHiveSyncGlobalCommitTool {
|
public class TestHiveSyncGlobalCommitTool {
|
||||||
|
|
||||||
@RegisterExtension
|
@RegisterExtension
|
||||||
public static HiveTestCluster localCluster = new HiveTestCluster();
|
public static TestCluster localCluster = new TestCluster();
|
||||||
@RegisterExtension
|
@RegisterExtension
|
||||||
public static HiveTestCluster remoteCluster = new HiveTestCluster();
|
public static TestCluster remoteCluster = new TestCluster();
|
||||||
|
|
||||||
private static final String DB_NAME = "foo";
|
private static final String DB_NAME = "foo";
|
||||||
private static final String TBL_NAME = "bar";
|
private static final String TBL_NAME = "bar";
|
||||||
|
|||||||
@@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.hive.testutils;
|
package org.apache.hudi.hive.testutils;
|
||||||
|
|
||||||
import org.apache.hudi.common.testutils.NetworkTestUtils;
|
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||||
import org.apache.hudi.common.util.FileIOUtils;
|
import org.apache.hudi.common.util.FileIOUtils;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@@ -62,40 +62,71 @@ import java.util.concurrent.Executors;
|
|||||||
public class HiveTestService {
|
public class HiveTestService {
|
||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(HiveTestService.class);
|
private static final Logger LOG = LogManager.getLogger(HiveTestService.class);
|
||||||
private static final int CONNECTION_TIMEOUT_MS = 30000;
|
|
||||||
private static final String BIND_HOST = "127.0.0.1";
|
|
||||||
private static final int HS2_THRIFT_PORT = 9999;
|
|
||||||
public static final String HS2_JDBC_URL = String.format("jdbc:hive2://%s:%s/", BIND_HOST, HS2_THRIFT_PORT);
|
|
||||||
|
|
||||||
private final Configuration hadoopConf;
|
private static final int CONNECTION_TIMEOUT = 30000;
|
||||||
private final String workDir;
|
|
||||||
private final Map<String, String> sysProps = new HashMap<>();
|
/**
|
||||||
|
* Configuration settings.
|
||||||
|
*/
|
||||||
|
private Configuration hadoopConf;
|
||||||
|
private String workDir;
|
||||||
|
private String bindIP = "127.0.0.1";
|
||||||
|
private int metastorePort = 9083;
|
||||||
|
private int serverPort = 9999;
|
||||||
|
private boolean clean = true;
|
||||||
|
|
||||||
|
private Map<String, String> sysProps = new HashMap<>();
|
||||||
private ExecutorService executorService;
|
private ExecutorService executorService;
|
||||||
private TServer tServer;
|
private TServer tServer;
|
||||||
private HiveServer2 hiveServer;
|
private HiveServer2 hiveServer;
|
||||||
private HiveConf hiveConf;
|
private HiveConf serverConf;
|
||||||
|
|
||||||
public HiveTestService(Configuration hadoopConf) throws IOException {
|
public HiveTestService(Configuration hadoopConf) throws IOException {
|
||||||
this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath();
|
this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath();
|
||||||
this.hadoopConf = hadoopConf;
|
this.hadoopConf = hadoopConf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Configuration getHadoopConf() {
|
||||||
|
return hadoopConf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TServer getHiveMetaStore() {
|
||||||
|
return tServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HiveConf getServerConf() {
|
||||||
|
return serverConf;
|
||||||
|
}
|
||||||
|
|
||||||
public HiveServer2 start() throws IOException {
|
public HiveServer2 start() throws IOException {
|
||||||
Objects.requireNonNull(workDir, "The work dir must be set before starting cluster.");
|
Objects.requireNonNull(workDir, "The work dir must be set before starting cluster.");
|
||||||
|
|
||||||
String localHiveLocation = getHiveLocation(workDir);
|
if (hadoopConf == null) {
|
||||||
LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
|
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
||||||
File file = new File(localHiveLocation);
|
}
|
||||||
FileIOUtils.deleteDirectory(file);
|
|
||||||
|
|
||||||
hiveConf = configureHive(hadoopConf, localHiveLocation);
|
String localHiveLocation = getHiveLocation(workDir);
|
||||||
|
if (clean) {
|
||||||
|
LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
|
||||||
|
File file = new File(localHiveLocation);
|
||||||
|
FileIOUtils.deleteDirectory(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
serverConf = configureHive(hadoopConf, localHiveLocation);
|
||||||
|
|
||||||
executorService = Executors.newSingleThreadExecutor();
|
executorService = Executors.newSingleThreadExecutor();
|
||||||
tServer = startMetaStore(hiveConf);
|
tServer = startMetaStore(bindIP, serverConf);
|
||||||
|
|
||||||
hiveServer = startHiveServer(hiveConf);
|
serverConf.set("hive.in.test", "true");
|
||||||
|
hiveServer = startHiveServer(serverConf);
|
||||||
|
|
||||||
if (!waitForServerUp(hiveConf)) {
|
String serverHostname;
|
||||||
|
if (bindIP.equals("0.0.0.0")) {
|
||||||
|
serverHostname = "localhost";
|
||||||
|
} else {
|
||||||
|
serverHostname = bindIP;
|
||||||
|
}
|
||||||
|
if (!waitForServerUp(serverConf, serverHostname, CONNECTION_TIMEOUT)) {
|
||||||
throw new IOException("Waiting for startup of standalone server");
|
throw new IOException("Waiting for startup of standalone server");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -125,69 +156,76 @@ public class HiveTestService {
|
|||||||
LOG.info("Hive Minicluster service shut down.");
|
LOG.info("Hive Minicluster service shut down.");
|
||||||
tServer = null;
|
tServer = null;
|
||||||
hiveServer = null;
|
hiveServer = null;
|
||||||
|
hadoopConf = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HiveServer2 getHiveServer() {
|
public HiveServer2 getHiveServer() {
|
||||||
return hiveServer;
|
return hiveServer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HiveConf getHiveConf() {
|
|
||||||
return hiveConf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getHiveServerPort() {
|
public int getHiveServerPort() {
|
||||||
return hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT);
|
return serverPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getJdbcHive2Url() {
|
public String getJdbcHive2Url() {
|
||||||
return String.format("jdbc:hive2://%s:%s/",
|
return String.format("jdbc:hive2://%s:%s/default", bindIP, serverPort);
|
||||||
hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST), hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public HiveConf configureHive(Configuration hadoopConf, String localHiveLocation) throws IOException {
|
public HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException {
|
||||||
hadoopConf.set("hive.metastore.local", "false");
|
conf.set("hive.metastore.local", "false");
|
||||||
hadoopConf.set("datanucleus.schema.autoCreateTables", "true");
|
int port = metastorePort;
|
||||||
hadoopConf.set("datanucleus.autoCreateSchema", "true");
|
if (conf.get(HiveConf.ConfVars.METASTORE_SERVER_PORT.varname, null) == null) {
|
||||||
hadoopConf.set("datanucleus.fixedDatastore", "false");
|
conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort);
|
||||||
HiveConf conf = new HiveConf(hadoopConf, HiveConf.class);
|
} else {
|
||||||
conf.setBoolVar(ConfVars.HIVE_IN_TEST, true);
|
port = conf.getInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort);
|
||||||
conf.setBoolVar(ConfVars.METASTORE_SCHEMA_VERIFICATION, false);
|
}
|
||||||
conf.setIntVar(ConfVars.HIVE_SERVER2_THRIFT_PORT, HS2_THRIFT_PORT);
|
if (conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, null) == null) {
|
||||||
conf.setVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST, BIND_HOST);
|
conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
|
||||||
final int metastoreServerPort = NetworkTestUtils.nextFreePort();
|
}
|
||||||
conf.setIntVar(ConfVars.METASTORE_SERVER_PORT, metastoreServerPort);
|
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + port);
|
||||||
conf.setVar(ConfVars.METASTOREURIS, "thrift://" + BIND_HOST + ":" + metastoreServerPort);
|
conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP);
|
||||||
|
// The following line to turn of SASL has no effect since HiveAuthFactory calls
|
||||||
|
// 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657,
|
||||||
|
// in Hive 0.14.
|
||||||
|
// As a workaround, the property is set in hive-site.xml in this module.
|
||||||
|
// conf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "NOSASL");
|
||||||
File localHiveDir = new File(localHiveLocation);
|
File localHiveDir = new File(localHiveLocation);
|
||||||
localHiveDir.mkdirs();
|
localHiveDir.mkdirs();
|
||||||
File metastoreDbDir = new File(localHiveDir, "metastore_db");
|
File metastoreDbDir = new File(localHiveDir, "metastore_db");
|
||||||
conf.setVar(ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:" + metastoreDbDir.getPath() + ";create=true");
|
conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname,
|
||||||
|
"jdbc:derby:" + metastoreDbDir.getPath() + ";create=true");
|
||||||
File derbyLogFile = new File(localHiveDir, "derby.log");
|
File derbyLogFile = new File(localHiveDir, "derby.log");
|
||||||
derbyLogFile.createNewFile();
|
derbyLogFile.createNewFile();
|
||||||
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
|
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
|
||||||
setSystemProperty("derby.system.home", localHiveDir.getAbsolutePath());
|
setSystemProperty("derby.system.home", localHiveDir.getAbsolutePath());
|
||||||
File metastoreWarehouseDir = new File(localHiveDir, "warehouse");
|
conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
|
||||||
metastoreWarehouseDir.mkdir();
|
Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath());
|
||||||
conf.setVar(ConfVars.METASTOREWAREHOUSE, metastoreWarehouseDir.getAbsolutePath());
|
conf.set("datanucleus.schema.autoCreateTables", "true");
|
||||||
|
conf.set("hive.metastore.schema.verification", "false");
|
||||||
|
conf.set("datanucleus.autoCreateSchema", "true");
|
||||||
|
conf.set("datanucleus.fixedDatastore", "false");
|
||||||
|
setSystemProperty("derby.stream.error.file", derbyLogFile.getPath());
|
||||||
|
|
||||||
return conf;
|
return new HiveConf(conf, this.getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean waitForServerUp(HiveConf serverConf) {
|
private boolean waitForServerUp(HiveConf serverConf, String hostname, int timeout) {
|
||||||
LOG.info("waiting for " + serverConf.getVar(ConfVars.METASTOREURIS));
|
long start = System.currentTimeMillis();
|
||||||
final long start = System.currentTimeMillis();
|
int port = serverConf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
new HiveMetaStoreClient(serverConf);
|
new HiveMetaStoreClient(serverConf);
|
||||||
return true;
|
return true;
|
||||||
} catch (MetaException e) {
|
} catch (MetaException e) {
|
||||||
// ignore as this is expected
|
// ignore as this is expected
|
||||||
|
LOG.info("server " + hostname + ":" + port + " not up " + e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (System.currentTimeMillis() > start + CONNECTION_TIMEOUT_MS) {
|
if (System.currentTimeMillis() > start + timeout) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Thread.sleep(CONNECTION_TIMEOUT_MS / 10);
|
Thread.sleep(250);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// ignore
|
// ignore
|
||||||
}
|
}
|
||||||
@@ -269,23 +307,28 @@ public class HiveTestService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private TServer startMetaStore(HiveConf conf) throws IOException {
|
public TServer startMetaStore(String forceBindIP, HiveConf conf) throws IOException {
|
||||||
try {
|
try {
|
||||||
// Server will create new threads up to max as necessary. After an idle
|
// Server will create new threads up to max as necessary. After an idle
|
||||||
// period, it will destory threads to keep the number of threads in the
|
// period, it will destory threads to keep the number of threads in the
|
||||||
// pool to min.
|
// pool to min.
|
||||||
String host = conf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
|
int port = conf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
|
||||||
int port = conf.getIntVar(ConfVars.METASTORE_SERVER_PORT);
|
int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS);
|
||||||
int minWorkerThreads = conf.getIntVar(ConfVars.METASTORESERVERMINTHREADS);
|
int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
|
||||||
int maxWorkerThreads = conf.getIntVar(ConfVars.METASTORESERVERMAXTHREADS);
|
boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE);
|
||||||
boolean tcpKeepAlive = conf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE);
|
boolean useFramedTransport = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
|
||||||
boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
|
|
||||||
|
|
||||||
// don't support SASL yet
|
// don't support SASL yet
|
||||||
// boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
|
// boolean useSasl = conf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
|
||||||
|
|
||||||
InetSocketAddress address = new InetSocketAddress(host, port);
|
TServerTransport serverTransport;
|
||||||
TServerTransport serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address);
|
if (forceBindIP != null) {
|
||||||
|
InetSocketAddress address = new InetSocketAddress(forceBindIP, port);
|
||||||
|
serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(address) : new TServerSocket(address);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
serverTransport = tcpKeepAlive ? new TServerSocketKeepAlive(port) : new TServerSocket(port);
|
||||||
|
}
|
||||||
|
|
||||||
TProcessor processor;
|
TProcessor processor;
|
||||||
TTransportFactory transFactory;
|
TTransportFactory transFactory;
|
||||||
@@ -293,7 +336,7 @@ public class HiveTestService {
|
|||||||
HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf, false);
|
HiveMetaStore.HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf, false);
|
||||||
IHMSHandler handler = RetryingHMSHandler.getProxy(conf, baseHandler, true);
|
IHMSHandler handler = RetryingHMSHandler.getProxy(conf, baseHandler, true);
|
||||||
|
|
||||||
if (conf.getBoolVar(ConfVars.METASTORE_EXECUTE_SET_UGI)) {
|
if (conf.getBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI)) {
|
||||||
transFactory = useFramedTransport
|
transFactory = useFramedTransport
|
||||||
? new ChainedTTransportFactory(new TFramedTransport.Factory(), new TUGIContainingTransport.Factory())
|
? new ChainedTTransportFactory(new TFramedTransport.Factory(), new TUGIContainingTransport.Factory())
|
||||||
: new TUGIContainingTransport.Factory();
|
: new TUGIContainingTransport.Factory();
|
||||||
|
|||||||
@@ -125,6 +125,7 @@ public class HiveTestUtil {
|
|||||||
hiveTestService = new HiveTestService(configuration);
|
hiveTestService = new HiveTestService(configuration);
|
||||||
hiveServer = hiveTestService.start();
|
hiveServer = hiveTestService.start();
|
||||||
}
|
}
|
||||||
|
fileSystem = FileSystem.get(configuration);
|
||||||
|
|
||||||
basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();
|
basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();
|
||||||
|
|
||||||
@@ -140,8 +141,7 @@ public class HiveTestUtil {
|
|||||||
hiveSyncProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
|
hiveSyncProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
|
||||||
hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3");
|
hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3");
|
||||||
|
|
||||||
hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, hiveTestService.getHiveConf());
|
hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, configuration);
|
||||||
fileSystem = hiveSyncConfig.getHadoopFileSystem();
|
|
||||||
|
|
||||||
dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
|
dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
|
||||||
ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig);
|
ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig);
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hive.conf.HiveConf;
|
import org.apache.hadoop.hive.conf.HiveConf;
|
||||||
|
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
||||||
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
|
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
|
||||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
||||||
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
|
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
|
||||||
@@ -56,6 +57,7 @@ import org.junit.jupiter.api.extension.AfterEachCallback;
|
|||||||
import org.junit.jupiter.api.extension.BeforeAllCallback;
|
import org.junit.jupiter.api.extension.BeforeAllCallback;
|
||||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||||
|
import org.junit.runners.model.InitializationError;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
@@ -63,7 +65,6 @@ import java.io.IOException;
|
|||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
@@ -74,15 +75,16 @@ import java.util.UUID;
|
|||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
import static org.junit.jupiter.api.Assertions.fail;
|
||||||
|
|
||||||
public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback,
|
public class TestCluster implements BeforeAllCallback, AfterAllCallback,
|
||||||
BeforeEachCallback, AfterEachCallback {
|
BeforeEachCallback, AfterEachCallback {
|
||||||
public MiniDFSCluster dfsCluster;
|
|
||||||
private HdfsTestService hdfsTestService;
|
private HdfsTestService hdfsTestService;
|
||||||
private HiveTestService hiveTestService;
|
public HiveTestService hiveTestService;
|
||||||
private HiveConf conf;
|
private Configuration conf;
|
||||||
private HiveServer2 server2;
|
public HiveServer2 server2;
|
||||||
private DateTimeFormatter dtfOut;
|
private static volatile int port = 9083;
|
||||||
private File hiveSiteXml;
|
public MiniDFSCluster dfsCluster;
|
||||||
|
DateTimeFormatter dtfOut;
|
||||||
|
public File hiveSiteXml;
|
||||||
private IMetaStoreClient client;
|
private IMetaStoreClient client;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -107,18 +109,24 @@ public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback,
|
|||||||
hdfsTestService = new HdfsTestService();
|
hdfsTestService = new HdfsTestService();
|
||||||
dfsCluster = hdfsTestService.start(true);
|
dfsCluster = hdfsTestService.start(true);
|
||||||
|
|
||||||
Configuration hadoopConf = hdfsTestService.getHadoopConf();
|
conf = hdfsTestService.getHadoopConf();
|
||||||
hiveTestService = new HiveTestService(hadoopConf);
|
conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, port++);
|
||||||
|
conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port++);
|
||||||
|
conf.setInt(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, port++);
|
||||||
|
hiveTestService = new HiveTestService(conf);
|
||||||
server2 = hiveTestService.start();
|
server2 = hiveTestService.start();
|
||||||
dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
|
dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
|
||||||
hiveSiteXml = File.createTempFile("hive-site", ".xml");
|
hiveSiteXml = File.createTempFile("hive-site", ".xml");
|
||||||
hiveSiteXml.deleteOnExit();
|
hiveSiteXml.deleteOnExit();
|
||||||
conf = hiveTestService.getHiveConf();
|
|
||||||
try (OutputStream os = new FileOutputStream(hiveSiteXml)) {
|
try (OutputStream os = new FileOutputStream(hiveSiteXml)) {
|
||||||
conf.writeXml(os);
|
hiveTestService.getServerConf().writeXml(os);
|
||||||
}
|
}
|
||||||
client = HiveMetaStoreClient.newSynchronizedClient(
|
client = HiveMetaStoreClient.newSynchronizedClient(
|
||||||
RetryingMetaStoreClient.getProxy(conf, true));
|
RetryingMetaStoreClient.getProxy(hiveTestService.getServerConf(), true));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Configuration getConf() {
|
||||||
|
return this.conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getHiveSiteXmlLocation() {
|
public String getHiveSiteXmlLocation() {
|
||||||
@@ -130,7 +138,7 @@ public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback,
|
|||||||
}
|
}
|
||||||
|
|
||||||
public String getHiveJdBcUrl() {
|
public String getHiveJdBcUrl() {
|
||||||
return hiveTestService.getJdbcHive2Url();
|
return "jdbc:hive2://127.0.0.1:" + conf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + "";
|
||||||
}
|
}
|
||||||
|
|
||||||
public String tablePath(String dbName, String tableName) throws Exception {
|
public String tablePath(String dbName, String tableName) throws Exception {
|
||||||
@@ -143,12 +151,12 @@ public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback,
|
|||||||
|
|
||||||
public void forceCreateDb(String dbName) throws Exception {
|
public void forceCreateDb(String dbName) throws Exception {
|
||||||
try {
|
try {
|
||||||
client.dropDatabase(dbName);
|
getHMSClient().dropDatabase(dbName);
|
||||||
} catch (NoSuchObjectException ignored) {
|
} catch (NoSuchObjectException e) {
|
||||||
// expected
|
System.out.println("db does not exist but its ok " + dbName);
|
||||||
}
|
}
|
||||||
Database db = new Database(dbName, "", dbPath(dbName), new HashMap<>());
|
Database db = new Database(dbName, "", dbPath(dbName), new HashMap<>());
|
||||||
client.createDatabase(db);
|
getHMSClient().createDatabase(db);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createCOWTable(String commitTime, int numberOfPartitions, String dbName, String tableName)
|
public void createCOWTable(String commitTime, int numberOfPartitions, String dbName, String tableName)
|
||||||
@@ -161,7 +169,10 @@ public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback,
|
|||||||
.setTableName(tableName)
|
.setTableName(tableName)
|
||||||
.setPayloadClass(HoodieAvroPayload.class)
|
.setPayloadClass(HoodieAvroPayload.class)
|
||||||
.initTable(conf, path.toString());
|
.initTable(conf, path.toString());
|
||||||
dfsCluster.getFileSystem().mkdirs(path);
|
boolean result = dfsCluster.getFileSystem().mkdirs(path);
|
||||||
|
if (!result) {
|
||||||
|
throw new InitializationError("cannot initialize table");
|
||||||
|
}
|
||||||
ZonedDateTime dateTime = ZonedDateTime.now();
|
ZonedDateTime dateTime = ZonedDateTime.now();
|
||||||
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime, path.toString());
|
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime, path.toString());
|
||||||
createCommitFile(commitMetadata, commitTime, path.toString());
|
createCommitFile(commitMetadata, commitTime, path.toString());
|
||||||
@@ -228,7 +239,7 @@ public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback,
|
|||||||
try {
|
try {
|
||||||
writer.write(s);
|
writer.write(s);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
fail("IOException while writing test records as parquet", e);
|
fail("IOException while writing test records as parquet" + e.toString());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
writer.close();
|
writer.close();
|
||||||
@@ -248,15 +259,15 @@ public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback,
|
|||||||
public void startHiveServer2() {
|
public void startHiveServer2() {
|
||||||
if (server2 == null) {
|
if (server2 == null) {
|
||||||
server2 = new HiveServer2();
|
server2 = new HiveServer2();
|
||||||
server2.init(hiveTestService.getHiveConf());
|
server2.init(hiveTestService.getServerConf());
|
||||||
server2.start();
|
server2.start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutDown() throws IOException {
|
public void shutDown() {
|
||||||
Files.deleteIfExists(hiveSiteXml.toPath());
|
stopHiveServer2();
|
||||||
Hive.closeCurrent();
|
Hive.closeCurrent();
|
||||||
hiveTestService.stop();
|
hiveTestService.getHiveMetaStore().stop();
|
||||||
hdfsTestService.stop();
|
hdfsTestService.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -48,7 +48,6 @@ import java.util.Map;
|
|||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
|
||||||
import static org.apache.hudi.hive.testutils.HiveTestService.HS2_JDBC_URL;
|
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
|
||||||
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
|
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
|
||||||
@@ -187,7 +186,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
|
|||||||
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
|
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
|
||||||
|
|
||||||
// Hive Configs
|
// Hive Configs
|
||||||
props.setProperty(HIVE_URL.key(), HS2_JDBC_URL);
|
props.setProperty(HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
|
||||||
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
|
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
|
||||||
props.setProperty(META_SYNC_TABLE_NAME.key(), "hive_trips");
|
props.setProperty(META_SYNC_TABLE_NAME.key(), "hive_trips");
|
||||||
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
|
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
|
||||||
@@ -247,7 +246,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
|
|||||||
|
|
||||||
protected static void populateCommonHiveProps(TypedProperties props) {
|
protected static void populateCommonHiveProps(TypedProperties props) {
|
||||||
// Hive Configs
|
// Hive Configs
|
||||||
props.setProperty(HIVE_URL.key(), HS2_JDBC_URL);
|
props.setProperty(HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
|
||||||
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb2");
|
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb2");
|
||||||
props.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "false");
|
props.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "false");
|
||||||
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
|
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
|
||||||
|
|||||||
@@ -1359,7 +1359,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
// Test Hive integration
|
// Test Hive integration
|
||||||
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
|
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
|
||||||
hiveSyncConfig.setValue(META_SYNC_PARTITION_FIELDS, "year,month,day");
|
hiveSyncConfig.setValue(META_SYNC_PARTITION_FIELDS, "year,month,day");
|
||||||
hiveSyncConfig.setHadoopConf(hiveTestService.getHiveConf());
|
hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
|
||||||
HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
|
HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
|
||||||
final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
|
final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
|
||||||
assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
|
assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
|
||||||
|
|||||||
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.FileSystem;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hive.conf.HiveConf;
|
||||||
import org.apache.hive.service.server.HiveServer2;
|
import org.apache.hive.service.server.HiveServer2;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@@ -196,7 +197,7 @@ public class UtilitiesTestBase {
|
|||||||
*/
|
*/
|
||||||
protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableName) {
|
protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableName) {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.setProperty(HIVE_URL.key(), hiveTestService.getJdbcHive2Url());
|
props.setProperty(HIVE_URL.key(),"jdbc:hive2://127.0.0.1:9999/");
|
||||||
props.setProperty(HIVE_USER.key(), "");
|
props.setProperty(HIVE_USER.key(), "");
|
||||||
props.setProperty(HIVE_PASS.key(), "");
|
props.setProperty(HIVE_PASS.key(), "");
|
||||||
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
|
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
|
||||||
@@ -214,9 +215,11 @@ public class UtilitiesTestBase {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private static void clearHiveDb() throws Exception {
|
private static void clearHiveDb() throws Exception {
|
||||||
|
HiveConf hiveConf = new HiveConf();
|
||||||
// Create Dummy hive sync config
|
// Create Dummy hive sync config
|
||||||
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");
|
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");
|
||||||
hiveSyncConfig.setHadoopConf(hiveTestService.getHiveConf());
|
hiveConf.addResource(hiveServer.getHiveConf());
|
||||||
|
hiveSyncConfig.setHadoopConf(hiveConf);
|
||||||
HoodieTableMetaClient.withPropertyBuilder()
|
HoodieTableMetaClient.withPropertyBuilder()
|
||||||
.setTableType(HoodieTableType.COPY_ON_WRITE)
|
.setTableType(HoodieTableType.COPY_ON_WRITE)
|
||||||
.setTableName(hiveSyncConfig.getString(META_SYNC_TABLE_NAME))
|
.setTableName(hiveSyncConfig.getString(META_SYNC_TABLE_NAME))
|
||||||
|
|||||||
Reference in New Issue
Block a user