From 968927801470953f137368cf146778a7f01aa63f Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 13 Aug 2021 22:31:26 +0530 Subject: [PATCH] [HUDI-1363] Provide option to drop partition columns (#3465) - Co-authored-by: Sivabalan Narayanan --- .../org/apache/hudi/avro/HoodieAvroUtils.java | 6 ++- .../org/apache/hudi/DataSourceOptions.scala | 6 +++ .../hudi/HoodieDatasetBulkInsertHelper.java | 14 ++++-- .../apache/hudi/HoodieSparkSqlWriter.scala | 44 +++++++++++++++---- .../org/apache/hudi/HoodieWriterUtils.scala | 3 +- .../TestHoodieDatasetBulkInsertHelper.java | 12 ++--- .../hudi/functional/TestCOWDataSource.scala | 22 ++++++++++ 7 files changed, 87 insertions(+), 20 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 239eed563..ef6ce13a3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -223,9 +223,13 @@ public class HoodieAvroUtils { } public static Schema removeMetadataFields(Schema schema) { + return removeFields(schema, HoodieRecord.HOODIE_META_COLUMNS); + } + + public static Schema removeFields(Schema schema, List fieldsToRemove) { List filteredFields = schema.getFields() .stream() - .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(field.name())) + .filter(field -> !fieldsToRemove.contains(field.name())) .map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal())) .collect(Collectors.toList()); Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index de7d37536..f04b1f74e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -539,6 +539,12 @@ object DataSourceWriteOptions { .defaultValue("io.confluent.kafka.serializers.KafkaAvroDeserializer") .sinceVersion("0.9.0") .withDocumentation("This class is used by kafka client to deserialize the records") + + val DROP_PARTITION_COLUMNS: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.write.drop.partition.columns") + .defaultValue("false") + .withDocumentation("When set to true, will not write the partition columns into hudi. " + + "By default, false.") } object DataSourceOptionsHelper { diff --git a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java index c9639851c..0ccd33d7d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java @@ -71,7 +71,7 @@ public class HoodieDatasetBulkInsertHelper { public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlContext, HoodieWriteConfig config, Dataset rows, String structName, String recordNamespace, BulkInsertPartitioner> bulkInsertPartitionerRows, - boolean isGlobalIndex) { + boolean isGlobalIndex, boolean dropPartitionColumns) { List originalFields = Arrays.stream(rows.schema().fields()).map(f -> new Column(f.name())).collect(Collectors.toList()); @@ -103,9 +103,17 @@ public class HoodieDatasetBulkInsertHelper { .withColumn(HoodieRecord.FILENAME_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType)); - Dataset dedupedDf = rowDatasetWithHoodieColumns; + Dataset processedDf = rowDatasetWithHoodieColumns; + if (dropPartitionColumns) { + String partitionColumns = String.join(",", keyGenerator.getPartitionPathFields()); + for (String partitionField: keyGenerator.getPartitionPathFields()) { + originalFields.remove(new Column(partitionField)); + } + processedDf = rowDatasetWithHoodieColumns.drop(partitionColumns); + } + Dataset dedupedDf = processedDf; if (config.shouldCombineBeforeInsert()) { - dedupedDf = SparkRowWriteHelper.newInstance().deduplicateRows(rowDatasetWithHoodieColumns, config.getPreCombineField(), isGlobalIndex); + dedupedDf = SparkRowWriteHelper.newInstance().deduplicateRows(processedDf, config.getPreCombineField(), isGlobalIndex); } List orderedFields = Stream.concat(HoodieRecord.HOODIE_META_COLUMNS.stream().map(Column::new), diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index e0b89f15c..f41df94db 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -118,11 +118,11 @@ object HoodieSparkSqlWriter { } else { // Handle various save modes handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs) + val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator) // Create the table if not present if (!tableExists) { val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP) val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP) - val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val populateMetaFields = parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean @@ -143,14 +143,14 @@ object HoodieSparkSqlWriter { } val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) + val dropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS) // short-circuit if bulk_insert via row is enabled. // scalastyle:off if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) { val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName, - basePath, path, instantTime, parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), - HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean) + basePath, path, instantTime, partitionColumns) return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) } // scalastyle:on @@ -224,20 +224,22 @@ object HoodieSparkSqlWriter { parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean val hoodieAllIncomingRecords = genericRecords.map(gr => { + val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns) val hoodieRecord = if (shouldCombine) { val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false) .asInstanceOf[Comparable[_]] - DataSourceUtils.createHoodieRecord(gr, + DataSourceUtils.createHoodieRecord(processedRecord, orderingVal, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS)) } else { - DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS)) + DataSourceUtils.createHoodieRecord(processedRecord, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS)) } hoodieRecord }).toJavaRDD() + val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema // Create a HoodieWriteClient & issue the write. - val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path.get, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT.key) )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] @@ -271,6 +273,23 @@ object HoodieSparkSqlWriter { } } + def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = { + val fieldsToRemove = new util.ArrayList[String]() + partitionParam.split(",").map(partitionField => partitionField.trim) + .filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field)) + HoodieAvroUtils.removeFields(schema, fieldsToRemove) + } + + def getProcessedRecord(partitionParam: String, record: GenericRecord, + dropPartitionColumns: Boolean): GenericRecord = { + var processedRecord = record + if (dropPartitionColumns) { + val writeSchema = generateSchemaWithoutPartitionColumns(partitionParam, record.getSchema) + processedRecord = HoodieAvroUtils.rewriteRecord(record, writeSchema) + } + processedRecord + } + /** * Checks if schema needs upgrade (if incoming record's write schema is old while table schema got evolved). * @@ -379,14 +398,21 @@ object HoodieSparkSqlWriter { basePath: Path, path: Option[String], instantTime: String, - populateMetaFields: Boolean): (Boolean, common.util.Option[String]) = { + partitionColumns: String): (Boolean, common.util.Option[String]) = { val sparkContext = sqlContext.sparkContext + val populateMetaFields = parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), + HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean + val dropPartitionColumns = + parameters.getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key(), DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue()).toBoolean // register classes & schemas val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) - val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + if (dropPartitionColumns) { + schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema) + } sparkContext.getConf.registerAvroSchemas(schema) log.info(s"Registered avro schema : ${schema.toString(true)}") if (parameters(INSERT_DROP_DUPS.key).toBoolean) { @@ -415,7 +441,7 @@ object HoodieSparkSqlWriter { } val hoodieDF = if (populateMetaFields) { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace, - bulkInsertPartitionerRows, isGlobalIndex) + bulkInsertPartitionerRows, isGlobalIndex, dropPartitionColumns) } else { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 5128ab13f..ef26ee55c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -76,7 +76,8 @@ object HoodieWriterUtils { INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue, ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue, ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue, - RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString + RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString, + DROP_PARTITION_COLUMNS.key -> DROP_PARTITION_COLUMNS.defaultValue ) ++ DataSourceOptionsHelper.translateConfigurations(parameters) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index 69836c70a..071a12cbc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -95,7 +95,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows(), false); + "testNamespace", new NonSortPartitionerWithRows(), false, false); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -158,7 +158,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { rows.addAll(updates); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows(), false); + "testNamespace", new NonSortPartitionerWithRows(), false, false); StructType resultSchema = result.schema(); assertEquals(result.count(), enablePreCombine ? 10 : 15); @@ -238,7 +238,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { Dataset dataset = sqlContext.createDataFrame(rows, structType); try { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows(), false); + "testNamespace", new NonSortPartitionerWithRows(), false, false); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -249,7 +249,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { dataset = sqlContext.createDataFrame(rows, structType); try { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows(), false); + "testNamespace", new NonSortPartitionerWithRows(), false, false); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -260,7 +260,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { dataset = sqlContext.createDataFrame(rows, structType); try { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows(), false); + "testNamespace", new NonSortPartitionerWithRows(), false, false); fail("Should have thrown exception"); } catch (Exception e) { // ignore @@ -271,7 +271,7 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase { dataset = sqlContext.createDataFrame(rows, structType); try { HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, config, dataset, "testStructName", - "testNamespace", new NonSortPartitionerWithRows(), false); + "testNamespace", new NonSortPartitionerWithRows(), false, false); fail("Should have thrown exception"); } catch (Exception e) { // ignore diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 04c6a10c2..da448b85a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -775,4 +775,26 @@ class TestCOWDataSource extends HoodieClientTestBase { val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) assertEquals(resultSchema, schema1) } + + @ParameterizedTest @ValueSource(booleans = Array(true, false)) + def testCopyOnWriteWithDropPartitionColumns(enableDropPartitionColumns: Boolean) { + val resultContainPartitionColumn = copyOnWriteTableSelect(enableDropPartitionColumns) + assertEquals(enableDropPartitionColumns, !resultContainPartitionColumn) + } + + def copyOnWriteTableSelect(enableDropPartitionColumns: Boolean): Boolean = { + val records1 = recordsToStrings(dataGen.generateInsertsContainsAllPartitions("000", 3)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key, enableDropPartitionColumns) + .mode(SaveMode.Overwrite) + .save(basePath) + val snapshotDF1 = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*/*/*") + snapshotDF1.registerTempTable("tmptable") + val result = spark.sql("select * from tmptable limit 1").collect()(0) + result.schema.contains(new StructField("partition", StringType, true)) + } }