From bbadac7de1bb57300ca7e796ebb401fdbb66a0f8 Mon Sep 17 00:00:00 2001 From: pengzhiwei Date: Thu, 29 Jul 2021 12:30:18 +0800 Subject: [PATCH] [HUDI-1425] Performance loss with the additional hoodieRecords.isEmpty() in HoodieSparkSqlWriter#write (#2296) --- .../hudi/client/AbstractHoodieWriteClient.java | 5 +++++ .../apache/hudi/config/HoodieWriteConfig.java | 10 ++++++++++ .../apache/hudi/common/config/HoodieConfig.java | 6 ++++++ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 ----- .../hudi/functional/TestCOWDataSource.scala | 17 ++++++++++++++--- 5 files changed, 35 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 6470cfa93..8c48f73b1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -172,6 +172,11 @@ public abstract class AbstractHoodieWriteClient stats, Option> extraMetadata, String commitActionType, Map> partitionToReplaceFileIds) { + // Skip the empty commit if not allowed + if (!config.allowEmptyCommit() && stats.isEmpty()) { + return true; + } + LOG.info("Committing " + instantTime + " action " + commitActionType); // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = createTable(config, hadoopConf); HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 212b0197a..6bb57da87 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -367,6 +367,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("When enabled, records in older schema are rewritten into newer schema during upsert,delete and background" + " compaction,clustering operations."); + public static final ConfigProperty ALLOW_EMPTY_COMMIT = ConfigProperty + .key("hoodie.allow.empty.commit") + .defaultValue(true) + .withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. " + + "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data"); + private ConsistencyGuardConfig consistencyGuardConfig; // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled @@ -1275,6 +1281,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(WRITE_META_KEY_PREFIXES_PROP); } + public boolean allowEmptyCommit() { + return getBooleanOrDefault(ALLOW_EMPTY_COMMIT); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index 97c6f49ec..1f646aa8d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -132,6 +132,12 @@ public class HoodieConfig implements Serializable { return rawValue.map(v -> Boolean.parseBoolean(v.toString())).orElse(null); } + public boolean getBooleanOrDefault(ConfigProperty configProperty) { + Option rawValue = getRawValue(configProperty); + return rawValue.map(v -> Boolean.parseBoolean(v.toString())) + .orElse((Boolean) configProperty.defaultValue()); + } + public Long getLong(ConfigProperty configProperty) { Option rawValue = getRawValue(configProperty); return rawValue.map(v -> Long.parseLong(v.toString())).orElse(null); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 409f5c3a6..b2a8a0e0b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -194,11 +194,6 @@ object HoodieSparkSqlWriter { } else { hoodieAllIncomingRecords } - - if (hoodieRecords.isEmpty()) { - log.info("new batch has no new records, skipping...") - (true, common.util.Option.empty()) - } client.startCommitWithTime(instantTime, commitActionType) val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation) (writeResult, client) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index fa5479c7b..3a1165a62 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -574,7 +574,7 @@ class TestCOWDataSource extends HoodieClientTestBase { fail("should fail when invalid PartitionKeyType is provided!") } catch { case e: Exception => - assertTrue(e.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")) + assertTrue(e.getCause.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")) } } @@ -770,7 +770,6 @@ class TestCOWDataSource extends HoodieClientTestBase { } } - @Test def testSchemaNotEqualData(): Unit = { val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: @@ -785,11 +784,23 @@ class TestCOWDataSource extends HoodieClientTestBase { .options(opts) .mode(SaveMode.Append) .save(basePath) - val recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) assertEquals(resultSchema, schema1) } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testWithEmptyInput(allowEmptyCommit: Boolean): Unit = { + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(Seq.empty[String], 1)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), allowEmptyCommit.toString) + .mode(SaveMode.Overwrite) + .save(basePath) + assertEquals(allowEmptyCommit, HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + } }