1
0

Add ability to provide multi-region (global) data consistency across HMS in different regions (#2542)

[global-hive-sync-tool] Add a global hive sync tool to sync hudi table across clusters. Add a way to rollback the replicated time stamp if we fail to sync or if we partly sync

Co-authored-by: Jagmeet Bali <jsbali@uber.com>
This commit is contained in:
s-sanjay
2021-06-25 08:56:26 +05:30
committed by GitHub
parent e64fe55054
commit 0fb8556b0d
27 changed files with 1731 additions and 71 deletions

View File

@@ -30,6 +30,8 @@
<properties>
<main.basedir>${project.parent.basedir}</main.basedir>
<jetty.version>7.6.0.v20120127</jetty.version>
</properties>
<dependencies>
@@ -148,6 +150,14 @@
<scope>test</scope>
</dependency>
<!-- Needed for running HiveServer for Tests -->
<dependency>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<scope>test</scope>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>

View File

@@ -0,0 +1,69 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# A tool to sync the hudi table to hive from different clusters. Similar to HiveSyncTool but syncs it to more
# than one hive cluster ( currently a local and remote cluster). The common timestamp that was synced is stored as a new table property
# This is most useful when we want to ensure that across different hive clusters we want ensure consistent reads. If that is not a requirement
# then it is better to run HiveSyncTool separately.
# Note:
# The tool tries to be transactional but does not guarantee it. If the sync fails midway in one cluster it will try to roll back the committed
# timestamp from already successful sync on other clusters but that can also fail.
# The tool does not roll back any synced partitions but only the timestamp.
function error_exit {
echo "$1" >&2 ## Send message to stderr. Exclude >&2 if you don't want it that way.
exit "${2:-1}" ## Return a code specified by $2 or 1 by default.
}
if [ -z "${HADOOP_HOME}" ]; then
error_exit "Please make sure the environment variable HADOOP_HOME is setup"
fi
if [ -z "${HIVE_HOME}" ]; then
error_exit "Please make sure the environment variable HIVE_HOME is setup"
fi
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
#Ensure we pick the right jar even for hive11 builds
HUDI_HIVE_UBER_JAR=`ls -c $DIR/../packaging/hudi-hive-bundle/target/hudi-hive-*.jar | grep -v source | head -1`
if [ -z "$HADOOP_CONF_DIR" ]; then
echo "setting hadoop conf dir"
HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
fi
## Include only specific packages from HIVE_HOME/lib to avoid version mismatches
HIVE_EXEC=`ls ${HIVE_HOME}/lib/hive-exec-*.jar | tr '\n' ':'`
HIVE_SERVICE=`ls ${HIVE_HOME}/lib/hive-service-*.jar | grep -v rpc | tr '\n' ':'`
HIVE_METASTORE=`ls ${HIVE_HOME}/lib/hive-metastore-*.jar | tr '\n' ':'`
HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | tr '\n' ':'`
if [ -z "${HIVE_JDBC}" ]; then
HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'`
fi
HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`
HIVE_NUCLEUS=`ls ${HIVE_HOME}/lib/datanucleus*.jar | tr '\n' ':'`
HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON:$HIVE_NUCLEUS
HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/*
if ! [ -z "$HIVE_CONF_DIR" ]; then
error_exit "Don't set HIVE_CONF_DIR; use config xml file"
fi
echo "Running Command : java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/* org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool $@"
java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/* org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool "$@"

View File

@@ -104,6 +104,7 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
public Boolean decodePartition = false;
// enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;

View File

@@ -58,10 +58,10 @@ public class HiveSyncTool extends AbstractSyncTool {
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
private final HiveSyncConfig cfg;
private HoodieHiveClient hoodieHiveClient = null;
private String snapshotTableName = null;
private Option<String> roTableName = null;
protected final HiveSyncConfig cfg;
protected HoodieHiveClient hoodieHiveClient = null;
protected String snapshotTableName = null;
protected Option<String> roTableName = null;
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(configuration.getAllProperties(), fs);
@@ -127,8 +127,8 @@ public class HiveSyncTool extends AbstractSyncTool {
}
}
}
private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
boolean readAsOptimized) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath()
+ " of type " + hoodieHiveClient.getTableType());

View File

@@ -18,12 +18,6 @@
package org.apache.hudi.hive;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -32,15 +26,22 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -60,6 +61,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
public class HoodieHiveClient extends AbstractSyncHoodieClient {
private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
@@ -402,7 +405,19 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
closeQuietly(null, stmt);
}
} else {
updateHiveSQLUsingHiveDriver(s);
CommandProcessorResponse response = updateHiveSQLUsingHiveDriver(s);
if (response == null) {
throw new HoodieHiveSyncException("Failed in executing SQL null response" + s);
}
if (response.getResponseCode() != 0) {
LOG.error(String.format("Failure in SQL response %s", response.toString()));
if (response.getException() != null) {
throw new HoodieHiveSyncException(
String.format("Failed in executing SQL %s", s), response.getException());
} else {
throw new HoodieHiveSyncException(String.format("Failed in executing SQL %s", s));
}
}
}
}
@@ -476,13 +491,58 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
public Option<String> getLastReplicatedTime(String tableName) {
// Get the last replicated time from the TBLproperties
try {
Table database = client.getTable(syncConfig.databaseName, tableName);
return Option.ofNullable(database.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP, null));
} catch (NoSuchObjectException e) {
LOG.warn("the said table not found in hms " + syncConfig.databaseName + "." + tableName);
return Option.empty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last replicated time from the database", e);
}
}
public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
if (!activeTimeline.filterCompletedInstants().getInstants()
.anyMatch(i -> i.getTimestamp().equals(timeStamp))) {
throw new HoodieHiveSyncException(
"Not a valid completed timestamp " + timeStamp + " for table " + tableName);
}
try {
Table table = client.getTable(syncConfig.databaseName, tableName);
table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
client.alter_table(syncConfig.databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to update last replicated time to " + timeStamp + " for " + tableName, e);
}
}
public void deleteLastReplicatedTimeStamp(String tableName) {
try {
Table table = client.getTable(syncConfig.databaseName, tableName);
String timestamp = table.getParameters().remove(GLOBALLY_CONSISTENT_READ_TIMESTAMP);
client.alter_table(syncConfig.databaseName, tableName, table);
if (timestamp != null) {
LOG.info("deleted last replicated timestamp " + timestamp + " for table " + tableName);
}
} catch (NoSuchObjectException e) {
// this is ok the table doesn't even exist.
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to delete last replicated timestamp for " + tableName, e);
}
}
public void close() {
try {
if (connection != null) {
connection.close();
}
if (client != null) {
Hive.closeCurrent();
client.close();
client = null;
}
} catch (SQLException e) {
@@ -506,4 +566,4 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);
}
}
}
}

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.replication;
import com.beust.jcommander.Parameter;
import org.apache.hudi.hive.HiveSyncConfig;
public class GlobalHiveSyncConfig extends HiveSyncConfig {
@Parameter(names = {"--replicated-timestamp"}, description = "Add globally replicated timestamp to enable consistent reads across clusters")
public String globallyReplicatedTimeStamp;
public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig();
newConfig.basePath = cfg.basePath;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.databaseName = cfg.databaseName;
newConfig.hivePass = cfg.hivePass;
newConfig.hiveUser = cfg.hiveUser;
newConfig.partitionFields = cfg.partitionFields;
newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
newConfig.jdbcUrl = cfg.jdbcUrl;
newConfig.tableName = cfg.tableName;
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
newConfig.supportTimestamp = cfg.supportTimestamp;
newConfig.decodePartition = cfg.decodePartition;
newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp;
return newConfig;
}
@Override
public String toString() {
return "GlobalHiveSyncConfig{" + super.toString()
+ " globallyReplicatedTimeStamp=" + globallyReplicatedTimeStamp + "}";
}
}

View File

@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.replication;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
public class GlobalHiveSyncTool extends HiveSyncTool {
private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
public GlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(cfg, configuration, fs);
}
@Override
public void syncHoodieTable() {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(snapshotTableName, false, false);
break;
case MERGE_ON_READ:
// sync a RO table for MOR
syncHoodieTable(roTableName.get(), false, true);
// sync a RT table for MOR
syncHoodieTable(snapshotTableName, true, false);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
}
@Override
protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) {
super.syncHoodieTable(tableName, useRealtimeInputFormat, readAsOptimized);
if (((GlobalHiveSyncConfig)cfg).globallyReplicatedTimeStamp != null) {
hoodieHiveClient.updateLastReplicatedTimeStamp(tableName,
((GlobalHiveSyncConfig) cfg).globallyReplicatedTimeStamp);
}
LOG.info("Sync complete for " + tableName);
}
public void close() {
hoodieHiveClient.close();
}
public Map<String, Option<String>> getLastReplicatedTimeStampMap() {
Map<String, Option<String>> timeStampMap = new HashMap<>();
Option<String> timeStamp = hoodieHiveClient.getLastReplicatedTime(snapshotTableName);
timeStampMap.put(snapshotTableName, timeStamp);
if (HoodieTableType.MERGE_ON_READ.equals(hoodieHiveClient.getTableType())) {
Option<String> roTimeStamp = hoodieHiveClient.getLastReplicatedTime(roTableName.get());
timeStampMap.put(roTableName.get(), roTimeStamp);
}
return timeStampMap;
}
public void setLastReplicatedTimeStamp(Map<String, Option<String>> timeStampMap) {
for (String tableName : timeStampMap.keySet()) {
Option<String> timestamp = timeStampMap.get(tableName);
if (timestamp.isPresent()) {
hoodieHiveClient.updateLastReplicatedTimeStamp(tableName, timestamp.get());
LOG.info("updated timestamp for " + tableName + " to: " + timestamp.get());
} else {
hoodieHiveClient.deleteLastReplicatedTimeStamp(tableName);
LOG.info("deleted timestamp for " + tableName);
}
}
}
public static GlobalHiveSyncTool buildGlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf hiveConf) {
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
hiveConf.addResource(fs.getConf());
return new GlobalHiveSyncTool(cfg, hiveConf, fs);
}
}

View File

@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.replication;
/**
* A interface to allow syncing the Hudi table to all clusters.
*/
public interface HiveSyncGlobalCommit {
/**
*
* @return whether the commit succeeded to all the clusters.
*/
boolean commit();
/**
*
* @return boolean whether the rollback succeeded to all the clusters.
*/
boolean rollback();
}

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.replication;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Properties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
// TODO: stop extending HiveSyncConfig and take all the variables needed from config file
@Parameters(commandDescription = "A tool to sync the hudi table to hive from different clusters. Similar to HiveSyncTool but syncs it to more"
+ "than one hive cluster ( currently a local and remote cluster). The common timestamp that was synced is stored as a new table property "
+ "This is most useful when we want to ensure that across different hive clusters we want ensure consistent reads. If that is not a requirement"
+ "then it is better to run HiveSyncTool separately."
+ "Note: "
+ " The tool tries to be transactional but does not guarantee it. If the sync fails midway in one cluster it will try to roll back the committed "
+ " timestamp from already successful sync on other clusters but that can also fail."
+ " The tool does not roll back any synced partitions but only the timestamp.")
public class HiveSyncGlobalCommitConfig extends GlobalHiveSyncConfig {
private static final Logger LOG = LogManager.getLogger(HiveSyncGlobalCommitConfig.class);
public static String LOCAL_HIVE_SITE_URI = "hivesyncglobal.local_hive_site_uri";
public static String REMOTE_HIVE_SITE_URI = "hivesyncglobal.remote_hive_site_uri";
public static String CONFIG_FILE_URI = "hivesyncglobal.config_file_uri";
public static String REMOTE_BASE_PATH = "hivesyncglobal.remote_base_path";
public static String LOCAL_BASE_PATH = "hivesyncglobal.local_base_path";
public static String RETRY_ATTEMPTS = "hivesyncglobal.retry_attempts";
public static String REMOTE_HIVE_SERVER_JDBC_URLS = "hivesyncglobal.remote_hs2_jdbc_urls";
public static String LOCAL_HIVE_SERVER_JDBC_URLS = "hivesyncglobal.local_hs2_jdbc_urls";
@Parameter(names = {
"--config-xml-file"}, description = "path to the config file in Hive", required = true)
public String configFile;
public Properties properties = new Properties();
private boolean finalize = false;
public void load() throws IOException {
if (finalize) {
throw new RuntimeException("trying to modify finalized config");
}
finalize = true;
try (InputStream configStream = new FileInputStream(new File(configFile))) {
properties.loadFromXML(configStream);
}
if (StringUtils.isNullOrEmpty(globallyReplicatedTimeStamp)) {
throw new RuntimeException("globally replicated timestamp not set");
}
}
GlobalHiveSyncConfig mkGlobalHiveSyncConfig(boolean forRemote) {
GlobalHiveSyncConfig cfg = GlobalHiveSyncConfig.copy(this);
cfg.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
: properties.getProperty(LOCAL_BASE_PATH, cfg.basePath);
cfg.jdbcUrl = forRemote ? properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
: properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.jdbcUrl);
LOG.info("building hivesync config forRemote: " + forRemote + " " + cfg.jdbcUrl + " "
+ cfg.basePath);
return cfg;
}
@Override
public String toString() {
return "HiveSyncGlobalCommitConfig{ " + "configFile=" + configFile + ", properties="
+ properties + ", " + super.toString()
+ " }";
}
public void storeToXML(OutputStream configStream) throws IOException {
this.properties.storeToXML(configStream, "hivesync global config");
}
}

View File

@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.replication;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
import com.beust.jcommander.JCommander;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.hive.HoodieHiveSyncException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class HiveSyncGlobalCommitTool implements HiveSyncGlobalCommit, AutoCloseable {
private static final Logger LOG = LogManager.getLogger(HiveSyncGlobalCommitTool.class);
private final HiveSyncGlobalCommitConfig config;
private List<ReplicationStateSync> replicationStateSyncList;
private ReplicationStateSync getReplicatedState(boolean forRemote) {
HiveConf hiveConf = new HiveConf();
// we probably just need to set the metastore URIs
// TODO: figure out how to integrate this in production
// how to load balance between piper HMS,HS2
// if we have list of uris, we can do something similar to createHiveConf in reairsync
hiveConf.addResource(new Path(config.properties.getProperty(
forRemote ? REMOTE_HIVE_SITE_URI : LOCAL_HIVE_SITE_URI)));
// TODO: get clusterId as input parameters
ReplicationStateSync state = new ReplicationStateSync(config.mkGlobalHiveSyncConfig(forRemote),
hiveConf, forRemote ? "REMOTESYNC" : "LOCALSYNC");
return state;
}
@Override
public boolean commit() {
// TODO: add retry attempts
String name = Thread.currentThread().getName();
try {
for (ReplicationStateSync stateSync : replicationStateSyncList) {
Thread.currentThread().setName(stateSync.getClusterId());
LOG.info("starting sync for state " + stateSync);
stateSync.sync();
LOG.info("synced state " + stateSync);
}
} catch (Exception e) {
Thread.currentThread().setName(name);
LOG.error(String.format("Error while trying to commit replication state %s", e.getMessage()), e);
return false;
} finally {
Thread.currentThread().setName(name);
}
LOG.info("done syncing to all tables, verifying the timestamps...");
ReplicationStateSync base = replicationStateSyncList.get(0);
boolean success = true;
LOG.info("expecting all timestamps to be similar to: " + base);
for (int idx = 1; idx < replicationStateSyncList.size(); ++idx) {
ReplicationStateSync other = replicationStateSyncList.get(idx);
if (!base.replicationStateIsInSync(other)) {
LOG.error("the timestamp of other : " + other + " is not matching with base: " + base);
success = false;
}
}
return success;
}
@Override
public boolean rollback() {
for (ReplicationStateSync stateSync : replicationStateSyncList) {
stateSync.rollback();
}
return true;
}
public HiveSyncGlobalCommitTool(HiveSyncGlobalCommitConfig config) {
this.config = config;
this.replicationStateSyncList = new ArrayList<>(2);
this.replicationStateSyncList.add(getReplicatedState(false));
this.replicationStateSyncList.add(getReplicatedState(true));
}
private static HiveSyncGlobalCommitConfig getHiveSyncGlobalCommitConfig(String[] args)
throws IOException {
HiveSyncGlobalCommitConfig cfg = new HiveSyncGlobalCommitConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
cfg.load();
return cfg;
}
@Override
public void close() {
for (ReplicationStateSync stateSync : replicationStateSyncList) {
stateSync.close();
}
}
public static void main(String[] args) throws IOException, HoodieHiveSyncException {
final HiveSyncGlobalCommitConfig cfg = getHiveSyncGlobalCommitConfig(args);
try (final HiveSyncGlobalCommitTool globalCommitTool = new HiveSyncGlobalCommitTool(cfg)) {
boolean success = globalCommitTool.commit();
if (!success) {
if (!globalCommitTool.rollback()) {
throw new RuntimeException("not able to rollback failed commit");
}
}
} catch (Exception e) {
throw new HoodieHiveSyncException(
"not able to commit replicated timestamp", e);
}
}
}

View File

@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.replication;
import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hudi.common.util.Option;
public class ReplicationStateSync {
private GlobalHiveSyncTool globalHiveSyncTool;
private final GlobalHiveSyncConfig globalHiveSyncConfig;
private final HiveConf hiveConf;
private Map<String, Option<String>> replicatedTimeStampMap;
private Map<String, Option<String>> oldReplicatedTimeStampMap;
private final String clusterId;
ReplicationStateSync(GlobalHiveSyncConfig conf, HiveConf hiveConf, String uid) {
this.globalHiveSyncConfig = conf;
this.hiveConf = hiveConf;
initGlobalHiveSyncTool();
replicatedTimeStampMap = globalHiveSyncTool.getLastReplicatedTimeStampMap();
clusterId = uid;
}
private void initGlobalHiveSyncTool() {
globalHiveSyncTool = GlobalHiveSyncTool.buildGlobalHiveSyncTool(globalHiveSyncConfig, hiveConf);
}
public void sync() throws Exception {
// the cluster maybe down by the time we reach here so we refresh our replication
// state right before we set the oldReplicatedTimeStamp to narrow this window. this is a
// liveliness check right before we start.
replicatedTimeStampMap = globalHiveSyncTool.getLastReplicatedTimeStampMap();
// it is possible sync fails midway and corrupts the table property therefore we should set
// the oldReplicatedTimeStampMap before the sync start so that we attempt to rollback
// this will help in scenario where sync failed due to some bug in hivesync but in case where
// cluster went down halfway through or before sync in this case rollback may also fail and
// that is ok and we want to be alerted to such scenarios.
oldReplicatedTimeStampMap = replicatedTimeStampMap;
globalHiveSyncTool.syncHoodieTable();
replicatedTimeStampMap = globalHiveSyncTool.getLastReplicatedTimeStampMap();
}
public boolean rollback() {
if (oldReplicatedTimeStampMap != null) {
globalHiveSyncTool.setLastReplicatedTimeStamp(oldReplicatedTimeStampMap);
oldReplicatedTimeStampMap = null;
}
return true;
}
public boolean replicationStateIsInSync(ReplicationStateSync other) {
return globalHiveSyncTool.getLastReplicatedTimeStampMap()
.equals(other.globalHiveSyncTool.getLastReplicatedTimeStampMap());
}
@Override
public String toString() {
return "{ clusterId: " + clusterId + " replicatedState: " + replicatedTimeStampMap + " }";
}
public String getClusterId() {
return clusterId;
}
public void close() {
if (globalHiveSyncTool != null) {
globalHiveSyncTool.close();
globalHiveSyncTool = null;
}
}
}

View File

@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
import java.util.Collections;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig;
import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool;
import org.apache.hudi.hive.testutils.TestCluster;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class TestHiveSyncGlobalCommitTool {
@RegisterExtension
public static TestCluster localCluster = new TestCluster();
@RegisterExtension
public static TestCluster remoteCluster = new TestCluster();
private static String DB_NAME = "foo";
private static String TBL_NAME = "bar";
private HiveSyncGlobalCommitConfig getGlobalCommitConfig(
String commitTime, String dbName, String tblName) throws Exception {
HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig();
config.properties.setProperty(LOCAL_HIVE_SITE_URI, localCluster.getHiveSiteXmlLocation());
config.properties.setProperty(REMOTE_HIVE_SITE_URI, remoteCluster.getHiveSiteXmlLocation());
config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, localCluster.getHiveJdBcUrl());
config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, remoteCluster.getHiveJdBcUrl());
config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName));
config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName));
config.globallyReplicatedTimeStamp = commitTime;
config.hiveUser = System.getProperty("user.name");
config.hivePass = "";
config.databaseName = dbName;
config.tableName = tblName;
config.basePath = localCluster.tablePath(dbName, tblName);
config.assumeDatePartitioning = true;
config.usePreApacheInputFormat = false;
config.partitionFields = Collections.singletonList("datestr");
return config;
}
private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception {
Assertions.assertEquals(localCluster.getHMSClient()
.getTable(config.databaseName, config.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
.getTable(config.databaseName, config.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated timestamps");
}
@BeforeEach
public void setUp() throws Exception {
localCluster.forceCreateDb(DB_NAME);
remoteCluster.forceCreateDb(DB_NAME);
localCluster.dfsCluster.getFileSystem().delete(new Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
remoteCluster.dfsCluster.getFileSystem().delete(new Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
}
@AfterEach
public void clear() throws Exception {
localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
}
@Test
public void testBasicGlobalCommit() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
Assertions.assertTrue(tool.commit());
compareEqualLastReplicatedTimeStamp(config);
}
@Test
public void testBasicRollback() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
Assertions.assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
// stop the remote cluster hive server to simulate cluster going down
remoteCluster.stopHiveServer2();
Assertions.assertFalse(tool.commit());
Assertions.assertEquals(commitTime, localCluster.getHMSClient()
.getTable(config.databaseName, config.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
Assertions.assertTrue(tool.rollback()); // do a rollback
Assertions.assertNotEquals(commitTime, localCluster.getHMSClient()
.getTable(config.databaseName, config.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
remoteCluster.startHiveServer2();
}
}

View File

@@ -18,22 +18,24 @@
package org.apache.hudi.hive;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -48,9 +50,12 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -68,12 +73,12 @@ public class TestHiveSyncTool {
}
@BeforeEach
public void setUp() throws IOException, InterruptedException {
public void setUp() throws Exception {
HiveTestUtil.setUp();
}
@AfterEach
public void teardown() throws IOException {
public void teardown() throws Exception {
HiveTestUtil.clear();
}
@@ -246,6 +251,7 @@ public class TestHiveSyncTool {
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
//writtenPartitionsSince.add(newPartition.get(0));
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
@@ -769,6 +775,136 @@ public class TestHiveSyncTool {
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
}
private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyCommitTime) throws Exception {
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
assertEquals(1, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),"Table partitions should match the number of partitions we wrote");
assertEquals(emptyCommitTime,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),"The last commit that was sycned should be updated in the TBLPROPERTIES");
// make sure correct schema is picked
Schema schema = SchemaTestUtil.getSimpleSchema();
for (Field field : schema.getFields()) {
assertEquals(field.schema().getType().getName(),
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get(field.name()).toLowerCase(),
String.format("Hive Schema Field %s was added", field));
}
assertEquals("string",
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("datestr").toLowerCase(), "Hive Schema Field datestr was added");
assertEquals(schema.getFields().size() + 1 + HoodieRecord.HOODIE_META_COLUMNS.size(),
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),"Hive Schema fields size");
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// create empty commit
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// evolve the schema
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "101";
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
// create empty commit
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(
hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
// now delete the evolved commit instant
Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ hiveClient.getActiveTimeline().getInstants()
.filter(inst -> inst.getTimestamp().equals(commitTime2))
.findFirst().get().getFileName());
assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
try {
tool.syncHoodieTable();
} catch (RuntimeException e) {
// we expect the table sync to fail
}
// table should not be synced yet
assertFalse(
hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist at all");
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// create empty commit
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime, true);
//HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(
hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
// evolve the schema
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "301";
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
//HiveTestUtil.createCommitFileWithSchema(commitMetadata, "400", false); // create another empty commit
//HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another empty commit
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HoodieHiveClient hiveClientLatest = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
// now delete the evolved commit instant
Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ hiveClientLatest.getActiveTimeline().getInstants()
.filter(inst -> inst.getTimestamp().equals(commitTime2))
.findFirst().get().getFileName());
assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
try {
tool.syncHoodieTable();
} catch (RuntimeException e) {
// we expect the table sync to fail
}
// old sync values should be left intact
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testTypeConverter(boolean useJdbc) throws Exception {
@@ -807,5 +943,137 @@ public class TestHiveSyncTool {
.containsValue("BIGINT"), errorMsg);
hiveClient.updateHiveSQL(dropTableSql);
}
/*
private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyCommitTime) throws Exception {
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
assertEquals(1,
hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", emptyCommitTime,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get());
// make sure correct schema is picked
Schema schema = SchemaTestUtil.getSimpleSchema();
for (Field field : schema.getFields()) {
assertEquals(String.format("Hive Schema Field %s was added", field), field.schema().getType().getName(),
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get(field.name()).toLowerCase());
}
assertEquals("Hive Schema Field datestr was added", "string",
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("datestr").toLowerCase());
assertEquals(schema.getFields().size() + 1,
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
"Hive Schema fields size");
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, false);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// create empty commit
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, false);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// evolve the schema
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "101";
HiveTestUtil.addCOWPartitions(1, false, false, dateTime, commitTime2);
// create empty commit
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
// now delete the evolved commit instant
Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ hiveClient.getActiveTimeline().getInstants()
.filter(inst -> inst.getTimestamp().equals(commitTime2))
.findFirst().get().getFileName());
assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
try {
tool.syncHoodieTable();
} catch (RuntimeException e) {
// we expect the table sync to fail
}
// table should not be synced yet
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist at all");
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, false);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
// create empty commit
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
// evolve the schema
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "301";
HiveTestUtil.addCOWPartitions(1, false, false, dateTime, commitTime2);
HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another empty commit
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HoodieHiveClient hiveClientLatest = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
// now delete the evolved commit instant
Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ hiveClientLatest.getActiveTimeline().getInstants()
.filter(inst -> inst.getTimestamp().equals(commitTime2))
.findFirst().get().getFileName());
assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
try {
tool.syncHoodieTable();
} catch (RuntimeException e) {
// we expect the table sync to fail
}
// old sync values should be left intact
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
}*/
}

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IHMSHandler;
@@ -78,6 +79,7 @@ public class HiveTestService {
private ExecutorService executorService;
private TServer tServer;
private HiveServer2 hiveServer;
private HiveConf serverConf;
public HiveTestService(Configuration hadoopConf) throws IOException {
this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath();
@@ -88,6 +90,14 @@ public class HiveTestService {
return hadoopConf;
}
public TServer getHiveMetaStore() {
return tServer;
}
public HiveConf getServerConf() {
return serverConf;
}
public HiveServer2 start() throws IOException {
Objects.requireNonNull(workDir, "The work dir must be set before starting cluster.");
@@ -102,10 +112,10 @@ public class HiveTestService {
FileIOUtils.deleteDirectory(file);
}
HiveConf serverConf = configureHive(hadoopConf, localHiveLocation);
serverConf = configureHive(hadoopConf, localHiveLocation);
executorService = Executors.newSingleThreadExecutor();
tServer = startMetaStore(bindIP, metastorePort, serverConf);
tServer = startMetaStore(bindIP, serverConf);
serverConf.set("hive.in.test", "true");
hiveServer = startHiveServer(serverConf);
@@ -116,7 +126,7 @@ public class HiveTestService {
} else {
serverHostname = bindIP;
}
if (!waitForServerUp(serverConf, serverHostname, metastorePort, CONNECTION_TIMEOUT)) {
if (!waitForServerUp(serverConf, serverHostname, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server");
}
@@ -163,9 +173,17 @@ public class HiveTestService {
public HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException {
conf.set("hive.metastore.local", "false");
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + metastorePort);
int port = metastorePort;
if (conf.get(HiveConf.ConfVars.METASTORE_SERVER_PORT.varname, null) == null) {
conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort);
} else {
port = conf.getInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort);
}
if (conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, null) == null) {
conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
}
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + port);
conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP);
conf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
// The following line to turn of SASL has no effect since HiveAuthFactory calls
// 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657,
// in Hive 0.14.
@@ -191,8 +209,9 @@ public class HiveTestService {
return new HiveConf(conf, this.getClass());
}
private boolean waitForServerUp(HiveConf serverConf, String hostname, int port, int timeout) {
private boolean waitForServerUp(HiveConf serverConf, String hostname, int timeout) {
long start = System.currentTimeMillis();
int port = serverConf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
while (true) {
try {
new HiveMetaStoreClient(serverConf);
@@ -288,11 +307,12 @@ public class HiveTestService {
}
}
public TServer startMetaStore(String forceBindIP, int port, HiveConf conf) throws IOException {
public TServer startMetaStore(String forceBindIP, HiveConf conf) throws IOException {
try {
// Server will create new threads up to max as necessary. After an idle
// period, it will destory threads to keep the number of threads in the
// pool to min.
int port = conf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS);
int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE);

View File

@@ -123,10 +123,10 @@ public class HiveTestUtil {
public static void clear() throws IOException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), fileSystem);
for (String tableName : createdTablesSet) {
@@ -158,10 +158,10 @@ public class HiveTestUtil {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
boolean result = fileSystem.mkdirs(path);
checkResult(result);
@@ -173,15 +173,15 @@ public class HiveTestUtil {
}
public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
boolean result = fileSystem.mkdirs(path);
checkResult(result);
@@ -189,25 +189,25 @@ public class HiveTestUtil {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, commitTime);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
useSchemaFromCommitMetadata);
useSchemaFromCommitMetadata);
createCompactionCommitFile(compactionMetadata, commitTime);
if (createDeltaCommit) {
// Write a delta commit
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true,
useSchemaFromCommitMetadata);
useSchemaFromCommitMetadata);
createDeltaCommitFile(deltaMetadata, deltaCommitTime);
}
}
public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
@@ -215,7 +215,7 @@ public class HiveTestUtil {
}
public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
@@ -223,7 +223,7 @@ public class HiveTestUtil {
}
public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
useSchemaFromCommitMetadata, startFrom, instantTime);
@@ -233,7 +233,7 @@ public class HiveTestUtil {
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
useSchemaFromCommitMetadata);
useSchemaFromCommitMetadata);
createCompactionCommitFile(compactionMetadata, instantTime);
HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple,
useSchemaFromCommitMetadata);
@@ -241,7 +241,7 @@ public class HiveTestUtil {
}
private static HoodieCommitMetadata createLogFiles(Map<String, List<HoodieWriteStat>> partitionWriteStats,
boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
throws InterruptedException, IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (Entry<String, List<HoodieWriteStat>> wEntry : partitionWriteStats.entrySet()) {
@@ -261,7 +261,7 @@ public class HiveTestUtil {
}
private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
startFrom = startFrom.withTimeAtStartOfDay();
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
@@ -279,7 +279,7 @@ public class HiveTestUtil {
}
private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
@@ -354,7 +354,7 @@ public class HiveTestUtil {
}
private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, boolean isSimpleSchema,
boolean useSchemaFromCommitMetadata) throws IOException {
boolean useSchemaFromCommitMetadata) throws IOException {
if (useSchemaFromCommitMetadata) {
Schema dataSchema = getTestDataSchema(isSimpleSchema);
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, dataSchema.toString());
@@ -362,7 +362,7 @@ public class HiveTestUtil {
}
private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, String schema,
boolean useSchemaFromCommitMetadata) {
boolean useSchemaFromCommitMetadata) {
if (useSchemaFromCommitMetadata) {
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema);
}
@@ -374,7 +374,7 @@ public class HiveTestUtil {
}
}
private static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(instantTime));
@@ -383,6 +383,11 @@ public class HiveTestUtil {
fsout.close();
}
public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
createCommitFile(commitMetadata, instantTime);
}
private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime)
throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
@@ -406,4 +411,4 @@ public class HiveTestUtil {
public static Set<String> getCreatedTablesSet() {
return createdTablesSet;
}
}
}

View File

@@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.testutils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hive.service.server.HiveServer2;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.runners.model.InitializationError;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.fail;
public class TestCluster implements BeforeAllCallback, AfterAllCallback,
BeforeEachCallback, AfterEachCallback {
private HdfsTestService hdfsTestService;
public HiveTestService hiveTestService;
private Configuration conf;
public HiveServer2 server2;
private static volatile int port = 9083;
public MiniDFSCluster dfsCluster;
DateTimeFormatter dtfOut;
public File hiveSiteXml;
private IMetaStoreClient client;
@Override
public void beforeAll(ExtensionContext context) throws Exception {
setup();
}
@Override
public void afterAll(ExtensionContext context) throws Exception {
shutDown();
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
}
public void setup() throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
conf = hdfsTestService.getHadoopConf();
conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, port++);
conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port++);
conf.setInt(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, port++);
hiveTestService = new HiveTestService(conf);
server2 = hiveTestService.start();
dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
hiveSiteXml = File.createTempFile("hive-site", ".xml");
hiveSiteXml.deleteOnExit();
try (OutputStream os = new FileOutputStream(hiveSiteXml)) {
hiveTestService.getServerConf().writeXml(os);
}
client = HiveMetaStoreClient.newSynchronizedClient(
RetryingMetaStoreClient.getProxy(hiveTestService.getServerConf(), true));
}
public Configuration getConf() {
return this.conf;
}
public String getHiveSiteXmlLocation() {
return hiveSiteXml.getAbsolutePath();
}
public IMetaStoreClient getHMSClient() {
return client;
}
public String getHiveJdBcUrl() {
return "jdbc:hive2://127.0.0.1:" + conf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + "";
}
public String tablePath(String dbName, String tableName) throws Exception {
return dbPath(dbName) + "/" + tableName;
}
private String dbPath(String dbName) throws Exception {
return dfsCluster.getFileSystem().getWorkingDirectory().toString() + "/" + dbName;
}
public void forceCreateDb(String dbName) throws Exception {
try {
getHMSClient().dropDatabase(dbName);
} catch (NoSuchObjectException e) {
System.out.println("db does not exist but its ok " + dbName);
}
Database db = new Database(dbName, "", dbPath(dbName), new HashMap<>());
getHMSClient().createDatabase(db);
}
public void createCOWTable(String commitTime, int numberOfPartitions, String dbName, String tableName)
throws Exception {
String tablePathStr = tablePath(dbName, tableName);
Path path = new Path(tablePathStr);
FileIOUtils.deleteDirectory(new File(path.toString()));
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(conf, path.toString());
boolean result = dfsCluster.getFileSystem().mkdirs(path);
if (!result) {
throw new InitializationError("cannot initialize table");
}
DateTime dateTime = DateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime, path.toString());
createCommitFile(commitMetadata, commitTime, path.toString());
}
private void createCommitFile(HoodieCommitMetadata commitMetadata, String commitTime, String basePath) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(commitTime));
FSDataOutputStream fsout = dfsCluster.getFileSystem().create(fullPath, true);
fsout.write(bytes);
fsout.close();
}
private HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
DateTime startFrom, String commitTime, String basePath) throws IOException, URISyntaxException {
startFrom = startFrom.withTimeAtStartOfDay();
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (int i = 0; i < numberOfPartitions; i++) {
String partitionPath = dtfOut.print(startFrom);
Path partPath = new Path(basePath + "/" + partitionPath);
dfsCluster.getFileSystem().makeQualified(partPath);
dfsCluster.getFileSystem().mkdirs(partPath);
List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, commitTime);
startFrom = startFrom.minusDays(1);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
}
return commitMetadata;
}
private List<HoodieWriteStat> createTestData(Path partPath, boolean isParquetSchemaSimple, String commitTime)
throws IOException, URISyntaxException {
List<HoodieWriteStat> writeStats = new ArrayList<>();
for (int i = 0; i < 5; i++) {
// Create 5 files
String fileId = UUID.randomUUID().toString();
Path filePath = new Path(partPath.toString() + "/" + FSUtils
.makeDataFileName(commitTime, "1-0-1", fileId));
generateParquetData(filePath, isParquetSchemaSimple);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
writeStat.setPath(filePath.toString());
writeStats.add(writeStat);
}
return writeStats;
}
@SuppressWarnings({"unchecked", "deprecation"})
private void generateParquetData(Path filePath, boolean isParquetSchemaSimple)
throws IOException, URISyntaxException {
Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
BloomFilterTypeCode.SIMPLE.name());
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter);
ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024,
ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, dfsCluster.getFileSystem().getConf());
List<IndexedRecord> testRecords = (isParquetSchemaSimple ? SchemaTestUtil.generateTestRecords(0, 100)
: SchemaTestUtil.generateEvolvedTestRecords(100, 100));
testRecords.forEach(s -> {
try {
writer.write(s);
} catch (IOException e) {
fail("IOException while writing test records as parquet" + e.toString());
}
});
writer.close();
}
public HiveConf getHiveConf() {
return server2.getHiveConf();
}
public void stopHiveServer2() {
if (server2 != null) {
server2.stop();
server2 = null;
}
}
public void startHiveServer2() {
if (server2 == null) {
server2 = new HiveServer2();
server2.init(hiveTestService.getServerConf());
server2.start();
}
}
public void shutDown() {
stopHiveServer2();
client.close();
hiveTestService.getHiveMetaStore().stop();
hdfsTestService.stop();
}
}

View File

@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.testutils;
import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig;
import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHiveSyncGlobalCommitTool {
TestCluster localCluster;
TestCluster remoteCluster;
private static String DB_NAME = "foo";
private static String TBL_NAME = "bar";
private HiveSyncGlobalCommitConfig getGlobalCommitConfig(
String commitTime, String dbName, String tblName) throws Exception {
HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig();
config.properties.setProperty(LOCAL_HIVE_SITE_URI, localCluster.getHiveSiteXmlLocation());
config.properties.setProperty(REMOTE_HIVE_SITE_URI, remoteCluster.getHiveSiteXmlLocation());
config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, localCluster.getHiveJdBcUrl());
config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, remoteCluster.getHiveJdBcUrl());
config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName));
config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName));
config.globallyReplicatedTimeStamp = commitTime;
config.hiveUser = System.getProperty("user.name");
config.hivePass = "";
config.databaseName = dbName;
config.tableName = tblName;
config.basePath = localCluster.tablePath(dbName, tblName);
config.assumeDatePartitioning = true;
config.usePreApacheInputFormat = false;
config.partitionFields = Collections.singletonList("datestr");
return config;
}
private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception {
assertEquals(localCluster.getHMSClient().getTable(config.databaseName, config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
remoteCluster.getHMSClient().getTable(config.databaseName, config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
"compare replicated timestamps");
}
@BeforeEach
public void setUp() throws Exception {
localCluster = new TestCluster();
localCluster.setup();
remoteCluster = new TestCluster();
remoteCluster.setup();
localCluster.forceCreateDb(DB_NAME);
remoteCluster.forceCreateDb(DB_NAME);
localCluster.dfsCluster.getFileSystem().delete(new Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
remoteCluster.dfsCluster.getFileSystem().delete(new Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
}
@AfterEach
public void clear() throws Exception {
localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
localCluster.shutDown();
remoteCluster.shutDown();
}
@Test
public void testBasicGlobalCommit() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
assertTrue(tool.commit());
compareEqualLastReplicatedTimeStamp(config);
}
@Test
public void testBasicRollback() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
// stop the remote cluster hive server to simulate cluster going down
remoteCluster.stopHiveServer2();
assertFalse(tool.commit());
assertEquals(commitTime, localCluster.getHMSClient()
.getTable(config.databaseName, config.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
assertTrue(tool.rollback()); // do a rollback
assertNotEquals(commitTime, localCluster.getHMSClient()
.getTable(config.databaseName, config.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
remoteCluster.startHiveServer2();
}
}