From 8547f11752bca1462f61c1042ef7b44bc7078e51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=91=A3=E5=8F=AF=E4=BC=A6?= Date: Fri, 21 Jan 2022 07:49:04 +0800 Subject: [PATCH] [HUDI-3271] Code optimization and clean up unused code in HoodieSparkSqlWriter (#4631) --- .../apache/hudi/HoodieSparkSqlWriter.scala | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f321cdf15..5dcf03d3e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -69,8 +69,8 @@ object HoodieSparkSqlWriter { df: DataFrame, hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, - asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty, - asyncClusteringTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty + asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty, + asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty ) : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String], SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { @@ -199,8 +199,8 @@ object HoodieSparkSqlWriter { // Get list of partitions to delete val partitionsToDelete = if (parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) { - val partitionColsToDelete = parameters.get(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).get.split(",") - java.util.Arrays.asList(partitionColsToDelete:_*) + val partitionColsToDelete = parameters(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).split(",") + java.util.Arrays.asList(partitionColsToDelete: _*) } else { genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect() } @@ -287,7 +287,7 @@ object HoodieSparkSqlWriter { } def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = { - val fieldsToRemove = new java.util.ArrayList[String]() + val fieldsToRemove = new java.util.ArrayList[String]() partitionParam.split(",").map(partitionField => partitionField.trim) .filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field)) HoodieAvroUtils.removeFields(schema, fieldsToRemove) @@ -317,13 +317,13 @@ object HoodieSparkSqlWriter { if (FSUtils.isTableExists(basePath.toString, fs)) { val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null); + latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null) } latestSchema } - def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext : SparkContext, df: Dataset[Row], - reconcileSchema: Boolean) : RDD[GenericRecord] = { + def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext: SparkContext, df: Dataset[Row], + reconcileSchema: Boolean): RDD[GenericRecord] = { val structName = s"${tblName}_record" val nameSpace = s"hoodie.${tblName}" sparkContext.getConf.registerKryoClasses( @@ -345,7 +345,7 @@ object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) - var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) + val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt) validateTableConfig(sqlContext.sparkSession, optParams, tableConfig) val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig) @@ -364,7 +364,6 @@ object HoodieSparkSqlWriter { schema = HoodieAvroUtils.getNullSchema.toString } - // Handle various save modes if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") @@ -441,9 +440,9 @@ object HoodieSparkSqlWriter { } val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString) val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params)) - val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) { + val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) { val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig) - if (userDefinedBulkInsertPartitionerOpt.isPresent) { + if (userDefinedBulkInsertPartitionerOpt.isPresent) { userDefinedBulkInsertPartitionerOpt.get } else { @@ -453,7 +452,7 @@ object HoodieSparkSqlWriter { // Sort modes are not yet supported when meta fields are disabled new NonSortPartitionerWithRows() } - val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted(); + val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted() parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString) val isGlobalIndex = if (populateMetaFields) { SparkHoodieIndexFactory.isGlobalIndex(writeConfig) @@ -472,7 +471,7 @@ object HoodieSparkSqlWriter { .options(params) .mode(SaveMode.Append) .save() - } else if(HoodieSparkUtils.isSpark3) { + } else if (HoodieSparkUtils.isSpark3) { hoodieDF.write.format("org.apache.hudi.spark3.internal") .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) .option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL) @@ -515,7 +514,7 @@ object HoodieSparkSqlWriter { if (operation != WriteOperationType.DELETE) { if (mode == SaveMode.ErrorIfExists && tableExists) { throw new HoodieException(s"hoodie table at $tablePath already exists.") - } else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { + } else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { // When user set operation as INSERT_OVERWRITE_TABLE, // overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.") @@ -576,7 +575,7 @@ object HoodieSparkSqlWriter { val hiveSyncEnabled = hoodieConfig.getStringOrDefault(HIVE_SYNC_ENABLED).toBoolean var metaSyncEnabled = hoodieConfig.getStringOrDefault(META_SYNC_ENABLED).toBoolean var syncClientToolClassSet = scala.collection.mutable.Set[String]() - hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS_NAME).split(",").foreach(syncClass => syncClientToolClassSet += syncClass) + hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS_NAME).split(",").foreach(syncClass => syncClientToolClassSet += syncClass) // for backward compatibility if (hiveSyncEnabled) { @@ -622,7 +621,7 @@ object HoodieSparkSqlWriter { jsc: JavaSparkContext, tableInstantInfo: TableInstantInfo ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = { - if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) { + if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) { log.info("Proceeding to commit the write.") val metaMap = parameters.filter(kv => kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key))) @@ -687,7 +686,7 @@ object HoodieSparkSqlWriter { private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], tableConfig: HoodieTableConfig, - parameters: Map[String, String], configuration: Configuration) : Boolean = { + parameters: Map[String, String], configuration: Configuration): Boolean = { log.info(s"Config.inlineCompactionEnabled ? ${client.getConfig.inlineCompactionEnabled}") if (asyncCompactionTriggerFnDefined && !client.getConfig.inlineCompactionEnabled && parameters.get(ASYNC_COMPACT_ENABLE.key).exists(r => r.toBoolean)) { @@ -698,7 +697,7 @@ object HoodieSparkSqlWriter { } private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], - parameters: Map[String, String]) : Boolean = { + parameters: Map[String, String]): Boolean = { log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}") asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled && parameters.get(ASYNC_CLUSTERING_ENABLE.key).exists(r => r.toBoolean)