[HUDI-1800] Exclude file slices in pending compaction when performing small file sizing (#2902)
Co-authored-by: Ryan Pifer <ryanpife@amazon.com>
This commit is contained in:
@@ -44,8 +44,8 @@ import java.util.stream.Collectors;
|
|||||||
*/
|
*/
|
||||||
public class SparkUpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>> extends UpsertPartitioner<T> {
|
public class SparkUpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>> extends UpsertPartitioner<T> {
|
||||||
|
|
||||||
SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEngineContext context, HoodieTable table,
|
public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEngineContext context, HoodieTable table,
|
||||||
HoodieWriteConfig config) {
|
HoodieWriteConfig config) {
|
||||||
super(profile, context, table, config);
|
super(profile, context, table, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -79,10 +79,10 @@ public class SparkUpsertDeltaCommitPartitioner<T extends HoodieRecordPayload<T>>
|
|||||||
allSmallFileSlices.add(smallFileSlice.get());
|
allSmallFileSlices.add(smallFileSlice.get());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If we can index log files, we can add more inserts to log files for fileIds including those under
|
// If we can index log files, we can add more inserts to log files for fileIds NOT including those under
|
||||||
// pending compaction.
|
// pending compaction
|
||||||
List<FileSlice> allFileSlices =
|
List<FileSlice> allFileSlices =
|
||||||
table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true)
|
table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
for (FileSlice fileSlice : allFileSlices) {
|
for (FileSlice fileSlice : allFileSlices) {
|
||||||
if (isSmallFile(fileSlice)) {
|
if (isSmallFile(fileSlice)) {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table.action.commit;
|
package org.apache.hudi.table.action.commit;
|
||||||
|
|
||||||
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
@@ -25,15 +26,20 @@ import org.apache.hudi.common.model.HoodieWriteStat;
|
|||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.testutils.CompactionTestUtils;
|
||||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||||
|
import org.apache.hudi.config.HoodieIndexConfig;
|
||||||
|
import org.apache.hudi.config.HoodieHBaseIndexConfig;
|
||||||
import org.apache.hudi.config.HoodieStorageConfig;
|
import org.apache.hudi.config.HoodieStorageConfig;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
|
import org.apache.hudi.index.HoodieIndex;
|
||||||
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
|
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
|
||||||
import org.apache.hudi.table.HoodieSparkTable;
|
import org.apache.hudi.table.HoodieSparkTable;
|
||||||
import org.apache.hudi.table.WorkloadProfile;
|
import org.apache.hudi.table.WorkloadProfile;
|
||||||
|
import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner;
|
||||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
@@ -52,6 +58,7 @@ import java.util.Map;
|
|||||||
|
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
|
||||||
import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
|
import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
|
||||||
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
|
||||||
import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
|
import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
|
||||||
@@ -306,6 +313,83 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
|||||||
assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
|
assertInsertBuckets(weights, cumulativeWeights, insertBuckets);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpsertPartitionerWithSmallFileHandlingWithInflightCompactionWithCanIndexLogFiles() throws Exception {
|
||||||
|
// Note this is used because it is same partition path used in CompactionTestUtils.createCompactionPlan()
|
||||||
|
final String testPartitionPath = DEFAULT_PARTITION_PATHS[0];
|
||||||
|
|
||||||
|
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024).build())
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder()
|
||||||
|
.withIndexType(HoodieIndex.IndexType.HBASE)
|
||||||
|
.withHBaseIndexConfig(HoodieHBaseIndexConfig.newBuilder().build())
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// This will generate initial commits and create a compaction plan which includes file groups created as part of this
|
||||||
|
HoodieCompactionPlan plan = CompactionTestUtils.createCompactionPlan(metaClient, "001", "002", 1, true, false);
|
||||||
|
FileCreateUtils.createRequestedCompactionCommit(basePath, "002", plan);
|
||||||
|
// Simulate one more commit so that inflight compaction is considered when building file groups in file system view
|
||||||
|
//
|
||||||
|
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "2", 1);
|
||||||
|
FileCreateUtils.createCommit(basePath, "003");
|
||||||
|
|
||||||
|
// Partitioner will attempt to assign inserts to file groups including base file created by inflight compaction
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||||
|
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("004", 100);
|
||||||
|
WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
|
||||||
|
|
||||||
|
HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient);
|
||||||
|
SparkUpsertDeltaCommitPartitioner partitioner = new SparkUpsertDeltaCommitPartitioner(profile, context, table, config);
|
||||||
|
|
||||||
|
assertEquals(1, partitioner.numPartitions(), "Should have 1 partitions");
|
||||||
|
assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
|
||||||
|
"Bucket 0 is UPDATE");
|
||||||
|
assertEquals("2", partitioner.getBucketInfo(0).fileIdPrefix,
|
||||||
|
"Should be assigned to only file id not pending compaction which is 2");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpsertPartitionerWithSmallFileHandlingWithCanIndexLogFiles() throws Exception {
|
||||||
|
// Note this is used because it is same partition path used in CompactionTestUtils.createCompactionPlan()
|
||||||
|
final String testPartitionPath = DEFAULT_PARTITION_PATHS[0];
|
||||||
|
|
||||||
|
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
|
||||||
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024).build())
|
||||||
|
.withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024).build())
|
||||||
|
.withIndexConfig(HoodieIndexConfig.newBuilder()
|
||||||
|
.withIndexType(HoodieIndex.IndexType.HBASE)
|
||||||
|
.withHBaseIndexConfig(HoodieHBaseIndexConfig.newBuilder().build())
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Create file group with only one log file
|
||||||
|
FileCreateUtils.createLogFile(basePath, testPartitionPath, "001", "fg1", 1);
|
||||||
|
FileCreateUtils.createDeltaCommit(basePath, "001");
|
||||||
|
// Create another file group size set to max parquet file size so should not be considered during small file sizing
|
||||||
|
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "fg2", 1024);
|
||||||
|
FileCreateUtils.createCommit(basePath, "002");
|
||||||
|
FileCreateUtils.createLogFile(basePath, testPartitionPath, "003", "fg2", 1);
|
||||||
|
FileCreateUtils.createDeltaCommit(basePath, "003");
|
||||||
|
|
||||||
|
// Partitioner will attempt to assign inserts to file groups including base file created by inflight compaction
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||||
|
// Default estimated record size will be 1024 based on last file group created. Only 1 record can be added to small file
|
||||||
|
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("004", 1);
|
||||||
|
WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
|
||||||
|
|
||||||
|
HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient);
|
||||||
|
SparkUpsertDeltaCommitPartitioner partitioner = new SparkUpsertDeltaCommitPartitioner(profile, context, table, config);
|
||||||
|
|
||||||
|
assertEquals(1, partitioner.numPartitions(), "Should have 1 partitions");
|
||||||
|
assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
|
||||||
|
"Bucket 0 should be UPDATE");
|
||||||
|
assertEquals("fg1", partitioner.getBucketInfo(0).fileIdPrefix,
|
||||||
|
"Insert should be assigned to fg1");
|
||||||
|
}
|
||||||
|
|
||||||
private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
|
private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() {
|
||||||
// Prepare the AvroParquetIO
|
// Prepare the AvroParquetIO
|
||||||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString());
|
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString());
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils;
|
|||||||
|
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||||
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
@@ -50,6 +51,7 @@ import java.time.Instant;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCompactionPlan;
|
||||||
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata;
|
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata;
|
||||||
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan;
|
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan;
|
||||||
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRequestedReplaceMetadata;
|
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRequestedReplaceMetadata;
|
||||||
@@ -177,6 +179,10 @@ public class FileCreateUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void createRequestedCompactionCommit(String basePath, String instantTime, HoodieCompactionPlan requestedCompactionPlan) throws IOException {
|
||||||
|
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION, serializeCompactionPlan(requestedCompactionPlan).get());
|
||||||
|
}
|
||||||
|
|
||||||
public static void createCleanFile(String basePath, String instantTime, HoodieCleanMetadata metadata) throws IOException {
|
public static void createCleanFile(String basePath, String instantTime, HoodieCleanMetadata metadata) throws IOException {
|
||||||
createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, serializeCleanMetadata(metadata).get());
|
createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, serializeCleanMetadata(metadata).get());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user