[HUDI-1425] Performance loss with the additional hoodieRecords.isEmpty() in HoodieSparkSqlWriter#write (#2296)
This commit is contained in:
@@ -172,6 +172,11 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
||||
|
||||
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
|
||||
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
|
||||
// Skip the empty commit if not allowed
|
||||
if (!config.allowEmptyCommit() && stats.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
LOG.info("Committing " + instantTime + " action " + commitActionType);
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable table = createTable(config, hadoopConf);
|
||||
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
|
||||
|
||||
@@ -367,6 +367,12 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
.withDocumentation("When enabled, records in older schema are rewritten into newer schema during upsert,delete and background"
|
||||
+ " compaction,clustering operations.");
|
||||
|
||||
public static final ConfigProperty<Boolean> ALLOW_EMPTY_COMMIT = ConfigProperty
|
||||
.key("hoodie.allow.empty.commit")
|
||||
.defaultValue(true)
|
||||
.withDocumentation("Whether to allow generation of empty commits, even if no data was written in the commit. "
|
||||
+ "It's useful in cases where extra metadata needs to be published regardless e.g tracking source offsets when ingesting data");
|
||||
|
||||
private ConsistencyGuardConfig consistencyGuardConfig;
|
||||
|
||||
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
|
||||
@@ -1275,6 +1281,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return getString(WRITE_META_KEY_PREFIXES_PROP);
|
||||
}
|
||||
|
||||
public boolean allowEmptyCommit() {
|
||||
return getBooleanOrDefault(ALLOW_EMPTY_COMMIT);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
|
||||
|
||||
@@ -132,6 +132,12 @@ public class HoodieConfig implements Serializable {
|
||||
return rawValue.map(v -> Boolean.parseBoolean(v.toString())).orElse(null);
|
||||
}
|
||||
|
||||
public <T> boolean getBooleanOrDefault(ConfigProperty<T> configProperty) {
|
||||
Option<Object> rawValue = getRawValue(configProperty);
|
||||
return rawValue.map(v -> Boolean.parseBoolean(v.toString()))
|
||||
.orElse((Boolean) configProperty.defaultValue());
|
||||
}
|
||||
|
||||
public <T> Long getLong(ConfigProperty<T> configProperty) {
|
||||
Option<Object> rawValue = getRawValue(configProperty);
|
||||
return rawValue.map(v -> Long.parseLong(v.toString())).orElse(null);
|
||||
|
||||
@@ -194,11 +194,6 @@ object HoodieSparkSqlWriter {
|
||||
} else {
|
||||
hoodieAllIncomingRecords
|
||||
}
|
||||
|
||||
if (hoodieRecords.isEmpty()) {
|
||||
log.info("new batch has no new records, skipping...")
|
||||
(true, common.util.Option.empty())
|
||||
}
|
||||
client.startCommitWithTime(instantTime, commitActionType)
|
||||
val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
|
||||
(writeResult, client)
|
||||
|
||||
@@ -574,7 +574,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
||||
fail("should fail when invalid PartitionKeyType is provided!")
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
assertTrue(e.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"))
|
||||
assertTrue(e.getCause.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -770,7 +770,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test def testSchemaNotEqualData(): Unit = {
|
||||
val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true")
|
||||
val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true)::
|
||||
@@ -785,11 +784,23 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
||||
.options(opts)
|
||||
.mode(SaveMode.Append)
|
||||
.save(basePath)
|
||||
|
||||
val recordsReadDF = spark.read.format("org.apache.hudi")
|
||||
.load(basePath + "/*/*")
|
||||
|
||||
val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray)
|
||||
assertEquals(resultSchema, schema1)
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = Array(true, false))
|
||||
def testWithEmptyInput(allowEmptyCommit: Boolean): Unit = {
|
||||
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(Seq.empty[String], 1))
|
||||
inputDF1.write.format("org.apache.hudi")
|
||||
.options(commonOpts)
|
||||
.option(DataSourceWriteOptions.OPERATION_OPT_KEY.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||
.option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), allowEmptyCommit.toString)
|
||||
.mode(SaveMode.Overwrite)
|
||||
.save(basePath)
|
||||
assertEquals(allowEmptyCommit, HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user