1
0

[HUDI-1841] Tweak the min max commits to keep when setting up cleaning retain commits for Flink (#2875)

This commit is contained in:
Danny Chan
2021-04-27 10:58:06 +08:00
committed by GitHub
parent 9bbb458e88
commit 5be3997f70
5 changed files with 112 additions and 11 deletions

View File

@@ -358,6 +358,18 @@ public class FlinkOptions {
.withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 10");
public static final ConfigOption<Integer> ARCHIVE_MAX_COMMITS = ConfigOptions
.key("archive.max_commits")
.intType()
.defaultValue(30)// default max 30 commits
.withDescription("Max number of commits to keep before archiving older commits into a sequential log, default 30");
public static final ConfigOption<Integer> ARCHIVE_MIN_COMMITS = ConfigOptions
.key("archive.min_commits")
.intType()
.defaultValue(20)// default min 20 commits
.withDescription("Min number of commits to keep before archiving older commits into a sequential log, default 20");
// ------------------------------------------------------------------------
// Hive Sync Options
// ------------------------------------------------------------------------

View File

@@ -63,7 +63,8 @@ import java.util.stream.Collectors;
* <p><h2>Work Flow</h2>
*
* <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s,
* It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BUCKET_SIZE}
* It flushes(write) the records bucket when the bucket size exceeds the configured threshold {@link FlinkOptions#WRITE_BUCKET_SIZE}
* or the whole data buffer size exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE}
* or a Flink checkpoint starts. After a batch has been written successfully,
* the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write.
*
@@ -356,8 +357,13 @@ public class StreamWriteFunction<K, I, O>
/**
* Buffers the given record.
*
* <p>Flush the data bucket first if the bucket records size is greater than
* the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}.
* <p>Flush the data bucket first if one of the condition meets:
*
* <ul>
* <li>The bucket size is greater than the configured value {@link FlinkOptions#WRITE_BUCKET_SIZE}.</li>
* <li>Flush half of the data buckets if the whole buffer size
* exceeds the configured threshold {@link FlinkOptions#WRITE_BUFFER_SIZE}.</li>
* </ul>
*
* @param value HoodieRecord
*/
@@ -365,19 +371,26 @@ public class StreamWriteFunction<K, I, O>
boolean flushBuffer = detector.detect(value);
if (flushBuffer) {
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted(Comparator.comparingDouble(b -> b.tracer.totalSize))
.filter(b -> b.records.size() > 0)
.sorted(Comparator.comparingLong(b -> b.tracer.totalSize))
.collect(Collectors.toList());
// flush half number of buckets to avoid flushing too small buckets
// flush half bytes size of buckets to avoid flushing too small buckets
// which cause small files.
int numBucketsToFlush = (sortedBuckets.size() + 1) / 2;
LOG.info("Flush {} data buckets because the total buffer size [{} bytes] exceeds the threshold [{} bytes]",
numBucketsToFlush, detector.totalSize, detector.threshold);
for (int i = 0; i < numBucketsToFlush; i++) {
DataBucket bucket = sortedBuckets.get(i);
long totalSize = detector.totalSize;
long flushedBytes = 0;
for (DataBucket bucket : sortedBuckets) {
final long bucketSize = bucket.tracer.totalSize;
flushBucket(bucket);
detector.countDown(bucket.tracer.totalSize);
detector.countDown(bucketSize);
bucket.reset();
flushedBytes += bucketSize;
if (flushedBytes > detector.totalSize / 2) {
break;
}
}
LOG.info("Flush {} bytes data buckets because the total buffer size {} bytes exceeds the threshold {} bytes",
flushedBytes, totalSize, detector.threshold);
}
final String bucketID = getBucketID(value);
@@ -386,6 +399,7 @@ public class StreamWriteFunction<K, I, O>
boolean flushBucket = bucket.tracer.trace(detector.lastRecordSize);
if (flushBucket) {
flushBucket(bucket);
detector.countDown(bucket.tracer.totalSize);
bucket.reset();
}
bucket.records.add((HoodieRecord<?>) value);

View File

@@ -115,6 +115,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
// hoodie key about options
setupHoodieKeyOptions(conf, table);
// cleaning options
setupCleaningOptions(conf);
// infer avro schema from physical DDL schema
inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
}
@@ -152,6 +154,22 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
}
}
/**
* Sets up the cleaning options from the table definition.
*/
private static void setupCleaningOptions(Configuration conf) {
int commitsToRetain = conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS);
int minCommitsToKeep = conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS);
if (commitsToRetain >= minCommitsToKeep) {
LOG.info("Table option [{}] is reset to {} to be greater than {}={},\n"
+ "to avoid risk of missing data from few instants in incremental pull",
FlinkOptions.ARCHIVE_MIN_COMMITS.key(), commitsToRetain + 10,
FlinkOptions.CLEAN_RETAIN_COMMITS.key(), commitsToRetain);
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20);
}
}
/**
* Inferences the deserialization Avro schema from the table schema (e.g. the DDL)
* if both options {@link FlinkOptions#READ_AVRO_SCHEMA_PATH} and

View File

@@ -203,6 +203,7 @@ public class StreamerUtil {
// override and hardcode to 20,
// actually Flink cleaning is always with parallelism 1 now
.withCleanerParallelism(20)
.archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
.build())
.withMemoryConfig(
HoodieMemoryConfig.newBuilder()

View File

@@ -130,6 +130,34 @@ public class TestHoodieTableFactory {
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName()));
}
@Test
void testSetupCleaningOptionsForSource() {
// definition with simple primary key and partition path
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
// set up new retains commits that is less than min archive commits
this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11");
final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1);
final Configuration conf1 = tableSource1.getConf();
assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(20));
assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(30));
// set up new retains commits that is greater than min archive commits
this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "25");
final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2);
final Configuration conf2 = tableSource2.getConf();
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(35));
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45));
}
@Test
void testInferAvroSchemaForSink() {
// infer the schema if not specified
@@ -186,6 +214,34 @@ public class TestHoodieTableFactory {
assertThat(conf3.get(FlinkOptions.KEYGEN_CLASS), is(NonpartitionedAvroKeyGenerator.class.getName()));
}
@Test
void testSetupCleaningOptionsForSink() {
// definition with simple primary key and partition path
TableSchema schema1 = TableSchema.builder()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.primaryKey("f0")
.build();
// set up new retains commits that is less than min archive commits
this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "11");
final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1);
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(20));
assertThat(conf1.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(30));
// set up new retains commits that is greater than min archive commits
this.conf.setString(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "25");
final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2");
final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2);
final Configuration conf2 = tableSink2.getConf();
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), is(35));
assertThat(conf2.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS), is(45));
}
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------