diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a88da49dc..5cef1df3e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -258,7 +258,7 @@ public class SparkRDDWriteClient extends public HoodieWriteResult deletePartitions(List partitions, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime); preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient()); - HoodieWriteMetadata> result = table.deletePartitions(context,instantTime, partitions); + HoodieWriteMetadata> result = table.deletePartitions(context, instantTime, partitions); return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index fa0cbb5b6..b571efa06 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -46,7 +46,8 @@ public class CommitUtils { * For example, INSERT_OVERWRITE/INSERT_OVERWRITE_TABLE operations have REPLACE commit action type. */ public static String getCommitActionType(WriteOperationType operation, HoodieTableType tableType) { - if (operation == WriteOperationType.INSERT_OVERWRITE || operation == WriteOperationType.INSERT_OVERWRITE_TABLE) { + if (operation == WriteOperationType.INSERT_OVERWRITE || operation == WriteOperationType.INSERT_OVERWRITE_TABLE + || operation == WriteOperationType.DELETE_PARTITION) { return HoodieTimeline.REPLACE_COMMIT_ACTION; } else { return getCommitActionType(tableType); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 700af622e..07ba8a935 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -226,6 +226,11 @@ public class DataSourceUtils { return new HoodieWriteResult(client.delete(hoodieKeys, instantTime)); } + public static HoodieWriteResult doDeletePartitionsOperation(SparkRDDWriteClient client, List partitionsToDelete, + String instantTime) { + return client.deletePartitions(partitionsToDelete, instantTime); + } + public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, String payloadClass) throws IOException { HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 0e4ad1791..9b60d9d0f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -29,8 +29,9 @@ import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} @@ -154,97 +155,110 @@ object HoodieSparkSqlWriter { } // scalastyle:on + val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) = - if (operation != WriteOperationType.DELETE) { - // register classes & schemas - val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) - sparkContext.getConf.registerKryoClasses( - Array(classOf[org.apache.avro.generic.GenericData], - classOf[org.apache.avro.Schema])) - var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean - if (reconcileSchema) { - schema = getLatestTableSchema(fs, basePath, sparkContext, schema) - } - sparkContext.getConf.registerAvroSchemas(schema) - log.info(s"Registered avro schema : ${schema.toString(true)}") + operation match { + case WriteOperationType.DELETE => { + val genericRecords = registerKryoClassesAndGetGenericRecords(tblName, sparkContext, df, reconcileSchema) + // Convert to RDD[HoodieKey] + val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() - // Convert to RDD[HoodieRecord] - val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema, - org.apache.hudi.common.util.Option.of(schema)) - val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || - operation.equals(WriteOperationType.UPSERT) || - parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(), - HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean - val hoodieAllIncomingRecords = genericRecords.map(gr => { - val hoodieRecord = if (shouldCombine) { - val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false) - .asInstanceOf[Comparable[_]] - DataSourceUtils.createHoodieRecord(gr, - orderingVal, keyGenerator.getKey(gr), - hoodieConfig.getString(PAYLOAD_CLASS)) - } else { - DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS)) + if (!tableExists) { + throw new HoodieException(s"hoodie table at $basePath does not exist") } - hoodieRecord - }).toJavaRDD() - // Create a HoodieWriteClient & issue the write. - val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, - tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key) - )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] - - if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { - asyncCompactionTriggerFn.get.apply(client) - } - - if (isAsyncClusteringEnabled(client, parameters)) { - asyncClusteringTriggerFn.get.apply(client) - } - - val hoodieRecords = - if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) { - DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) - } else { - hoodieAllIncomingRecords - } - client.startCommitWithTime(instantTime, commitActionType) - val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) - (writeResult, client) - } else { - val structName = s"${tblName}_record" - val nameSpace = s"hoodie.${tblName}" - sparkContext.getConf.registerKryoClasses( - Array(classOf[org.apache.avro.generic.GenericData], - classOf[org.apache.avro.Schema])) - - // Convert to RDD[HoodieKey] - val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, - parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean) - val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() - - if (!tableExists) { - throw new HoodieException(s"hoodie table at $basePath does not exist") - } - - // Create a HoodieWriteClient & issue the delete. - val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, + // Create a HoodieWriteClient & issue the delete. + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path.get, tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key))) .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] - if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { - asyncCompactionTriggerFn.get.apply(client) - } + if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { + asyncCompactionTriggerFn.get.apply(client) + } + if (isAsyncClusteringEnabled(client, parameters)) { + asyncClusteringTriggerFn.get.apply(client) + } - if (isAsyncClusteringEnabled(client, parameters)) { - asyncClusteringTriggerFn.get.apply(client) + // Issue deletes + client.startCommitWithTime(instantTime, commitActionType) + val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime) + (writeStatuses, client) } + case WriteOperationType.DELETE_PARTITION => { + val genericRecords = registerKryoClassesAndGetGenericRecords(tblName, sparkContext, df, reconcileSchema) + if (!tableExists) { + throw new HoodieException(s"hoodie table at $basePath does not exist") + } - // Issue deletes - client.startCommitWithTime(instantTime, commitActionType) - val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime) - (writeStatuses, client) + // Get list of partitions to delete + val partitionsToDelete = genericRecords.map(gr => keyGenerator.getKey(gr).getPartitionPath).toJavaRDD().distinct().collect() + // Create a HoodieWriteClient & issue the delete. + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, + null, path.get, tblName, + mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key))) + .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] + // Issue delete partitions + client.startCommitWithTime(instantTime, commitActionType) + val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime) + (writeStatuses, client) + } + case _ => { // any other operation + // register classes & schemas + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) + sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + if (reconcileSchema) { + schema = getLatestTableSchema(fs, basePath, sparkContext, schema) + } + sparkContext.getConf.registerAvroSchemas(schema) + log.info(s"Registered avro schema : ${schema.toString(true)}") + + // Convert to RDD[HoodieRecord] + val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema, + org.apache.hudi.common.util.Option.of(schema)) + val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || + operation.equals(WriteOperationType.UPSERT) || + parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(), + HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean + val hoodieAllIncomingRecords = genericRecords.map(gr => { + val hoodieRecord = if (shouldCombine) { + val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false) + .asInstanceOf[Comparable[_]] + DataSourceUtils.createHoodieRecord(gr, + orderingVal, keyGenerator.getKey(gr), + hoodieConfig.getString(PAYLOAD_CLASS)) + } else { + DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS)) + } + hoodieRecord + }).toJavaRDD() + + // Create a HoodieWriteClient & issue the write. + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, + tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key) + )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] + + if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { + asyncCompactionTriggerFn.get.apply(client) + } + + if (isAsyncClusteringEnabled(client, parameters)) { + asyncClusteringTriggerFn.get.apply(client) + } + + val hoodieRecords = + if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) { + DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters)) + } else { + hoodieAllIncomingRecords + } + client.startCommitWithTime(instantTime, commitActionType) + val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) + (writeResult, client) + } } // Check for errors and commit the write. @@ -276,6 +290,16 @@ object HoodieSparkSqlWriter { latestSchema } + def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext : SparkContext, df: Dataset[Row], + reconcileSchema: Boolean) : RDD[GenericRecord] = { + val structName = s"${tblName}_record" + val nameSpace = s"hoodie.${tblName}" + sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema) + } + def bootstrap(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala index 8ec0583ef..84b9c98b9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala @@ -22,8 +22,9 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.config.HoodieConfig -import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException @@ -36,7 +37,7 @@ import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} -import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} @@ -146,17 +147,12 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, path: java.nio.file.Path, populateMetaFields : Boolean = true) : Unit = { val hoodieFooTableName = "hoodie_foo_tbl" //create a new table - val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, - DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, - "hoodie.bulkinsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, - DataSourceWriteOptions.ENABLE_ROW_WRITER.key -> "true", - HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key() -> String.valueOf(populateMetaFields), - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - HoodieWriteConfig.BULKINSERT_SORT_MODE.key() -> sortMode.name(), - DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + .updated("hoodie.bulkinsert.shuffle.parallelism", "4") + .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") + .updated(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)) + .updated(HoodieWriteConfig.BULKINSERT_SORT_MODE.key(), sortMode.name()) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts @@ -168,7 +164,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val toUpdateDataset = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 40), structType) val updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset) val records = inserts.union(updates) - val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi @@ -185,14 +180,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // fetch all records from parquet files generated from write to hudi val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) - if (!populateMetaFields) { List(0, 1, 2, 3, 4).foreach(i => assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(i)).filter(entry => !(entry.mkString(",").equals(""))).count())) } // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + val trimmedDf = dropMetaFields(actualDf) assert(df.except(trimmedDf).count() == 0) } @@ -201,20 +193,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { testBulkInsertWithSortMode(BulkInsertSortMode.NONE, path, false) - // enabling meta fields back should throw exception val hoodieFooTableName = "hoodie_foo_tbl" //create a new table - val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, - DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, - "hoodie.bulkinsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, - DataSourceWriteOptions.ENABLE_ROW_WRITER.key -> "true", - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - HoodieWriteConfig.BULKINSERT_SORT_MODE.key() -> BulkInsertSortMode.NONE.name(), - DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + .updated("hoodie.bulkinsert.shuffle.parallelism", "4") + .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") + .updated(HoodieWriteConfig.BULKINSERT_SORT_MODE.key(), BulkInsertSortMode.NONE.name()) val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts @@ -239,20 +225,13 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { initSparkContext("test_append_mode") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { - val hoodieFooTableName = "hoodie_foo_tbl" - //create a new table - val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, - DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, - "hoodie.bulkinsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, - DataSourceWriteOptions.ENABLE_ROW_WRITER.key -> "true", - INSERT_DROP_DUPS.key -> "true", - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + .updated("hoodie.bulkinsert.shuffle.parallelism", "4") + .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") + .updated(INSERT_DROP_DUPS.key, "true") val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts @@ -276,20 +255,13 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { initSparkContext("test_bulk_insert_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { - val sqlContext = spark.sqlContext val sc = spark.sparkContext val hoodieFooTableName = "hoodie_foo_tbl" - //create a new table - val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, - "hoodie.bulkinsert.shuffle.parallelism" -> "1", - DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, - DataSourceWriteOptions.INSERT_DROP_DUPS.key -> "false", - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "false") val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts @@ -312,12 +284,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // fetch all records from parquet files generated from write to hudi val actualDf = spark.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) - // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - + val trimmedDf = dropMetaFields(actualDf) assert(df.except(trimmedDf).count() == 0) } finally { spark.stop() @@ -329,18 +297,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { initSparkContext("test_bulk_insert_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { - val hoodieFooTableName = "hoodie_foo_tbl" - - //create a new table - val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, - "hoodie.bulkinsert.shuffle.parallelism" -> "4", - DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, - DataSourceWriteOptions.ENABLE_ROW_WRITER.key -> "true", - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + .updated("hoodie.bulkinsert.shuffle.parallelism", "4") + .updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true") val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, @@ -364,12 +325,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // Fetch records from entire dataset val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) - // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - + val trimmedDf = dropMetaFields(actualDf) // find total df (union from multiple rounds) totalExpectedDf = totalExpectedDf.union(df) // find mismatch between actual and expected df @@ -444,12 +401,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } else if (baseFileFormat.equalsIgnoreCase(HoodieFileFormat.ORC.name())) { actualDf = sqlContext.read.orc(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) } - // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - + val trimmedDf = dropMetaFields(actualDf) assert(df.except(trimmedDf).count() == 0) } finally { spark.stop() @@ -464,7 +417,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { initSparkContext("test_bootstrap_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") val srcPath = java.nio.file.Files.createTempDirectory("hoodie_bootstrap_source_path") - try { val hoodieFooTableName = "hoodie_foo_tbl" val sourceDF = TestBootstrap.generateTestRawTripDataset(Instant.now.toEpochMilli, 0, 100, Collections.emptyList(), sc, @@ -498,7 +450,6 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { // Verify that HoodieWriteClient is closed correctly verify(client, times(1)).close() - // fetch all records from parquet files generated from write to hudi val actualDf = sqlContext.read.parquet(path.toAbsolutePath.toString) assert(actualDf.count == 100) @@ -517,16 +468,9 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_schema_evol") try { val hoodieFooTableName = "hoodie_foo_tbl_schema_evolution_" + tableType - val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, - HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, - "hoodie.insert.shuffle.parallelism" -> "1", - "hoodie.upsert.shuffle.parallelism" -> "1", - DataSourceWriteOptions.TABLE_TYPE.key -> tableType, - DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", - DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", - DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator", - DataSourceWriteOptions.RECONCILE_SCHEMA.key -> "true" - ) + //create a new table + val fooTableModifier = getCommonParams(path, hoodieFooTableName, tableType) + .updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "true") val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) // generate the inserts @@ -542,15 +486,11 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assertEquals(10, snapshotDF1.count()) // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf1 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - + val trimmedDf1 = dropMetaFields(snapshotDF1) assert(df1.except(trimmedDf1).count() == 0) // issue updates so that log files are created for MOR table - val updates = DataSourceTestUtils.generateUpdates(records, 5); - val updatesSeq = convertRowListToSeq(updates) + val updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5)) val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) @@ -559,10 +499,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assertEquals(10, snapshotDF2.count()) // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf2 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - + val trimmedDf2 = dropMetaFields(snapshotDF2) // ensure 2nd batch of updates matches. assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) @@ -580,10 +517,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assertEquals(15, snapshotDF3.count()) // remove metadata columns so that expected and actual DFs can be compared as is - val trimmedDf3 = snapshotDF3.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) - .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) - + val trimmedDf3 = dropMetaFields(snapshotDF3) // ensure 2nd batch of updates matches. assert(df3.intersect(trimmedDf3).except(df3).count() == 0) @@ -597,10 +531,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { .load(path.toAbsolutePath.toString + "/*/*/*/*") assertEquals(25, snapshotDF4.count()) - val tableMetaClient = HoodieTableMetaClient.builder() - .setConf(spark.sparkContext.hadoopConfiguration) - .setBasePath(path.toAbsolutePath.toString) - .build() + val tableMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(path.toAbsolutePath.toString).build() val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchemaWithoutMetadataFields assertTrue(actualSchema != null) val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName) @@ -747,6 +679,79 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } + test("test delete partitions") { + initSparkContext("test_delete_partitions") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_delete_partitions") + try { + val hoodieFooTableName = "hoodie_foo_tbl_delete_partitions" + val fooTableModifier = getCommonParams(path, hoodieFooTableName, HoodieTableType.COPY_ON_WRITE.name()) + val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val records = DataSourceTestUtils.generateRandomRows(10) + val recordsSeq = convertRowListToSeq(records) + val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) + + val snapshotDF1 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(10, snapshotDF1.count()) + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf1 = dropMetaFields(snapshotDF1) + assert(df1.except(trimmedDf1).count() == 0) + + // issue updates so that log files are created for MOR table + var updatesSeq = convertRowListToSeq(DataSourceTestUtils.generateUpdates(records, 5)) + var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) + // write updates to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) + val snapshotDF2 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(10, snapshotDF2.count()) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf2 = dropMetaFields(snapshotDF2) + // ensure 2nd batch of updates matches. + assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) + + // delete partitions + val recordsToDelete = df1.filter(entry => { + val partitionPath : String = entry.getString(1) + partitionPath.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH) || partitionPath.equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH) + }) + val updatedParams = fooTableParams.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name()) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, updatedParams, recordsToDelete) + + val snapshotDF3 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(0, snapshotDF3.filter(entry => { + val partitionPath = entry.getString(3) + !partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) + }).count()) + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + + def dropMetaFields(df: Dataset[Row]) : Dataset[Row] = { + df.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + } + + def getCommonParams(path: java.nio.file.Path, hoodieFooTableName: String, tableType: String) : Map[String, String] = { + Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.TABLE_TYPE.key -> tableType, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS.key -> "org.apache.hudi.keygen.SimpleKeyGenerator") + } + test("test Non partition table with metatable support") { List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType => initSparkContext("testNonPartitionTableWithMetaTable")