From 2fe7a3a41f36f0dcbbc89e4dbe51b70f2de997db Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Thu, 10 Feb 2022 23:35:28 +0800 Subject: [PATCH] [HUDI-2610] pass the spark version when sync the table created by spark (#4758) * [HUDI-2610] pass the spark version when sync the table created by spark * [MINOR] sync spark version in DataSourceUtils#buildHiveSyncConfig --- .../src/main/java/org/apache/hudi/DataSourceUtils.java | 4 ++++ .../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 3 +++ .../src/main/java/org/apache/hudi/hive/HiveSyncConfig.java | 4 ++++ .../src/main/java/org/apache/hudi/hive/HiveSyncTool.java | 4 ++++ 4 files changed, 15 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 24afc013a..bad6c2d72 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -55,6 +55,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.hive.HiveExternalCatalog; import org.apache.spark.sql.types.StructType; import java.io.IOException; @@ -318,6 +319,9 @@ public class DataSourceUtils { (boolean) DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue()) ? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null; + if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION())) { + hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION()); + } return hiveSyncConfig; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 62002adf5..e2ba3d595 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -47,7 +47,9 @@ import org.apache.hudi.table.BulkInsertPartitioner import org.apache.log4j.LogManager +import org.apache.spark.SPARK_VERSION import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.rdd.RDD import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.StructType @@ -582,6 +584,7 @@ object HoodieSparkSqlWriter { hiveSyncConfig.syncMode = hoodieConfig.getString(HIVE_SYNC_MODE) hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES) hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES) + hiveSyncConfig.sparkVersion = SPARK_VERSION hiveSyncConfig } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 254e8ba53..50991852b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -129,6 +129,9 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.") public Boolean isConditionalSync = false; + @Parameter(names = {"--spark-version"}, description = "The spark version", required = false) + public String sparkVersion; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -155,6 +158,7 @@ public class HiveSyncConfig implements Serializable { newConfig.sparkSchemaLengthThreshold = cfg.sparkSchemaLengthThreshold; newConfig.withOperationField = cfg.withOperationField; newConfig.isConditionalSync = cfg.isConditionalSync; + newConfig.sparkVersion = cfg.sparkVersion; return newConfig; } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index b37b28ed2..35200216e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; @@ -302,6 +303,9 @@ public class HiveSyncTool extends AbstractSyncTool { Map sparkProperties = new HashMap<>(); sparkProperties.put("spark.sql.sources.provider", "hudi"); + if (!StringUtils.isNullOrEmpty(cfg.sparkVersion)) { + sparkProperties.put("spark.sql.create.version", cfg.sparkVersion); + } // Split the schema string to multi-parts according the schemaLengthThreshold size. String schemaString = Parquet2SparkSchemaUtils.convertToSparkSchemaJson(reOrderedType); int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) / schemaLengthThreshold;