diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java index 48a0ff082..a2bcbf5d2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java @@ -44,8 +44,8 @@ import java.util.stream.Collectors; */ public class SparkUpsertDeltaCommitPartitioner> extends UpsertPartitioner { - SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEngineContext context, HoodieTable table, - HoodieWriteConfig config) { + public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEngineContext context, HoodieTable table, + HoodieWriteConfig config) { super(profile, context, table, config); } @@ -79,10 +79,10 @@ public class SparkUpsertDeltaCommitPartitioner> allSmallFileSlices.add(smallFileSlice.get()); } } else { - // If we can index log files, we can add more inserts to log files for fileIds including those under - // pending compaction. + // If we can index log files, we can add more inserts to log files for fileIds NOT including those under + // pending compaction List allFileSlices = - table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) + table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), false) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(fileSlice)) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index f40a97c0b..c495ff600 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -25,15 +26,20 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.CompactionTestUtils; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.avro.Schema; @@ -52,6 +58,7 @@ import java.util.Map; import scala.Tuple2; +import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord; @@ -306,6 +313,83 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { assertInsertBuckets(weights, cumulativeWeights, insertBuckets); } + @Test + public void testUpsertPartitionerWithSmallFileHandlingWithInflightCompactionWithCanIndexLogFiles() throws Exception { + // Note this is used because it is same partition path used in CompactionTestUtils.createCompactionPlan() + final String testPartitionPath = DEFAULT_PARTITION_PATHS[0]; + + HoodieWriteConfig config = makeHoodieClientConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.HBASE) + .withHBaseIndexConfig(HoodieHBaseIndexConfig.newBuilder().build()) + .build()) + .build(); + + // This will generate initial commits and create a compaction plan which includes file groups created as part of this + HoodieCompactionPlan plan = CompactionTestUtils.createCompactionPlan(metaClient, "001", "002", 1, true, false); + FileCreateUtils.createRequestedCompactionCommit(basePath, "002", plan); + // Simulate one more commit so that inflight compaction is considered when building file groups in file system view + // + FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "2", 1); + FileCreateUtils.createCommit(basePath, "003"); + + // Partitioner will attempt to assign inserts to file groups including base file created by inflight compaction + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + List insertRecords = dataGenerator.generateInserts("004", 100); + WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords))); + + HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient); + SparkUpsertDeltaCommitPartitioner partitioner = new SparkUpsertDeltaCommitPartitioner(profile, context, table, config); + + assertEquals(1, partitioner.numPartitions(), "Should have 1 partitions"); + assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType, + "Bucket 0 is UPDATE"); + assertEquals("2", partitioner.getBucketInfo(0).fileIdPrefix, + "Should be assigned to only file id not pending compaction which is 2"); + } + + @Test + public void testUpsertPartitionerWithSmallFileHandlingWithCanIndexLogFiles() throws Exception { + // Note this is used because it is same partition path used in CompactionTestUtils.createCompactionPlan() + final String testPartitionPath = DEFAULT_PARTITION_PATHS[0]; + + HoodieWriteConfig config = makeHoodieClientConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024).build()) + .withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(1024).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.HBASE) + .withHBaseIndexConfig(HoodieHBaseIndexConfig.newBuilder().build()) + .build()) + .build(); + + // Create file group with only one log file + FileCreateUtils.createLogFile(basePath, testPartitionPath, "001", "fg1", 1); + FileCreateUtils.createDeltaCommit(basePath, "001"); + // Create another file group size set to max parquet file size so should not be considered during small file sizing + FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "fg2", 1024); + FileCreateUtils.createCommit(basePath, "002"); + FileCreateUtils.createLogFile(basePath, testPartitionPath, "003", "fg2", 1); + FileCreateUtils.createDeltaCommit(basePath, "003"); + + // Partitioner will attempt to assign inserts to file groups including base file created by inflight compaction + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + // Default estimated record size will be 1024 based on last file group created. Only 1 record can be added to small file + List insertRecords = dataGenerator.generateInserts("004", 1); + WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords))); + + HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient); + SparkUpsertDeltaCommitPartitioner partitioner = new SparkUpsertDeltaCommitPartitioner(profile, context, table, config); + + assertEquals(1, partitioner.numPartitions(), "Should have 1 partitions"); + assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType, + "Bucket 0 should be UPDATE"); + assertEquals("fg1", partitioner.getBucketInfo(0).fileIdPrefix, + "Insert should be assigned to fg1"); + } + private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() { // Prepare the AvroParquetIO return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index b7754b0c6..211bb8fab 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.fs.FSUtils; @@ -50,6 +51,7 @@ import java.time.Instant; import java.util.HashMap; import java.util.Map; +import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCompactionPlan; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan; import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeRequestedReplaceMetadata; @@ -177,6 +179,10 @@ public class FileCreateUtils { } } + public static void createRequestedCompactionCommit(String basePath, String instantTime, HoodieCompactionPlan requestedCompactionPlan) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMPACTION_EXTENSION, serializeCompactionPlan(requestedCompactionPlan).get()); + } + public static void createCleanFile(String basePath, String instantTime, HoodieCleanMetadata metadata) throws IOException { createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, serializeCleanMetadata(metadata).get()); }