[HUDI-1775] Add option for compaction parallelism (#2785)
This commit is contained in:
@@ -71,6 +71,15 @@ public class FlinkOptions {
|
|||||||
.withDescription("The default partition name in case the dynamic partition"
|
.withDescription("The default partition name in case the dynamic partition"
|
||||||
+ " column value is null/empty string");
|
+ " column value is null/empty string");
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// Index Options
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
public static final ConfigOption<Boolean> INDEX_BOOTSTRAP_ENABLED = ConfigOptions
|
||||||
|
.key("index.bootstrap.enabled")
|
||||||
|
.booleanType()
|
||||||
|
.defaultValue(false)
|
||||||
|
.withDescription("Whether to bootstrap the index state from existing hoodie table, default false");
|
||||||
|
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
// Read Options
|
// Read Options
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
@@ -255,8 +264,14 @@ public class FlinkOptions {
|
|||||||
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
|
public static final ConfigOption<Double> WRITE_BATCH_SIZE = ConfigOptions
|
||||||
.key("write.batch.size.MB")
|
.key("write.batch.size.MB")
|
||||||
.doubleType()
|
.doubleType()
|
||||||
.defaultValue(2D) // 2MB
|
.defaultValue(64D) // 64MB
|
||||||
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem");
|
.withDescription("Batch buffer size in MB to flush data into the underneath filesystem, default 64MB");
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> WRITE_LOG_BLOCK_SIZE = ConfigOptions
|
||||||
|
.key("write.log_block.size.MB")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(128)
|
||||||
|
.withDescription("Max log block size in MB for log file, default 128MB");
|
||||||
|
|
||||||
// ------------------------------------------------------------------------
|
// ------------------------------------------------------------------------
|
||||||
// Compaction Options
|
// Compaction Options
|
||||||
@@ -268,6 +283,12 @@ public class FlinkOptions {
|
|||||||
.defaultValue(true) // default true for MOR write
|
.defaultValue(true) // default true for MOR write
|
||||||
.withDescription("Async Compaction, enabled by default for MOR");
|
.withDescription("Async Compaction, enabled by default for MOR");
|
||||||
|
|
||||||
|
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
|
||||||
|
.key("compaction.tasks")
|
||||||
|
.intType()
|
||||||
|
.defaultValue(10) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.5 (assumes two commits generate one bucket)
|
||||||
|
.withDescription("Parallelism of tasks that do actual compaction, default is 10");
|
||||||
|
|
||||||
public static final String NUM_COMMITS = "num_commits";
|
public static final String NUM_COMMITS = "num_commits";
|
||||||
public static final String TIME_ELAPSED = "time_elapsed";
|
public static final String TIME_ELAPSED = "time_elapsed";
|
||||||
public static final String NUM_AND_TIME = "num_and_time";
|
public static final String NUM_AND_TIME = "num_and_time";
|
||||||
|
|||||||
@@ -31,7 +31,6 @@ import org.apache.hudi.common.util.ParquetUtils;
|
|||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
|
||||||
import org.apache.hudi.index.HoodieIndexUtils;
|
import org.apache.hudi.index.HoodieIndexUtils;
|
||||||
import org.apache.hudi.table.HoodieTable;
|
import org.apache.hudi.table.HoodieTable;
|
||||||
import org.apache.hudi.table.action.commit.BucketInfo;
|
import org.apache.hudi.table.action.commit.BucketInfo;
|
||||||
@@ -103,6 +102,8 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
|
|
||||||
private final boolean isChangingRecords;
|
private final boolean isChangingRecords;
|
||||||
|
|
||||||
|
private final boolean bootstrapIndex;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State to book-keep which partition is loaded into the index state {@code indexState}.
|
* State to book-keep which partition is loaded into the index state {@code indexState}.
|
||||||
*/
|
*/
|
||||||
@@ -112,6 +113,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.isChangingRecords = WriteOperationType.isChangingRecords(
|
this.isChangingRecords = WriteOperationType.isChangingRecords(
|
||||||
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
|
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
|
||||||
|
this.bootstrapIndex = conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -143,9 +145,11 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
TypeInformation.of(HoodieKey.class),
|
TypeInformation.of(HoodieKey.class),
|
||||||
TypeInformation.of(HoodieRecordLocation.class));
|
TypeInformation.of(HoodieRecordLocation.class));
|
||||||
indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
|
indexState = context.getKeyedStateStore().getMapState(indexStateDesc);
|
||||||
MapStateDescriptor<String, Integer> partitionLoadStateDesc =
|
if (bootstrapIndex) {
|
||||||
new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
|
MapStateDescriptor<String, Integer> partitionLoadStateDesc =
|
||||||
partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
|
new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
|
||||||
|
partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@@ -159,7 +163,9 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
final BucketInfo bucketInfo;
|
final BucketInfo bucketInfo;
|
||||||
final HoodieRecordLocation location;
|
final HoodieRecordLocation location;
|
||||||
|
|
||||||
if (!partitionLoadState.contains(hoodieKey.getPartitionPath())) {
|
// The dataset may be huge, thus the processing would block for long,
|
||||||
|
// disabled by default.
|
||||||
|
if (bootstrapIndex && !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
|
||||||
// If the partition records are never loaded, load the records first.
|
// If the partition records are never loaded, load the records first.
|
||||||
loadRecords(hoodieKey.getPartitionPath());
|
loadRecords(hoodieKey.getPartitionPath());
|
||||||
}
|
}
|
||||||
@@ -205,6 +211,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
* @throws Exception when error occurs for state update
|
* @throws Exception when error occurs for state update
|
||||||
*/
|
*/
|
||||||
private void loadRecords(String partitionPath) throws Exception {
|
private void loadRecords(String partitionPath) throws Exception {
|
||||||
|
LOG.info("Start loading records under partition {} into the index state", partitionPath);
|
||||||
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
|
HoodieTable<?, ?, ?, ?> hoodieTable = bucketAssigner.getTable();
|
||||||
List<HoodieBaseFile> latestBaseFiles =
|
List<HoodieBaseFile> latestBaseFiles =
|
||||||
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
|
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionPath, hoodieTable);
|
||||||
@@ -212,8 +219,16 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
|
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
|
||||||
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
|
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||||
for (HoodieBaseFile baseFile : latestBaseFiles) {
|
for (HoodieBaseFile baseFile : latestBaseFiles) {
|
||||||
List<HoodieKey> hoodieKeys =
|
final List<HoodieKey> hoodieKeys;
|
||||||
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
|
try {
|
||||||
|
hoodieKeys =
|
||||||
|
ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hadoopConf, new Path(baseFile.getPath()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
// in case there was some empty parquet file when the pipeline
|
||||||
|
// crushes exceptionally.
|
||||||
|
LOG.error("Error when loading record keys from file: {}", baseFile);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
hoodieKeys.forEach(hoodieKey -> {
|
hoodieKeys.forEach(hoodieKey -> {
|
||||||
try {
|
try {
|
||||||
// Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
|
// Reference: org.apache.flink.streaming.api.datastream.KeyedStream,
|
||||||
@@ -224,12 +239,13 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
|
|||||||
this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
|
this.indexState.put(hoodieKey, new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieIOException("Error when load record keys from file: " + baseFile);
|
LOG.error("Error when putting record keys into the state from file: {}", baseFile);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
// Mark the partition path as loaded.
|
// Mark the partition path as loaded.
|
||||||
partitionLoadState.put(partitionPath, 0);
|
partitionLoadState.put(partitionPath, 0);
|
||||||
|
LOG.info("Finish loading records under partition {} into the index state", partitionPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning {
|
|||||||
.transform("compact_task",
|
.transform("compact_task",
|
||||||
TypeInformation.of(CompactionCommitEvent.class),
|
TypeInformation.of(CompactionCommitEvent.class),
|
||||||
new KeyedProcessOperator<>(new CompactFunction(conf)))
|
new KeyedProcessOperator<>(new CompactFunction(conf)))
|
||||||
|
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
|
||||||
.addSink(new CompactionCommitSink(conf))
|
.addSink(new CompactionCommitSink(conf))
|
||||||
.name("compact_commit")
|
.name("compact_commit")
|
||||||
.setParallelism(1); // compaction commit should be singleton
|
.setParallelism(1); // compaction commit should be singleton
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
|
|||||||
import org.apache.hudi.common.util.TablePathUtils;
|
import org.apache.hudi.common.util.TablePathUtils;
|
||||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
import org.apache.hudi.config.HoodieMemoryConfig;
|
import org.apache.hudi.config.HoodieMemoryConfig;
|
||||||
|
import org.apache.hudi.config.HoodieStorageConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
@@ -210,6 +211,9 @@ public class StreamerUtil {
|
|||||||
conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L
|
conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L
|
||||||
).build())
|
).build())
|
||||||
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
|
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
|
.withStorageConfig(HoodieStorageConfig.newBuilder()
|
||||||
|
.logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024)
|
||||||
|
.build())
|
||||||
.withAutoCommit(false)
|
.withAutoCommit(false)
|
||||||
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
|
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
|
||||||
|
|
||||||
|
|||||||
@@ -449,6 +449,10 @@ public class TestWriteCopyOnWrite {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIndexStateBootstrap() throws Exception {
|
public void testIndexStateBootstrap() throws Exception {
|
||||||
|
// reset the config option
|
||||||
|
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
|
||||||
|
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);
|
||||||
|
|
||||||
// open the function and ingest data
|
// open the function and ingest data
|
||||||
funcWrapper.openFunction();
|
funcWrapper.openFunction();
|
||||||
for (RowData rowData : TestData.DATA_SET_INSERT) {
|
for (RowData rowData : TestData.DATA_SET_INSERT) {
|
||||||
|
|||||||
@@ -169,6 +169,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||||
options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
|
options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
|
||||||
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
|
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
|
||||||
|
options.put(FlinkOptions.COMPACTION_TASKS.key(), "1");
|
||||||
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
|
||||||
streamTableEnv.executeSql(hoodieTableDDL);
|
streamTableEnv.executeSql(hoodieTableDDL);
|
||||||
String insertInto = "insert into t1 select * from source";
|
String insertInto = "insert into t1 select * from source";
|
||||||
@@ -180,7 +181,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testStreamWriteWithCleaning() throws InterruptedException {
|
void testStreamWriteWithCleaning() {
|
||||||
// create filesystem table named source
|
// create filesystem table named source
|
||||||
|
|
||||||
// the source generates 4 commits but the cleaning task
|
// the source generates 4 commits but the cleaning task
|
||||||
|
|||||||
Reference in New Issue
Block a user