[HUDI-3692] MetadataFileSystemView includes compaction in timeline (#5110)
This commit is contained in:
@@ -21,6 +21,9 @@ package org.apache.hudi.table.functional;
|
||||
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.config.HoodieMetadataConfig;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
@@ -43,12 +46,16 @@ import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
|
||||
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
|
||||
@@ -56,6 +63,17 @@ import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
|
||||
@Tag("functional")
|
||||
public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFunctionalTestHarness {
|
||||
|
||||
private static Stream<Arguments> writeLogTest() {
|
||||
// enable metadata table, enable embedded time line server
|
||||
Object[][] data = new Object[][] {
|
||||
{true, true},
|
||||
{true, false},
|
||||
{false, true},
|
||||
{false, false}
|
||||
};
|
||||
return Stream.of(data).map(Arguments::of);
|
||||
}
|
||||
|
||||
private HoodieTestDataGenerator dataGen;
|
||||
private SparkRDDWriteClient client;
|
||||
private HoodieTableMetaClient metaClient;
|
||||
@@ -104,6 +122,44 @@ public class TestHoodieSparkMergeOnReadTableCompaction extends SparkClientFuncti
|
||||
Assertions.assertEquals(300, readTableTotalRecordsNum());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("writeLogTest")
|
||||
public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean enableTimelineServer) throws IOException {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
|
||||
.forTable("test-trip-table")
|
||||
.withPath(basePath())
|
||||
.withSchema(TRIP_EXAMPLE_SCHEMA)
|
||||
.withParallelism(2, 2)
|
||||
.withAutoCommit(true)
|
||||
.withEmbeddedTimelineServerEnabled(enableTimelineServer)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
|
||||
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
|
||||
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
|
||||
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
|
||||
client = getHoodieWriteClient(config);
|
||||
|
||||
final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
|
||||
|
||||
// initialize 100 records
|
||||
client.upsert(writeRecords, client.startCommit());
|
||||
// update 100 records
|
||||
client.upsert(writeRecords, client.startCommit());
|
||||
// schedule compaction
|
||||
client.scheduleCompaction(Option.empty());
|
||||
// delete 50 records
|
||||
List<HoodieKey> toBeDeleted = records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
|
||||
JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
|
||||
client.delete(deleteRecords, client.startCommit());
|
||||
// insert the same 100 records again
|
||||
client.upsert(writeRecords, client.startCommit());
|
||||
Assertions.assertEquals(100, readTableTotalRecordsNum());
|
||||
}
|
||||
|
||||
private long readTableTotalRecordsNum() {
|
||||
return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(),
|
||||
Arrays.stream(dataGen.getPartitionPaths()).map(p -> Paths.get(basePath(), p).toString()).collect(Collectors.toList()), basePath()).size();
|
||||
|
||||
@@ -162,7 +162,7 @@ public class FileSystemViewManager {
|
||||
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||
if (metadataConfig.enabled()) {
|
||||
ValidationUtils.checkArgument(metadataSupplier != null, "Metadata supplier is null. Cannot instantiate metadata file system view");
|
||||
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
|
||||
return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(),
|
||||
metadataSupplier.get());
|
||||
}
|
||||
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
|
||||
|
||||
Reference in New Issue
Block a user