[HUDI-1484] Escape the partition value in HiveSyncTool (#2363)
This commit is contained in:
@@ -21,6 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.HoodieDataSourceHelpers;
|
||||
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
|
||||
@@ -49,6 +50,8 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
|
||||
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive").
|
||||
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
|
||||
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
|
||||
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName).
|
||||
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "true").
|
||||
mode(SaveMode.Overwrite).
|
||||
save("/user/hive/warehouse/stock_ticks_derived_mor");
|
||||
|
||||
@@ -79,6 +82,8 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
|
||||
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive").
|
||||
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
|
||||
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
|
||||
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName).
|
||||
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "true").
|
||||
mode(SaveMode.Overwrite).
|
||||
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
|
||||
|
||||
|
||||
@@ -373,6 +373,8 @@ private[hudi] object HoodieSparkSqlWriter {
|
||||
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
|
||||
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
|
||||
hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
|
||||
hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
|
||||
DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean
|
||||
hiveSyncConfig
|
||||
}
|
||||
|
||||
|
||||
@@ -84,6 +84,9 @@ public class HiveSyncConfig implements Serializable {
|
||||
+ "Disabled by default for backward compatibility.")
|
||||
public Boolean supportTimestamp = false;
|
||||
|
||||
@Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
|
||||
public Boolean decodePartition = false;
|
||||
|
||||
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
|
||||
HiveSyncConfig newConfig = new HiveSyncConfig();
|
||||
newConfig.basePath = cfg.basePath;
|
||||
@@ -97,15 +100,30 @@ public class HiveSyncConfig implements Serializable {
|
||||
newConfig.tableName = cfg.tableName;
|
||||
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
|
||||
newConfig.supportTimestamp = cfg.supportTimestamp;
|
||||
newConfig.decodePartition = cfg.decodePartition;
|
||||
return newConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HiveSyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
|
||||
+ ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
|
||||
+ ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
|
||||
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + '\'' + ", supportTimestamp='" + supportTimestamp + '\''
|
||||
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", useJdbc=" + useJdbc + ", help=" + help + '}';
|
||||
return "HiveSyncConfig{"
|
||||
+ "databaseName='" + databaseName + '\''
|
||||
+ ", tableName='" + tableName + '\''
|
||||
+ ", baseFileFormat='" + baseFileFormat + '\''
|
||||
+ ", hiveUser='" + hiveUser + '\''
|
||||
+ ", hivePass='" + hivePass + '\''
|
||||
+ ", jdbcUrl='" + jdbcUrl + '\''
|
||||
+ ", basePath='" + basePath + '\''
|
||||
+ ", partitionFields=" + partitionFields
|
||||
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
|
||||
+ ", assumeDatePartitioning=" + assumeDatePartitioning
|
||||
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
|
||||
+ ", useJdbc=" + useJdbc
|
||||
+ ", autoCreateDatabase=" + autoCreateDatabase
|
||||
+ ", skipROSuffix=" + skipROSuffix
|
||||
+ ", help=" + help
|
||||
+ ", supportTimestamp=" + supportTimestamp
|
||||
+ ", decodePartition=" + decodePartition
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,9 @@
|
||||
|
||||
package org.apache.hudi.hive;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLDecoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.MetaException;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
@@ -162,7 +165,17 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
|
||||
+ ". Check partition strategy. ");
|
||||
List<String> partBuilder = new ArrayList<>();
|
||||
for (int i = 0; i < syncConfig.partitionFields.size(); i++) {
|
||||
partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValues.get(i) + "'");
|
||||
String partitionValue = partitionValues.get(i);
|
||||
// decode the partition before sync to hive to prevent multiple escapes of HIVE
|
||||
if (syncConfig.decodePartition) {
|
||||
try {
|
||||
// This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
|
||||
partitionValue = URLDecoder.decode(partitionValue, StandardCharsets.UTF_8.toString());
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new HoodieHiveSyncException("error in decode partition: " + partitionValue, e);
|
||||
}
|
||||
}
|
||||
partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValue + "'");
|
||||
}
|
||||
return String.join(",", partBuilder);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user