From 773b3179834d6e0d2912a7a9d4b3b3873d0c2a21 Mon Sep 17 00:00:00 2001 From: ForwardXu Date: Mon, 7 Feb 2022 22:28:13 +0800 Subject: [PATCH] [HUDI-2941] Show _hoodie_operation in spark sql results (#4649) --- .../org/apache/hudi/table/HoodieTable.java | 4 +- .../table/action/compact/HoodieCompactor.java | 4 +- .../common/table/TableSchemaResolver.java | 35 ++++++++------ .../apache/hudi/table/HoodieTableSource.java | 4 +- .../hudi/source/TestStreamReadOperator.java | 4 +- .../hudi/MergeOnReadIncrementalRelation.scala | 4 +- .../hudi/MergeOnReadSnapshotRelation.scala | 4 +- .../spark/sql/hudi/TestInsertTable.scala | 48 ++++++++++++++++++- .../sync/common/AbstractSyncHoodieClient.java | 17 ++----- .../hudi/utilities/HoodieClusteringJob.java | 4 +- 10 files changed, 85 insertions(+), 43 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 8c958e989..f19b6aa86 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -679,9 +679,9 @@ public abstract class HoodieTable implem Schema writerSchema; boolean isValid; try { - TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient()); + TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient()); writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); - tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields()); + tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchemaWithoutMetadataFields()); isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema); } catch (Exception e) { throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 73e1413d9..a3b7a7fab 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -111,13 +111,13 @@ public abstract class HoodieCompactor im table.getMetaClient().reloadActiveTimeline(); HoodieTableMetaClient metaClient = table.getMetaClient(); - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); // Here we firstly use the table schema as the reader schema to read // log file.That is because in the case of MergeInto, the config.getSchema may not // the same with the table schema. try { - Schema readerSchema = schemaUtil.getTableAvroSchema(false); + Schema readerSchema = schemaResolver.getTableAvroSchema(false); config.setSchema(readerSchema.toString()); } catch (Exception e) { // If there is no commit in the table, just ignore the exception. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index a70774896..0607601ca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -21,14 +21,13 @@ package org.apache.hudi.common.table; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.SchemaCompatibility; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; import org.apache.hudi.common.table.log.block.HoodieDataBlock; @@ -42,10 +41,8 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; - import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; @@ -61,15 +58,11 @@ public class TableSchemaResolver { private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class); private final HoodieTableMetaClient metaClient; - private final boolean withOperationField; + private final boolean hasOperationField; public TableSchemaResolver(HoodieTableMetaClient metaClient) { - this(metaClient, false); - } - - public TableSchemaResolver(HoodieTableMetaClient metaClient, boolean withOperationField) { this.metaClient = metaClient; - this.withOperationField = withOperationField; + this.hasOperationField = hasOperationField(); } /** @@ -122,7 +115,7 @@ public class TableSchemaResolver { } } - public Schema getTableAvroSchemaFromDataFile() throws Exception { + public Schema getTableAvroSchemaFromDataFile() { return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile()); } @@ -151,7 +144,7 @@ public class TableSchemaResolver { Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); if (schemaFromTableConfig.isPresent()) { if (includeMetadataFields) { - return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField); + return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField); } else { return schemaFromTableConfig.get(); } @@ -176,7 +169,7 @@ public class TableSchemaResolver { } Option schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema(); if (schemaFromTableConfig.isPresent()) { - Schema schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField); + Schema schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField); return convertAvroSchemaToParquet(schema); } return getTableParquetSchemaFromDataFile(); @@ -244,7 +237,7 @@ public class TableSchemaResolver { Schema schema = new Schema.Parser().parse(existingSchemaStr); if (includeMetadataFields) { - schema = HoodieAvroUtils.addMetadataFields(schema, withOperationField); + schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField); } return Option.of(schema); } catch (Exception e) { @@ -477,4 +470,18 @@ public class TableSchemaResolver { } return null; } + + public boolean isHasOperationField() { + return hasOperationField; + } + + private boolean hasOperationField() { + try { + Schema tableAvroSchema = getTableAvroSchemaFromDataFile(); + return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null; + } catch (Exception e) { + LOG.warn("Failed to read operation field from avro schema", e); + return false; + } + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 259c2e40c..3efd1d561 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -452,8 +452,8 @@ public class HoodieTableSource implements @VisibleForTesting public Schema getTableAvroSchema() { try { - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)); - return schemaUtil.getTableAvroSchema(); + TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); + return schemaResolver.getTableAvroSchema(); } catch (Throwable e) { // table exists but has no written data LOG.warn("Get table avro schema error, use schema from the DDL instead", e); diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index 911c68511..db45a7597 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -245,10 +245,10 @@ public class TestStreamReadOperator { final List partitionKeys = Collections.singletonList("partition"); // This input format is used to opening the emitted split. - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); final Schema tableAvroSchema; try { - tableAvroSchema = schemaUtil.getTableAvroSchema(); + tableAvroSchema = schemaResolver.getTableAvroSchema(); } catch (Exception e) { throw new HoodieException("Get table avro schema error", e); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 9b96abd09..01b480a11 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -74,8 +74,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp)) log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}") private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList - private val schemaUtil = new TableSchemaResolver(metaClient) - private val tableAvroSchema = schemaUtil.getTableAvroSchema + private val schemaResolver = new TableSchemaResolver(metaClient) + private val tableAvroSchema = schemaResolver.getTableAvroSchema private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 449d87b8a..2829b4bc1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -65,10 +65,10 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, private val conf = sqlContext.sparkContext.hadoopConfiguration private val jobConf = new JobConf(conf) // use schema from latest metadata, if not present, read schema from the data file - private val schemaUtil = new TableSchemaResolver(metaClient) + private val schemaResolver = new TableSchemaResolver(metaClient) private lazy val tableAvroSchema = { try { - schemaUtil.getTableAvroSchema + schemaResolver.getTableAvroSchema } catch { case _: Throwable => // If there is no commit in the table, we cann't get the schema // with schemaUtil, use the userSchema instead. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 4d12d987f..b186381c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -17,8 +17,12 @@ package org.apache.spark.sql.hudi -import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE} +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException +import org.apache.hudi.keygen.ComplexKeyGenerator +import org.apache.spark.sql.SaveMode import java.io.File @@ -582,8 +586,48 @@ class TestInsertTable extends TestHoodieSqlBase { checkAnswer(s"select id, name, price, ts from $tableName")( Seq(1, "a1", 11.0, 1000) ) - } } + test("Test For read operation's field") { + withTempDir { tmp => { + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + import spark.implicits._ + val day = "2021-08-02" + val df = Seq((1, "a1", 10, 1000, day, 12)).toDF("id", "name", "value", "ts", "day", "hh") + // Write a table by spark dataframe. + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "day,hh") + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key, "true") + .mode(SaveMode.Overwrite) + .save(tablePath) + + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tablePath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + + assertResult(true)(new TableSchemaResolver(metaClient).isHasOperationField) + + spark.sql( + s""" + |create table $tableName using hudi + |location '${tablePath}' + |""".stripMargin) + + // Note: spark sql batch write currently does not write actual content to the operation field + checkAnswer(s"select id, _hoodie_operation from $tableName")( + Seq(1, null) + ) + } + } + } } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 98b11f2f3..1815491f1 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.sync.common; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -29,9 +31,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -149,11 +148,7 @@ public abstract class AbstractSyncHoodieClient { */ public MessageType getDataSchema() { try { - if (withOperationField) { - return new TableSchemaResolver(metaClient, true).getTableParquetSchema(); - } else { - return new TableSchemaResolver(metaClient).getTableParquetSchema(); - } + return new TableSchemaResolver(metaClient).getTableParquetSchema(); } catch (Exception e) { throw new HoodieSyncException("Failed to read data schema", e); } @@ -162,11 +157,7 @@ public abstract class AbstractSyncHoodieClient { public boolean isDropPartition() { try { Option hoodieCommitMetadata; - if (withOperationField) { - hoodieCommitMetadata = new TableSchemaResolver(metaClient, true).getLatestCommitMetadata(); - } else { - hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata(); - } + hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata(); if (hoodieCommitMetadata.isPresent() && WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index a4ee8089f..b7345a146 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -189,11 +189,11 @@ public class HoodieClusteringJob { } private String getSchemaFromLatestInstant() throws Exception { - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient); if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) { throw new HoodieException("Cannot run clustering without any completed commits"); } - Schema schema = schemaUtil.getTableAvroSchema(false); + Schema schema = schemaResolver.getTableAvroSchema(false); return schema.toString(); }