[HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable (#2112)
Remove APIs in HoodieTestUtils - HoodieTestUtils#createInflightCommitFiles - HoodieTestUtils#getCommitFilePath - HoodieTestUtils#doesCommitExist and migrate usages to HoodieTestTable in - hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java - hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java - hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java - hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java - hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
This commit is contained in:
@@ -33,7 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.common.testutils.RawTripTestPayload;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
@@ -53,20 +53,22 @@ import org.apache.hudi.table.MarkerFiles;
|
||||
import org.apache.hudi.table.action.commit.WriteHelper;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
import org.apache.hudi.testutils.HoodieWriteableTestTable;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -102,6 +104,12 @@ import static org.mockito.Mockito.when;
|
||||
public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
|
||||
private HoodieTestTable testTable;
|
||||
|
||||
@BeforeEach
|
||||
public void setUpTestTable() {
|
||||
testTable = HoodieWriteableTestTable.of(metaClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Auto Commit behavior for HoodieWriteClient insert API.
|
||||
@@ -170,10 +178,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
JavaRDD<WriteStatus> result = insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, writeFn,
|
||||
isPrepped, false, numRecords);
|
||||
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, newCommitTime),
|
||||
assertFalse(testTable.commitExists(newCommitTime),
|
||||
"If Autocommit is false, then commit should not be made automatically");
|
||||
assertTrue(client.commit(newCommitTime, result), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, newCommitTime),
|
||||
assertTrue(testTable.commitExists(newCommitTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
}
|
||||
}
|
||||
@@ -985,7 +993,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, instantTime);
|
||||
|
||||
assertTrue(client.commit(instantTime, result), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
assertTrue(testTable.commitExists(instantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
|
||||
// Get parquet file paths from commit metadata
|
||||
@@ -998,16 +1006,14 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
Collection<String> commitPathNames = commitMetadata.getFileIdAndFullPaths(basePath).values();
|
||||
|
||||
// Read from commit file
|
||||
String filename = HoodieTestUtils.getCommitFilePath(basePath, instantTime);
|
||||
FileInputStream inputStream = new FileInputStream(filename);
|
||||
String everything = FileIOUtils.readAsUTFString(inputStream);
|
||||
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
|
||||
HashMap<String, String> paths = metadata.getFileIdAndFullPaths(basePath);
|
||||
inputStream.close();
|
||||
|
||||
// Compare values in both to make sure they are equal.
|
||||
for (String pathName : paths.values()) {
|
||||
assertTrue(commitPathNames.contains(pathName));
|
||||
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime))) {
|
||||
String everything = FileIOUtils.readAsUTFString(inputStream);
|
||||
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
|
||||
HashMap<String, String> paths = metadata.getFileIdAndFullPaths(basePath);
|
||||
// Compare values in both to make sure they are equal.
|
||||
for (String pathName : paths.values()) {
|
||||
assertTrue(commitPathNames.contains(pathName));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1017,65 +1023,61 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
*/
|
||||
@Test
|
||||
public void testMetadataStatsOnCommit() throws Exception {
|
||||
|
||||
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
|
||||
HoodieWriteClient client = getHoodieWriteClient(cfg);
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
|
||||
|
||||
String instantTime = "000";
|
||||
client.startCommitWithTime(instantTime);
|
||||
String instantTime0 = "000";
|
||||
client.startCommitWithTime(instantTime0);
|
||||
|
||||
List<HoodieRecord> records = dataGen.generateInserts(instantTime, 200);
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||
List<HoodieRecord> records0 = dataGen.generateInserts(instantTime0, 200);
|
||||
JavaRDD<HoodieRecord> writeRecords0 = jsc.parallelize(records0, 1);
|
||||
JavaRDD<WriteStatus> result0 = client.bulkInsert(writeRecords0, instantTime0);
|
||||
|
||||
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, instantTime);
|
||||
|
||||
assertTrue(client.commit(instantTime, result), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
assertTrue(client.commit(instantTime0, result0), "Commit should succeed");
|
||||
assertTrue(testTable.commitExists(instantTime0),
|
||||
"After explicit commit, commit file should be created");
|
||||
|
||||
// Read from commit file
|
||||
String filename = HoodieTestUtils.getCommitFilePath(basePath, instantTime);
|
||||
FileInputStream inputStream = new FileInputStream(filename);
|
||||
String everything = FileIOUtils.readAsUTFString(inputStream);
|
||||
HoodieCommitMetadata metadata =
|
||||
HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class);
|
||||
int inserts = 0;
|
||||
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
|
||||
for (HoodieWriteStat stat : pstat.getValue()) {
|
||||
inserts += stat.getNumInserts();
|
||||
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime0))) {
|
||||
String everything = FileIOUtils.readAsUTFString(inputStream);
|
||||
HoodieCommitMetadata metadata =
|
||||
HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
|
||||
int inserts = 0;
|
||||
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
|
||||
for (HoodieWriteStat stat : pstat.getValue()) {
|
||||
inserts += stat.getNumInserts();
|
||||
}
|
||||
}
|
||||
assertEquals(200, inserts);
|
||||
}
|
||||
assertEquals(200, inserts);
|
||||
|
||||
// Update + Inserts such that they just expand file1
|
||||
instantTime = "001";
|
||||
client.startCommitWithTime(instantTime);
|
||||
String instantTime1 = "001";
|
||||
client.startCommitWithTime(instantTime1);
|
||||
|
||||
records = dataGen.generateUpdates(instantTime, records);
|
||||
writeRecords = jsc.parallelize(records, 1);
|
||||
result = client.upsert(writeRecords, instantTime);
|
||||
List<HoodieRecord> records1 = dataGen.generateUpdates(instantTime1, records0);
|
||||
JavaRDD<HoodieRecord> writeRecords1 = jsc.parallelize(records1, 1);
|
||||
JavaRDD<WriteStatus> result1 = client.upsert(writeRecords1, instantTime1);
|
||||
|
||||
assertTrue(client.commit(instantTime, result), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
assertTrue(client.commit(instantTime1, result1), "Commit should succeed");
|
||||
assertTrue(testTable.commitExists(instantTime1),
|
||||
"After explicit commit, commit file should be created");
|
||||
|
||||
// Read from commit file
|
||||
filename = HoodieTestUtils.getCommitFilePath(basePath, instantTime);
|
||||
inputStream = new FileInputStream(filename);
|
||||
everything = FileIOUtils.readAsUTFString(inputStream);
|
||||
metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class);
|
||||
inserts = 0;
|
||||
int upserts = 0;
|
||||
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
|
||||
for (HoodieWriteStat stat : pstat.getValue()) {
|
||||
inserts += stat.getNumInserts();
|
||||
upserts += stat.getNumUpdateWrites();
|
||||
try (FSDataInputStream inputStream = fs.open(testTable.getCommitFilePath(instantTime1))) {
|
||||
String everything = FileIOUtils.readAsUTFString(inputStream);
|
||||
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything, HoodieCommitMetadata.class);
|
||||
int inserts = 0;
|
||||
int upserts = 0;
|
||||
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
|
||||
for (HoodieWriteStat stat : pstat.getValue()) {
|
||||
inserts += stat.getNumInserts();
|
||||
upserts += stat.getNumUpdateWrites();
|
||||
}
|
||||
}
|
||||
assertEquals(0, inserts);
|
||||
assertEquals(200, upserts);
|
||||
}
|
||||
assertEquals(0, inserts);
|
||||
assertEquals(200, upserts);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1095,13 +1097,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
metaClient.getFs().delete(result.getKey(), false);
|
||||
if (!enableOptimisticConsistencyGuard) {
|
||||
assertTrue(client.commit(instantTime, result.getRight()), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
assertTrue(testTable.commitExists(instantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
// Marker directory must be removed
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
} else {
|
||||
// with optimistic, first client.commit should have succeeded.
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
assertTrue(testTable.commitExists(instantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
// Marker directory must be removed
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
@@ -1124,13 +1126,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
if (!enableOptimisticConsistencyGuard) {
|
||||
// Rollback of this commit should succeed with FailSafeCG
|
||||
client.rollback(instantTime);
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
assertFalse(testTable.commitExists(instantTime),
|
||||
"After explicit rollback, commit file should not be present");
|
||||
// Marker directory must be removed after rollback
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
} else {
|
||||
// if optimistic CG is enabled, commit should have succeeded.
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
assertTrue(testTable.commitExists(instantTime),
|
||||
"With optimistic CG, first commit should succeed. commit file should be present");
|
||||
// Marker directory must be removed after rollback
|
||||
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(instantTime))));
|
||||
@@ -1145,7 +1147,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
} else {
|
||||
// rollback of a completed commit should succeed if using list based rollback
|
||||
client.rollback(instantTime);
|
||||
assertFalse(HoodieTestUtils.doesCommitExist(basePath, instantTime),
|
||||
assertFalse(testTable.commitExists(instantTime),
|
||||
"After explicit rollback, commit file should not be present");
|
||||
}
|
||||
}
|
||||
@@ -1216,7 +1218,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1);
|
||||
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, firstInstantTime);
|
||||
assertTrue(client.commit(firstInstantTime, result), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, firstInstantTime),
|
||||
assertTrue(testTable.commitExists(firstInstantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
|
||||
// Check the entire dataset has all records still
|
||||
@@ -1235,7 +1237,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
JavaRDD<WriteStatus> inserts = client.bulkInsert(insertRecords, nextInstantTime);
|
||||
JavaRDD<WriteStatus> upserts = client.upsert(updateRecords, nextInstantTime);
|
||||
assertTrue(client.commit(nextInstantTime, inserts.union(upserts)), "Commit should succeed");
|
||||
assertTrue(HoodieTestUtils.doesCommitExist(basePath, firstInstantTime),
|
||||
assertTrue(testTable.commitExists(firstInstantTime),
|
||||
"After explicit commit, commit file should be created");
|
||||
int totalRecords = 2 * numRecords;
|
||||
assertEquals(totalRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
|
||||
|
||||
Reference in New Issue
Block a user