diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 8a68d3e33..57cd48edc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -462,6 +462,11 @@ object DataSourceWriteOptions { .defaultValue(false) .withDocumentation("Whether to sync the table as managed table.") + val HIVE_BATCH_SYNC_PARTITION_NUM: ConfigProperty[Int] = ConfigProperty + .key("hoodie.datasource.hive_sync.batch_num") + .defaultValue(1000) + .withDocumentation("The number of partitions one batch when synchronous partitions to hive.") + // Async Compaction - Enabled by default for MOR val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.compaction.async.enable") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index cecb9dec7..aa835ff52 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -428,6 +428,7 @@ object HoodieSparkSqlWriter { hiveSyncConfig.supportTimestamp = hoodieConfig.getStringOrDefault(HIVE_SUPPORT_TIMESTAMP).toBoolean hiveSyncConfig.autoCreateDatabase = hoodieConfig.getStringOrDefault(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).toBoolean hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean + hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean if (syncAsDtaSourceTable) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 3d4cc5b20..09c3e7b35 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -107,6 +107,9 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--managed-table"}, description = "Create a managed table") public Boolean createManagedTable = false; + @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive") + public Integer batchSyncNum = 1000; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -127,6 +130,7 @@ public class HiveSyncConfig implements Serializable { newConfig.tableProperties = cfg.tableProperties; newConfig.serdeProperties = cfg.serdeProperties; newConfig.createManagedTable = cfg.createManagedTable; + newConfig.batchSyncNum = cfg.batchSyncNum; return newConfig; } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 9d0214595..ad219e4f9 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -140,8 +140,8 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { return; } LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName); - String sql = constructAddPartitions(tableName, partitionsToAdd); - updateHiveSQL(sql); + List sqls = constructAddPartitions(tableName, partitionsToAdd); + sqls.stream().forEach(sql -> updateHiveSQL(sql)); } /** @@ -180,18 +180,36 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient { } } - private String constructAddPartitions(String tableName, List partitions) { + private StringBuilder getAlterTablePrefix(String tableName) { StringBuilder alterSQL = new StringBuilder("ALTER TABLE "); alterSQL.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.databaseName) .append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER) .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS "); - for (String partition : partitions) { - String partitionClause = getPartitionClause(partition); - String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString(); + return alterSQL; + } + + private List constructAddPartitions(String tableName, List partitions) { + if (syncConfig.batchSyncNum <= 0) { + throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); + } + List result = new ArrayList<>(); + int batchSyncPartitionNum = syncConfig.batchSyncNum; + StringBuilder alterSQL = getAlterTablePrefix(tableName); + for (int i = 0; i < partitions.size(); i++) { + String partitionClause = getPartitionClause(partitions.get(i)); + String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partitions.get(i)).toString(); alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath) .append("' "); + if ((i + 1) % batchSyncPartitionNum == 0) { + result.add(alterSQL.toString()); + alterSQL = getAlterTablePrefix(tableName); + } } - return alterSQL.toString(); + // add left partitions to result + if (partitions.size() % batchSyncPartitionNum != 0) { + result.add(alterSQL.toString()); + } + return result; } /** diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java index 19074c800..fd7dbd81b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java @@ -41,6 +41,7 @@ public class GlobalHiveSyncConfig extends HiveSyncConfig { newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing; newConfig.supportTimestamp = cfg.supportTimestamp; newConfig.decodePartition = cfg.decodePartition; + newConfig.batchSyncNum = cfg.batchSyncNum; newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp; return newConfig; } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index f0e171b4b..c4125337e 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -89,6 +89,7 @@ public class TestHiveSyncTool { @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata); HoodieHiveClient hiveClient = @@ -160,6 +161,7 @@ public class TestHiveSyncTool { public void testSyncCOWTableWithProperties(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); @@ -214,6 +216,7 @@ public class TestHiveSyncTool { public void testSyncMORTableWithProperties(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 3; Map serdeProperties = new HashMap() { { put("path", hiveSyncConfig.basePath); @@ -312,6 +315,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testSyncIncremental(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String commitTime1 = "100"; HiveTestUtil.createCOWTable(commitTime1, 5, true); HoodieHiveClient hiveClient = @@ -351,6 +355,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String commitTime1 = "100"; HiveTestUtil.createCOWTable(commitTime1, 5, true); HoodieHiveClient hiveClient = @@ -388,6 +393,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbcAndSchemaFromCommitMetadata") public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; String deltaCommitTime = "101"; HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, @@ -454,6 +460,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbcAndSchemaFromCommitMetadata") public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; String deltaCommitTime = "101"; String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; @@ -524,6 +531,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testMultiPartitionKeySync(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, true); @@ -598,6 +606,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testNonPartitionedSync(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, true); @@ -627,6 +636,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testReadSchemaForMOR(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String commitTime = "100"; String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; HiveTestUtil.createMORTable(commitTime, "", 5, false, true); @@ -675,6 +685,7 @@ public class TestHiveSyncTool { @Test public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException { HiveTestUtil.hiveSyncConfig.useJdbc = true; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; String instantTime = "100"; HiveTestUtil.createCOWTable(instantTime, 5, false); HoodieHiveClient hiveClient = @@ -720,6 +731,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; final String commitTime = "100"; HiveTestUtil.createCOWTable(commitTime, 1, true); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); @@ -740,6 +752,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; final String commitTime = "100"; HiveTestUtil.createCOWTable(commitTime, 1, true); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); @@ -782,6 +795,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; final String commitTime = "100"; HiveTestUtil.createCOWTable(commitTime, 1, true); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); @@ -828,6 +842,7 @@ public class TestHiveSyncTool { @MethodSource("useJdbc") public void testTypeConverter(boolean useJdbc) throws Exception { HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc; + HiveTestUtil.hiveSyncConfig.batchSyncNum = 2; HiveTestUtil.createCOWTable("100", 5, true); HoodieHiveClient hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);