[HUDI-2610] pass the spark version when sync the table created by spark (#4758)
* [HUDI-2610] pass the spark version when sync the table created by spark * [MINOR] sync spark version in DataSourceUtils#buildHiveSyncConfig
This commit is contained in:
@@ -55,6 +55,7 @@ import org.apache.spark.api.java.JavaRDD;
|
|||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.hive.HiveExternalCatalog;
|
||||||
import org.apache.spark.sql.types.StructType;
|
import org.apache.spark.sql.types.StructType;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -318,6 +319,9 @@ public class DataSourceUtils {
|
|||||||
(boolean) DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
|
(boolean) DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
|
||||||
? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
|
? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
|
||||||
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null;
|
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null;
|
||||||
|
if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION())) {
|
||||||
|
hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION());
|
||||||
|
}
|
||||||
return hiveSyncConfig;
|
return hiveSyncConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,9 @@ import org.apache.hudi.table.BulkInsertPartitioner
|
|||||||
|
|
||||||
import org.apache.log4j.LogManager
|
import org.apache.log4j.LogManager
|
||||||
|
|
||||||
|
import org.apache.spark.SPARK_VERSION
|
||||||
import org.apache.spark.api.java.JavaSparkContext
|
import org.apache.spark.api.java.JavaSparkContext
|
||||||
|
import org.apache.spark.sql.hive.HiveExternalCatalog
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
@@ -582,6 +584,7 @@ object HoodieSparkSqlWriter {
|
|||||||
hiveSyncConfig.syncMode = hoodieConfig.getString(HIVE_SYNC_MODE)
|
hiveSyncConfig.syncMode = hoodieConfig.getString(HIVE_SYNC_MODE)
|
||||||
hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
|
hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES)
|
||||||
hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
|
hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES)
|
||||||
|
hiveSyncConfig.sparkVersion = SPARK_VERSION
|
||||||
hiveSyncConfig
|
hiveSyncConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -129,6 +129,9 @@ public class HiveSyncConfig implements Serializable {
|
|||||||
@Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
|
@Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
|
||||||
public Boolean isConditionalSync = false;
|
public Boolean isConditionalSync = false;
|
||||||
|
|
||||||
|
@Parameter(names = {"--spark-version"}, description = "The spark version", required = false)
|
||||||
|
public String sparkVersion;
|
||||||
|
|
||||||
// enhance the similar function in child class
|
// enhance the similar function in child class
|
||||||
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
|
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
|
||||||
HiveSyncConfig newConfig = new HiveSyncConfig();
|
HiveSyncConfig newConfig = new HiveSyncConfig();
|
||||||
@@ -155,6 +158,7 @@ public class HiveSyncConfig implements Serializable {
|
|||||||
newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
|
newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold;
|
||||||
newConfig.withOperationField = cfg.withOperationField;
|
newConfig.withOperationField = cfg.withOperationField;
|
||||||
newConfig.isConditionalSync = cfg.isConditionalSync;
|
newConfig.isConditionalSync = cfg.isConditionalSync;
|
||||||
|
newConfig.sparkVersion = cfg.sparkVersion;
|
||||||
return newConfig;
|
return newConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import org.apache.hudi.common.fs.FSUtils;
|
|||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.InvalidTableException;
|
import org.apache.hudi.exception.InvalidTableException;
|
||||||
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
|
||||||
@@ -302,6 +303,9 @@ public class HiveSyncTool extends AbstractSyncTool {
|
|||||||
|
|
||||||
Map<String, String> sparkProperties = new HashMap<>();
|
Map<String, String> sparkProperties = new HashMap<>();
|
||||||
sparkProperties.put("spark.sql.sources.provider", "hudi");
|
sparkProperties.put("spark.sql.sources.provider", "hudi");
|
||||||
|
if (!StringUtils.isNullOrEmpty(cfg.sparkVersion)) {
|
||||||
|
sparkProperties.put("spark.sql.create.version", cfg.sparkVersion);
|
||||||
|
}
|
||||||
// Split the schema string to multi-parts according the schemaLengthThreshold size.
|
// Split the schema string to multi-parts according the schemaLengthThreshold size.
|
||||||
String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType);
|
String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType);
|
||||||
int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
|
int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;
|
||||||
|
|||||||
Reference in New Issue
Block a user