1
0

[HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable (#2094)

Migrate deprecated APIs in HoodieTestUtils to HoodieTestTable for test classes
- TestClientRollback
- TestCopyOnWriteRollbackActionExecutor

Use FileCreateUtils APIs in CompactionTestUtils.

Then remove unused deprecated APIs after migration.
This commit is contained in:
Raymond Xu
2020-09-19 02:55:24 -07:00
committed by GitHub
parent 73e5b4c7bb
commit 7c45894f43
7 changed files with 241 additions and 233 deletions

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.common.testutils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -33,11 +32,12 @@ import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -49,6 +49,10 @@ import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
import static org.apache.hudi.common.testutils.FileCreateUtils.createBaseFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createLogFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -152,40 +156,36 @@ public class CompactionTestUtils {
.transitionCompactionRequestedToInflight(new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime));
}
public static HoodieCompactionPlan createCompactionPlan(HoodieTableMetaClient metaClient, String instantId,
String compactionInstantId, int numFileIds, boolean createDataFile, boolean deltaCommitsAfterCompactionRequests) {
public static HoodieCompactionPlan createCompactionPlan(HoodieTableMetaClient metaClient, String instantTime,
String compactionInstantTime, int numFileIds, boolean createDataFile, boolean deltaCommitsAfterCompactionRequests) {
List<HoodieCompactionOperation> ops = IntStream.range(0, numFileIds).boxed().map(idx -> {
try {
String fileId = UUID.randomUUID().toString();
final String basePath = metaClient.getBasePath();
final String partition = DEFAULT_PARTITION_PATHS[0];
final String fileId = UUID.randomUUID().toString();
if (createDataFile) {
HoodieTestUtils.createDataFile(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId);
createBaseFile(basePath, partition, instantTime, fileId);
}
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Option.of(1));
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Option.of(2));
FileSlice slice = new FileSlice(DEFAULT_PARTITION_PATHS[0], instantId, fileId);
createLogFile(basePath, partition, instantTime, fileId, 1);
createLogFile(basePath, partition, instantTime, fileId, 2);
FileSlice slice = new FileSlice(partition, instantTime, fileId);
if (createDataFile) {
slice.setBaseFile(new TestHoodieBaseFile(metaClient.getBasePath() + "/" + DEFAULT_PARTITION_PATHS[0] + "/"
+ FSUtils.makeDataFileName(instantId, TEST_WRITE_TOKEN, fileId)));
slice.setBaseFile(new DummyHoodieBaseFile(Paths.get(basePath, partition,
baseFileName(instantTime, fileId)).toString()));
}
String logFilePath1 = HoodieTestUtils.getLogFilePath(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Option.of(1));
String logFilePath2 = HoodieTestUtils.getLogFilePath(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
instantId, fileId, Option.of(2));
String logFilePath1 = Paths.get(basePath, partition, logFileName(instantTime, fileId, 1)).toString();
String logFilePath2 = Paths.get(basePath, partition, logFileName(instantTime, fileId, 2)).toString();
slice.addLogFile(new HoodieLogFile(new Path(logFilePath1)));
slice.addLogFile(new HoodieLogFile(new Path(logFilePath2)));
HoodieCompactionOperation op =
CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], slice, Option.empty());
CompactionUtils.buildFromFileSlice(partition, slice, Option.empty());
if (deltaCommitsAfterCompactionRequests) {
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
compactionInstantId, fileId, Option.of(1));
HoodieTestUtils.createNewLogFile(metaClient.getFs(), metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0],
compactionInstantId, fileId, Option.of(2));
createLogFile(basePath, partition, compactionInstantTime, fileId, 1);
createLogFile(basePath, partition, compactionInstantTime, fileId, 2);
}
return op;
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
} catch (Exception e) {
throw new HoodieException(e.getMessage(), e);
}
}).collect(Collectors.toList());
return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new HashMap<>(),
@@ -195,11 +195,11 @@ public class CompactionTestUtils {
/**
* The hoodie data file for testing.
*/
public static class TestHoodieBaseFile extends HoodieBaseFile {
public static class DummyHoodieBaseFile extends HoodieBaseFile {
private final String path;
public TestHoodieBaseFile(String path) {
public DummyHoodieBaseFile(String path) {
super(path);
this.path = path;
}

View File

@@ -19,9 +19,6 @@
package org.apache.hudi.common.testutils;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericFixed;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.fs.FSUtils;
@@ -37,9 +34,12 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -157,6 +157,10 @@ public class HoodieTestDataGenerator {
numKeysBySchema = new HashMap<>();
}
/**
* @implNote {@link HoodieTestDataGenerator} is supposed to just generate records with schemas. Leave HoodieTable files (metafile, basefile, logfile, etc) to {@link HoodieTestTable}.
* @deprecated Use {@link HoodieTestTable#withPartitionMetaFiles(java.lang.String...)} instead.
*/
public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths, String basePath) {
for (String partitionPath : partitionPaths) {
new HoodiePartitionMetadata(fs, "000", new Path(basePath), new Path(basePath, partitionPath)).trySave(0);

View File

@@ -218,6 +218,13 @@ public class HoodieTestTable {
return partitionFileIdMap;
}
public HoodieTestTable withBaseFilesInPartitions(Map<String, String> partitionAndFileId) throws Exception {
for (Map.Entry<String, String> pair : partitionAndFileId.entrySet()) {
withBaseFilesInPartition(pair.getKey(), pair.getValue());
}
return this;
}
public HoodieTestTable withBaseFilesInPartition(String partition, String... fileIds) throws Exception {
for (String f : fileIds) {
FileCreateUtils.createBaseFile(basePath, partition, currentInstantTime, f);
@@ -248,6 +255,30 @@ public class HoodieTestTable {
return this;
}
public boolean inflightCommitsExist(String... instantTime) {
return Arrays.stream(instantTime).allMatch(this::inflightCommitExists);
}
public boolean inflightCommitExists(String instantTime) {
try {
return fs.exists(getInflightCommitFilePath(instantTime));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public boolean commitsExist(String... instantTime) {
return Arrays.stream(instantTime).allMatch(this::commitExists);
}
public boolean commitExists(String instantTime) {
try {
return fs.exists(getCommitFilePath(instantTime));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public boolean baseFilesExist(Map<String, String> partitionAndFileId, String instantTime) {
return partitionAndFileId.entrySet().stream().allMatch(entry -> {
String partition = entry.getKey();
@@ -268,16 +299,16 @@ public class HoodieTestTable {
}
}
public Path getPartitionPath(String partition) {
return new Path(Paths.get(basePath, partition).toUri());
public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) {
return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v));
}
public String getBaseFileNameById(String fileId) {
return baseFileName(currentInstantTime, fileId);
}
public Path getBaseFilePath(String partition, String fileId) {
return new Path(Paths.get(basePath, partition, getBaseFileNameById(fileId)).toUri());
public boolean logFileExists(String partition, String instantTime, String fileId, int version) {
try {
return fs.exists(new Path(Paths.get(basePath, partition, logFileName(instantTime, fileId, version)).toString()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public Path getInflightCommitFilePath(String instantTime) {
@@ -292,16 +323,16 @@ public class HoodieTestTable {
return new Path(Paths.get(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME, instantTime + HoodieTimeline.REQUESTED_COMPACTION_EXTENSION).toUri());
}
public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) {
return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v));
public Path getPartitionPath(String partition) {
return new Path(Paths.get(basePath, partition).toUri());
}
public boolean logFileExists(String partition, String instantTime, String fileId, int version) {
try {
return fs.exists(new Path(Paths.get(basePath, partition, logFileName(instantTime, fileId, version)).toString()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
public Path getBaseFilePath(String partition, String fileId) {
return new Path(Paths.get(basePath, partition, getBaseFileNameById(fileId)).toUri());
}
public String getBaseFileNameById(String fileId) {
return baseFileName(currentInstantTime, fileId);
}
public List<FileStatus> listAllFiles(String partitionPath) throws IOException {

View File

@@ -260,31 +260,10 @@ public class HoodieTestUtils {
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String getDataFilePath(String basePath, String partitionPath, String instantTime, String fileID) {
return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID);
}
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String getLogFilePath(String basePath, String partitionPath, String instantTime, String fileID,
Option<Integer> version) {
return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", instantTime,
version.orElse(DEFAULT_LOG_VERSION), HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
}
public static String getCommitFilePath(String basePath, String instantTime) {
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION;
}
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static boolean doesDataFileExist(String basePath, String partitionPath, String instantTime,
String fileID) {
return new File(getDataFilePath(basePath, partitionPath, instantTime, fileID)).exists();
}
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
@@ -294,15 +273,6 @@ public class HoodieTestUtils {
.exists();
}
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static boolean doesInflightExist(String basePath, String instantTime) {
return new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_EXTENSION)
.exists();
}
public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath,
String instantTime, Configuration configuration)
throws IOException {

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.compaction.CompactionPlanMigrator;
import org.apache.hudi.common.testutils.CompactionTestUtils.TestHoodieBaseFile;
import org.apache.hudi.common.testutils.CompactionTestUtils.DummyHoodieBaseFile;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.collection.Pair;
@@ -106,7 +106,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
// File Slice with data-file but no log files
FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1");
noLogFileSlice.setBaseFile(new TestHoodieBaseFile("/tmp/noLog_1_000.parquet"));
noLogFileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000.parquet"));
op = CompactionUtils.buildFromFileSlice(DEFAULT_PARTITION_PATHS[0], noLogFileSlice, Option.of(metricsCaptureFn));
testFileSliceCompactionOpEquality(noLogFileSlice, op, DEFAULT_PARTITION_PATHS[0],
LATEST_COMPACTION_METADATA_VERSION);
@@ -122,7 +122,7 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
// File Slice with data-file and log files present
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
fileSlice.setBaseFile(new TestHoodieBaseFile("/tmp/noLog_1_000.parquet"));
fileSlice.setBaseFile(new DummyHoodieBaseFile("/tmp/noLog_1_000.parquet"));
fileSlice.addLogFile(
new HoodieLogFile(new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
fileSlice.addLogFile(
@@ -138,13 +138,13 @@ public class TestCompactionUtils extends HoodieCommonTestHarness {
Path fullPartitionPath = new Path(new Path(metaClient.getBasePath()), DEFAULT_PARTITION_PATHS[0]);
FileSlice emptyFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "empty1");
FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
fileSlice.setBaseFile(new TestHoodieBaseFile(fullPartitionPath.toString() + "/data1_1_000.parquet"));
fileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/data1_1_000.parquet"));
fileSlice.addLogFile(new HoodieLogFile(
new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))));
fileSlice.addLogFile(new HoodieLogFile(
new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN)))));
FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noLog1");
noLogFileSlice.setBaseFile(new TestHoodieBaseFile(fullPartitionPath.toString() + "/noLog_1_000.parquet"));
noLogFileSlice.setBaseFile(new DummyHoodieBaseFile(fullPartitionPath.toString() + "/noLog_1_000.parquet"));
FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0], "000", "noData1");
noDataFileSlice.addLogFile(new HoodieLogFile(
new Path(fullPartitionPath, new Path(FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN)))));