From 0fb8556b0d9274aef650a46bb82a8cf495d4450b Mon Sep 17 00:00:00 2001 From: s-sanjay Date: Fri, 25 Jun 2021 08:56:26 +0530 Subject: [PATCH] Add ability to provide multi-region (global) data consistency across HMS in different regions (#2542) [global-hive-sync-tool] Add a global hive sync tool to sync hudi table across clusters. Add a way to rollback the replicated time stamp if we fail to sync or if we partly sync Co-authored-by: Jagmeet Bali --- .../bootstrap/index/BootstrapIndex.java | 6 +- .../common/table/HoodieTableMetaClient.java | 4 +- .../hudi/common/util/ReflectionUtils.java | 2 +- .../apache/hudi/hadoop/InputPathHandler.java | 9 +- .../hudi/hadoop/utils/HoodieHiveUtils.java | 8 +- .../hadoop/utils/HoodieInputFormatUtils.java | 1 + ...nsistentTimeStampFilteringInputFormat.java | 121 ++++++++ .../hadoop/TestHoodieParquetInputFormat.java | 4 +- .../hudi/hadoop/TestInputPathHandler.java | 17 ++ hudi-sync/hudi-hive-sync/pom.xml | 10 + .../run_hive_global_commit_tool.sh | 69 +++++ .../org/apache/hudi/hive/HiveSyncConfig.java | 1 + .../org/apache/hudi/hive/HiveSyncTool.java | 12 +- .../apache/hudi/hive/HoodieHiveClient.java | 80 ++++- .../replication/GlobalHiveSyncConfig.java | 54 ++++ .../hive/replication/GlobalHiveSyncTool.java | 105 +++++++ .../replication/HiveSyncGlobalCommit.java | 37 +++ .../HiveSyncGlobalCommitConfig.java | 98 ++++++ .../replication/HiveSyncGlobalCommitTool.java | 136 +++++++++ .../replication/ReplicationStateSync.java | 90 ++++++ .../hive/TestHiveSyncGlobalCommitTool.java | 128 ++++++++ .../apache/hudi/hive/TestHiveSyncTool.java | 286 +++++++++++++++++- .../hudi/hive/testutils/HiveTestService.java | 34 ++- .../hudi/hive/testutils/HiveTestUtil.java | 59 ++-- .../hudi/hive/testutils/TestCluster.java | 271 +++++++++++++++++ .../TestHiveSyncGlobalCommitTool.java | 133 ++++++++ .../hive-global-commit-config.xml | 27 ++ 27 files changed, 1731 insertions(+), 71 deletions(-) create mode 100644 hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java create mode 100755 hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java create mode 100644 hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java create mode 100644 hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java create mode 100644 hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java index 6aafeca53..abd3ac51a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java @@ -21,12 +21,12 @@ package org.apache.hudi.common.bootstrap.index; import org.apache.hudi.common.model.BootstrapFileMapping; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ReflectionUtils; import java.io.Serializable; import java.util.List; import java.util.Map; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.ReflectionUtils; /** * Bootstrap Index Interface. @@ -161,6 +161,6 @@ public abstract class BootstrapIndex implements Serializable { public static BootstrapIndex getBootstrapIndex(HoodieTableMetaClient metaClient) { return ((BootstrapIndex)(ReflectionUtils.loadClass( - metaClient.getTableConfig().getBootstrapIndexClass(), metaClient))); + metaClient.getTableConfig().getBootstrapIndexClass(), new Class[]{HoodieTableMetaClient.class}, metaClient))); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index cf66f1674..3285a00ca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -98,8 +98,8 @@ public class HoodieTableMetaClient implements Serializable { private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, - ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, - String payloadClassName) { + ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, + String payloadClassName) { LOG.info("Loading HoodieTableMetaClient from " + basePath); this.consistencyGuardConfig = consistencyGuardConfig; this.hadoopConf = new SerializableConfiguration(conf); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 57d11d0b2..2895f46db 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -87,7 +87,7 @@ public class ReflectionUtils { try { return getClass(clazz).getConstructor(constructorArgTypes).newInstance(constructorArgs); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new HoodieException("Unable to instantiate class ", e); + throw new HoodieException("Unable to instantiate class " + clazz, e); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java index 0a5055a05..f7adf38e4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java @@ -39,6 +39,14 @@ import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaCl * InputPathHandler takes in a set of input paths and incremental tables list. Then, classifies the * input paths to incremental, snapshot paths and non-hoodie paths. This is then accessed later to * mutate the JobConf before processing incremental mode queries and snapshot queries. + * + * Note: We are adding jobConf of a mapreduce or spark job. The properties in the jobConf are two + * type: session properties and table properties from metastore. While session property is common + * for all the tables in a query the table properties are unique per table so there is no need to + * check if it belongs to the table for which the path handler is now instantiated. The jobConf has + * all table properties such as name, last modification time and so on which are unique to a table. + * This class is written in such a way that it can handle multiple tables and properties unique to + * a table but for table level property such check is not required. */ public class InputPathHandler { @@ -63,7 +71,6 @@ public class InputPathHandler { /** * Takes in the original InputPaths and classifies each of them into incremental, snapshot and * non-hoodie InputPaths. The logic is as follows: - * * 1. Check if an inputPath starts with the same basepath as any of the metadata basepaths we know * 1a. If yes, this belongs to a Hoodie table that we already know about. Simply classify this * as incremental or snapshot - We can get the table name of this inputPath from the diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java index d9983bd65..d5d9d9cf5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java @@ -18,13 +18,14 @@ package org.apache.hudi.hadoop.utils; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.JobContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -73,6 +74,7 @@ public class HoodieHiveUtils { public static final int MAX_COMMIT_ALL = -1; public static final int DEFAULT_LEVELS_TO_BASEPATH = 3; public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode"); + public static final String GLOBALLY_CONSISTENT_READ_TIMESTAMP = "last_replication_timestamp"; public static boolean stopAtCompaction(JobContext job, String tableName) { String compactionPropName = String.format(HOODIE_STOP_AT_COMPACTION_PATTERN, tableName); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index b39ee349a..26fbdda29 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -442,6 +442,7 @@ public class HoodieInputFormatUtils { } HoodieTimeline timeline = HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), job, metaClient); + HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient -> FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline)); List filteredBaseFiles = new ArrayList<>(); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java new file mode 100644 index 000000000..50e4a6ed7 --- /dev/null +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestGloballyConsistentTimeStampFilteringInputFormat + extends TestHoodieParquetInputFormat { + + @BeforeEach + public void setUp() { + super.setUp(); + } + + @Test + public void testInputFormatLoad() throws IOException { + super.testInputFormatLoad(); + + // set filtering timestamp to 0 now the timeline wont have any commits. + InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "0"); + + Assertions.assertThrows(HoodieIOException.class, () -> inputFormat.getSplits(jobConf, 10)); + Assertions.assertThrows(HoodieIOException.class, () -> inputFormat.listStatus(jobConf)); + } + + @Test + public void testInputFormatUpdates() throws IOException { + super.testInputFormatUpdates(); + + // set the globally replicated timestamp to 199 so only 100 is read and update is ignored. + InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "100"); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + + ensureFilesInCommit("5 files have been updated to commit 200. but should get filtered out ", + files,"200", 0); + ensureFilesInCommit("We should see 10 files from commit 100 ", files, "100", 10); + } + + @Override + public void testIncrementalSimple() throws IOException { + // setting filtering timestamp to zero should not in any way alter the result of the test which + // pulls in zero files due to incremental ts being the actual commit time + jobConf.set(HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP, "0"); + super.testIncrementalSimple(); + } + + @Override + public void testIncrementalWithMultipleCommits() throws IOException { + super.testIncrementalWithMultipleCommits(); + + // set globally replicated timestamp to 400 so commits from 500, 600 does not show up + InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "400"); + InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtils.MAX_COMMIT_ALL); + + FileStatus[] files = inputFormat.listStatus(jobConf); + + assertEquals( + 5, files.length,"Pulling ALL commits from 100, should get us the 3 files from 400 commit, 1 file from 300 " + + "commit and 1 file from 200 commit"); + ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files from 400 commit", + files, "400", 3); + ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 300 commit", + files, "300", 1); + ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files from 200 commit", + files, "200", 1); + + List commits = Arrays.asList("100", "200", "300", "400", "500", "600"); + for (int idx = 0; idx < commits.size(); ++idx) { + for (int jdx = 0; jdx < commits.size(); ++jdx) { + InputFormatTestUtil.setupIncremental(jobConf, commits.get(idx), HoodieHiveUtils.MAX_COMMIT_ALL); + InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, commits.get(jdx)); + + files = inputFormat.listStatus(jobConf); + + if (jdx <= idx) { + assertEquals(0, files.length,"all commits should be filtered"); + } else { + // only commits upto the timestamp is allowed + for (FileStatus file : files) { + String commitTs = FSUtils.getCommitTime(file.getPath().getName()); + assertTrue(commits.indexOf(commitTs) <= jdx); + assertTrue(commits.indexOf(commitTs) > idx); + } + } + } + } + } +} diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index c4fed987d..c45c61467 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -65,8 +65,8 @@ import static org.junit.jupiter.api.Assertions.fail; public class TestHoodieParquetInputFormat { - private HoodieParquetInputFormat inputFormat; - private JobConf jobConf; + protected HoodieParquetInputFormat inputFormat; + protected JobConf jobConf; private final HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET; private final String baseFileExtension = baseFileFormat.getFileExtension(); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java index 3a8b19744..0287318e4 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java @@ -23,10 +23,12 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.jupiter.api.AfterAll; @@ -169,6 +171,21 @@ public class TestInputPathHandler { assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths)); } + @Test + public void testInputPathHandlerWithGloballyReplicatedTimeStamp() throws IOException { + JobConf jobConf = new JobConf(); + jobConf.set(HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP, "1"); + inputPathHandler = new InputPathHandler(dfs.getConf(), inputPaths.toArray( + new Path[inputPaths.size()]), incrementalTables); + List actualPaths = inputPathHandler.getGroupedIncrementalPaths().values().stream() + .flatMap(List::stream).collect(Collectors.toList()); + assertTrue(actualComparesToExpected(actualPaths, incrementalPaths)); + actualPaths = inputPathHandler.getSnapshotPaths(); + assertTrue(actualComparesToExpected(actualPaths, snapshotPaths)); + actualPaths = inputPathHandler.getNonHoodieInputPaths(); + assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths)); + } + private boolean actualComparesToExpected(List actualPaths, List expectedPaths) { if (actualPaths.size() != expectedPaths.size()) { return false; diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml index c44f785e4..fd6302895 100644 --- a/hudi-sync/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -30,6 +30,8 @@ ${project.parent.basedir} + + 7.6.0.v20120127 @@ -148,6 +150,14 @@ test + + + org.eclipse.jetty.aggregate + jetty-all + test + ${jetty.version} + + org.junit.jupiter junit-jupiter-api diff --git a/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh b/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh new file mode 100755 index 000000000..b7e6cd203 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# A tool to sync the hudi table to hive from different clusters. Similar to HiveSyncTool but syncs it to more +# than one hive cluster ( currently a local and remote cluster). The common timestamp that was synced is stored as a new table property +# This is most useful when we want to ensure that across different hive clusters we want ensure consistent reads. If that is not a requirement +# then it is better to run HiveSyncTool separately. +# Note: +# The tool tries to be transactional but does not guarantee it. If the sync fails midway in one cluster it will try to roll back the committed +# timestamp from already successful sync on other clusters but that can also fail. +# The tool does not roll back any synced partitions but only the timestamp. + +function error_exit { + echo "$1" >&2 ## Send message to stderr. Exclude >&2 if you don't want it that way. + exit "${2:-1}" ## Return a code specified by $2 or 1 by default. +} + +if [ -z "${HADOOP_HOME}" ]; then + error_exit "Please make sure the environment variable HADOOP_HOME is setup" +fi + +if [ -z "${HIVE_HOME}" ]; then + error_exit "Please make sure the environment variable HIVE_HOME is setup" +fi + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +#Ensure we pick the right jar even for hive11 builds +HUDI_HIVE_UBER_JAR=`ls -c $DIR/../packaging/hudi-hive-bundle/target/hudi-hive-*.jar | grep -v source | head -1` + +if [ -z "$HADOOP_CONF_DIR" ]; then + echo "setting hadoop conf dir" + HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop" +fi + +## Include only specific packages from HIVE_HOME/lib to avoid version mismatches +HIVE_EXEC=`ls ${HIVE_HOME}/lib/hive-exec-*.jar | tr '\n' ':'` +HIVE_SERVICE=`ls ${HIVE_HOME}/lib/hive-service-*.jar | grep -v rpc | tr '\n' ':'` +HIVE_METASTORE=`ls ${HIVE_HOME}/lib/hive-metastore-*.jar | tr '\n' ':'` +HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | tr '\n' ':'` +if [ -z "${HIVE_JDBC}" ]; then + HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' ':'` +fi +HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'` +HIVE_NUCLEUS=`ls ${HIVE_HOME}/lib/datanucleus*.jar | tr '\n' ':'` +HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON:$HIVE_NUCLEUS + +HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/* + +if ! [ -z "$HIVE_CONF_DIR" ]; then + error_exit "Don't set HIVE_CONF_DIR; use config xml file" +fi + +echo "Running Command : java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/* org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool $@" +java -cp $HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/* org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool "$@" diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index e4e796295..41c419d06 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -104,6 +104,7 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing") public Boolean decodePartition = false; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); newConfig.basePath = cfg.basePath; diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 0dbe97f9b..7264c8dff 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -58,10 +58,10 @@ public class HiveSyncTool extends AbstractSyncTool { public static final String SUFFIX_SNAPSHOT_TABLE = "_rt"; public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro"; - private final HiveSyncConfig cfg; - private HoodieHiveClient hoodieHiveClient = null; - private String snapshotTableName = null; - private Option roTableName = null; + protected final HiveSyncConfig cfg; + protected HoodieHiveClient hoodieHiveClient = null; + protected String snapshotTableName = null; + protected Option roTableName = null; public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { super(configuration.getAllProperties(), fs); @@ -127,8 +127,8 @@ public class HiveSyncTool extends AbstractSyncTool { } } } - - private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, + + protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) { LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath() + " of type " + hoodieHiveClient.getTableType()); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 3ae94dd6c..9d0214595 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -18,12 +18,6 @@ package org.apache.hudi.hive; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -32,15 +26,22 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -60,6 +61,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP; + public class HoodieHiveClient extends AbstractSyncHoodieClient { private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync"; @@ -402,7 +405,19 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { closeQuietly(null, stmt); } } else { - updateHiveSQLUsingHiveDriver(s); + CommandProcessorResponse response = updateHiveSQLUsingHiveDriver(s); + if (response == null) { + throw new HoodieHiveSyncException("Failed in executing SQL null response" + s); + } + if (response.getResponseCode() != 0) { + LOG.error(String.format("Failure in SQL response %s", response.toString())); + if (response.getException() != null) { + throw new HoodieHiveSyncException( + String.format("Failed in executing SQL %s", s), response.getException()); + } else { + throw new HoodieHiveSyncException(String.format("Failed in executing SQL %s", s)); + } + } } } @@ -476,13 +491,58 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { } } + public Option getLastReplicatedTime(String tableName) { + // Get the last replicated time from the TBLproperties + try { + Table database = client.getTable(syncConfig.databaseName, tableName); + return Option.ofNullable(database.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP, null)); + } catch (NoSuchObjectException e) { + LOG.warn("the said table not found in hms " + syncConfig.databaseName + "." + tableName); + return Option.empty(); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to get the last replicated time from the database", e); + } + } + + public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) { + if (!activeTimeline.filterCompletedInstants().getInstants() + .anyMatch(i -> i.getTimestamp().equals(timeStamp))) { + throw new HoodieHiveSyncException( + "Not a valid completed timestamp " + timeStamp + " for table " + tableName); + } + try { + Table table = client.getTable(syncConfig.databaseName, tableName); + table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp); + client.alter_table(syncConfig.databaseName, tableName, table); + } catch (Exception e) { + throw new HoodieHiveSyncException( + "Failed to update last replicated time to " + timeStamp + " for " + tableName, e); + } + } + + public void deleteLastReplicatedTimeStamp(String tableName) { + try { + Table table = client.getTable(syncConfig.databaseName, tableName); + String timestamp = table.getParameters().remove(GLOBALLY_CONSISTENT_READ_TIMESTAMP); + client.alter_table(syncConfig.databaseName, tableName, table); + if (timestamp != null) { + LOG.info("deleted last replicated timestamp " + timestamp + " for table " + tableName); + } + } catch (NoSuchObjectException e) { + // this is ok the table doesn't even exist. + } catch (Exception e) { + throw new HoodieHiveSyncException( + "Failed to delete last replicated timestamp for " + tableName, e); + } + } + public void close() { try { if (connection != null) { connection.close(); } if (client != null) { - Hive.closeCurrent(); + client.close(); client = null; } } catch (SQLException e) { @@ -506,4 +566,4 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e); } } -} \ No newline at end of file +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java new file mode 100644 index 000000000..19074c800 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.replication; + +import com.beust.jcommander.Parameter; +import org.apache.hudi.hive.HiveSyncConfig; + +public class GlobalHiveSyncConfig extends HiveSyncConfig { + @Parameter(names = {"--replicated-timestamp"}, description = "Add globally replicated timestamp to enable consistent reads across clusters") + public String globallyReplicatedTimeStamp; + + public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) { + GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig(); + newConfig.basePath = cfg.basePath; + newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning; + newConfig.databaseName = cfg.databaseName; + newConfig.hivePass = cfg.hivePass; + newConfig.hiveUser = cfg.hiveUser; + newConfig.partitionFields = cfg.partitionFields; + newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass; + newConfig.jdbcUrl = cfg.jdbcUrl; + newConfig.tableName = cfg.tableName; + newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; + newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata; + newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing; + newConfig.supportTimestamp = cfg.supportTimestamp; + newConfig.decodePartition = cfg.decodePartition; + newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp; + return newConfig; + } + + @Override + public String toString() { + return "GlobalHiveSyncConfig{" + super.toString() + + " globallyReplicatedTimeStamp=" + globallyReplicatedTimeStamp + "}"; + } + +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java new file mode 100644 index 000000000..19c23b701 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.replication; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.InvalidTableException; +import org.apache.hudi.hive.HiveSyncTool; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +public class GlobalHiveSyncTool extends HiveSyncTool { + + private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class); + + public GlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { + super(cfg, configuration, fs); + } + + @Override + public void syncHoodieTable() { + switch (hoodieHiveClient.getTableType()) { + case COPY_ON_WRITE: + syncHoodieTable(snapshotTableName, false, false); + break; + case MERGE_ON_READ: + // sync a RO table for MOR + syncHoodieTable(roTableName.get(), false, true); + // sync a RT table for MOR + syncHoodieTable(snapshotTableName, true, false); + break; + default: + LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); + throw new InvalidTableException(hoodieHiveClient.getBasePath()); + } + } + + @Override + protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) { + super.syncHoodieTable(tableName, useRealtimeInputFormat, readAsOptimized); + if (((GlobalHiveSyncConfig)cfg).globallyReplicatedTimeStamp != null) { + hoodieHiveClient.updateLastReplicatedTimeStamp(tableName, + ((GlobalHiveSyncConfig) cfg).globallyReplicatedTimeStamp); + } + LOG.info("Sync complete for " + tableName); + } + + public void close() { + hoodieHiveClient.close(); + } + + public Map> getLastReplicatedTimeStampMap() { + Map> timeStampMap = new HashMap<>(); + Option timeStamp = hoodieHiveClient.getLastReplicatedTime(snapshotTableName); + timeStampMap.put(snapshotTableName, timeStamp); + if (HoodieTableType.MERGE_ON_READ.equals(hoodieHiveClient.getTableType())) { + Option roTimeStamp = hoodieHiveClient.getLastReplicatedTime(roTableName.get()); + timeStampMap.put(roTableName.get(), roTimeStamp); + } + return timeStampMap; + } + + public void setLastReplicatedTimeStamp(Map> timeStampMap) { + for (String tableName : timeStampMap.keySet()) { + Option timestamp = timeStampMap.get(tableName); + if (timestamp.isPresent()) { + hoodieHiveClient.updateLastReplicatedTimeStamp(tableName, timestamp.get()); + LOG.info("updated timestamp for " + tableName + " to: " + timestamp.get()); + } else { + hoodieHiveClient.deleteLastReplicatedTimeStamp(tableName); + LOG.info("deleted timestamp for " + tableName); + } + } + } + + public static GlobalHiveSyncTool buildGlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf hiveConf) { + FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration()); + hiveConf.addResource(fs.getConf()); + return new GlobalHiveSyncTool(cfg, hiveConf, fs); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java new file mode 100644 index 000000000..ad8d03dba --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.replication; + +/** + * A interface to allow syncing the Hudi table to all clusters. + */ +public interface HiveSyncGlobalCommit { + + /** + * + * @return whether the commit succeeded to all the clusters. + */ + boolean commit(); + + /** + * + * @return boolean whether the rollback succeeded to all the clusters. + */ + boolean rollback(); +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java new file mode 100644 index 000000000..bce84e9fc --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.replication; + +import com.beust.jcommander.Parameter; + +import com.beust.jcommander.Parameters; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Properties; +import org.apache.hudi.common.util.StringUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +// TODO: stop extending HiveSyncConfig and take all the variables needed from config file +@Parameters(commandDescription = "A tool to sync the hudi table to hive from different clusters. Similar to HiveSyncTool but syncs it to more" + + "than one hive cluster ( currently a local and remote cluster). The common timestamp that was synced is stored as a new table property " + + "This is most useful when we want to ensure that across different hive clusters we want ensure consistent reads. If that is not a requirement" + + "then it is better to run HiveSyncTool separately." + + "Note: " + + " The tool tries to be transactional but does not guarantee it. If the sync fails midway in one cluster it will try to roll back the committed " + + " timestamp from already successful sync on other clusters but that can also fail." + + " The tool does not roll back any synced partitions but only the timestamp.") +public class HiveSyncGlobalCommitConfig extends GlobalHiveSyncConfig { + + private static final Logger LOG = LogManager.getLogger(HiveSyncGlobalCommitConfig.class); + + public static String LOCAL_HIVE_SITE_URI = "hivesyncglobal.local_hive_site_uri"; + public static String REMOTE_HIVE_SITE_URI = "hivesyncglobal.remote_hive_site_uri"; + public static String CONFIG_FILE_URI = "hivesyncglobal.config_file_uri"; + public static String REMOTE_BASE_PATH = "hivesyncglobal.remote_base_path"; + public static String LOCAL_BASE_PATH = "hivesyncglobal.local_base_path"; + public static String RETRY_ATTEMPTS = "hivesyncglobal.retry_attempts"; + public static String REMOTE_HIVE_SERVER_JDBC_URLS = "hivesyncglobal.remote_hs2_jdbc_urls"; + public static String LOCAL_HIVE_SERVER_JDBC_URLS = "hivesyncglobal.local_hs2_jdbc_urls"; + + @Parameter(names = { + "--config-xml-file"}, description = "path to the config file in Hive", required = true) + public String configFile; + + public Properties properties = new Properties(); + + private boolean finalize = false; + + public void load() throws IOException { + if (finalize) { + throw new RuntimeException("trying to modify finalized config"); + } + finalize = true; + try (InputStream configStream = new FileInputStream(new File(configFile))) { + properties.loadFromXML(configStream); + } + if (StringUtils.isNullOrEmpty(globallyReplicatedTimeStamp)) { + throw new RuntimeException("globally replicated timestamp not set"); + } + } + + GlobalHiveSyncConfig mkGlobalHiveSyncConfig(boolean forRemote) { + GlobalHiveSyncConfig cfg = GlobalHiveSyncConfig.copy(this); + cfg.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH) + : properties.getProperty(LOCAL_BASE_PATH, cfg.basePath); + cfg.jdbcUrl = forRemote ? properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS) + : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.jdbcUrl); + LOG.info("building hivesync config forRemote: " + forRemote + " " + cfg.jdbcUrl + " " + + cfg.basePath); + return cfg; + } + + @Override + public String toString() { + return "HiveSyncGlobalCommitConfig{ " + "configFile=" + configFile + ", properties=" + + properties + ", " + super.toString() + + " }"; + } + + public void storeToXML(OutputStream configStream) throws IOException { + this.properties.storeToXML(configStream, "hivesync global config"); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java new file mode 100644 index 000000000..a194eeb2e --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.replication; + +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI; + +import com.beust.jcommander.JCommander; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; + +import org.apache.hudi.hive.HoodieHiveSyncException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public class HiveSyncGlobalCommitTool implements HiveSyncGlobalCommit, AutoCloseable { + + private static final Logger LOG = LogManager.getLogger(HiveSyncGlobalCommitTool.class); + private final HiveSyncGlobalCommitConfig config; + private List replicationStateSyncList; + + private ReplicationStateSync getReplicatedState(boolean forRemote) { + HiveConf hiveConf = new HiveConf(); + // we probably just need to set the metastore URIs + // TODO: figure out how to integrate this in production + // how to load balance between piper HMS,HS2 + // if we have list of uris, we can do something similar to createHiveConf in reairsync + hiveConf.addResource(new Path(config.properties.getProperty( + forRemote ? REMOTE_HIVE_SITE_URI : LOCAL_HIVE_SITE_URI))); + // TODO: get clusterId as input parameters + ReplicationStateSync state = new ReplicationStateSync(config.mkGlobalHiveSyncConfig(forRemote), + hiveConf, forRemote ? "REMOTESYNC" : "LOCALSYNC"); + return state; + } + + @Override + public boolean commit() { + // TODO: add retry attempts + String name = Thread.currentThread().getName(); + try { + for (ReplicationStateSync stateSync : replicationStateSyncList) { + Thread.currentThread().setName(stateSync.getClusterId()); + LOG.info("starting sync for state " + stateSync); + stateSync.sync(); + LOG.info("synced state " + stateSync); + } + } catch (Exception e) { + Thread.currentThread().setName(name); + LOG.error(String.format("Error while trying to commit replication state %s", e.getMessage()), e); + return false; + } finally { + Thread.currentThread().setName(name); + } + + LOG.info("done syncing to all tables, verifying the timestamps..."); + ReplicationStateSync base = replicationStateSyncList.get(0); + boolean success = true; + LOG.info("expecting all timestamps to be similar to: " + base); + for (int idx = 1; idx < replicationStateSyncList.size(); ++idx) { + ReplicationStateSync other = replicationStateSyncList.get(idx); + if (!base.replicationStateIsInSync(other)) { + LOG.error("the timestamp of other : " + other + " is not matching with base: " + base); + success = false; + } + } + return success; + } + + @Override + public boolean rollback() { + for (ReplicationStateSync stateSync : replicationStateSyncList) { + stateSync.rollback(); + } + return true; + } + + public HiveSyncGlobalCommitTool(HiveSyncGlobalCommitConfig config) { + this.config = config; + this.replicationStateSyncList = new ArrayList<>(2); + this.replicationStateSyncList.add(getReplicatedState(false)); + this.replicationStateSyncList.add(getReplicatedState(true)); + } + + private static HiveSyncGlobalCommitConfig getHiveSyncGlobalCommitConfig(String[] args) + throws IOException { + HiveSyncGlobalCommitConfig cfg = new HiveSyncGlobalCommitConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + cfg.load(); + return cfg; + } + + @Override + public void close() { + for (ReplicationStateSync stateSync : replicationStateSyncList) { + stateSync.close(); + } + } + + public static void main(String[] args) throws IOException, HoodieHiveSyncException { + final HiveSyncGlobalCommitConfig cfg = getHiveSyncGlobalCommitConfig(args); + try (final HiveSyncGlobalCommitTool globalCommitTool = new HiveSyncGlobalCommitTool(cfg)) { + boolean success = globalCommitTool.commit(); + if (!success) { + if (!globalCommitTool.rollback()) { + throw new RuntimeException("not able to rollback failed commit"); + } + } + } catch (Exception e) { + throw new HoodieHiveSyncException( + "not able to commit replicated timestamp", e); + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java new file mode 100644 index 000000000..bf806fe4b --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.replication; + +import java.util.Map; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hudi.common.util.Option; + +public class ReplicationStateSync { + + private GlobalHiveSyncTool globalHiveSyncTool; + private final GlobalHiveSyncConfig globalHiveSyncConfig; + private final HiveConf hiveConf; + private Map> replicatedTimeStampMap; + private Map> oldReplicatedTimeStampMap; + private final String clusterId; + + ReplicationStateSync(GlobalHiveSyncConfig conf, HiveConf hiveConf, String uid) { + this.globalHiveSyncConfig = conf; + this.hiveConf = hiveConf; + initGlobalHiveSyncTool(); + replicatedTimeStampMap = globalHiveSyncTool.getLastReplicatedTimeStampMap(); + clusterId = uid; + } + + private void initGlobalHiveSyncTool() { + globalHiveSyncTool = GlobalHiveSyncTool.buildGlobalHiveSyncTool(globalHiveSyncConfig, hiveConf); + } + + public void sync() throws Exception { + // the cluster maybe down by the time we reach here so we refresh our replication + // state right before we set the oldReplicatedTimeStamp to narrow this window. this is a + // liveliness check right before we start. + replicatedTimeStampMap = globalHiveSyncTool.getLastReplicatedTimeStampMap(); + // it is possible sync fails midway and corrupts the table property therefore we should set + // the oldReplicatedTimeStampMap before the sync start so that we attempt to rollback + // this will help in scenario where sync failed due to some bug in hivesync but in case where + // cluster went down halfway through or before sync in this case rollback may also fail and + // that is ok and we want to be alerted to such scenarios. + oldReplicatedTimeStampMap = replicatedTimeStampMap; + globalHiveSyncTool.syncHoodieTable(); + replicatedTimeStampMap = globalHiveSyncTool.getLastReplicatedTimeStampMap(); + } + + public boolean rollback() { + if (oldReplicatedTimeStampMap != null) { + globalHiveSyncTool.setLastReplicatedTimeStamp(oldReplicatedTimeStampMap); + oldReplicatedTimeStampMap = null; + } + return true; + } + + public boolean replicationStateIsInSync(ReplicationStateSync other) { + return globalHiveSyncTool.getLastReplicatedTimeStampMap() + .equals(other.globalHiveSyncTool.getLastReplicatedTimeStampMap()); + } + + @Override + public String toString() { + return "{ clusterId: " + clusterId + " replicatedState: " + replicatedTimeStampMap + " }"; + } + + public String getClusterId() { + return clusterId; + } + + public void close() { + if (globalHiveSyncTool != null) { + globalHiveSyncTool.close(); + globalHiveSyncTool = null; + } + } + +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java new file mode 100644 index 000000000..937243393 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive; + +import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI; + +import java.util.Collections; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig; +import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool; +import org.apache.hudi.hive.testutils.TestCluster; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestHiveSyncGlobalCommitTool { + + @RegisterExtension + public static TestCluster localCluster = new TestCluster(); + @RegisterExtension + public static TestCluster remoteCluster = new TestCluster(); + + private static String DB_NAME = "foo"; + private static String TBL_NAME = "bar"; + + private HiveSyncGlobalCommitConfig getGlobalCommitConfig( + String commitTime, String dbName, String tblName) throws Exception { + HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig(); + config.properties.setProperty(LOCAL_HIVE_SITE_URI, localCluster.getHiveSiteXmlLocation()); + config.properties.setProperty(REMOTE_HIVE_SITE_URI, remoteCluster.getHiveSiteXmlLocation()); + config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, localCluster.getHiveJdBcUrl()); + config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, remoteCluster.getHiveJdBcUrl()); + config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName)); + config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName)); + config.globallyReplicatedTimeStamp = commitTime; + config.hiveUser = System.getProperty("user.name"); + config.hivePass = ""; + config.databaseName = dbName; + config.tableName = tblName; + config.basePath = localCluster.tablePath(dbName, tblName); + config.assumeDatePartitioning = true; + config.usePreApacheInputFormat = false; + config.partitionFields = Collections.singletonList("datestr"); + return config; + } + + private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception { + Assertions.assertEquals(localCluster.getHMSClient() + .getTable(config.databaseName, config.tableName).getParameters() + .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient() + .getTable(config.databaseName, config.tableName).getParameters() + .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated timestamps"); + } + + @BeforeEach + public void setUp() throws Exception { + localCluster.forceCreateDb(DB_NAME); + remoteCluster.forceCreateDb(DB_NAME); + localCluster.dfsCluster.getFileSystem().delete(new Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true); + remoteCluster.dfsCluster.getFileSystem().delete(new Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true); + } + + @AfterEach + public void clear() throws Exception { + localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME); + remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME); + } + + @Test + public void testBasicGlobalCommit() throws Exception { + String commitTime = "100"; + localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); + // simulate drs + remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); + HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME); + HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config); + Assertions.assertTrue(tool.commit()); + compareEqualLastReplicatedTimeStamp(config); + } + + @Test + public void testBasicRollback() throws Exception { + String commitTime = "100"; + localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); + // simulate drs + remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); + HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME); + HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config); + Assertions.assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); + Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); + // stop the remote cluster hive server to simulate cluster going down + remoteCluster.stopHiveServer2(); + Assertions.assertFalse(tool.commit()); + Assertions.assertEquals(commitTime, localCluster.getHMSClient() + .getTable(config.databaseName, config.tableName).getParameters() + .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP)); + Assertions.assertTrue(tool.rollback()); // do a rollback + Assertions.assertNotEquals(commitTime, localCluster.getHMSClient() + .getTable(config.databaseName, config.tableName).getParameters() + .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP)); + Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); + remoteCluster.startHiveServer2(); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 4324a64f7..e88c46a2d 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -18,22 +18,24 @@ package org.apache.hudi.hive; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; +import org.apache.hudi.hive.testutils.HiveTestUtil; import org.apache.hudi.hive.util.ConfigUtils; +import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; -import org.apache.hudi.hive.testutils.HiveTestUtil; -import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -48,9 +50,12 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -68,12 +73,12 @@ public class TestHiveSyncTool { } @BeforeEach - public void setUp() throws IOException, InterruptedException { + public void setUp() throws Exception { HiveTestUtil.setUp(); } @AfterEach - public void teardown() throws IOException { + public void teardown() throws Exception { HiveTestUtil.clear(); } @@ -246,6 +251,7 @@ public class TestHiveSyncTool { hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); List hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName); List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); + //writtenPartitionsSince.add(newPartition.get(0)); List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType, @@ -769,6 +775,136 @@ public class TestHiveSyncTool { "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); } + private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyCommitTime) throws Exception { + assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes"); + assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.getDataSchema().getColumns().size() + 1, + "Hive Schema should match the table schema + partition field"); + assertEquals(1, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),"Table partitions should match the number of partitions we wrote"); + assertEquals(emptyCommitTime, + hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),"The last commit that was sycned should be updated in the TBLPROPERTIES"); + + // make sure correct schema is picked + Schema schema = SchemaTestUtil.getSimpleSchema(); + for (Field field : schema.getFields()) { + assertEquals(field.schema().getType().getName(), + hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get(field.name()).toLowerCase(), + String.format("Hive Schema Field %s was added", field)); + } + assertEquals("string", + hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("datestr").toLowerCase(), "Hive Schema Field datestr was added"); + assertEquals(schema.getFields().size() + 1 + HoodieRecord.HOODIE_META_COLUMNS.size(), + hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),"Hive Schema fields size"); + } + + @ParameterizedTest + @MethodSource("useJdbc") + public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception { + HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + final String commitTime = "100"; + HiveTestUtil.createCOWTable(commitTime, 1, true); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + // create empty commit + final String emptyCommitTime = "200"; + HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime, true); + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + + HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + verifyOldParquetFileTest(hiveClient, emptyCommitTime); + } + + @ParameterizedTest + @MethodSource("useJdbc") + public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception { + HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + final String commitTime = "100"; + HiveTestUtil.createCOWTable(commitTime, 1, true); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + + // evolve the schema + DateTime dateTime = DateTime.now().plusDays(6); + String commitTime2 = "101"; + HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2); + + // create empty commit + final String emptyCommitTime = "200"; + HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime); + + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertFalse( + hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + + HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + + // now delete the evolved commit instant + Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + hiveClient.getActiveTimeline().getInstants() + .filter(inst -> inst.getTimestamp().equals(commitTime2)) + .findFirst().get().getFileName()); + assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false)); + + try { + tool.syncHoodieTable(); + } catch (RuntimeException e) { + // we expect the table sync to fail + } + + // table should not be synced yet + assertFalse( + hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist at all"); + } + + @ParameterizedTest + @MethodSource("useJdbc") + public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception { + HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + final String commitTime = "100"; + HiveTestUtil.createCOWTable(commitTime, 1, true); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + // create empty commit + final String emptyCommitTime = "200"; + HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime, true); + //HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime); + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertFalse( + hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + + HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + verifyOldParquetFileTest(hiveClient, emptyCommitTime); + + // evolve the schema + DateTime dateTime = DateTime.now().plusDays(6); + String commitTime2 = "301"; + HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2); + //HiveTestUtil.createCommitFileWithSchema(commitMetadata, "400", false); // create another empty commit + //HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another empty commit + + tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HoodieHiveClient hiveClientLatest = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + // now delete the evolved commit instant + Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + hiveClientLatest.getActiveTimeline().getInstants() + .filter(inst -> inst.getTimestamp().equals(commitTime2)) + .findFirst().get().getFileName()); + assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false)); + try { + tool.syncHoodieTable(); + } catch (RuntimeException e) { + // we expect the table sync to fail + } + + // old sync values should be left intact + verifyOldParquetFileTest(hiveClient, emptyCommitTime); + } + @ParameterizedTest @MethodSource("useJdbc") public void testTypeConverter(boolean useJdbc) throws Exception { @@ -807,5 +943,137 @@ public class TestHiveSyncTool { .containsValue("BIGINT"), errorMsg); hiveClient.updateHiveSQL(dropTableSql); } + /* + private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyCommitTime) throws Exception { + assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes"); + assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), + hiveClient.getDataSchema().getColumns().size() + 1, + "Hive Schema should match the table schema + partition field"); + assertEquals(1, + hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", emptyCommitTime, + hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get()); + // make sure correct schema is picked + Schema schema = SchemaTestUtil.getSimpleSchema(); + for (Field field : schema.getFields()) { + assertEquals(String.format("Hive Schema Field %s was added", field), field.schema().getType().getName(), + hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get(field.name()).toLowerCase()); + } + assertEquals("Hive Schema Field datestr was added", "string", + hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("datestr").toLowerCase()); + assertEquals(schema.getFields().size() + 1, + hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(), + "Hive Schema fields size"); + } + + @ParameterizedTest + @MethodSource("useJdbc") + public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception { + HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + final String commitTime = "100"; + HiveTestUtil.createCOWTable(commitTime, 1, false); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + // create empty commit + final String emptyCommitTime = "200"; + HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime); + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + + HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + verifyOldParquetFileTest(hiveClient, emptyCommitTime); + } + + @ParameterizedTest + @MethodSource("useJdbc") + public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception { + HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + final String commitTime = "100"; + HiveTestUtil.createCOWTable(commitTime, 1, false); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + + // evolve the schema + DateTime dateTime = DateTime.now().plusDays(6); + String commitTime2 = "101"; + HiveTestUtil.addCOWPartitions(1, false, false, dateTime, commitTime2); + + // create empty commit + final String emptyCommitTime = "200"; + HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime); + + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + + HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + + // now delete the evolved commit instant + Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + hiveClient.getActiveTimeline().getInstants() + .filter(inst -> inst.getTimestamp().equals(commitTime2)) + .findFirst().get().getFileName()); + assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false)); + + try { + tool.syncHoodieTable(); + } catch (RuntimeException e) { + // we expect the table sync to fail + } + + // table should not be synced yet + assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist at all"); + } + + @ParameterizedTest + @MethodSource("useJdbc") + public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception { + HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + final String commitTime = "100"; + HiveTestUtil.createCOWTable(commitTime, 1, false); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + // create empty commit + final String emptyCommitTime = "200"; + HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime); + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + + HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + verifyOldParquetFileTest(hiveClient, emptyCommitTime); + + // evolve the schema + DateTime dateTime = DateTime.now().plusDays(6); + String commitTime2 = "301"; + HiveTestUtil.addCOWPartitions(1, false, false, dateTime, commitTime2); + HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another empty commit + + tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + HoodieHiveClient hiveClientLatest = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + // now delete the evolved commit instant + Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + hiveClientLatest.getActiveTimeline().getInstants() + .filter(inst -> inst.getTimestamp().equals(commitTime2)) + .findFirst().get().getFileName()); + assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false)); + + try { + tool.syncHoodieTable(); + } catch (RuntimeException e) { + // we expect the table sync to fail + } + + // old sync values should be left intact + verifyOldParquetFileTest(hiveClient, emptyCommitTime); + }*/ } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java index ac083ab07..66343bfd1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IHMSHandler; @@ -78,6 +79,7 @@ public class HiveTestService { private ExecutorService executorService; private TServer tServer; private HiveServer2 hiveServer; + private HiveConf serverConf; public HiveTestService(Configuration hadoopConf) throws IOException { this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath(); @@ -88,6 +90,14 @@ public class HiveTestService { return hadoopConf; } + public TServer getHiveMetaStore() { + return tServer; + } + + public HiveConf getServerConf() { + return serverConf; + } + public HiveServer2 start() throws IOException { Objects.requireNonNull(workDir, "The work dir must be set before starting cluster."); @@ -102,10 +112,10 @@ public class HiveTestService { FileIOUtils.deleteDirectory(file); } - HiveConf serverConf = configureHive(hadoopConf, localHiveLocation); + serverConf = configureHive(hadoopConf, localHiveLocation); executorService = Executors.newSingleThreadExecutor(); - tServer = startMetaStore(bindIP, metastorePort, serverConf); + tServer = startMetaStore(bindIP, serverConf); serverConf.set("hive.in.test", "true"); hiveServer = startHiveServer(serverConf); @@ -116,7 +126,7 @@ public class HiveTestService { } else { serverHostname = bindIP; } - if (!waitForServerUp(serverConf, serverHostname, metastorePort, CONNECTION_TIMEOUT)) { + if (!waitForServerUp(serverConf, serverHostname, CONNECTION_TIMEOUT)) { throw new IOException("Waiting for startup of standalone server"); } @@ -163,9 +173,17 @@ public class HiveTestService { public HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException { conf.set("hive.metastore.local", "false"); - conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + metastorePort); + int port = metastorePort; + if (conf.get(HiveConf.ConfVars.METASTORE_SERVER_PORT.varname, null) == null) { + conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort); + } else { + port = conf.getInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort); + } + if (conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, null) == null) { + conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort); + } + conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + port); conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP); - conf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort); // The following line to turn of SASL has no effect since HiveAuthFactory calls // 'new HiveConf()'. This is fixed by https://issues.apache.org/jira/browse/HIVE-6657, // in Hive 0.14. @@ -191,8 +209,9 @@ public class HiveTestService { return new HiveConf(conf, this.getClass()); } - private boolean waitForServerUp(HiveConf serverConf, String hostname, int port, int timeout) { + private boolean waitForServerUp(HiveConf serverConf, String hostname, int timeout) { long start = System.currentTimeMillis(); + int port = serverConf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT); while (true) { try { new HiveMetaStoreClient(serverConf); @@ -288,11 +307,12 @@ public class HiveTestService { } } - public TServer startMetaStore(String forceBindIP, int port, HiveConf conf) throws IOException { + public TServer startMetaStore(String forceBindIP, HiveConf conf) throws IOException { try { // Server will create new threads up to max as necessary. After an idle // period, it will destory threads to keep the number of threads in the // pool to min. + int port = conf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT); int minWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS); int maxWorkerThreads = conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS); boolean tcpKeepAlive = conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 1d6bfb442..46f95f616 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -123,10 +123,10 @@ public class HiveTestUtil { public static void clear() throws IOException { fileSystem.delete(new Path(hiveSyncConfig.basePath), true); HoodieTableMetaClient.withPropertyBuilder() - .setTableType(HoodieTableType.COPY_ON_WRITE) - .setTableName(hiveSyncConfig.tableName) - .setPayloadClass(HoodieAvroPayload.class) - .initTable(configuration, hiveSyncConfig.basePath); + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(hiveSyncConfig.tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, hiveSyncConfig.basePath); HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), fileSystem); for (String tableName : createdTablesSet) { @@ -158,10 +158,10 @@ public class HiveTestUtil { Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); HoodieTableMetaClient.withPropertyBuilder() - .setTableType(HoodieTableType.COPY_ON_WRITE) - .setTableName(hiveSyncConfig.tableName) - .setPayloadClass(HoodieAvroPayload.class) - .initTable(configuration, hiveSyncConfig.basePath); + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(hiveSyncConfig.tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, hiveSyncConfig.basePath); boolean result = fileSystem.mkdirs(path); checkResult(result); @@ -173,15 +173,15 @@ public class HiveTestUtil { } public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, - boolean createDeltaCommit, boolean useSchemaFromCommitMetadata) + boolean createDeltaCommit, boolean useSchemaFromCommitMetadata) throws IOException, URISyntaxException, InterruptedException { Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); HoodieTableMetaClient.withPropertyBuilder() - .setTableType(HoodieTableType.MERGE_ON_READ) - .setTableName(hiveSyncConfig.tableName) - .setPayloadClass(HoodieAvroPayload.class) - .initTable(configuration, hiveSyncConfig.basePath); + .setTableType(HoodieTableType.MERGE_ON_READ) + .setTableName(hiveSyncConfig.tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, hiveSyncConfig.basePath); boolean result = fileSystem.mkdirs(path); checkResult(result); @@ -189,25 +189,25 @@ public class HiveTestUtil { HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, useSchemaFromCommitMetadata, dateTime, commitTime); createdTablesSet - .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); + .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); createdTablesSet .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); commitMetadata.getPartitionToWriteStats() .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l))); addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY), - useSchemaFromCommitMetadata); + useSchemaFromCommitMetadata); createCompactionCommitFile(compactionMetadata, commitTime); if (createDeltaCommit) { // Write a delta commit HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true, - useSchemaFromCommitMetadata); + useSchemaFromCommitMetadata); createDeltaCommitFile(deltaMetadata, deltaCommitTime); } } public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, - boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException { + boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); @@ -215,7 +215,7 @@ public class HiveTestUtil { } public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple, - boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); @@ -223,7 +223,7 @@ public class HiveTestUtil { } public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, - boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) + boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) throws IOException, URISyntaxException, InterruptedException { HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime); @@ -233,7 +233,7 @@ public class HiveTestUtil { commitMetadata.getPartitionToWriteStats() .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l))); addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY), - useSchemaFromCommitMetadata); + useSchemaFromCommitMetadata); createCompactionCommitFile(compactionMetadata, instantTime); HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple, useSchemaFromCommitMetadata); @@ -241,7 +241,7 @@ public class HiveTestUtil { } private static HoodieCommitMetadata createLogFiles(Map> partitionWriteStats, - boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata) + boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata) throws InterruptedException, IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); for (Entry> wEntry : partitionWriteStats.entrySet()) { @@ -261,7 +261,7 @@ public class HiveTestUtil { } private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, - boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException { + boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException { startFrom = startFrom.withTimeAtStartOfDay(); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); @@ -279,7 +279,7 @@ public class HiveTestUtil { } private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple, - boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath); fileSystem.makeQualified(partPath); @@ -354,7 +354,7 @@ public class HiveTestUtil { } private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, boolean isSimpleSchema, - boolean useSchemaFromCommitMetadata) throws IOException { + boolean useSchemaFromCommitMetadata) throws IOException { if (useSchemaFromCommitMetadata) { Schema dataSchema = getTestDataSchema(isSimpleSchema); commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, dataSchema.toString()); @@ -362,7 +362,7 @@ public class HiveTestUtil { } private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, String schema, - boolean useSchemaFromCommitMetadata) { + boolean useSchemaFromCommitMetadata) { if (useSchemaFromCommitMetadata) { commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema); } @@ -374,7 +374,7 @@ public class HiveTestUtil { } } - private static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { + public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime)); @@ -383,6 +383,11 @@ public class HiveTestUtil { fsout.close(); } + public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException { + addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true); + createCommitFile(commitMetadata, instantTime); + } + private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); @@ -406,4 +411,4 @@ public class HiveTestUtil { public static Set getCreatedTablesSet() { return createdTablesSet; } -} +} \ No newline at end of file diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java new file mode 100644 index 000000000..6a077e10a --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.testutils; + +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.common.util.FileIOUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hive.service.server.HiveServer2; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.runners.model.InitializationError; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.fail; + +public class TestCluster implements BeforeAllCallback, AfterAllCallback, + BeforeEachCallback, AfterEachCallback { + private HdfsTestService hdfsTestService; + public HiveTestService hiveTestService; + private Configuration conf; + public HiveServer2 server2; + private static volatile int port = 9083; + public MiniDFSCluster dfsCluster; + DateTimeFormatter dtfOut; + public File hiveSiteXml; + private IMetaStoreClient client; + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + setup(); + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + shutDown(); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + } + + public void setup() throws Exception { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + + conf = hdfsTestService.getHadoopConf(); + conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, port++); + conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port++); + conf.setInt(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, port++); + hiveTestService = new HiveTestService(conf); + server2 = hiveTestService.start(); + dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd"); + hiveSiteXml = File.createTempFile("hive-site", ".xml"); + hiveSiteXml.deleteOnExit(); + try (OutputStream os = new FileOutputStream(hiveSiteXml)) { + hiveTestService.getServerConf().writeXml(os); + } + client = HiveMetaStoreClient.newSynchronizedClient( + RetryingMetaStoreClient.getProxy(hiveTestService.getServerConf(), true)); + } + + public Configuration getConf() { + return this.conf; + } + + public String getHiveSiteXmlLocation() { + return hiveSiteXml.getAbsolutePath(); + } + + public IMetaStoreClient getHMSClient() { + return client; + } + + public String getHiveJdBcUrl() { + return "jdbc:hive2://127.0.0.1:" + conf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + ""; + } + + public String tablePath(String dbName, String tableName) throws Exception { + return dbPath(dbName) + "/" + tableName; + } + + private String dbPath(String dbName) throws Exception { + return dfsCluster.getFileSystem().getWorkingDirectory().toString() + "/" + dbName; + } + + public void forceCreateDb(String dbName) throws Exception { + try { + getHMSClient().dropDatabase(dbName); + } catch (NoSuchObjectException e) { + System.out.println("db does not exist but its ok " + dbName); + } + Database db = new Database(dbName, "", dbPath(dbName), new HashMap<>()); + getHMSClient().createDatabase(db); + } + + public void createCOWTable(String commitTime, int numberOfPartitions, String dbName, String tableName) + throws Exception { + String tablePathStr = tablePath(dbName, tableName); + Path path = new Path(tablePathStr); + FileIOUtils.deleteDirectory(new File(path.toString())); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(conf, path.toString()); + boolean result = dfsCluster.getFileSystem().mkdirs(path); + if (!result) { + throw new InitializationError("cannot initialize table"); + } + DateTime dateTime = DateTime.now(); + HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime, path.toString()); + createCommitFile(commitMetadata, commitTime, path.toString()); + } + + private void createCommitFile(HoodieCommitMetadata commitMetadata, String commitTime, String basePath) throws IOException { + byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); + Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + + HoodieTimeline.makeCommitFileName(commitTime)); + FSDataOutputStream fsout = dfsCluster.getFileSystem().create(fullPath, true); + fsout.write(bytes); + fsout.close(); + } + + private HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, + DateTime startFrom, String commitTime, String basePath) throws IOException, URISyntaxException { + startFrom = startFrom.withTimeAtStartOfDay(); + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + for (int i = 0; i < numberOfPartitions; i++) { + String partitionPath = dtfOut.print(startFrom); + Path partPath = new Path(basePath + "/" + partitionPath); + dfsCluster.getFileSystem().makeQualified(partPath); + dfsCluster.getFileSystem().mkdirs(partPath); + List writeStats = createTestData(partPath, isParquetSchemaSimple, commitTime); + startFrom = startFrom.minusDays(1); + writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); + } + return commitMetadata; + } + + private List createTestData(Path partPath, boolean isParquetSchemaSimple, String commitTime) + throws IOException, URISyntaxException { + List writeStats = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + // Create 5 files + String fileId = UUID.randomUUID().toString(); + Path filePath = new Path(partPath.toString() + "/" + FSUtils + .makeDataFileName(commitTime, "1-0-1", fileId)); + generateParquetData(filePath, isParquetSchemaSimple); + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(fileId); + writeStat.setPath(filePath.toString()); + writeStats.add(writeStat); + } + return writeStats; + } + + @SuppressWarnings({"unchecked", "deprecation"}) + private void generateParquetData(Path filePath, boolean isParquetSchemaSimple) + throws IOException, URISyntaxException { + Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema()); + org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1, + BloomFilterTypeCode.SIMPLE.name()); + HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter); + ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024, + ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, + ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, dfsCluster.getFileSystem().getConf()); + + List testRecords = (isParquetSchemaSimple ? SchemaTestUtil.generateTestRecords(0, 100) + : SchemaTestUtil.generateEvolvedTestRecords(100, 100)); + testRecords.forEach(s -> { + try { + writer.write(s); + } catch (IOException e) { + fail("IOException while writing test records as parquet" + e.toString()); + } + }); + writer.close(); + } + + public HiveConf getHiveConf() { + return server2.getHiveConf(); + } + + public void stopHiveServer2() { + if (server2 != null) { + server2.stop(); + server2 = null; + } + } + + public void startHiveServer2() { + if (server2 == null) { + server2 = new HiveServer2(); + server2.init(hiveTestService.getServerConf()); + server2.start(); + } + } + + public void shutDown() { + stopHiveServer2(); + client.close(); + hiveTestService.getHiveMetaStore().stop(); + hdfsTestService.stop(); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java new file mode 100644 index 000000000..980374e0b --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.testutils; + +import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig; +import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool; + +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS; +import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHiveSyncGlobalCommitTool { + + TestCluster localCluster; + TestCluster remoteCluster; + + private static String DB_NAME = "foo"; + private static String TBL_NAME = "bar"; + + private HiveSyncGlobalCommitConfig getGlobalCommitConfig( + String commitTime, String dbName, String tblName) throws Exception { + HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig(); + config.properties.setProperty(LOCAL_HIVE_SITE_URI, localCluster.getHiveSiteXmlLocation()); + config.properties.setProperty(REMOTE_HIVE_SITE_URI, remoteCluster.getHiveSiteXmlLocation()); + config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, localCluster.getHiveJdBcUrl()); + config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, remoteCluster.getHiveJdBcUrl()); + config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName)); + config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName)); + config.globallyReplicatedTimeStamp = commitTime; + config.hiveUser = System.getProperty("user.name"); + config.hivePass = ""; + config.databaseName = dbName; + config.tableName = tblName; + config.basePath = localCluster.tablePath(dbName, tblName); + config.assumeDatePartitioning = true; + config.usePreApacheInputFormat = false; + config.partitionFields = Collections.singletonList("datestr"); + return config; + } + + private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception { + assertEquals(localCluster.getHMSClient().getTable(config.databaseName, config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), + remoteCluster.getHMSClient().getTable(config.databaseName, config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), + "compare replicated timestamps"); + } + + @BeforeEach + public void setUp() throws Exception { + localCluster = new TestCluster(); + localCluster.setup(); + remoteCluster = new TestCluster(); + remoteCluster.setup(); + localCluster.forceCreateDb(DB_NAME); + remoteCluster.forceCreateDb(DB_NAME); + localCluster.dfsCluster.getFileSystem().delete(new Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true); + remoteCluster.dfsCluster.getFileSystem().delete(new Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true); + } + + @AfterEach + public void clear() throws Exception { + localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME); + remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME); + localCluster.shutDown(); + remoteCluster.shutDown(); + } + + @Test + public void testBasicGlobalCommit() throws Exception { + String commitTime = "100"; + localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); + // simulate drs + remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); + HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME); + HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config); + assertTrue(tool.commit()); + compareEqualLastReplicatedTimeStamp(config); + } + + @Test + public void testBasicRollback() throws Exception { + String commitTime = "100"; + localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); + // simulate drs + remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME); + HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME); + HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config); + assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); + assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); + // stop the remote cluster hive server to simulate cluster going down + remoteCluster.stopHiveServer2(); + assertFalse(tool.commit()); + assertEquals(commitTime, localCluster.getHMSClient() + .getTable(config.databaseName, config.tableName).getParameters() + .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP)); + assertTrue(tool.rollback()); // do a rollback + assertNotEquals(commitTime, localCluster.getHMSClient() + .getTable(config.databaseName, config.tableName).getParameters() + .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP)); + assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME)); + remoteCluster.startHiveServer2(); + } +} diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml b/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml new file mode 100644 index 000000000..e1a834743 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml @@ -0,0 +1,27 @@ + + + + +hi +1 +/home/hive-remote-site.xml +hdfs://hadoop-cluster2:9000/tmp/hudi_trips_cow +/home/hive/packaging/target/apache-hive-2.3.4-uber-51-SNAPSHOT-bin/apache-hive-2.3.4-uber-51-SNAPSHOT-bin/conf/hive-site.xml +jdbc:hive2://hadoop-cluster2:10000/default;transportMode=http;httpPath=hs2 +