From 043eb564c2c7e4578237b5d0f2bbad8276db51f4 Mon Sep 17 00:00:00 2001 From: Yajun Luo Date: Mon, 15 Jun 2020 19:02:03 +0800 Subject: [PATCH] [HUDI-1003] Handle partitions correctly for syncing hudi non-parititioned table to hive (#1720) --- .../org/apache/hudi/hive/HiveSyncTool.java | 6 ++++ .../apache/hudi/hive/TestHiveSyncTool.java | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 273635cd0..c3849d779 100644 --- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -38,6 +38,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -64,6 +65,11 @@ public class HiveSyncTool { public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs); this.cfg = cfg; + // Set partitionFields to empty, when the NonPartitionedExtractor is used + if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) { + LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used"); + cfg.partitionFields = new ArrayList<>(); + } switch (hoodieHiveClient.getTableType()) { case COPY_ON_WRITE: this.snapshotTableName = cfg.tableName; diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index bf383c41e..cab26e069 100644 --- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -457,6 +457,35 @@ public class TestHiveSyncTool { "The last commit that was sycned should be updated in the TBLPROPERTIES"); } + @ParameterizedTest + @MethodSource("useJdbc") + public void testNonPartitionedSync(boolean useJdbc) throws Exception { + HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + String instantTime = "100"; + HiveTestUtil.createCOWTable(instantTime, 5, true); + + HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig); + // Set partition value extractor to NonPartitionedExtractor + hiveSyncConfig.partitionValueExtractorClass = NonPartitionedExtractor.class.getCanonicalName(); + hiveSyncConfig.tableName = "non_partitioned"; + hiveSyncConfig.partitionFields = Arrays.asList("year", "month", "day"); + HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); + + HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should not exist initially"); + // Lets do the sync + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), + "Table " + hiveSyncConfig.tableName + " should exist after sync completes"); + assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(), + hiveClient.getDataSchema().getColumns().size(), + "Hive Schema should match the table schema,ignoring the partition fields"); + assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + "Table should not have partitions because of the NonPartitionedExtractor"); + } + @ParameterizedTest @MethodSource("useJdbc") public void testReadSchemaForMOR(boolean useJdbc) throws Exception {