From da2919a75f564be6c3d731a2c503959e416ebe71 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 6 Jan 2021 07:49:44 -0500 Subject: [PATCH] [HUDI-1383] Fixing sorting of partition vals for hive sync computation (#2402) --- .../apache/hudi/hive/HoodieHiveClient.java | 2 - .../apache/hudi/hive/TestHiveSyncTool.java | 66 +++++++++++++++---- .../hudi/hive/testutils/HiveTestUtil.java | 20 ++++++ 3 files changed, 75 insertions(+), 13 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 5c0c1287d..b6211671b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -207,7 +207,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { Map paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List hivePartitionValues = tablePartition.getValues(); - Collections.sort(hivePartitionValues); String fullTablePartitionPath = Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath(); paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); @@ -219,7 +218,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); // Check if the partition values or if hdfs path is the same List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - Collections.sort(storagePartitionValues); if (!storagePartitionValues.isEmpty()) { String storageValue = String.join(", ", storagePartitionValues); if (!paths.containsKey(storageValue)) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 1d8cbd853..8a1ea4f89 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -21,10 +21,10 @@ package org.apache.hudi.hive; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.hive.testutils.HiveTestUtil; import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.parquet.schema.MessageType; @@ -56,7 +56,7 @@ public class TestHiveSyncTool { } private static Iterable useJdbcAndSchemaFromCommitMetadata() { - return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } }); + return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}}); } @BeforeEach @@ -347,7 +347,7 @@ public class TestHiveSyncTool { assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), - "Hive Schema should match the table schema + partition field"); + "Hive Schema should match the table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClient.getTableSchema(roTableName).size(), @@ -377,7 +377,7 @@ public class TestHiveSyncTool { assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), - "Hive Schema should match the evolved table schema + partition field"); + "Hive Schema should match the evolved table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClient.getTableSchema(roTableName).size(), @@ -418,7 +418,7 @@ public class TestHiveSyncTool { assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size() + HoodieRecord.HOODIE_META_COLUMNS.size(), - "Hive Schema should match the table schema + partition field"); + "Hive Schema should match the table schema + partition field"); } else { // The data generated and schema in the data file do not have metadata columns, so we need a separate check. assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), @@ -489,6 +489,50 @@ public class TestHiveSyncTool { "Table partitions should match the number of partitions we wrote"); assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), "The last commit that was sycned should be updated in the TBLPROPERTIES"); + + // HoodieHiveClient had a bug where partition vals were sorted + // and stored as keys in a map. The following tests this particular case. + // Now lets create partition "2010/01/02" and followed by "2010/02/01". + String commitTime2 = "101"; + HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); + assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); + List hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + assertEquals(1, partitionEvents.size(), "There should be only one paritition event"); + assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + // Sync should add the one partition + assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was sycned should be 101"); + + // create partition "2010/02/01" and ensure sync works + String commitTime3 = "102"; + HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3); + HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + + hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); + assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), + hiveClient.getDataSchema().getColumns().size() + 3, + "Hive Schema should match the table schema + partition fields"); + assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), + "The last commit that was sycned should be updated in the TBLPROPERTIES"); + assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size()); } @ParameterizedTest @@ -507,17 +551,17 @@ public class TestHiveSyncTool { HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), - "Table " + hiveSyncConfig.tableName + " should not exist initially"); + "Table " + hiveSyncConfig.tableName + " should not exist initially"); // Lets do the sync HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); tool.syncHoodieTable(); assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), - "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); + "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), - hiveClient.getDataSchema().getColumns().size(), - "Hive Schema should match the table schema,ignoring the partition fields"); + hiveClient.getDataSchema().getColumns().size(), + "Hive Schema should match the table schema,ignoring the partition fields"); assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), - "Table should not have partitions because of the NonPartitionedExtractor"); + "Table should not have partitions because of the NonPartitionedExtractor"); } @ParameterizedTest diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index d0d1b667a..09090532b 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -210,6 +210,14 @@ public class HiveTestUtil { createCommitFile(commitMetadata, instantTime); } + public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + HoodieCommitMetadata commitMetadata = + createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime); + createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + createCommitFile(commitMetadata, instantTime); + } + public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime) throws IOException, URISyntaxException, InterruptedException { @@ -266,6 +274,18 @@ public class HiveTestUtil { return commitMetadata; } + private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple, + boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath); + fileSystem.makeQualified(partPath); + fileSystem.mkdirs(partPath); + List writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime); + writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); + addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata); + return commitMetadata; + } + private static List createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime) throws IOException, URISyntaxException { List writeStats = new ArrayList<>();