[HUDI-2116] Support batch synchronization of partition datas to hive metastore to avoid oom problem (#3209)
This commit is contained in:
@@ -462,6 +462,11 @@ object DataSourceWriteOptions {
|
||||
.defaultValue(false)
|
||||
.withDocumentation("Whether to sync the table as managed table.")
|
||||
|
||||
val HIVE_BATCH_SYNC_PARTITION_NUM: ConfigProperty[Int] = ConfigProperty
|
||||
.key("hoodie.datasource.hive_sync.batch_num")
|
||||
.defaultValue(1000)
|
||||
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.")
|
||||
|
||||
// Async Compaction - Enabled by default for MOR
|
||||
val ASYNC_COMPACT_ENABLE_OPT_KEY: ConfigProperty[String] = ConfigProperty
|
||||
.key("hoodie.datasource.compaction.async.enable")
|
||||
|
||||
@@ -428,6 +428,7 @@ object HoodieSparkSqlWriter {
|
||||
hiveSyncConfig.supportTimestamp = hoodieConfig.getStringOrDefault(HIVE_SUPPORT_TIMESTAMP).toBoolean
|
||||
hiveSyncConfig.autoCreateDatabase = hoodieConfig.getStringOrDefault(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).toBoolean
|
||||
hiveSyncConfig.decodePartition = hoodieConfig.getStringOrDefault(URL_ENCODE_PARTITIONING_OPT_KEY).toBoolean
|
||||
hiveSyncConfig.batchSyncNum = hoodieConfig.getStringOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM).toInt
|
||||
|
||||
val syncAsDtaSourceTable = hoodieConfig.getStringOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean
|
||||
if (syncAsDtaSourceTable) {
|
||||
|
||||
@@ -107,6 +107,9 @@ public class HiveSyncConfig implements Serializable {
|
||||
@Parameter(names = {"--managed-table"}, description = "Create a managed table")
|
||||
public Boolean createManagedTable = false;
|
||||
|
||||
@Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
|
||||
public Integer batchSyncNum = 1000;
|
||||
|
||||
// enhance the similar function in child class
|
||||
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
|
||||
HiveSyncConfig newConfig = new HiveSyncConfig();
|
||||
@@ -127,6 +130,7 @@ public class HiveSyncConfig implements Serializable {
|
||||
newConfig.tableProperties = cfg.tableProperties;
|
||||
newConfig.serdeProperties = cfg.serdeProperties;
|
||||
newConfig.createManagedTable = cfg.createManagedTable;
|
||||
newConfig.batchSyncNum = cfg.batchSyncNum;
|
||||
return newConfig;
|
||||
}
|
||||
|
||||
|
||||
@@ -140,8 +140,8 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
||||
return;
|
||||
}
|
||||
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
|
||||
String sql = constructAddPartitions(tableName, partitionsToAdd);
|
||||
updateHiveSQL(sql);
|
||||
List<String> sqls = constructAddPartitions(tableName, partitionsToAdd);
|
||||
sqls.stream().forEach(sql -> updateHiveSQL(sql));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -180,18 +180,36 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
||||
}
|
||||
}
|
||||
|
||||
private String constructAddPartitions(String tableName, List<String> partitions) {
|
||||
private StringBuilder getAlterTablePrefix(String tableName) {
|
||||
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
|
||||
alterSQL.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.databaseName)
|
||||
.append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
|
||||
.append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
|
||||
for (String partition : partitions) {
|
||||
String partitionClause = getPartitionClause(partition);
|
||||
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
|
||||
return alterSQL;
|
||||
}
|
||||
|
||||
private List<String> constructAddPartitions(String tableName, List<String> partitions) {
|
||||
if (syncConfig.batchSyncNum <= 0) {
|
||||
throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
|
||||
}
|
||||
List<String> result = new ArrayList<>();
|
||||
int batchSyncPartitionNum = syncConfig.batchSyncNum;
|
||||
StringBuilder alterSQL = getAlterTablePrefix(tableName);
|
||||
for (int i = 0; i < partitions.size(); i++) {
|
||||
String partitionClause = getPartitionClause(partitions.get(i));
|
||||
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partitions.get(i)).toString();
|
||||
alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath)
|
||||
.append("' ");
|
||||
if ((i + 1) % batchSyncPartitionNum == 0) {
|
||||
result.add(alterSQL.toString());
|
||||
alterSQL = getAlterTablePrefix(tableName);
|
||||
}
|
||||
}
|
||||
return alterSQL.toString();
|
||||
// add left partitions to result
|
||||
if (partitions.size() % batchSyncPartitionNum != 0) {
|
||||
result.add(alterSQL.toString());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -41,6 +41,7 @@ public class GlobalHiveSyncConfig extends HiveSyncConfig {
|
||||
newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
|
||||
newConfig.supportTimestamp = cfg.supportTimestamp;
|
||||
newConfig.decodePartition = cfg.decodePartition;
|
||||
newConfig.batchSyncNum = cfg.batchSyncNum;
|
||||
newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp;
|
||||
return newConfig;
|
||||
}
|
||||
|
||||
@@ -89,6 +89,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
|
||||
public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
|
||||
String instantTime = "100";
|
||||
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
|
||||
HoodieHiveClient hiveClient =
|
||||
@@ -160,6 +161,7 @@ public class TestHiveSyncTool {
|
||||
public void testSyncCOWTableWithProperties(boolean useJdbc,
|
||||
boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
|
||||
Map<String, String> serdeProperties = new HashMap<String, String>() {
|
||||
{
|
||||
put("path", hiveSyncConfig.basePath);
|
||||
@@ -214,6 +216,7 @@ public class TestHiveSyncTool {
|
||||
public void testSyncMORTableWithProperties(boolean useJdbc,
|
||||
boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
|
||||
Map<String, String> serdeProperties = new HashMap<String, String>() {
|
||||
{
|
||||
put("path", hiveSyncConfig.basePath);
|
||||
@@ -312,6 +315,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testSyncIncremental(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
String commitTime1 = "100";
|
||||
HiveTestUtil.createCOWTable(commitTime1, 5, true);
|
||||
HoodieHiveClient hiveClient =
|
||||
@@ -351,6 +355,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
String commitTime1 = "100";
|
||||
HiveTestUtil.createCOWTable(commitTime1, 5, true);
|
||||
HoodieHiveClient hiveClient =
|
||||
@@ -388,6 +393,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbcAndSchemaFromCommitMetadata")
|
||||
public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
String instantTime = "100";
|
||||
String deltaCommitTime = "101";
|
||||
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
|
||||
@@ -454,6 +460,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbcAndSchemaFromCommitMetadata")
|
||||
public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
String instantTime = "100";
|
||||
String deltaCommitTime = "101";
|
||||
String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||
@@ -524,6 +531,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
String instantTime = "100";
|
||||
HiveTestUtil.createCOWTable(instantTime, 5, true);
|
||||
|
||||
@@ -598,6 +606,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testNonPartitionedSync(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
String instantTime = "100";
|
||||
HiveTestUtil.createCOWTable(instantTime, 5, true);
|
||||
|
||||
@@ -627,6 +636,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testReadSchemaForMOR(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
String commitTime = "100";
|
||||
String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||
HiveTestUtil.createMORTable(commitTime, "", 5, false, true);
|
||||
@@ -675,6 +685,7 @@ public class TestHiveSyncTool {
|
||||
@Test
|
||||
public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = true;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
String instantTime = "100";
|
||||
HiveTestUtil.createCOWTable(instantTime, 5, false);
|
||||
HoodieHiveClient hiveClient =
|
||||
@@ -720,6 +731,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
final String commitTime = "100";
|
||||
HiveTestUtil.createCOWTable(commitTime, 1, true);
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
@@ -740,6 +752,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
final String commitTime = "100";
|
||||
HiveTestUtil.createCOWTable(commitTime, 1, true);
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
@@ -782,6 +795,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
final String commitTime = "100";
|
||||
HiveTestUtil.createCOWTable(commitTime, 1, true);
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
@@ -828,6 +842,7 @@ public class TestHiveSyncTool {
|
||||
@MethodSource("useJdbc")
|
||||
public void testTypeConverter(boolean useJdbc) throws Exception {
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
|
||||
HiveTestUtil.createCOWTable("100", 5, true);
|
||||
HoodieHiveClient hiveClient =
|
||||
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
|
||||
Reference in New Issue
Block a user