From 6786581c4842e47e1a8a8e942f54003dc151c7c6 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 9 Apr 2021 13:46:19 +0800 Subject: [PATCH] [HUDI-1775] Add option for compaction parallelism (#2785) --- .../hudi/configuration/FlinkOptions.java | 25 +++++++++++++-- .../partitioner/BucketAssignFunction.java | 32 ++++++++++++++----- .../apache/hudi/table/HoodieTableSink.java | 1 + .../org/apache/hudi/util/StreamerUtil.java | 4 +++ .../hudi/sink/TestWriteCopyOnWrite.java | 4 +++ .../hudi/table/HoodieDataSourceITCase.java | 3 +- 6 files changed, 58 insertions(+), 11 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b66be2b1e..b120714b3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -71,6 +71,15 @@ public class FlinkOptions { .withDescription("The default partition name in case the dynamic partition" + " column value is null/empty string"); + // ------------------------------------------------------------------------ + // Index Options + // ------------------------------------------------------------------------ + public static final ConfigOption INDEX_BOOTSTRAP_ENABLED = ConfigOptions + .key("index.bootstrap.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Whether to bootstrap the index state from existing hoodie table, default false"); + // ------------------------------------------------------------------------ // Read Options // ------------------------------------------------------------------------ @@ -255,8 +264,14 @@ public class FlinkOptions { public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions .key("write.batch.size.MB") .doubleType() - .defaultValue(2D) // 2MB - .withDescription("Batch buffer size in MB to flush data into the underneath filesystem"); + .defaultValue(64D) // 64MB + .withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB"); + + public static final ConfigOption WRITE_LOG_BLOCK_SIZE = ConfigOptions + .key("write.log_block.size.MB") + .intType() + .defaultValue(128) + .withDescription("Max log block size in MB for log file, default 128MB"); // ------------------------------------------------------------------------ // Compaction Options @@ -268,6 +283,12 @@ public class FlinkOptions { .defaultValue(true) // default true for MOR write .withDescription("Async Compaction, enabled by default for MOR"); + public static final ConfigOption COMPACTION_TASKS = ConfigOptions + .key("compaction.tasks") + .intType() + .defaultValue(10) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.5 (assumes two commits generate one bucket) + .withDescription("Parallelism of tasks that do actual compaction, default is 10"); + public static final String NUM_COMMITS = "num_commits"; public static final String TIME_ELAPSED = "time_elapsed"; public static final String NUM_AND_TIME = "num_and_time"; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 9c23259fc..7e017ccc8 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -31,7 +31,6 @@ import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.BucketInfo; @@ -103,6 +102,8 @@ public class BucketAssignFunction> private final boolean isChangingRecords; + private final boolean bootstrapIndex; + /** * State to book-keep which partition is loaded into the index state {@code indexState}. */ @@ -112,6 +113,7 @@ public class BucketAssignFunction> this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); + this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED); } @Override @@ -143,9 +145,11 @@ public class BucketAssignFunction> TypeInformation.of(HoodieKey.class), TypeInformation.of(HoodieRecordLocation.class)); indexState = context.getKeyedStateStore().getMapState(indexStateDesc); - MapStateDescriptor partitionLoadStateDesc = - new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT); - partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc); + if (bootstrapIndex) { + MapStateDescriptor partitionLoadStateDesc = + new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT); + partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc); + } } @SuppressWarnings("unchecked") @@ -159,7 +163,9 @@ public class BucketAssignFunction> final BucketInfo bucketInfo; final HoodieRecordLocation location; - if (!partitionLoadState.contains(hoodieKey.getPartitionPath())) { + // The dataset may be huge, thus the processing would block for long, + // disabled by default. + if (bootstrapIndex && !partitionLoadState.contains(hoodieKey.getPartitionPath())) { // If the partition records are never loaded, load the records first. loadRecords(hoodieKey.getPartitionPath()); } @@ -205,6 +211,7 @@ public class BucketAssignFunction> * @throws Exception when error occurs for state update */ private void loadRecords(String partitionPath) throws Exception { + LOG.info("Start loading records under partition {} into the index state", partitionPath); HoodieTable hoodieTable = bucketAssigner.getTable(); List latestBaseFiles = HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable); @@ -212,8 +219,16 @@ public class BucketAssignFunction> final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); final int taskID = getRuntimeContext().getIndexOfThisSubtask(); for (HoodieBaseFile baseFile : latestBaseFiles) { - List hoodieKeys = - ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath())); + final List hoodieKeys; + try { + hoodieKeys = + ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath())); + } catch (Exception e) { + // in case there was some empty parquet file when the pipeline + // crushes exceptionally. + LOG.error("Error when loading record keys from file: {}", baseFile); + continue; + } hoodieKeys.forEach(hoodieKey -> { try { // Reference: org.apache.flink.streaming.api.datastream.KeyedStream, @@ -224,12 +239,13 @@ public class BucketAssignFunction> this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId())); } } catch (Exception e) { - throw new HoodieIOException("Error when load record keys from file: " + baseFile); + LOG.error("Error when putting record keys into the state from file: {}", baseFile); } }); } // Mark the partition path as loaded. partitionLoadState.put(partitionPath, 0); + LOG.info("Finish loading records under partition {} into the index state", partitionPath); } @VisibleForTesting diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index a568a3f5c..b52e0caed 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -91,6 +91,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning { .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), new KeyedProcessOperator<>(new CompactFunction(conf))) + .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) .addSink(new CompactionCommitSink(conf)) .name("compact_commit") .setParallelism(1); // compaction commit should be singleton diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 8fed30b7a..3cc5d561d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.TablePathUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; +import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; @@ -210,6 +211,9 @@ public class StreamerUtil { conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L ).build()) .forTable(conf.getString(FlinkOptions.TABLE_NAME)) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) + .build()) .withAutoCommit(false) .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf))); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 2384f7ef9..f373ab86d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -449,6 +449,10 @@ public class TestWriteCopyOnWrite { @Test public void testIndexStateBootstrap() throws Exception { + // reset the config option + conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + // open the function and ingest data funcWrapper.openFunction(); for (RowData rowData : TestData.DATA_SET_INSERT) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index ffb985915..56cbb5543 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -169,6 +169,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED); options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); + options.put(FlinkOptions.COMPACTION_TASKS.key(), "1"); String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); streamTableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 select * from source"; @@ -180,7 +181,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase { } @Test - void testStreamWriteWithCleaning() throws InterruptedException { + void testStreamWriteWithCleaning() { // create filesystem table named source // the source generates 4 commits but the cleaning task