From 6808559b018366b4bc6d47b40dbbe362f48f65d7 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Fri, 3 Apr 2020 16:23:05 -0700 Subject: [PATCH] [HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416) When using HiveDriver mode in HudiHiveClient, Hive 2.x DDL operations like ALTER PARTITION may fail. This is because Hive 2.x doesn't like `db`.`table_name` for operations. In this fix, we set the name of the database in the SessionState create for the Driver. --- .../apache/hudi/hive/HoodieHiveClient.java | 4 +- .../apache/hudi/hive/TestHiveSyncTool.java | 91 ++++++++++++++++++- .../java/org/apache/hudi/hive/TestUtil.java | 16 ++-- 3 files changed, 101 insertions(+), 10 deletions(-) diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 1bfbe20ad..55a496885 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -198,7 +198,8 @@ public class HoodieHiveClient { for (String partition : partitions) { String partitionClause = getPartitionClause(partition); Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition); - String fullPartitionPath = partitionPath.toUri().getScheme().equals(StorageSchemes.HDFS.getScheme()) + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString(); String changePartition = alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'"; @@ -505,6 +506,7 @@ public class HoodieHiveClient { try { final long startTime = System.currentTimeMillis(); ss = SessionState.start(configuration); + ss.setCurrentDatabase(syncConfig.databaseName); hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration); final long endTime = System.currentTimeMillis(); LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime))); diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index f8042192e..449c7f343 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -168,6 +168,47 @@ public class TestHiveSyncTool { hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get()); + + // Adding of new partitions + List newPartition = Arrays.asList("2050/01/01"); + hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList()); + assertEquals("No new partition should be added", 5, + hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); + hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition); + assertEquals("New partition should be added", 6, + hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); + + // Update partitions + hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList()); + assertEquals("Partition count should remain the same", 6, + hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); + hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition); + assertEquals("Partition count should remain the same", 6, + hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); + + // Alter partitions + // Manually change a hive partition location to check if the sync will detect + // it and generage a partition update event for it. + hiveClient.updateHiveSQL("ALTER TABLE `" + TestUtil.hiveSyncConfig.tableName + + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'"); + + hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + List hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName); + List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); + writtenPartitionsSince.add(newPartition.get(0)); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + assertEquals("There should be only one paritition event", 1, partitionEvents.size()); + assertEquals("The one partition event must of type UPDATE", PartitionEventType.UPDATE, + partitionEvents.iterator().next().eventType); + + tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + tool.syncHoodieTable(); + // Sync should update the changed partition to correct path + List tablePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName); + assertEquals("The one partition we wrote should be added to hive", 6, tablePartitions.size()); + assertEquals("The last commit that was sycned should be 100", instantTime, + hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get()); + } @Test @@ -250,7 +291,7 @@ public class TestHiveSyncTool { TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; String instantTime = "100"; String deltaCommitTime = "101"; - TestUtil.createMORTable(instantTime, deltaCommitTime, 5); + TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true); String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE; HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -294,7 +335,7 @@ public class TestHiveSyncTool { String instantTime = "100"; String deltaCommitTime = "101"; String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; - TestUtil.createMORTable(instantTime, deltaCommitTime, 5); + TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true); HoodieHiveClient hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -363,4 +404,50 @@ public class TestHiveSyncTool { assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get()); } + + @Test + public void testReadSchemaForMOR() throws Exception { + TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; + String commitTime = "100"; + String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; + TestUtil.createMORTable(commitTime, "", 5, false); + HoodieHiveClient hiveClientRT = + new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + + assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + + " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName)); + + // Lets do the sync + HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + tool.syncHoodieTable(); + + assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + + " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName)); + + // Schema being read from compacted base files + assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + 1); + assertEquals("Table partitions should match the number of partitions we wrote", 5, + hiveClientRT.scanTablePartitions(snapshotTableName).size()); + + // Now lets create more partitions and these are the only ones which needs to be synced + DateTime dateTime = DateTime.now().plusDays(6); + String commitTime2 = "102"; + String deltaCommitTime2 = "103"; + + TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); + // Lets do the sync + tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + tool.syncHoodieTable(); + hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + + // Schema being read from the log files + assertEquals("Hive Schema should match the evolved table schema + partition field", + hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1); + // Sync should add the one partition + assertEquals("The 1 partition we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size()); + assertEquals("The last commit that was sycned should be 103", deltaCommitTime2, + hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get()); + } + } diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java index df5d9eaa1..3c0f55155 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java @@ -107,7 +107,6 @@ public class TestUtil { hiveSyncConfig = new HiveSyncConfig(); hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/"; - hiveSyncConfig.databaseName = "hdrone_test"; hiveSyncConfig.hiveUser = ""; hiveSyncConfig.hivePass = ""; hiveSyncConfig.databaseName = "testdb"; @@ -167,7 +166,8 @@ public class TestUtil { createCommitFile(commitMetadata, instantTime); } - static void createMORTable(String instantTime, String deltaCommitTime, int numberOfPartitions) + static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, + boolean createDeltaCommit) throws IOException, InitializationError, URISyntaxException, InterruptedException { Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); @@ -177,17 +177,19 @@ public class TestUtil { boolean result = fileSystem.mkdirs(path); checkResult(result); DateTime dateTime = DateTime.now(); - HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, instantTime); + HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); commitMetadata.getPartitionToWriteStats() .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l))); - createCompactionCommitFile(compactionMetadata, instantTime); - // Write a delta commit - HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true); - createDeltaCommitFile(deltaMetadata, deltaCommitTime); + createCompactionCommitFile(compactionMetadata, commitTime); + if (createDeltaCommit) { + // Write a delta commit + HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true); + createDeltaCommitFile(deltaMetadata, deltaCommitTime); + } } static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom,