1
0

[HUDI-485] Corrected the check for incremental sql (#2768)

* [HUDI-485]: corrected the check for incremental sql

* [HUDI-485]: added tests

* code review comments addressed

* [HUDI-485]: added happy flow test case
This commit is contained in:
Pratyaksh Sharma
2022-01-12 08:22:07 +05:30
committed by GitHub
parent 6cdcd89afa
commit a392e9ba46
5 changed files with 177 additions and 30 deletions

View File

@@ -996,7 +996,7 @@ public class TestHiveSyncTool {
// create empty commit // create empty commit
final String emptyCommitTime = "200"; final String emptyCommitTime = "200";
HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime); HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime, hiveSyncConfig.basePath);
HoodieHiveClient hiveClient = HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);

View File

@@ -128,6 +128,14 @@ public class HiveTestUtil {
clear(); clear();
} }
public static void clearIncrementalPullSetup(String path1, String path2) throws IOException, HiveException, MetaException {
fileSystem.delete(new Path(path1), true);
if (path2 != null) {
fileSystem.delete(new Path(path2), true);
}
clear();
}
public static void clear() throws IOException, HiveException, MetaException { public static void clear() throws IOException, HiveException, MetaException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true); fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
@@ -159,23 +167,28 @@ public class HiveTestUtil {
} }
} }
public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata) public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata,
throws IOException, URISyntaxException { String basePath, String databaseName, String tableName) throws IOException, URISyntaxException {
Path path = new Path(hiveSyncConfig.basePath); Path path = new Path(basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); FileIOUtils.deleteDirectory(new File(basePath));
HoodieTableMetaClient.withPropertyBuilder() HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE) .setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName) .setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class) .setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath); .initTable(configuration, basePath);
boolean result = fileSystem.mkdirs(path); boolean result = fileSystem.mkdirs(path);
checkResult(result); checkResult(result);
ZonedDateTime dateTime = ZonedDateTime.now(); ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, instantTime); useSchemaFromCommitMetadata, dateTime, instantTime, basePath);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(databaseName + "." + tableName);
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime, basePath);
}
public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException {
createCOWTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata, hiveSyncConfig.basePath, hiveSyncConfig.databaseName, hiveSyncConfig.tableName);
} }
public static void createReplaceCommit(String instantTime, String partitions, WriteOperationType type, boolean isParquetSchemaSimple, boolean useSchemaFromCommitMetadata) public static void createReplaceCommit(String instantTime, String partitions, WriteOperationType type, boolean isParquetSchemaSimple, boolean useSchemaFromCommitMetadata)
@@ -220,7 +233,7 @@ public class HiveTestUtil {
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s)); writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema.toString()); commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema.toString());
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
} }
public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
@@ -238,7 +251,7 @@ public class HiveTestUtil {
checkResult(result); checkResult(result);
ZonedDateTime dateTime = ZonedDateTime.now(); ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, commitTime); useSchemaFromCommitMetadata, dateTime, commitTime, hiveSyncConfig.basePath);
createdTablesSet createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet createdTablesSet
@@ -260,9 +273,9 @@ public class HiveTestUtil {
public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException { boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime); createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime, hiveSyncConfig.basePath);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
} }
public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple, public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
@@ -270,14 +283,14 @@ public class HiveTestUtil {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata commitMetadata =
createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime); createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
} }
public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple, public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String deltaCommitTime) boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple, HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
useSchemaFromCommitMetadata, startFrom, instantTime); useSchemaFromCommitMetadata, startFrom, instantTime, hiveSyncConfig.basePath);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE); createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata(); HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
@@ -312,13 +325,13 @@ public class HiveTestUtil {
} }
private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException { boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String basePath) throws IOException, URISyntaxException {
startFrom = startFrom.truncatedTo(ChronoUnit.DAYS); startFrom = startFrom.truncatedTo(ChronoUnit.DAYS);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (int i = 0; i < numberOfPartitions; i++) { for (int i = 0; i < numberOfPartitions; i++) {
String partitionPath = startFrom.format(dtfOut); String partitionPath = startFrom.format(dtfOut);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath); Path partPath = new Path(basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath); fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath); fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime); List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime);
@@ -382,7 +395,7 @@ public class HiveTestUtil {
} }
private static void generateParquetDataWithSchema(Path filePath, Schema schema) private static void generateParquetDataWithSchema(Path filePath, Schema schema)
throws IOException, URISyntaxException { throws IOException {
org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema); org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1, BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
BloomFilterTypeCode.SIMPLE.name()); BloomFilterTypeCode.SIMPLE.name());
@@ -446,9 +459,9 @@ public class HiveTestUtil {
} }
} }
public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime, String basePath) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(instantTime)); + HoodieTimeline.makeCommitFileName(instantTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true); FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes); fsout.write(bytes);
@@ -466,7 +479,7 @@ public class HiveTestUtil {
public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException { public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true); addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
createCommitFile(commitMetadata, instantTime); createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
} }
private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime)

View File

@@ -261,6 +261,12 @@
<groupId>org.antlr</groupId> <groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId> <artifactId>stringtemplate</artifactId>
<version>4.0.2</version> <version>4.0.2</version>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>

View File

@@ -52,7 +52,8 @@ import java.util.stream.Collectors;
/** /**
* Utility to pull data after a given commit, based on the supplied HiveQL and save the delta as another hive temporary * Utility to pull data after a given commit, based on the supplied HiveQL and save the delta as another hive temporary
* table. * table. This temporary table can be further read using {@link org.apache.hudi.utilities.sources.HiveIncrPullSource} and the changes can
* be applied to the target table.
* <p> * <p>
* Current Limitations: * Current Limitations:
* <p> * <p>
@@ -149,7 +150,7 @@ public class HiveIncrementalPuller {
String tempDbTable = config.tmpDb + "." + config.targetTable + "__" + config.sourceTable; String tempDbTable = config.tmpDb + "." + config.targetTable + "__" + config.sourceTable;
String tempDbTablePath = String tempDbTablePath =
config.hoodieTmpDir + "/" + config.targetTable + "__" + config.sourceTable + "/" + lastCommitTime; config.hoodieTmpDir + "/" + config.targetTable + "__" + config.sourceTable + "/" + lastCommitTime;
executeStatement("drop table " + tempDbTable, stmt); executeStatement("drop table if exists " + tempDbTable, stmt);
deleteHDFSPath(fs, tempDbTablePath); deleteHDFSPath(fs, tempDbTablePath);
if (!ensureTempPathExists(fs, lastCommitTime)) { if (!ensureTempPathExists(fs, lastCommitTime)) {
throw new IllegalStateException("Could not create target path at " throw new IllegalStateException("Could not create target path at "
@@ -188,12 +189,12 @@ public class HiveIncrementalPuller {
throw new HoodieIncrementalPullSQLException( throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable); "Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable);
} }
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) { if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) {
LOG.error("Incremental SQL : " + incrementalSQL LOG.error("Incremental SQL : " + incrementalSQL
+ " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add " + " does not contain `_hoodie_commit_time` > '%s'. Please add "
+ "this clause for incremental to work properly."); + "this clause for incremental to work properly.");
throw new HoodieIncrementalPullSQLException( throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have clause `_hoodie_commit_time` > '%targetBasePath', which " "Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which "
+ "means its not pulling incrementally"); + "means its not pulling incrementally");
} }

View File

@@ -18,18 +18,45 @@
package org.apache.hudi.utilities; package org.apache.hudi.utilities;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;
import static org.apache.hudi.hive.testutils.HiveTestUtil.fileSystem;
import static org.apache.hudi.hive.testutils.HiveTestUtil.hiveSyncConfig;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHiveIncrementalPuller { public class TestHiveIncrementalPuller {
private HiveIncrementalPuller.Config config; private HiveIncrementalPuller.Config config;
private String targetBasePath = null;
@BeforeEach @BeforeEach
public void setup() { public void setup() throws HiveException, IOException, InterruptedException, MetaException {
config = new HiveIncrementalPuller.Config(); config = new HiveIncrementalPuller.Config();
HiveTestUtil.setUp();
}
@AfterEach
public void teardown() throws Exception {
HiveTestUtil.clearIncrementalPullSetup(config.hoodieTmpDir, targetBasePath);
} }
@Test @Test
@@ -41,4 +68,104 @@ public class TestHiveIncrementalPuller {
} }
private HiveIncrementalPuller.Config getHivePullerConfig(String incrementalSql) throws IOException {
config.hiveJDBCUrl = hiveSyncConfig.jdbcUrl;
config.hiveUsername = hiveSyncConfig.hiveUser;
config.hivePassword = hiveSyncConfig.hivePass;
config.hoodieTmpDir = Files.createTempDirectory("hivePullerTest").toUri().toString();
config.sourceDb = hiveSyncConfig.databaseName;
config.sourceTable = hiveSyncConfig.tableName;
config.targetDb = "tgtdb";
config.targetTable = "test2";
config.tmpDb = "tmp_db";
config.fromCommitTime = "100";
createIncrementalSqlFile(incrementalSql, config);
return config;
}
private void createIncrementalSqlFile(String text, HiveIncrementalPuller.Config cfg) throws IOException {
java.nio.file.Path path = Paths.get(cfg.hoodieTmpDir + "/incremental_pull.txt");
Files.createDirectories(path.getParent());
Files.createFile(path);
try (FileWriter fr = new FileWriter(new File(path.toUri()))) {
fr.write(text);
} catch (Exception e) {
// no-op
}
cfg.incrementalSQLFile = path.toString();
}
private void createSourceTable() throws IOException, URISyntaxException {
String instantTime = "101";
HiveTestUtil.createCOWTable(instantTime, 5, true);
hiveSyncConfig.syncMode = "jdbc";
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
}
private void createTargetTable() throws IOException, URISyntaxException {
String instantTime = "100";
targetBasePath = Files.createTempDirectory("hivesynctest1" + Instant.now().toEpochMilli()).toUri().toString();
HiveTestUtil.createCOWTable(instantTime, 5, true,
targetBasePath, "tgtdb", "test2");
HiveSyncTool tool = new HiveSyncTool(getTargetHiveSyncConfig(targetBasePath), HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
}
private HiveSyncConfig getTargetHiveSyncConfig(String basePath) {
HiveSyncConfig config = HiveSyncConfig.copy(hiveSyncConfig);
config.databaseName = "tgtdb";
config.tableName = "test2";
config.basePath = basePath;
config.batchSyncNum = 3;
config.syncMode = "jdbc";
return config;
}
private HiveSyncConfig getAssertionSyncConfig(String databaseName) {
HiveSyncConfig config = HiveSyncConfig.copy(hiveSyncConfig);
config.databaseName = databaseName;
return config;
}
private void createTables() throws IOException, URISyntaxException {
createSourceTable();
createTargetTable();
}
@Test
public void testPullerWithoutIncrementalClause() throws IOException, URISyntaxException {
createTables();
HiveIncrementalPuller puller = new HiveIncrementalPuller(getHivePullerConfig(
"select name from testdb.test1"));
Exception e = assertThrows(HoodieIncrementalPullSQLException.class, puller::saveDelta,
"Should fail when incremental clause not provided!");
assertTrue(e.getMessage().contains("Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which means its not pulling incrementally"));
}
@Test
public void testPullerWithoutSourceInSql() throws IOException, URISyntaxException {
createTables();
HiveIncrementalPuller puller = new HiveIncrementalPuller(getHivePullerConfig(
"select name from tgtdb.test2 where `_hoodie_commit_time` > '%s'"));
Exception e = assertThrows(HoodieIncrementalPullSQLException.class, puller::saveDelta,
"Should fail when source db and table names not provided!");
assertTrue(e.getMessage().contains("Incremental SQL does not have testdb.test1"));
}
@Test
public void testPuller() throws IOException, URISyntaxException {
createTables();
HiveIncrementalPuller.Config cfg = getHivePullerConfig("select name from testdb.test1 where `_hoodie_commit_time` > '%s'");
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
hiveClient.createDatabase(cfg.tmpDb);
HiveIncrementalPuller puller = new HiveIncrementalPuller(cfg);
puller.saveDelta();
HiveSyncConfig assertingConfig = getAssertionSyncConfig(cfg.tmpDb);
HoodieHiveClient assertingClient = new HoodieHiveClient(assertingConfig, HiveTestUtil.getHiveConf(), fileSystem);
String tmpTable = cfg.targetTable + "__" + cfg.sourceTable;
assertTrue(assertingClient.doesTableExist(tmpTable));
}
} }