[HUDI-2472] Enabling metadata table for TestHoodieMergeOnReadTable and TestHoodieCompactor (#4023)
This commit is contained in:
committed by
GitHub
parent
459b34240b
commit
c8617d9390
@@ -34,13 +34,14 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.SyncableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.Transformations;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
||||
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
|
||||
import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
|
||||
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
|
||||
@@ -63,6 +64,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -190,11 +192,13 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
@ValueSource(booleans = {true})
|
||||
public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws Exception {
|
||||
// insert 100 records
|
||||
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build());
|
||||
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true)
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
|
||||
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
|
||||
HoodieWriteConfig config = cfgBuilder.build();
|
||||
|
||||
@@ -208,37 +212,40 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
|
||||
|
||||
// Update all the 100 records
|
||||
newCommitTime = "101";
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
|
||||
JavaRDD<HoodieRecord> updatedRecordsRDD = jsc().parallelize(updatedRecords, 1);
|
||||
|
||||
HoodieReadClient readClient = new HoodieReadClient(context(), config);
|
||||
updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
|
||||
JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = readClient.tagLocation(updatedRecordsRDD);
|
||||
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
|
||||
|
||||
// Write them to corresponding avro logfiles
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient);
|
||||
HoodieSparkWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)
|
||||
.withLogAppends(updatedRecords);
|
||||
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
|
||||
((SyncableFileSystemView) (table.getSliceView())).reset();
|
||||
|
||||
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(
|
||||
writeClient.getEngineContext().getHadoopConf().get(), config, writeClient.getEngineContext());
|
||||
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable
|
||||
.of(metaClient, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, metadataWriter);
|
||||
|
||||
Set<String> allPartitions = updatedRecords.stream()
|
||||
.map(record -> record.getPartitionPath())
|
||||
.collect(Collectors.groupingBy(partitionPath -> partitionPath))
|
||||
.keySet();
|
||||
assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length);
|
||||
|
||||
// Verify that all data file has one log file
|
||||
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient, true);
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
List<FileSlice> groupedLogFiles =
|
||||
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
|
||||
for (FileSlice fileSlice : groupedLogFiles) {
|
||||
assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file");
|
||||
assertEquals(1, fileSlice.getLogFiles().count(),
|
||||
"There should be 1 log file written for the latest data file - " + fileSlice);
|
||||
}
|
||||
}
|
||||
|
||||
// Mark 2nd delta-instant as completed
|
||||
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT,
|
||||
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
|
||||
metaClient.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
|
||||
|
||||
// Do a compaction
|
||||
String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
|
||||
JavaRDD<WriteStatus> result = (JavaRDD<WriteStatus>) writeClient.compact(compactionInstantTime);
|
||||
|
||||
@@ -46,7 +46,6 @@ import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
@@ -57,9 +56,6 @@ import org.junit.jupiter.api.Test;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
@@ -163,7 +159,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
// insert 100 records
|
||||
HoodieWriteConfig config = getConfigBuilder()
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
|
||||
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
|
||||
.build();
|
||||
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
|
||||
String newCommitTime = "100";
|
||||
@@ -176,19 +172,14 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
// Update all the 100 records
|
||||
HoodieTable table = HoodieSparkTable.create(config, context);
|
||||
newCommitTime = "101";
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
|
||||
List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
|
||||
JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
|
||||
HoodieIndex index = new HoodieBloomIndex<>(config, SparkHoodieBloomIndexHelper.getInstance());
|
||||
updatedRecords = tagLocation(index, updatedRecordsRDD, table).collect();
|
||||
JavaRDD<HoodieRecord> updatedTaggedRecordsRDD = tagLocation(index, updatedRecordsRDD, table);
|
||||
|
||||
// Write them to corresponding avro logfiles. Also, set the state transition properly.
|
||||
HoodieSparkWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)
|
||||
.withLogAppends(updatedRecords);
|
||||
metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
|
||||
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
|
||||
writeClient.commit(newCommitTime, jsc.emptyRDD(), Option.empty());
|
||||
writeClient.startCommitWithTime(newCommitTime);
|
||||
writeClient.upsertPreppedRecords(updatedTaggedRecordsRDD, newCommitTime).collect();
|
||||
metaClient.reloadActiveTimeline();
|
||||
|
||||
// Verify that all data file has one log file
|
||||
@@ -200,9 +191,6 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file");
|
||||
}
|
||||
}
|
||||
createDeltaCommit(basePath, newCommitTime);
|
||||
createRequestedDeltaCommit(basePath, newCommitTime);
|
||||
createInflightDeltaCommit(basePath, newCommitTime);
|
||||
|
||||
// Do a compaction
|
||||
table = HoodieSparkTable.create(config, context);
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
@@ -39,12 +40,20 @@ import java.util.UUID;
|
||||
public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieSparkWriteableTestTable.class);
|
||||
|
||||
private HoodieSparkWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
|
||||
super(basePath, fs, metaClient, schema, filter);
|
||||
private HoodieSparkWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema,
|
||||
BloomFilter filter, HoodieTableMetadataWriter metadataWriter) {
|
||||
super(basePath, fs, metaClient, schema, filter, metadataWriter);
|
||||
}
|
||||
|
||||
public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
|
||||
return new HoodieSparkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter);
|
||||
return new HoodieSparkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(),
|
||||
metaClient, schema, filter, null);
|
||||
}
|
||||
|
||||
public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter,
|
||||
HoodieTableMetadataWriter metadataWriter) {
|
||||
return new HoodieSparkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(),
|
||||
metaClient, schema, filter, metadataWriter);
|
||||
}
|
||||
|
||||
public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) {
|
||||
@@ -53,6 +62,13 @@ public class HoodieSparkWriteableTestTable extends HoodieWriteableTestTable {
|
||||
return of(metaClient, schema, filter);
|
||||
}
|
||||
|
||||
public static HoodieSparkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema,
|
||||
HoodieTableMetadataWriter metadataWriter) {
|
||||
BloomFilter filter = BloomFilterFactory
|
||||
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
|
||||
return of(metaClient, schema, filter, metadataWriter);
|
||||
}
|
||||
|
||||
public static HoodieSparkWriteableTestTable of(HoodieTable hoodieTable, Schema schema) {
|
||||
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
||||
return of(metaClient, schema);
|
||||
|
||||
Reference in New Issue
Block a user