[HUDI-3271] Code optimization and clean up unused code in HoodieSparkSqlWriter (#4631)
This commit is contained in:
@@ -69,8 +69,8 @@ object HoodieSparkSqlWriter {
|
|||||||
df: DataFrame,
|
df: DataFrame,
|
||||||
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
|
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
|
||||||
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
|
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
|
||||||
asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty,
|
asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
|
||||||
asyncClusteringTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
|
asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty
|
||||||
)
|
)
|
||||||
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
|
: (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
|
||||||
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
|
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
|
||||||
@@ -199,8 +199,8 @@ object HoodieSparkSqlWriter {
|
|||||||
|
|
||||||
// Get list of partitions to delete
|
// Get list of partitions to delete
|
||||||
val partitionsToDelete = if (parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
|
val partitionsToDelete = if (parameters.containsKey(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key())) {
|
||||||
val partitionColsToDelete = parameters.get(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).get.split(",")
|
val partitionColsToDelete = parameters(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key()).split(",")
|
||||||
java.util.Arrays.asList(partitionColsToDelete:_*)
|
java.util.Arrays.asList(partitionColsToDelete: _*)
|
||||||
} else {
|
} else {
|
||||||
genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
|
genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect()
|
||||||
}
|
}
|
||||||
@@ -317,13 +317,13 @@ object HoodieSparkSqlWriter {
|
|||||||
if (FSUtils.isTableExists(basePath.toString, fs)) {
|
if (FSUtils.isTableExists(basePath.toString, fs)) {
|
||||||
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
|
val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
|
||||||
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
|
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
|
||||||
latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null);
|
latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null)
|
||||||
}
|
}
|
||||||
latestSchema
|
latestSchema
|
||||||
}
|
}
|
||||||
|
|
||||||
def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext : SparkContext, df: Dataset[Row],
|
def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext: SparkContext, df: Dataset[Row],
|
||||||
reconcileSchema: Boolean) : RDD[GenericRecord] = {
|
reconcileSchema: Boolean): RDD[GenericRecord] = {
|
||||||
val structName = s"${tblName}_record"
|
val structName = s"${tblName}_record"
|
||||||
val nameSpace = s"hoodie.${tblName}"
|
val nameSpace = s"hoodie.${tblName}"
|
||||||
sparkContext.getConf.registerKryoClasses(
|
sparkContext.getConf.registerKryoClasses(
|
||||||
@@ -345,7 +345,7 @@ object HoodieSparkSqlWriter {
|
|||||||
val sparkContext = sqlContext.sparkContext
|
val sparkContext = sqlContext.sparkContext
|
||||||
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
|
||||||
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
|
||||||
var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
|
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
|
||||||
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
|
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
|
||||||
|
|
||||||
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
|
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
|
||||||
@@ -364,7 +364,6 @@ object HoodieSparkSqlWriter {
|
|||||||
schema = HoodieAvroUtils.getNullSchema.toString
|
schema = HoodieAvroUtils.getNullSchema.toString
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Handle various save modes
|
// Handle various save modes
|
||||||
if (mode == SaveMode.Ignore && tableExists) {
|
if (mode == SaveMode.Ignore && tableExists) {
|
||||||
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
|
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
|
||||||
@@ -441,7 +440,7 @@ object HoodieSparkSqlWriter {
|
|||||||
}
|
}
|
||||||
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString)
|
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key, schema.toString)
|
||||||
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
|
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
|
||||||
val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
|
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
|
||||||
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
|
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
|
||||||
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
|
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
|
||||||
userDefinedBulkInsertPartitionerOpt.get
|
userDefinedBulkInsertPartitionerOpt.get
|
||||||
@@ -453,7 +452,7 @@ object HoodieSparkSqlWriter {
|
|||||||
// Sort modes are not yet supported when meta fields are disabled
|
// Sort modes are not yet supported when meta fields are disabled
|
||||||
new NonSortPartitionerWithRows()
|
new NonSortPartitionerWithRows()
|
||||||
}
|
}
|
||||||
val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted();
|
val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted()
|
||||||
parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString)
|
parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString)
|
||||||
val isGlobalIndex = if (populateMetaFields) {
|
val isGlobalIndex = if (populateMetaFields) {
|
||||||
SparkHoodieIndexFactory.isGlobalIndex(writeConfig)
|
SparkHoodieIndexFactory.isGlobalIndex(writeConfig)
|
||||||
@@ -472,7 +471,7 @@ object HoodieSparkSqlWriter {
|
|||||||
.options(params)
|
.options(params)
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save()
|
.save()
|
||||||
} else if(HoodieSparkUtils.isSpark3) {
|
} else if (HoodieSparkUtils.isSpark3) {
|
||||||
hoodieDF.write.format("org.apache.hudi.spark3.internal")
|
hoodieDF.write.format("org.apache.hudi.spark3.internal")
|
||||||
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
|
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
|
||||||
.option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
|
.option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
|
||||||
@@ -622,7 +621,7 @@ object HoodieSparkSqlWriter {
|
|||||||
jsc: JavaSparkContext,
|
jsc: JavaSparkContext,
|
||||||
tableInstantInfo: TableInstantInfo
|
tableInstantInfo: TableInstantInfo
|
||||||
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
|
): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
|
||||||
if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
|
if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) {
|
||||||
log.info("Proceeding to commit the write.")
|
log.info("Proceeding to commit the write.")
|
||||||
val metaMap = parameters.filter(kv =>
|
val metaMap = parameters.filter(kv =>
|
||||||
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key)))
|
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key)))
|
||||||
@@ -687,7 +686,7 @@ object HoodieSparkSqlWriter {
|
|||||||
|
|
||||||
private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
|
private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
|
||||||
tableConfig: HoodieTableConfig,
|
tableConfig: HoodieTableConfig,
|
||||||
parameters: Map[String, String], configuration: Configuration) : Boolean = {
|
parameters: Map[String, String], configuration: Configuration): Boolean = {
|
||||||
log.info(s"Config.inlineCompactionEnabled ? ${client.getConfig.inlineCompactionEnabled}")
|
log.info(s"Config.inlineCompactionEnabled ? ${client.getConfig.inlineCompactionEnabled}")
|
||||||
if (asyncCompactionTriggerFnDefined && !client.getConfig.inlineCompactionEnabled
|
if (asyncCompactionTriggerFnDefined && !client.getConfig.inlineCompactionEnabled
|
||||||
&& parameters.get(ASYNC_COMPACT_ENABLE.key).exists(r => r.toBoolean)) {
|
&& parameters.get(ASYNC_COMPACT_ENABLE.key).exists(r => r.toBoolean)) {
|
||||||
@@ -698,7 +697,7 @@ object HoodieSparkSqlWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
|
private def isAsyncClusteringEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
|
||||||
parameters: Map[String, String]) : Boolean = {
|
parameters: Map[String, String]): Boolean = {
|
||||||
log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}")
|
log.info(s"Config.asyncClusteringEnabled ? ${client.getConfig.isAsyncClusteringEnabled}")
|
||||||
asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled &&
|
asyncClusteringTriggerFnDefined && client.getConfig.isAsyncClusteringEnabled &&
|
||||||
parameters.get(ASYNC_CLUSTERING_ENABLE.key).exists(r => r.toBoolean)
|
parameters.get(ASYNC_CLUSTERING_ENABLE.key).exists(r => r.toBoolean)
|
||||||
|
|||||||
Reference in New Issue
Block a user