diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index a93331ee2..79c858e06 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -44,12 +44,12 @@ trait SparkAdapter extends Serializable { /** * Convert a AliasIdentifier to TableIdentifier. */ - def toTableIdentify(aliasId: AliasIdentifier): TableIdentifier + def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier /** * Convert a UnresolvedRelation to TableIdentifier. */ - def toTableIdentify(relation: UnresolvedRelation): TableIdentifier + def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier /** * Create Join logical plan. diff --git a/hudi-spark-datasource/README.md b/hudi-spark-datasource/README.md new file mode 100644 index 000000000..50a88ac8d --- /dev/null +++ b/hudi-spark-datasource/README.md @@ -0,0 +1,38 @@ + + +# Description of the relationship between each module + +This repo contains the code that integrate Hudi with Spark. The repo is split into the following modules + +`hudi-spark` +`hudi-spark2` +`hudi-spark3` +`hudi-spark3.1.x` +`hudi-spark2-common` +`hudi-spark3-common` +`hudi-spark-common` + +* hudi-spark is the module that contains the code that both spark2 & spark3 version would share, also contains the antlr4 +file that supports spark sql on spark 2.x version. +* hudi-spark2 is the module that contains the code that compatible with spark 2.x versions. +* hudi-spark3 is the module that contains the code that compatible with spark 3.2.0(and above) versions。 +* hudi-spark3.1.x is the module that contains the code that compatible with spark3.1.x and spark3.0.x version. +* hudi-spark2-common is the module that contains the code that would be reused between spark2.x versions, right now the module +has no class since hudi only supports spark 2.4.4 version, and it acts as the placeholder when packaging hudi-spark-bundle module. +* hudi-spark3-common is the module that contains the code that would be reused between spark3.x versions. +* hudi-spark-common is the module that contains the code that would be reused between spark2.x and spark3.x versions. diff --git a/hudi-spark-datasource/hudi-spark-common/pom.xml b/hudi-spark-datasource/hudi-spark-common/pom.xml index caa218a70..790fd50b6 100644 --- a/hudi-spark-datasource/hudi-spark-common/pom.xml +++ b/hudi-spark-datasource/hudi-spark-common/pom.xml @@ -170,11 +170,53 @@ ${project.version} + + + org.apache.spark + spark-core_${scala.binary.version} + + + javax.servlet + * + + + org.apache.spark spark-sql_${scala.binary.version} + + org.apache.spark + spark-hive_${scala.binary.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + tests + test + + + org.apache.spark + spark-core_${scala.binary.version} + tests + test + + + org.apache.spark + spark-catalyst_${scala.binary.version} + tests + test + + + + + org.apache.spark + spark-avro_${scala.binary.version} + provided + + org.apache.hudi diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java rename to hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/SparkRowWriteHelper.java similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/SparkRowWriteHelper.java rename to hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/SparkRowWriteHelper.java diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java rename to hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/async/SparkStreamingAsyncClusteringService.java diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java rename to hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/IExpressionEvaluator.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/sql/IExpressionEvaluator.java similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/IExpressionEvaluator.java rename to hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/sql/IExpressionEvaluator.java diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/sql/InsertMode.java similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/sql/InsertMode.java rename to hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/sql/InsertMode.java diff --git a/hudi-spark-datasource/hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister rename to hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 9b437f545..a57691f9f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -177,7 +177,7 @@ class DefaultSource extends RelationProvider outputMode) } - override def shortName(): String = "hudi" + override def shortName(): String = "hudi_v1" private def getBaseFileOnlyView(useHoodieFileIndex: Boolean, sqlContext: SQLContext, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 572a97e9d..a6c63660e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr -import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{AnalysisException, Column, SparkSession} @@ -87,7 +87,7 @@ case class HoodieFileIndex( private val tableType = metaClient.getTableType private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) - .map(HoodieSqlUtils.formatQueryInstant) + .map(HoodieSqlCommonUtils.formatQueryInstant) /** * Get all completeCommits. diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala similarity index 92% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index ffe2c9282..226fb01f4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -119,7 +119,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } private def logFileIterator(split: HoodieMergeOnReadFileSplit, - config: Configuration): Iterator[InternalRow] = + config: Configuration): Iterator[InternalRow] = new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) @@ -168,8 +168,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit, - baseFileIterator: Iterator[InternalRow], - config: Configuration): Iterator[InternalRow] = + baseFileIterator: Iterator[InternalRow], + config: Configuration): Iterator[InternalRow] = new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) @@ -225,8 +225,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, } private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit, - baseFileIterator: Iterator[InternalRow], - config: Configuration): Iterator[InternalRow] = + baseFileIterator: Iterator[InternalRow], + config: Configuration): Iterator[InternalRow] = new Iterator[InternalRow] with Closeable { private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) @@ -350,23 +350,23 @@ private object HoodieMergeOnReadRDD { def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = { val fs = FSUtils.getFs(split.tablePath, config) HoodieMergedLogRecordScanner.newBuilder() - .withFileSystem(fs) - .withBasePath(split.tablePath) - .withLogFilePaths(split.logPaths.get.asJava) - .withReaderSchema(logSchema) - .withLatestInstantTime(split.latestCommit) - .withReadBlocksLazily( - Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) - .getOrElse(false)) - .withReverseReader(false) - .withBufferSize( - config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, - HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) - .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes) - .withSpillableMapBasePath( - config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, - HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) - .build() + .withFileSystem(fs) + .withBasePath(split.tablePath) + .withLogFilePaths(split.logPaths.get.asJava) + .withReaderSchema(logSchema) + .withLatestInstantTime(split.latestCommit) + .withReadBlocksLazily( + Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, + HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) + .getOrElse(false)) + .withReverseReader(false) + .withBufferSize( + config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, + HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) + .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes) + .withSpillableMapBasePath( + config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, + HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + .build() } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 5414b4d75..f321cdf15 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -177,7 +177,7 @@ object HoodieSparkSqlWriter { val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) - .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] + .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { asyncCompactionTriggerFn.get.apply(client) @@ -489,9 +489,9 @@ object HoodieSparkSqlWriter { val syncHiveSuccess = if (hiveSyncEnabled || metaSyncEnabled) { metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema) - } else { - true - } + } else { + true + } (syncHiveSuccess, common.util.Option.ofNullable(instantTime)) } @@ -621,7 +621,7 @@ object HoodieSparkSqlWriter { tableConfig: HoodieTableConfig, jsc: JavaSparkContext, tableInstantInfo: TableInstantInfo - ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = { + ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = { if(writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).isEmpty()) { log.info("Proceeding to commit the write.") val metaMap = parameters.filter(kv => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index a1d857c94..b30e57310 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType @@ -99,7 +99,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, override def needConversion: Boolean = false private val specifiedQueryInstant = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) - .map(HoodieSqlUtils.formatQueryInstant) + .map(HoodieSqlCommonUtils.formatQueryInstant) override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/package.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/package.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/package.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/package.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala similarity index 91% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 8a43d8762..6087e3979 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -30,8 +30,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} -import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.types.{StructField, StructType} import java.util.{Locale, Properties} @@ -62,7 +62,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten * hoodie table's location. * if create managed hoodie table, use `catalog.defaultTablePath`. */ - val tableLocation: String = HoodieSqlUtils.getTableLocation(table, spark) + val tableLocation: String = HoodieSqlCommonUtils.getTableLocation(table, spark) /** * A flag to whether the hoodie table exists. @@ -124,7 +124,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten /** * The schema without hoodie meta fields */ - lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlUtils.removeMetaFields(tableSchema) + lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(tableSchema) /** * The schema of data fields @@ -136,7 +136,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten /** * The schema of data fields not including hoodie meta fields */ - lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlUtils.removeMetaFields(dataSchema) + lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlCommonUtils.removeMetaFields(dataSchema) /** * The schema of partition fields @@ -146,7 +146,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten /** * All the partition paths */ - def getAllPartitionPaths: Seq[String] = HoodieSqlUtils.getAllPartitionPaths(spark, table) + def getAllPartitionPaths: Seq[String] = HoodieSqlCommonUtils.getAllPartitionPaths(spark, table) /** * Check if table is a partitioned table @@ -213,7 +213,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten case (CatalogTableType.MANAGED, true) => throw new AnalysisException(s"Can not create the managed table('$catalogTableName')" + - s". The associated location('$tableLocation') already exists.") + s". The associated location('$tableLocation') already exists.") } HoodieOptionConfig.validateTable(spark, finalSchema, HoodieOptionConfig.mappingTableConfigToSqlOption(tableConfigs)) @@ -234,17 +234,17 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten val allPartitionPaths = getAllPartitionPaths if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) { extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = - originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) + originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) } else { extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = - String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table)) + String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table)) } if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) { extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = - originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) + originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) } else { extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = - String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table)) + String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table)) } } else { extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true" @@ -253,8 +253,8 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = - HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( - originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) + HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( + originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) } else { extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/trees/HoodieLeafLike.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/trees/HoodieLeafLike.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/trees/HoodieLeafLike.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/trees/HoodieLeafLike.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hive/HiveClientUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hive/HiveClientUtils.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hive/HiveClientUtils.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hive/HiveClientUtils.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index bc9f14978..e3388e221 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} +import org.apache.hudi.common.model.DefaultHoodieRecordPayload import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.ValidationUtils import org.apache.spark.sql.SparkSession diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala new file mode 100644 index 000000000..b5b75a604 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{Column, DataFrame, SparkSession} + +import java.net.URI +import java.text.SimpleDateFormat +import java.util.{Locale, Properties} +import scala.collection.JavaConverters._ +import scala.collection.immutable.Map + +object HoodieSqlCommonUtils extends SparkAdapterSupport { + // NOTE: {@code SimpleDataFormat} is NOT thread-safe + // TODO replace w/ DateTimeFormatter + private val defaultDateFormat = + ThreadLocal.withInitial(new java.util.function.Supplier[SimpleDateFormat] { + override def get() = new SimpleDateFormat("yyyy-MM-dd") + }) + + def isHoodieTable(table: CatalogTable): Boolean = { + table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi" + } + + def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = { + val table = spark.sessionState.catalog.getTableMetadata(tableId) + isHoodieTable(table) + } + + def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = { + tripAlias(table) match { + case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl) + case relation: UnresolvedRelation => + isHoodieTable(sparkAdapter.toTableIdentifier(relation), spark) + case _=> false + } + } + + def getTableIdentifier(table: LogicalPlan): TableIdentifier = { + table match { + case SubqueryAlias(name, _) => sparkAdapter.toTableIdentifier(name) + case _ => throw new IllegalArgumentException(s"Illegal table: $table") + } + } + + def getTableSqlSchema(metaClient: HoodieTableMetaClient, + includeMetadataFields: Boolean = false): Option[StructType] = { + val schemaResolver = new TableSchemaResolver(metaClient) + val avroSchema = try Some(schemaResolver.getTableAvroSchema(includeMetadataFields)) + catch { + case _: Throwable => None + } + avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType) + } + + def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = { + val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + val metadataConfig = { + val properties = new Properties() + properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ table.properties).asJava) + HoodieMetadataConfig.newBuilder.fromProperties(properties).build() + } + FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala + } + + /** + * This method is used to compatible with the old non-hive-styled partition table. + * By default we enable the "hoodie.datasource.write.hive_style_partitioning" + * when writing data to hudi table by spark sql by default. + * If the exist table is a non-hive-styled partitioned table, we should + * disable the "hoodie.datasource.write.hive_style_partitioning" when + * merge or update the table. Or else, we will get an incorrect merge result + * as the partition path mismatch. + */ + def isHiveStyledPartitioning(partitionPaths: Seq[String], table: CatalogTable): Boolean = { + if (table.partitionColumnNames.nonEmpty) { + val isHiveStylePartitionPath = (path: String) => { + val fragments = path.split("/") + if (fragments.size != table.partitionColumnNames.size) { + false + } else { + fragments.zip(table.partitionColumnNames).forall { + case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=") + } + } + } + partitionPaths.forall(isHiveStylePartitionPath) + } else { + true + } + } + + /** + * Determine whether URL encoding is enabled + */ + def isUrlEncodeEnabled(partitionPaths: Seq[String], table: CatalogTable): Boolean = { + if (table.partitionColumnNames.nonEmpty) { + partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size) + } else { + false + } + } + + private def tripAlias(plan: LogicalPlan): LogicalPlan = { + plan match { + case SubqueryAlias(_, relation: LogicalPlan) => + tripAlias(relation) + case other => + other + } + } + + /** + * Add the hoodie meta fields to the schema. + * @param schema + * @return + */ + def addMetaFields(schema: StructType): StructType = { + val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala + // filter the meta field to avoid duplicate field. + val dataFields = schema.fields.filterNot(f => metaFields.contains(f.name)) + val fields = metaFields.map(StructField(_, StringType)) ++ dataFields + StructType(fields) + } + + private lazy val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet + + /** + * Remove the meta fields from the schema. + * @param schema + * @return + */ + def removeMetaFields(schema: StructType): StructType = { + StructType(schema.fields.filterNot(f => isMetaField(f.name))) + } + + def isMetaField(name: String): Boolean = { + metaFields.contains(name) + } + + def removeMetaFields(df: DataFrame): DataFrame = { + val withoutMetaColumns = df.logicalPlan.output + .filterNot(attr => isMetaField(attr.name)) + .map(new Column(_)) + if (withoutMetaColumns.length != df.logicalPlan.output.size) { + df.select(withoutMetaColumns: _*) + } else { + df + } + } + + def removeMetaFields(attrs: Seq[Attribute]): Seq[Attribute] = { + attrs.filterNot(attr => isMetaField(attr.name)) + } + + /** + * Get the table location. + * @param tableId + * @param spark + * @return + */ + def getTableLocation(tableId: TableIdentifier, spark: SparkSession): String = { + val table = spark.sessionState.catalog.getTableMetadata(tableId) + getTableLocation(table, spark) + } + + def getTableLocation(table: CatalogTable, sparkSession: SparkSession): String = { + val uri = if (table.tableType == CatalogTableType.MANAGED && isHoodieTable(table)) { + Some(sparkSession.sessionState.catalog.defaultTablePath(table.identifier)) + } else { + table.storage.locationUri + } + val conf = sparkSession.sessionState.newHadoopConf() + uri.map(makePathQualified(_, conf)) + .map(removePlaceHolder) + .getOrElse(throw new IllegalArgumentException(s"Missing location for ${table.identifier}")) + } + + private def removePlaceHolder(path: String): String = { + if (path == null || path.length == 0) { + path + } else if (path.endsWith("-__PLACEHOLDER__")) { + path.substring(0, path.length() - 16) + } else { + path + } + } + + def makePathQualified(path: URI, hadoopConf: Configuration): String = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(hadoopConf) + fs.makeQualified(hadoopPath).toUri.toString + } + + /** + * Check if the hoodie.properties exists in the table path. + */ + def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = { + val basePath = new Path(tablePath) + val fs = basePath.getFileSystem(conf) + val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME) + fs.exists(metaPath) + } + + /** + * Split the expression to a sub expression seq by the AND operation. + * @param expression + * @return + */ + def splitByAnd(expression: Expression): Seq[Expression] = { + expression match { + case And(left, right) => + splitByAnd(left) ++ splitByAnd(right) + case exp => Seq(exp) + } + } + + /** + * Append the spark config and table options to the baseConfig. + */ + def withSparkConf(spark: SparkSession, options: Map[String, String]) + (baseConfig: Map[String, String] = Map.empty): Map[String, String] = { + baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority + (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options)) + .filterKeys(_.startsWith("hoodie.")) + } + + def isEnableHive(sparkSession: SparkSession): Boolean = + "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) + + /** + * Convert different query instant time format to the commit time format. + * Currently we support three kinds of instant time format for time travel query: + * 1、yyyy-MM-dd HH:mm:ss + * 2、yyyy-MM-dd + * This will convert to 'yyyyMMdd000000'. + * 3、yyyyMMddHHmmss + */ + def formatQueryInstant(queryInstant: String): String = { + val instantLength = queryInstant.length + if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS] + HoodieInstantTimeGenerator.getInstantForDateString(queryInstant) + } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH + || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS] + HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format + queryInstant + } else if (instantLength == 10) { // for yyyy-MM-dd + HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant)) + } else { + throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant," + + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss.SSS' or 'yyyy-MM-dd' or 'yyyyMMddHHmmssSSS'") + } + } + + def formatName(sparkSession: SparkSession, name: String): String = { + if (sparkSession.sessionState.conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) + } + + /** + * Check if this is a empty table path. + */ + def isEmptyPath(tablePath: String, conf: Configuration): Boolean = { + val basePath = new Path(tablePath) + val fs = basePath.getFileSystem(conf) + if (fs.exists(basePath)) { + fs.listStatus(basePath).isEmpty + } else { + true + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index c6c08da1e..c6c79f431 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -26,12 +26,10 @@ import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstan import org.apache.hudi.common.util.{CommitUtils, Option} import org.apache.hudi.table.HoodieSparkTable import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils} - import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.SchemaUtils diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala index b69c686f9..9a965e2a9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema - import org.apache.hudi.AvroConversionUtils import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} @@ -27,7 +26,6 @@ import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{StructField, StructType} import scala.util.control.NonFatal diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index 21f16275d..b2b8911e3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -25,13 +25,12 @@ import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} case class AlterHoodieTableDropPartitionCommand( @@ -40,7 +39,7 @@ case class AlterHoodieTableDropPartitionCommand( ifExists : Boolean, purge : Boolean, retainData : Boolean) -extends HoodieLeafRunnableCommand { + extends HoodieLeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}" diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala similarity index 96% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index 2608f9383..d84973f90 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -25,14 +25,12 @@ import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.hadoop.HoodieParquetInputFormat import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils - import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveExternalCatalog._ -import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} -import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SparkSession} @@ -124,8 +122,8 @@ object CreateHoodieTableCommand { table.storage.compressed, storageProperties + ("path" -> path)) - val tablName = HoodieSqlUtils.formatName(sparkSession, table.identifier.table) - val newDatabaseName = HoodieSqlUtils.formatName(sparkSession, table.identifier.database + val tablName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table) + val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database .getOrElse(catalog.getCurrentDatabase)) val newTableIdentifier = table.identifier diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieLeafRunnableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieLeafRunnableCommand.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieLeafRunnableCommand.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/HoodieLeafRunnableCommand.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala index 6a3eff8b4..f7511f5b5 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.StringType case class ShowHoodieTablePartitionsCommand( tableIdentifier: TableIdentifier, specOpt: Option[TablePartitionSpec]) -extends HoodieLeafRunnableCommand { + extends HoodieLeafRunnableCommand { override val output: Seq[Attribute] = { AttributeReference("partition", StringType, nullable = false)() :: Nil diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala similarity index 98% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index e44b838fe..e43923804 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -104,7 +104,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) } } val timestampFormat = PartitionPathEncodeUtils.escapePathName( - SqlKeyGenerator.timestampTimeFormat.print(timeMs)) + SqlKeyGenerator.timestampTimeFormat.print(timeMs)) if (isHiveStyle) s"$hiveStylePrefix$timestampFormat" else timestampFormat case _ => partitionValue } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala new file mode 100644 index 000000000..2619d1d9f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/ValidateDuplicateKeyPayload.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, IndexedRecord} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.exception.HoodieDuplicateKeyException + + +import java.util.Properties + +/** + * Validate the duplicate key for insert statement without enable the INSERT_DROP_DUPS_OPT + * config. + */ +class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable[_]) + extends DefaultHoodieRecordPayload(record, orderingVal) { + + def this(record: HOption[GenericRecord]) { + this(if (record.isPresent) record.get else null, 0) + } + + override def combineAndGetUpdateValue(currentValue: IndexedRecord, + schema: Schema, properties: Properties): HOption[IndexedRecord] = { + val key = currentValue.asInstanceOf[GenericRecord].get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString + throw new HoodieDuplicateKeyException(key) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml index e20f4ad48..534691cf0 100644 --- a/hudi-spark-datasource/hudi-spark/pom.xml +++ b/hudi-spark-datasource/hudi-spark/pom.xml @@ -242,6 +242,18 @@ + + org.apache.hudi + ${hudi.spark.common.module} + ${project.version} + + + org.apache.hudi + * + + + + log4j diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala index a18a17f44..9cc95e6f9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.parser.HoodieCommonSqlParser * The Hoodie SparkSessionExtension for extending the syntax and add the rules. */ class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) - with SparkAdapterSupport{ + with SparkAdapterSupport { override def apply(extensions: SparkSessionExtensions): Unit = { extensions.injectParser { (session, parser) => diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 3b6436ee2..a198d0e00 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -17,235 +17,14 @@ package org.apache.spark.sql.hudi -import scala.collection.JavaConverters._ -import java.net.URI -import java.util.{Date, Locale, Properties} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path - -import org.apache.hudi.{AvroConversionUtils, SparkAdapterSupport} -import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.common.config.DFSPropertiesConfiguration -import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} - -import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.hudi.SparkAdapterSupport import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, SubqueryAlias} -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType} - -import java.text.SimpleDateFormat - -import scala.collection.immutable.Map +import org.apache.spark.sql.catalyst.expressions.{And, Cast, Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{MergeIntoTable, SubqueryAlias} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{DataType, NullType} object HoodieSqlUtils extends SparkAdapterSupport { - // NOTE: {@code SimpleDataFormat} is NOT thread-safe - // TODO replace w/ DateTimeFormatter - private val defaultDateFormat = - ThreadLocal.withInitial(new java.util.function.Supplier[SimpleDateFormat] { - override def get() = new SimpleDateFormat("yyyy-MM-dd") - }) - - def isHoodieTable(table: CatalogTable): Boolean = { - table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi" - } - - def isHoodieTable(tableId: TableIdentifier, spark: SparkSession): Boolean = { - val table = spark.sessionState.catalog.getTableMetadata(tableId) - isHoodieTable(table) - } - - def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = { - tripAlias(table) match { - case LogicalRelation(_, _, Some(tbl), _) => isHoodieTable(tbl) - case relation: UnresolvedRelation => - isHoodieTable(sparkAdapter.toTableIdentify(relation), spark) - case _=> false - } - } - - def getTableIdentify(table: LogicalPlan): TableIdentifier = { - table match { - case SubqueryAlias(name, _) => sparkAdapter.toTableIdentify(name) - case _ => throw new IllegalArgumentException(s"Illegal table: $table") - } - } - - def getTableSqlSchema(metaClient: HoodieTableMetaClient, - includeMetadataFields: Boolean = false): Option[StructType] = { - val schemaResolver = new TableSchemaResolver(metaClient) - val avroSchema = try Some(schemaResolver.getTableAvroSchema(includeMetadataFields)) - catch { - case _: Throwable => None - } - avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType) - } - - def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = { - val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) - val metadataConfig = { - val properties = new Properties() - properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++ table.properties).asJava) - HoodieMetadataConfig.newBuilder.fromProperties(properties).build() - } - FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala - } - - /** - * This method is used to compatible with the old non-hive-styled partition table. - * By default we enable the "hoodie.datasource.write.hive_style_partitioning" - * when writing data to hudi table by spark sql by default. - * If the exist table is a non-hive-styled partitioned table, we should - * disable the "hoodie.datasource.write.hive_style_partitioning" when - * merge or update the table. Or else, we will get an incorrect merge result - * as the partition path mismatch. - */ - def isHiveStyledPartitioning(partitionPaths: Seq[String], table: CatalogTable): Boolean = { - if (table.partitionColumnNames.nonEmpty) { - val isHiveStylePartitionPath = (path: String) => { - val fragments = path.split("/") - if (fragments.size != table.partitionColumnNames.size) { - false - } else { - fragments.zip(table.partitionColumnNames).forall { - case (pathFragment, partitionColumn) => pathFragment.startsWith(s"$partitionColumn=") - } - } - } - partitionPaths.forall(isHiveStylePartitionPath) - } else { - true - } - } - - /** - * Determine whether URL encoding is enabled - */ - def isUrlEncodeEnabled(partitionPaths: Seq[String], table: CatalogTable): Boolean = { - if (table.partitionColumnNames.nonEmpty) { - partitionPaths.forall(partitionPath => partitionPath.split("/").length == table.partitionColumnNames.size) - } else { - false - } - } - - private def tripAlias(plan: LogicalPlan): LogicalPlan = { - plan match { - case SubqueryAlias(_, relation: LogicalPlan) => - tripAlias(relation) - case other => - other - } - } - - /** - * Add the hoodie meta fields to the schema. - * @param schema - * @return - */ - def addMetaFields(schema: StructType): StructType = { - val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala - // filter the meta field to avoid duplicate field. - val dataFields = schema.fields.filterNot(f => metaFields.contains(f.name)) - val fields = metaFields.map(StructField(_, StringType)) ++ dataFields - StructType(fields) - } - - private lazy val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet - - /** - * Remove the meta fields from the schema. - * @param schema - * @return - */ - def removeMetaFields(schema: StructType): StructType = { - StructType(schema.fields.filterNot(f => isMetaField(f.name))) - } - - def isMetaField(name: String): Boolean = { - metaFields.contains(name) - } - - def removeMetaFields(df: DataFrame): DataFrame = { - val withoutMetaColumns = df.logicalPlan.output - .filterNot(attr => isMetaField(attr.name)) - .map(new Column(_)) - if (withoutMetaColumns.length != df.logicalPlan.output.size) { - df.select(withoutMetaColumns: _*) - } else { - df - } - } - - def removeMetaFields(attrs: Seq[Attribute]): Seq[Attribute] = { - attrs.filterNot(attr => isMetaField(attr.name)) - } - - /** - * Get the table location. - * @param tableId - * @param spark - * @return - */ - def getTableLocation(tableId: TableIdentifier, spark: SparkSession): String = { - val table = spark.sessionState.catalog.getTableMetadata(tableId) - getTableLocation(table, spark) - } - - def getTableLocation(table: CatalogTable, sparkSession: SparkSession): String = { - val uri = if (table.tableType == CatalogTableType.MANAGED && isHoodieTable(table)) { - Some(sparkSession.sessionState.catalog.defaultTablePath(table.identifier)) - } else { - table.storage.locationUri - } - val conf = sparkSession.sessionState.newHadoopConf() - uri.map(makePathQualified(_, conf)) - .map(removePlaceHolder) - .getOrElse(throw new IllegalArgumentException(s"Missing location for ${table.identifier}")) - } - - private def removePlaceHolder(path: String): String = { - if (path == null || path.length == 0) { - path - } else if (path.endsWith("-__PLACEHOLDER__")) { - path.substring(0, path.length() - 16) - } else { - path - } - } - - def makePathQualified(path: URI, hadoopConf: Configuration): String = { - val hadoopPath = new Path(path) - val fs = hadoopPath.getFileSystem(hadoopConf) - fs.makeQualified(hadoopPath).toUri.toString - } - - /** - * Check if the hoodie.properties exists in the table path. - */ - def tableExistsInPath(tablePath: String, conf: Configuration): Boolean = { - val basePath = new Path(tablePath) - val fs = basePath.getFileSystem(conf) - val metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME) - fs.exists(metaPath) - } - - def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = { - child match { - case Literal(nul, NullType) => Literal(nul, dataType) - case _ => if (child.dataType != dataType) - Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child - } - } /** * Get the TableIdentifier of the target table in MergeInto. @@ -256,7 +35,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { case SubqueryAlias(tableId, _) => tableId case plan => throw new IllegalArgumentException(s"Illegal plan $plan in target") } - sparkAdapter.toTableIdentify(aliaId) + sparkAdapter.toTableIdentifier(aliaId) } /** @@ -272,57 +51,11 @@ object HoodieSqlUtils extends SparkAdapterSupport { } } - /** - * Append the spark config and table options to the baseConfig. - */ - def withSparkConf(spark: SparkSession, options: Map[String, String]) - (baseConfig: Map[String, String] = Map.empty): Map[String, String] = { - baseConfig ++ DFSPropertiesConfiguration.getGlobalProps.asScala ++ // Table options has the highest priority - (spark.sessionState.conf.getAllConfs ++ HoodieOptionConfig.mappingSqlOptionToHoodieParam(options)) - .filterKeys(_.startsWith("hoodie.")) - } - - def isEnableHive(sparkSession: SparkSession): Boolean = - "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) - - /** - * Convert different query instant time format to the commit time format. - * Currently we support three kinds of instant time format for time travel query: - * 1、yyyy-MM-dd HH:mm:ss - * 2、yyyy-MM-dd - * This will convert to 'yyyyMMdd000000'. - * 3、yyyyMMddHHmmss - */ - def formatQueryInstant(queryInstant: String): String = { - val instantLength = queryInstant.length - if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS] - HoodieInstantTimeGenerator.getInstantForDateString(queryInstant) - } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH - || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS] - HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format - queryInstant - } else if (instantLength == 10) { // for yyyy-MM-dd - HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant)) - } else { - throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant," - + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss.SSS' or 'yyyy-MM-dd' or 'yyyyMMddHHmmssSSS'") - } - } - - def formatName(sparkSession: SparkSession, name: String): String = { - if (sparkSession.sessionState.conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) - } - - /** - * Check if this is a empty table path. - */ - def isEmptyPath(tablePath: String, conf: Configuration): Boolean = { - val basePath = new Path(tablePath) - val fs = basePath.getFileSystem(conf) - if (fs.exists(basePath)) { - fs.listStatus(basePath).isEmpty - } else { - true + def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = { + child match { + case Literal(nul, NullType) => Literal(nul, dataType) + case _ => if (child.dataType != dataType) + Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 31af71994..762f23b55 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -21,7 +21,6 @@ import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport} import org.apache.hudi.DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient - import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.Inner @@ -29,9 +28,10 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{getTableIdentifier, getTableLocation, isHoodieTable, removeMetaFields, tableExistsInPath} import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.hudi.command._ -import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} +import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils, HoodieSqlUtils} import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -91,7 +91,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] // Convert to CompactionHoodieTableCommand case CompactionTable(table, operation, options) if table.resolved && isHoodieTable(table, sparkSession) => - val tableId = getTableIdentify(table) + val tableId = getTableIdentifier(table) val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) CompactionHoodieTableCommand(catalogTable, operation, options) // Convert to CompactionHoodiePathCommand @@ -100,7 +100,7 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] // Convert to CompactionShowOnTable case CompactionShowOnTable(table, limit) if isHoodieTable(table, sparkSession) => - val tableId = getTableIdentify(table) + val tableId = getTableIdentifier(table) val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) CompactionShowHoodieTableCommand(catalogTable, limit) // Convert to CompactionShowHoodiePathCommand @@ -161,8 +161,8 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi val resolvedCondition = condition.map(resolveExpressionFrom(resolvedSource)(_)) val resolvedAssignments = if (isInsertOrUpdateStar(assignments)) { // assignments is empty means insert * or update set * - val resolvedSourceOutput = resolvedSource.output.filter(attr => !HoodieSqlUtils.isMetaField(attr.name)) - val targetOutput = target.output.filter(attr => !HoodieSqlUtils.isMetaField(attr.name)) + val resolvedSourceOutput = resolvedSource.output.filter(attr => !HoodieSqlCommonUtils.isMetaField(attr.name)) + val targetOutput = target.output.filter(attr => !HoodieSqlCommonUtils.isMetaField(attr.name)) val resolvedSourceColumnNames = resolvedSourceOutput.map(_.name) if(targetOutput.filter(attr => resolvedSourceColumnNames.exists(resolver(_, attr.name))).equals(targetOutput)){ @@ -182,7 +182,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi // For Spark3.2, InsertStarAction/UpdateStarAction's assignments will contain the meta fields. val withoutMetaAttrs = assignments.filterNot{ assignment => if (assignment.key.isInstanceOf[Attribute]) { - HoodieSqlUtils.isMetaField(assignment.key.asInstanceOf[Attribute].name) + HoodieSqlCommonUtils.isMetaField(assignment.key.asInstanceOf[Attribute].name) } else { false } @@ -333,7 +333,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi .setBasePath(tablePath) .setConf(sparkSession.sessionState.newHadoopConf()) .build() - val tableSchema = HoodieSqlUtils.getTableSqlSchema(metaClient) + val tableSchema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient) if (tableSchema.isDefined && tableDesc.schema.isEmpty) { // Fill the schema with the schema from the table c.copy(tableDesc.copy(schema = tableSchema.get)) @@ -343,9 +343,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi } else { c } - } else { - c - } + } else { + c + } case p => p } @@ -438,15 +438,15 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic // Rewrite the AlterTableAddColumnsCommand to AlterHoodieTableAddColumnsCommand case AlterTableAddColumnsCommand(tableId, colsToAdd) if isHoodieTable(tableId, sparkSession) => - AlterHoodieTableAddColumnsCommand(tableId, colsToAdd) + AlterHoodieTableAddColumnsCommand(tableId, colsToAdd) // Rewrite the AlterTableRenameCommand to AlterHoodieTableRenameCommand case AlterTableRenameCommand(oldName, newName, isView) if !isView && isHoodieTable(oldName, sparkSession) => - new AlterHoodieTableRenameCommand(oldName, newName, isView) + new AlterHoodieTableRenameCommand(oldName, newName, isView) // Rewrite the AlterTableChangeColumnCommand to AlterHoodieTableChangeColumnCommand case AlterTableChangeColumnCommand(tableName, columnName, newColumn) if isHoodieTable(tableName, sparkSession) => - AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn) + AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn) // SPARK-34238: the definition of ShowPartitionsCommand has been changed in Spark3.2. // Match the class type instead of call the `unapply` method. case s: ShowPartitionsCommand diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala index be6658477..1363fb939 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodiePathCommand.scala @@ -17,21 +17,20 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils} import org.apache.hudi.client.WriteStatus import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.{HoodieTimer, Option => HOption} import org.apache.hudi.exception.HoodieException - +import org.apache.hudi.{DataSourceUtils, DataSourceWriteOptions, HoodieWriterUtils} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.{CompactionOperation, LogicalPlan} -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} -import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.{Row, SparkSession} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -50,7 +49,7 @@ case class CompactionHoodiePathCommand(path: String, val schemaStr = schemaUtil.getTableAvroSchemaWithoutMetadataFields.toString val parameters = HoodieWriterUtils.parametersWithWriteDefaults( - HoodieSqlUtils.withSparkConf(sparkSession, Map.empty)( + HoodieSqlCommonUtils.withSparkConf(sparkSession, Map.empty)( Map( DataSourceWriteOptions.TABLE_TYPE.key() -> HoodieTableType.MERGE_ON_READ.name() ) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala index 27fb6e779..2c89ed8c9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionHoodieTableCommand.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.hudi.command -import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.CompactionOperation.{CompactionOperation, RUN, SCHEDULE} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.{Row, SparkSession} case class CompactionHoodieTableCommand(table: CatalogTable, operation: CompactionOperation, instantTimestamp: Option[Long]) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala index 7502bf762..a9176164f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CompactionShowHoodieTableCommand.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.hudi.command -import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation import org.apache.spark.sql.types.{IntegerType, StringType} +import org.apache.spark.sql.{Row, SparkSession} case class CompactionShowHoodieTableCommand(table: CatalogTable, limit: Int) extends HoodieLeafRunnableCommand { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 2790ea97c..572013981 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -28,7 +28,8 @@ import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.sql.execution.command.DataWritingCommand +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import scala.collection.JavaConverters._ @@ -71,7 +72,7 @@ case class CreateHoodieTableAsSelectCommand( val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableWithSchema) val tablePath = hoodieCatalogTable.tableLocation val hadoopConf = sparkSession.sessionState.newHadoopConf() - assert(HoodieSqlUtils.isEmptyPath(tablePath, hadoopConf), + assert(HoodieSqlCommonUtils.isEmptyPath(tablePath, hadoopConf), s"Path '$tablePath' should be empty for CTAS") // Execute the insert query diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index a77acf066..b51663df7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -22,11 +22,10 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan} -import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.types.StructType case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends HoodieLeafRunnableCommand @@ -34,7 +33,7 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie private val table = deleteTable.table - private val tableId = getTableIdentify(table) + private val tableId = getTableIdentifier(table) override def run(sparkSession: SparkSession): Seq[Row] = { logInfo(s"start execute delete command for $tableId") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala index aa9d9b812..954f08ce6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.hive.HiveClientUtils -import org.apache.spark.sql.hudi.HoodieSqlUtils.isEnableHive +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive import scala.util.control.NonFatal diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 37d30c813..560c7e17a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -17,32 +17,25 @@ package org.apache.spark.sql.hudi.command -import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, IndexedRecord} - import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload} -import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.hudi.sql.InsertMode import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ +import org.apache.spark.sql.hudi.HoodieSqlUtils.castIfNeeded import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} -import java.util.Properties - import scala.collection.JavaConverters._ /** @@ -173,7 +166,7 @@ object InsertIntoHoodieTableCommand extends Logging { } else { // insert static partitions targetPartitionSchema.fields.map(f => { val staticPartitionValue = staticPartitionValues.getOrElse(f.name, - s"Missing static partition value for: ${f.name}") + s"Missing static partition value for: ${f.name}") val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf) Alias(castAttr, f.name)() }) @@ -290,21 +283,3 @@ object InsertIntoHoodieTableCommand extends Logging { } } } - -/** - * Validate the duplicate key for insert statement without enable the INSERT_DROP_DUPS_OPT - * config. - */ -class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable[_]) - extends DefaultHoodieRecordPayload(record, orderingVal) { - - def this(record: HOption[GenericRecord]) { - this(if (record.isPresent) record.get else null, 0) - } - - override def combineAndGetUpdateValue(currentValue: IndexedRecord, - schema: Schema, properties: Properties): HOption[IndexedRecord] = { - val key = currentValue.asInstanceOf[GenericRecord].get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString - throw new HoodieDuplicateKeyException(key) - } -} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 2d36c6c31..b3ba034d8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -24,18 +24,18 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils, SparkAdapterSupport} - +import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ +import org.apache.spark.sql.hudi.HoodieSqlUtils.{castIfNeeded, getMergeIntoTargetTableId} +import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ -import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.types.{BooleanType, StructType} import java.util.Base64 diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index 0ff7ffb45..512e9a18b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -24,14 +24,14 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan, UpdateTable} -import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable} +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ +import org.apache.spark.sql.hudi.HoodieSqlUtils.castIfNeeded import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructField import scala.collection.JavaConverters._ @@ -39,7 +39,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends HoodieLeaf with SparkAdapterSupport { private val table = updateTable.table - private val tableId = getTableIdentify(table) + private val tableId = getTableIdentifier(table) override def run(sparkSession: SparkSession): Seq[Row] = { logInfo(s"start execute update command for $tableId") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 9996f23c7..f420b296e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,8 +17,6 @@ package org.apache.hudi.functional -import java.util.Properties - import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala index e2521047f..9a74d23c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTable.scala @@ -65,7 +65,7 @@ class TestAlterTable extends TestHoodieSqlBase { spark.sql(s"alter table $newTableName add columns(ext0 string)") val table = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(newTableName)) assertResult(Seq("id", "name", "price", "ts", "ext0")) { - HoodieSqlUtils.removeMetaFields(table.schema).fields.map(_.name) + HoodieSqlCommonUtils.removeMetaFields(table.schema).fields.map(_.name) } checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")( Seq(1, "a1", 10.0, 1000, null) diff --git a/hudi-spark-datasource/hudi-spark2-common/pom.xml b/hudi-spark-datasource/hudi-spark2-common/pom.xml new file mode 100644 index 000000000..403c2fe1e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2-common/pom.xml @@ -0,0 +1,19 @@ + + + + hudi-spark-datasource + org.apache.hudi + 0.11.0-SNAPSHOT + + 4.0.0 + + hudi-spark2-common + + + 8 + 8 + + + \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 000000000..673594302 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,19 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +org.apache.hudi.Spark2DefaultSource \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2DefaultSource.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2DefaultSource.scala new file mode 100644 index 000000000..6f42eb777 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2DefaultSource.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql.sources._ + +/** + * Hoodie Spark Datasource, for reading and writing hoodie tables + * + */ +class Spark2DefaultSource extends DefaultSource with DataSourceRegister { + override def shortName(): String = "hudi" +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index d47e7fbb4..bf1cd2448 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark2RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} @@ -31,6 +30,7 @@ import org.apache.spark.sql.execution.datasources.{Spark2ParsePartitionUtil, Spa import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.{Row, SparkSession} /** * The adapter for spark2. @@ -41,11 +41,11 @@ class Spark2Adapter extends SparkAdapter { new Spark2RowSerDe(encoder) } - override def toTableIdentify(aliasId: AliasIdentifier): TableIdentifier = { + override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = { TableIdentifier(aliasId.identifier, aliasId.database) } - override def toTableIdentify(relation: UnresolvedRelation): TableIdentifier = { + override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = { relation.tableIdentifier } @@ -58,7 +58,7 @@ class Spark2Adapter extends SparkAdapter { } override def getInsertIntoChildren(plan: LogicalPlan): - Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { + Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)] = { plan match { case InsertIntoTable(table, partition, query, overwrite, ifPartitionNotExists) => Some((table, partition, query, overwrite, ifPartitionNotExists)) diff --git a/hudi-spark-datasource/hudi-spark3-common/pom.xml b/hudi-spark-datasource/hudi-spark3-common/pom.xml new file mode 100644 index 000000000..affa98737 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3-common/pom.xml @@ -0,0 +1,247 @@ + + + + + hudi-spark-datasource + org.apache.hudi + 0.11.0-SNAPSHOT + + 4.0.0 + + hudi-spark3-common + + + ${project.parent.parent.basedir} + 8 + 8 + + + + + + src/main/resources + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + + false + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skip.hudi-spark3.unit.tests} + + + + org.apache.rat + apache-rat-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.jacoco + jacoco-maven-plugin + + + + + + + org.scala-lang + scala-library + ${scala12.version} + + + + org.apache.spark + spark-sql_2.12 + ${spark3.version} + true + + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.spark3.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${fasterxml.spark3.version} + + + com.fasterxml.jackson.core + jackson-core + ${fasterxml.spark3.version} + + + + org.apache.hudi + hudi-spark-client + ${project.version} + + + org.apache.hudi + hudi-spark-common_${scala.binary.version} + ${project.version} + + + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-client + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-common_${scala.binary.version} + ${project.version} + tests + test-jar + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-params + test + + + + \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java rename to hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java rename to hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java rename to hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriterFactory.java diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java rename to hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWrite.java diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java rename to hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalBatchWriteBuilder.java diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java rename to hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieDataSourceInternalTable.java diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java rename to hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieWriterCommitMessage.java diff --git a/hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java rename to hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowSerDe.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/hudi/Spark3RowSerDe.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowSerDe.scala rename to hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/hudi/Spark3RowSerDe.scala diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala similarity index 85% rename from hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala rename to hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index 7e806f740..61fcc9634 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.spark3.internal.ReflectUtil - -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} @@ -30,7 +29,9 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ -import org.apache.spark.sql.execution.datasources.{Spark3ParsePartitionUtil, SparkParsePartitionUtil} +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.{LogicalRelation, Spark3ParsePartitionUtil, SparkParsePartitionUtil} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf @@ -43,10 +44,10 @@ class Spark3Adapter extends SparkAdapter { new Spark3RowSerDe(encoder) } - override def toTableIdentify(aliasId: AliasIdentifier): TableIdentifier = { + override def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier = { aliasId match { case AliasIdentifier(name, Seq(database)) => - TableIdentifier(name, Some(database)) + TableIdentifier(name, Some(database)) case AliasIdentifier(name, Seq(_, database)) => TableIdentifier(name, Some(database)) case AliasIdentifier(name, Seq()) => @@ -55,7 +56,7 @@ class Spark3Adapter extends SparkAdapter { } } - override def toTableIdentify(relation: UnresolvedRelation): TableIdentifier = { + override def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier = { relation.multipartIdentifier.asTableIdentifier } @@ -78,7 +79,7 @@ class Spark3Adapter extends SparkAdapter { } override def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]], - query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { + query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan = { ReflectUtil.createInsertInto(table, partition, Seq.empty[String], query, overwrite, ifPartitionNotExists) } diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala rename to hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala index d993b9803..1d23a84bc 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala @@ -130,7 +130,7 @@ class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil { // i.e. currentPath.getParent == null. For the example of "/table/a=1/", // the top level dir is "/table". finished = - (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null + (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null if (!finished) { // For the above example, currentPath will be "/table/". diff --git a/hudi-spark-datasource/hudi-spark3.1.x/pom.xml b/hudi-spark-datasource/hudi-spark3.1.x/pom.xml new file mode 100644 index 000000000..f6d9f7d55 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.1.x/pom.xml @@ -0,0 +1,242 @@ + + + + + hudi-spark-datasource + org.apache.hudi + 0.11.0-SNAPSHOT + + 4.0.0 + + hudi-spark3.1.x_2.12 + 0.11.0-SNAPSHOT + + hudi-spark3.1.x_2.12 + jar + + + ${project.parent.parent.basedir} + + + + + + src/main/resources + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + + false + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + ${project.build.directory}/lib + true + true + true + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skip.hudi-spark3.unit.tests} + + + + org.apache.rat + apache-rat-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.jacoco + jacoco-maven-plugin + + + + + + + org.scala-lang + scala-library + ${scala12.version} + + + + org.apache.spark + spark-sql_2.12 + ${spark3.version} + true + + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.spark3.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${fasterxml.spark3.version} + + + com.fasterxml.jackson.core + jackson-core + ${fasterxml.spark3.version} + + + + org.apache.hudi + hudi-spark-client + ${project.version} + + + org.apache.hudi + hudi-spark-common_${scala.binary.version} + ${project.version} + + + org.apache.hudi + hudi-spark3-common + ${project.version} + + + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-client + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-spark-common_${scala.binary.version} + ${project.version} + tests + test-jar + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-params + test + + + + diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark3.1.x/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 000000000..8fbcd663c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,19 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +org.apache.hudi.Spark3xDefaultSource \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/hudi/Spark3xDefaultSource.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/hudi/Spark3xDefaultSource.scala new file mode 100644 index 000000000..6f941dabc --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/hudi/Spark3xDefaultSource.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql.sources.DataSourceRegister + +class Spark3xDefaultSource extends DefaultSource with DataSourceRegister { + override def shortName(): String = "hudi" +} diff --git a/hudi-spark-datasource/hudi-spark3/pom.xml b/hudi-spark-datasource/hudi-spark3/pom.xml index ca09d8359..d8dba8384 100644 --- a/hudi-spark-datasource/hudi-spark3/pom.xml +++ b/hudi-spark-datasource/hudi-spark3/pom.xml @@ -187,6 +187,11 @@ hudi-spark-common_${scala.binary.version} ${project.version} + + org.apache.hudi + hudi-spark3-common + ${project.version} + diff --git a/hudi-spark-datasource/hudi-spark3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/hudi-spark-datasource/hudi-spark3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 000000000..33ab03f55 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1,19 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +org.apache.hudi.Spark3DefaultSource \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala new file mode 100644 index 000000000..b55379087 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3DefaultSource.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql.sources.DataSourceRegister + +class Spark3DefaultSource extends DefaultSource with DataSourceRegister { + override def shortName(): String = "hudi" +} diff --git a/hudi-spark-datasource/pom.xml b/hudi-spark-datasource/pom.xml index ae7cb8e0b..7aaf39865 100644 --- a/hudi-spark-datasource/pom.xml +++ b/hudi-spark-datasource/pom.xml @@ -35,5 +35,9 @@ hudi-spark hudi-spark2 hudi-spark3 + hudi-spark3.1.x + hudi-spark3-common + hudi-spark2-common + diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 9955b1c5d..39510537b 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -141,6 +141,12 @@ ${project.version} + + org.apache.hudi + ${hudi.spark.common.module} + ${project.version} + + org.apache.kafka diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index d90f6acb0..c136909a5 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -74,6 +74,7 @@ org.apache.hudi:hudi-utilities_${scala.binary.version} org.apache.hudi:hudi-spark_${scala.binary.version} org.apache.hudi:${hudi.spark.module}_${scala.binary.version} + org.apache.hudi:${hudi.spark.common.module} org.apache.hudi:hudi-hive-sync org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr @@ -350,6 +351,12 @@ ${project.version} + + org.apache.hudi + ${hudi.spark.common.module} + ${project.version} + + org.apache.hadoop hadoop-hdfs diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index e497af60c..a877d10a5 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -60,6 +60,9 @@ META-INF/LICENSE target/classes/META-INF/LICENSE + + META-INF/services/org.apache.spark.sql.sources.DataSourceRegister + @@ -69,6 +72,7 @@ org.apache.hudi:hudi-spark-common_${scala.binary.version} org.apache.hudi:hudi-spark_${scala.binary.version} org.apache.hudi:${hudi.spark.module}_${scala.binary.version} + org.apache.hudi:${hudi.spark.common.module} org.apache.hudi:hudi-hive-sync org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr @@ -249,6 +253,11 @@ ${hudi.spark.module}_${scala.binary.version} ${project.version} + + org.apache.hudi + ${hudi.spark.common.module} + ${project.version} + org.apache.hudi hudi-timeline-service diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index ea9d368a2..f063f1b4d 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -93,6 +93,7 @@ org.apache.hudi:hudi-spark-common_${scala.binary.version} org.apache.hudi:hudi-spark_${scala.binary.version} org.apache.hudi:${hudi.spark.module}_${scala.binary.version} + org.apache.hudi:${hudi.spark.common.module} org.apache.hudi:hudi-hive-sync org.apache.hudi:hudi-sync-common org.apache.hudi:hudi-hadoop-mr @@ -275,6 +276,11 @@ ${hudi.spark.module}_${scala.binary.version} ${project.version} + + org.apache.hudi + ${hudi.spark.common.module} + ${project.version} + org.apache.hudi hudi-utilities_${scala.binary.version} diff --git a/pom.xml b/pom.xml index bfb3723d5..2778885e8 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,7 @@ 2.4.4 3.2.0 hudi-spark2 + hudi-spark2-common 1.8.2 2.11.12 2.12.10 @@ -1582,6 +1583,7 @@ ${scala12.version} 2.12 hudi-spark3 + hudi-spark3-common 3.1.0 2.4.1 1.12.1 @@ -1607,7 +1609,8 @@ ${spark3.version} ${scala12.version} 2.12 - hudi-spark3 + hudi-spark3.1.x + hudi-spark3-common 3.1.0 2.4.1 ${fasterxml.spark3.version}