From 5be3997f70415e1752a0b5214f9398880fc8fd1f Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 27 Apr 2021 10:58:06 +0800 Subject: [PATCH] [HUDI-1841] Tweak the min max commits to keep when setting up cleaning retain commits for Flink (#2875) --- .../hudi/configuration/FlinkOptions.java | 12 ++++ .../apache/hudi/sink/StreamWriteFunction.java | 36 ++++++++---- .../apache/hudi/table/HoodieTableFactory.java | 18 ++++++ .../org/apache/hudi/util/StreamerUtil.java | 1 + .../hudi/table/TestHoodieTableFactory.java | 56 +++++++++++++++++++ 5 files changed, 112 insertions(+), 11 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index dd20f22e4..d132d5322 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -358,6 +358,18 @@ public class FlinkOptions { .withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n" + "This also directly translates into how much you can incrementally pull on this table, default 10"); + public static final ConfigOption ARCHIVE_MAX_COMMITS = ConfigOptions + .key("archive.max_commits") + .intType() + .defaultValue(30)// default max 30 commits + .withDescription("Max number of commits to keep before archiving older commits into a sequential log, default 30"); + + public static final ConfigOption ARCHIVE_MIN_COMMITS = ConfigOptions + .key("archive.min_commits") + .intType() + .defaultValue(20)// default min 20 commits + .withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 20"); + // ------------------------------------------------------------------------ // Hive Sync Options // ------------------------------------------------------------------------ diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 36bd0edae..d316ee11d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -63,7 +63,8 @@ import java.util.stream.Collectors; *

Work Flow

* *

The function firstly buffers the data as a batch of {@link HoodieRecord}s, - * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BUCKET_SIZE} + * It flushes(write) the records bucket when the bucket size exceeds the configured threshold {@link FlinkOptions#WRITE_BUCKET_SIZE} + * or the whole data buffer size exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE} * or a Flink checkpoint starts. After a batch has been written successfully, * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. * @@ -356,8 +357,13 @@ public class StreamWriteFunction /** * Buffers the given record. * - *

Flush the data bucket first if the bucket records size is greater than - * the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}. + *

Flush the data bucket first if one of the condition meets: + * + *

    + *
  • The bucket size is greater than the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}.
  • + *
  • Flush half of the data buckets if the whole buffer size + * exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE}.
  • + *
* * @param value HoodieRecord */ @@ -365,19 +371,26 @@ public class StreamWriteFunction boolean flushBuffer = detector.detect(value); if (flushBuffer) { List sortedBuckets = this.buckets.values().stream() - .sorted(Comparator.comparingDouble(b -> b.tracer.totalSize)) + .filter(b -> b.records.size() > 0) + .sorted(Comparator.comparingLong(b -> b.tracer.totalSize)) .collect(Collectors.toList()); - // flush half number of buckets to avoid flushing too small buckets + // flush half bytes size of buckets to avoid flushing too small buckets // which cause small files. - int numBucketsToFlush = (sortedBuckets.size() + 1) / 2; - LOG.info("Flush {} data buckets because the total buffer size [{} bytes] exceeds the threshold [{} bytes]", - numBucketsToFlush, detector.totalSize, detector.threshold); - for (int i = 0; i < numBucketsToFlush; i++) { - DataBucket bucket = sortedBuckets.get(i); + long totalSize = detector.totalSize; + long flushedBytes = 0; + for (DataBucket bucket : sortedBuckets) { + final long bucketSize = bucket.tracer.totalSize; flushBucket(bucket); - detector.countDown(bucket.tracer.totalSize); + detector.countDown(bucketSize); bucket.reset(); + + flushedBytes += bucketSize; + if (flushedBytes > detector.totalSize / 2) { + break; + } } + LOG.info("Flush {} bytes data buckets because the total buffer size {} bytes exceeds the threshold {} bytes", + flushedBytes, totalSize, detector.threshold); } final String bucketID = getBucketID(value); @@ -386,6 +399,7 @@ public class StreamWriteFunction boolean flushBucket = bucket.tracer.trace(detector.lastRecordSize); if (flushBucket) { flushBucket(bucket); + detector.countDown(bucket.tracer.totalSize); bucket.reset(); } bucket.records.add((HoodieRecord) value); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index abdfdcbb3..1566aa6f7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -115,6 +115,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab conf.setString(FlinkOptions.TABLE_NAME.key(), tableName); // hoodie key about options setupHoodieKeyOptions(conf, table); + // cleaning options + setupCleaningOptions(conf); // infer avro schema from physical DDL schema inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType()); } @@ -152,6 +154,22 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab } } + /** + * Sets up the cleaning options from the table definition. + */ + private static void setupCleaningOptions(Configuration conf) { + int commitsToRetain = conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS); + int minCommitsToKeep = conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS); + if (commitsToRetain >= minCommitsToKeep) { + LOG.info("Table option [{}] is reset to {} to be greater than {}={},\n" + + "to avoid risk of missing data from few instants in incremental pull", + FlinkOptions.ARCHIVE_MIN_COMMITS.key(), commitsToRetain + 10, + FlinkOptions.CLEAN_RETAIN_COMMITS.key(), commitsToRetain); + conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); + conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); + } + } + /** * Inferences the deserialization Avro schema from the table schema (e.g. the DDL) * if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index b28b604f0..d08d68e62 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -203,6 +203,7 @@ public class StreamerUtil { // override and hardcode to 20, // actually Flink cleaning is always with parallelism 1 now .withCleanerParallelism(20) + .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) .build()) .withMemoryConfig( HoodieMemoryConfig.newBuilder() diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 1f2059e2c..b7f4429f5 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -130,6 +130,34 @@ public class TestHoodieTableFactory { assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName())); } + @Test + void testSetupCleaningOptionsForSource() { + // definition with simple primary key and partition path + TableSchema schema1 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + // set up new retains commits that is less than min archive commits + this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11"); + + final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1); + final Configuration conf1 = tableSource1.getConf(); + assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(20)); + assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(30)); + + // set up new retains commits that is greater than min archive commits + this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "25"); + + final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2); + final Configuration conf2 = tableSource2.getConf(); + assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(35)); + assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45)); + } + @Test void testInferAvroSchemaForSink() { // infer the schema if not specified @@ -186,6 +214,34 @@ public class TestHoodieTableFactory { assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName())); } + @Test + void testSetupCleaningOptionsForSink() { + // definition with simple primary key and partition path + TableSchema schema1 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + // set up new retains commits that is less than min archive commits + this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11"); + + final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1); + final Configuration conf1 = tableSink1.getConf(); + assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(20)); + assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(30)); + + // set up new retains commits that is greater than min archive commits + this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "25"); + + final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2"); + final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2); + final Configuration conf2 = tableSink2.getConf(); + assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(35)); + assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45)); + } + // ------------------------------------------------------------------------- // Inner Class // -------------------------------------------------------------------------