From a392e9ba46e4f91bdc520badcf8ed03ac8b49e85 Mon Sep 17 00:00:00 2001 From: Pratyaksh Sharma Date: Wed, 12 Jan 2022 08:22:07 +0530 Subject: [PATCH] [HUDI-485] Corrected the check for incremental sql (#2768) * [HUDI-485]: corrected the check for incremental sql * [HUDI-485]: added tests * code review comments addressed * [HUDI-485]: added happy flow test case --- .../apache/hudi/hive/TestHiveSyncTool.java | 2 +- .../hudi/hive/testutils/HiveTestUtil.java | 59 ++++---- hudi-utilities/pom.xml | 6 + .../hudi/utilities/HiveIncrementalPuller.java | 11 +- .../utilities/TestHiveIncrementalPuller.java | 129 +++++++++++++++++- 5 files changed, 177 insertions(+), 30 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index c515c9dda..9a1012b64 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -996,7 +996,7 @@ public class TestHiveSyncTool { // create empty commit final String emptyCommitTime = "200"; - HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime); + HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime, hiveSyncConfig.basePath); HoodieHiveClient hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 34158d4e7..4b92b252c 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -128,6 +128,14 @@ public class HiveTestUtil { clear(); } + public static void clearIncrementalPullSetup(String path1, String path2) throws IOException, HiveException, MetaException { + fileSystem.delete(new Path(path1), true); + if (path2 != null) { + fileSystem.delete(new Path(path2), true); + } + clear(); + } + public static void clear() throws IOException, HiveException, MetaException { fileSystem.delete(new Path(hiveSyncConfig.basePath), true); HoodieTableMetaClient.withPropertyBuilder() @@ -159,23 +167,28 @@ public class HiveTestUtil { } } - public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata) - throws IOException, URISyntaxException { - Path path = new Path(hiveSyncConfig.basePath); - FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); + public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata, + String basePath, String databaseName, String tableName) throws IOException, URISyntaxException { + Path path = new Path(basePath); + FileIOUtils.deleteDirectory(new File(basePath)); HoodieTableMetaClient.withPropertyBuilder() - .setTableType(HoodieTableType.COPY_ON_WRITE) - .setTableName(hiveSyncConfig.tableName) - .setPayloadClass(HoodieAvroPayload.class) - .initTable(configuration, hiveSyncConfig.basePath); + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, basePath); boolean result = fileSystem.mkdirs(path); checkResult(result); ZonedDateTime dateTime = ZonedDateTime.now(); HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, - useSchemaFromCommitMetadata, dateTime, instantTime); - createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); - createCommitFile(commitMetadata, instantTime); + useSchemaFromCommitMetadata, dateTime, instantTime, basePath); + createdTablesSet.add(databaseName + "." + tableName); + createCommitFile(commitMetadata, instantTime, basePath); + } + + public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata) + throws IOException, URISyntaxException { + createCOWTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata, hiveSyncConfig.basePath, hiveSyncConfig.databaseName, hiveSyncConfig.tableName); } public static void createReplaceCommit(String instantTime, String partitions, WriteOperationType type, boolean isParquetSchemaSimple, boolean useSchemaFromCommitMetadata) @@ -220,7 +233,7 @@ public class HiveTestUtil { writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema.toString()); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); - createCommitFile(commitMetadata, instantTime); + createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath); } public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, @@ -238,7 +251,7 @@ public class HiveTestUtil { checkResult(result); ZonedDateTime dateTime = ZonedDateTime.now(); HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, - useSchemaFromCommitMetadata, dateTime, commitTime); + useSchemaFromCommitMetadata, dateTime, commitTime, hiveSyncConfig.basePath); createdTablesSet .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); createdTablesSet @@ -260,9 +273,9 @@ public class HiveTestUtil { public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException { HoodieCommitMetadata commitMetadata = - createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime); + createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime, hiveSyncConfig.basePath); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); - createCommitFile(commitMetadata, instantTime); + createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath); } public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple, @@ -270,14 +283,14 @@ public class HiveTestUtil { HoodieCommitMetadata commitMetadata = createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); - createCommitFile(commitMetadata, instantTime); + createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath); } public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String deltaCommitTime) throws IOException, URISyntaxException, InterruptedException { HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, - useSchemaFromCommitMetadata, startFrom, instantTime); + useSchemaFromCommitMetadata, startFrom, instantTime, hiveSyncConfig.basePath); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); @@ -312,13 +325,13 @@ public class HiveTestUtil { } private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, - boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException { + boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String basePath) throws IOException, URISyntaxException { startFrom = startFrom.truncatedTo(ChronoUnit.DAYS); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); for (int i = 0; i < numberOfPartitions; i++) { String partitionPath = startFrom.format(dtfOut); - Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath); + Path partPath = new Path(basePath + "/" + partitionPath); fileSystem.makeQualified(partPath); fileSystem.mkdirs(partPath); List writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime); @@ -382,7 +395,7 @@ public class HiveTestUtil { } private static void generateParquetDataWithSchema(Path filePath, Schema schema) - throws IOException, URISyntaxException { + throws IOException { org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1, BloomFilterTypeCode.SIMPLE.name()); @@ -446,9 +459,9 @@ public class HiveTestUtil { } } - public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { + public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime, String basePath) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime)); FSDataOutputStream fsout = fileSystem.create(fullPath, true); fsout.write(bytes); @@ -466,7 +479,7 @@ public class HiveTestUtil { public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException { addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true); - createCommitFile(commitMetadata, instantTime); + createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath); } private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 2e68039c1..e7fb36e13 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -261,6 +261,12 @@ org.antlr stringtemplate 4.0.2 + + + org.antlr + antlr-runtime + + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java index e5d630df5..2e66a2275 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java @@ -52,7 +52,8 @@ import java.util.stream.Collectors; /** * Utility to pull data after a given commit, based on the supplied HiveQL and save the delta as another hive temporary - * table. + * table. This temporary table can be further read using {@link org.apache.hudi.utilities.sources.HiveIncrPullSource} and the changes can + * be applied to the target table. *

* Current Limitations: *

@@ -149,7 +150,7 @@ public class HiveIncrementalPuller { String tempDbTable = config.tmpDb + "." + config.targetTable + "__" + config.sourceTable; String tempDbTablePath = config.hoodieTmpDir + "/" + config.targetTable + "__" + config.sourceTable + "/" + lastCommitTime; - executeStatement("drop table " + tempDbTable, stmt); + executeStatement("drop table if exists " + tempDbTable, stmt); deleteHDFSPath(fs, tempDbTablePath); if (!ensureTempPathExists(fs, lastCommitTime)) { throw new IllegalStateException("Could not create target path at " @@ -188,12 +189,12 @@ public class HiveIncrementalPuller { throw new HoodieIncrementalPullSQLException( "Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable); } - if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) { + if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) { LOG.error("Incremental SQL : " + incrementalSQL - + " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add " + + " does not contain `_hoodie_commit_time` > '%s'. Please add " + "this clause for incremental to work properly."); throw new HoodieIncrementalPullSQLException( - "Incremental SQL does not have clause `_hoodie_commit_time` > '%targetBasePath', which " + "Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which " + "means its not pulling incrementally"); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java index c60991619..a2f70e041 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java @@ -18,18 +18,45 @@ package org.apache.hudi.utilities; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.hive.HoodieHiveClient; +import org.apache.hudi.hive.testutils.HiveTestUtil; +import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.Instant; + +import static org.apache.hudi.hive.testutils.HiveTestUtil.fileSystem; +import static org.apache.hudi.hive.testutils.HiveTestUtil.hiveSyncConfig; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestHiveIncrementalPuller { private HiveIncrementalPuller.Config config; + private String targetBasePath = null; @BeforeEach - public void setup() { + public void setup() throws HiveException, IOException, InterruptedException, MetaException { config = new HiveIncrementalPuller.Config(); + HiveTestUtil.setUp(); + } + + @AfterEach + public void teardown() throws Exception { + HiveTestUtil.clearIncrementalPullSetup(config.hoodieTmpDir, targetBasePath); } @Test @@ -41,4 +68,104 @@ public class TestHiveIncrementalPuller { } + private HiveIncrementalPuller.Config getHivePullerConfig(String incrementalSql) throws IOException { + config.hiveJDBCUrl = hiveSyncConfig.jdbcUrl; + config.hiveUsername = hiveSyncConfig.hiveUser; + config.hivePassword = hiveSyncConfig.hivePass; + config.hoodieTmpDir = Files.createTempDirectory("hivePullerTest").toUri().toString(); + config.sourceDb = hiveSyncConfig.databaseName; + config.sourceTable = hiveSyncConfig.tableName; + config.targetDb = "tgtdb"; + config.targetTable = "test2"; + config.tmpDb = "tmp_db"; + config.fromCommitTime = "100"; + createIncrementalSqlFile(incrementalSql, config); + return config; + } + + private void createIncrementalSqlFile(String text, HiveIncrementalPuller.Config cfg) throws IOException { + java.nio.file.Path path = Paths.get(cfg.hoodieTmpDir + "/incremental_pull.txt"); + Files.createDirectories(path.getParent()); + Files.createFile(path); + try (FileWriter fr = new FileWriter(new File(path.toUri()))) { + fr.write(text); + } catch (Exception e) { + // no-op + } + cfg.incrementalSQLFile = path.toString(); + } + + private void createSourceTable() throws IOException, URISyntaxException { + String instantTime = "101"; + HiveTestUtil.createCOWTable(instantTime, 5, true); + hiveSyncConfig.syncMode = "jdbc"; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + } + + private void createTargetTable() throws IOException, URISyntaxException { + String instantTime = "100"; + targetBasePath = Files.createTempDirectory("hivesynctest1" + Instant.now().toEpochMilli()).toUri().toString(); + HiveTestUtil.createCOWTable(instantTime, 5, true, + targetBasePath, "tgtdb", "test2"); + HiveSyncTool tool = new HiveSyncTool(getTargetHiveSyncConfig(targetBasePath), HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + } + + private HiveSyncConfig getTargetHiveSyncConfig(String basePath) { + HiveSyncConfig config = HiveSyncConfig.copy(hiveSyncConfig); + config.databaseName = "tgtdb"; + config.tableName = "test2"; + config.basePath = basePath; + config.batchSyncNum = 3; + config.syncMode = "jdbc"; + return config; + } + + private HiveSyncConfig getAssertionSyncConfig(String databaseName) { + HiveSyncConfig config = HiveSyncConfig.copy(hiveSyncConfig); + config.databaseName = databaseName; + return config; + } + + private void createTables() throws IOException, URISyntaxException { + createSourceTable(); + createTargetTable(); + } + + @Test + public void testPullerWithoutIncrementalClause() throws IOException, URISyntaxException { + createTables(); + HiveIncrementalPuller puller = new HiveIncrementalPuller(getHivePullerConfig( + "select name from testdb.test1")); + Exception e = assertThrows(HoodieIncrementalPullSQLException.class, puller::saveDelta, + "Should fail when incremental clause not provided!"); + assertTrue(e.getMessage().contains("Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which means its not pulling incrementally")); + } + + @Test + public void testPullerWithoutSourceInSql() throws IOException, URISyntaxException { + createTables(); + HiveIncrementalPuller puller = new HiveIncrementalPuller(getHivePullerConfig( + "select name from tgtdb.test2 where `_hoodie_commit_time` > '%s'")); + Exception e = assertThrows(HoodieIncrementalPullSQLException.class, puller::saveDelta, + "Should fail when source db and table names not provided!"); + assertTrue(e.getMessage().contains("Incremental SQL does not have testdb.test1")); + } + + @Test + public void testPuller() throws IOException, URISyntaxException { + createTables(); + HiveIncrementalPuller.Config cfg = getHivePullerConfig("select name from testdb.test1 where `_hoodie_commit_time` > '%s'"); + HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + hiveClient.createDatabase(cfg.tmpDb); + HiveIncrementalPuller puller = new HiveIncrementalPuller(cfg); + puller.saveDelta(); + HiveSyncConfig assertingConfig = getAssertionSyncConfig(cfg.tmpDb); + HoodieHiveClient assertingClient = new HoodieHiveClient(assertingConfig, HiveTestUtil.getHiveConf(), fileSystem); + String tmpTable = cfg.targetTable + "__" + cfg.sourceTable; + assertTrue(assertingClient.doesTableExist(tmpTable)); + } + }