[HUDI-781] Introduce HoodieTestTable for test preparation (#1997)
This commit is contained in:
@@ -21,13 +21,14 @@ package org.apache.hudi.cli.commands;
|
||||
import org.apache.hudi.cli.HoodieCLI;
|
||||
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTableVersion;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
@@ -77,11 +78,11 @@ public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest {
|
||||
// generate commit and marker files for inflight commit 101
|
||||
for (String commitTime : Arrays.asList(commitTime2)) {
|
||||
HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1");
|
||||
HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1");
|
||||
FileCreateUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, "file-1", IOType.MERGE);
|
||||
HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2");
|
||||
HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2");
|
||||
FileCreateUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, commitTime, "file-2", IOType.MERGE);
|
||||
HoodieTestUtils.createDataFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3");
|
||||
HoodieClientTestUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3");
|
||||
FileCreateUtils.createMarkerFile(tablePath, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, commitTime, "file-3", IOType.MERGE);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,7 +99,7 @@ public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest {
|
||||
|
||||
// verify marker files for inflight commit exists
|
||||
for (String partitionPath : HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) {
|
||||
assertEquals(1, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101"));
|
||||
assertEquals(1, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE));
|
||||
}
|
||||
|
||||
SparkMain.upgradeOrDowngradeTable(jsc, tablePath, HoodieTableVersion.ZERO.name());
|
||||
@@ -112,7 +113,7 @@ public class TestUpgradeDowngradeCommand extends AbstractShellIntegrationTest {
|
||||
|
||||
// verify marker files are non existant
|
||||
for (String partitionPath : HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS) {
|
||||
assertEquals(0, HoodieClientTestUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101"));
|
||||
assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
|
||||
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
|
||||
|
||||
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.util.DefaultSizeEstimator;
|
||||
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
@@ -33,13 +34,13 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.io.storage.HoodieFileWriter;
|
||||
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.MarkerFiles;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.table.MarkerFiles;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
|
||||
@@ -18,26 +18,27 @@
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.io.IOType;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@@ -18,10 +18,10 @@
|
||||
|
||||
package org.apache.hudi.table.action.rollback;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
@@ -29,19 +29,21 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieRollbackException;
|
||||
import org.apache.hudi.io.IOType;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.MarkerFiles;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
/**
|
||||
* Performs rollback using marker files generated during the write..
|
||||
*/
|
||||
|
||||
@@ -21,13 +21,13 @@ package org.apache.hudi.table.upgrade;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieRollbackException;
|
||||
import org.apache.hudi.io.IOType;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.MarkerFiles;
|
||||
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
@@ -47,7 +48,6 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieRollbackException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||
import org.apache.hudi.io.IOType;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.MarkerFiles;
|
||||
import org.apache.hudi.table.action.commit.WriteHelper;
|
||||
@@ -80,6 +80,7 @@ import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
|
||||
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH;
|
||||
@@ -506,8 +507,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
|
||||
// verify one basefile per partition
|
||||
String[] fullExpectedPartitionPaths = getFullPartitionPaths(expectedPartitionPathRecKeyPairs.stream().map(Pair::getLeft).toArray(String[]::new));
|
||||
Map<String, Integer> baseFileCounts = getBaseFileCounts(fullExpectedPartitionPaths);
|
||||
for (Map.Entry<String, Integer> entry : baseFileCounts.entrySet()) {
|
||||
Map<String, Long> baseFileCounts = getBaseFileCountsForPaths(basePath, fs, fullExpectedPartitionPaths);
|
||||
for (Map.Entry<String, Long> entry : baseFileCounts.entrySet()) {
|
||||
assertEquals(1, entry.getValue());
|
||||
}
|
||||
assertTrue(baseFileCounts.entrySet().stream().allMatch(entry -> entry.getValue() == 1));
|
||||
@@ -532,9 +533,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
|
||||
// verify that there are more than 1 basefiles per partition
|
||||
// we can't guarantee randomness in partitions where records are distributed. So, verify atleast one partition has more than 1 basefile.
|
||||
baseFileCounts = getBaseFileCounts(fullPartitionPaths);
|
||||
baseFileCounts = getBaseFileCountsForPaths(basePath, fs, fullPartitionPaths);
|
||||
assertTrue(baseFileCounts.entrySet().stream().filter(entry -> entry.getValue() > 1).count() >= 1,
|
||||
"Atleast one partition should have more than 1 base file after 2nd batch of writes");
|
||||
"At least one partition should have more than 1 base file after 2nd batch of writes");
|
||||
|
||||
// Write 3 (upserts to records from batch 1 with diff partition path)
|
||||
newCommitTime = "003";
|
||||
@@ -605,10 +606,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
return fullPartitionPaths;
|
||||
}
|
||||
|
||||
private Map<String, Integer> getBaseFileCounts(String[] fullPartitionPaths) {
|
||||
return HoodieClientTestUtils.getBaseFileCountForPaths(basePath, fs, fullPartitionPaths);
|
||||
}
|
||||
|
||||
private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set<Pair<String, String>> expectedPartitionPathRecKeyPairs,
|
||||
List<Pair<String, String>> actualPartitionPathRecKeyPairs) {
|
||||
// verify all partitionpath, record key matches
|
||||
|
||||
@@ -40,6 +40,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
@@ -50,8 +51,8 @@ import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigra
|
||||
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanMigrator;
|
||||
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.common.testutils.FileSystemTestUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.util.CleanerUtils;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
@@ -64,17 +65,21 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.table.action.clean.CleanPlanner;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -500,137 +505,102 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
return new ArrayList<>(cleanStatMap.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by versions for COW table.
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestFileVersions() throws IOException {
|
||||
testKeepLatestFileVersions(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by version logic for COW table with Bootstrap source file clean enable.
|
||||
*/
|
||||
@Test
|
||||
public void testBootstrapSourceFileCleanWithKeepLatestFileVersions() throws IOException {
|
||||
testKeepLatestFileVersions(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by versions logic.
|
||||
*/
|
||||
public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws IOException {
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {false, true})
|
||||
public void testKeepLatestFileVersions(Boolean enableBootstrapSourceClean) throws Exception {
|
||||
HoodieWriteConfig config =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
|
||||
.build();
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
String p0 = "2020/01/01";
|
||||
String p1 = "2020/01/02";
|
||||
Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
|
||||
|
||||
// make 1 commit, with 1 file per partition
|
||||
HoodieTestUtils.createCommitFiles(basePath, "00000000000001");
|
||||
|
||||
Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null;
|
||||
|
||||
String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId()
|
||||
String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId()
|
||||
: UUID.randomUUID().toString();
|
||||
String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId()
|
||||
String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId()
|
||||
: UUID.randomUUID().toString();
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert
|
||||
testTable.addCommit("00000000000001").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
|
||||
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
|
||||
file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
|
||||
file1P1C0));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
|
||||
assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next commit, with 1 insert & 1 update per partition
|
||||
HoodieTestUtils.createCommitFiles(basePath, "00000000000002");
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
String file2P0C1 =
|
||||
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert
|
||||
String file2P1C1 =
|
||||
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert
|
||||
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update
|
||||
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update
|
||||
Map<String, String> partitionAndFileId002 = testTable.addCommit("00000000000002")
|
||||
.withUpdates(p0, file1P0C0)
|
||||
.withUpdates(p1, file1P1C0)
|
||||
.withInserts(p0, p1);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
|
||||
// enableBootstrapSourceClean would delete the bootstrap base file as the same time
|
||||
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
|
||||
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0);
|
||||
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
|
||||
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
|
||||
if (enableBootstrapSourceClean) {
|
||||
HoodieFileStatus fstatus =
|
||||
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus();
|
||||
bootstrapMapping.get(p0).get(0).getBoostrapFileStatus();
|
||||
// This ensures full path is recorded in metadata.
|
||||
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
|
||||
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
|
||||
+ " but did not contain " + fstatus.getPath().getUri());
|
||||
assertFalse(new File(bootstrapMapping.get(
|
||||
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
|
||||
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
|
||||
p0).get(0).getBoostrapFileStatus().getPath().getUri())));
|
||||
}
|
||||
cleanStat = getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH);
|
||||
cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
|
||||
String file2P0C1 = partitionAndFileId002.get(p0);
|
||||
String file2P1C1 = partitionAndFileId002.get(p1);
|
||||
assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1));
|
||||
assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0));
|
||||
assertFalse(testTable.fileExists(p1, "00000000000001", file1P1C0));
|
||||
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
|
||||
+ (cleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
|
||||
file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
|
||||
file2P1C1));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
|
||||
file1P0C0));
|
||||
if (enableBootstrapSourceClean) {
|
||||
HoodieFileStatus fstatus =
|
||||
bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus();
|
||||
bootstrapMapping.get(p1).get(0).getBoostrapFileStatus();
|
||||
// This ensures full path is recorded in metadata.
|
||||
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
|
||||
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
|
||||
+ " but did not contain " + fstatus.getPath().getUri());
|
||||
assertFalse(new File(bootstrapMapping.get(
|
||||
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
|
||||
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
|
||||
p1).get(0).getBoostrapFileStatus().getPath().getUri())));
|
||||
}
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
|
||||
"00000000000001", file1P1C0));
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createCommitFiles(basePath, "00000000000003");
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update
|
||||
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update
|
||||
String file3P0C2 =
|
||||
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003");
|
||||
|
||||
String file3P0C2 = testTable.addCommit("00000000000003")
|
||||
.withUpdates(p0, file1P0C0, file2P0C1)
|
||||
.withInserts(p0, "00000000000003").get(p0);
|
||||
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
|
||||
assertEquals(2,
|
||||
getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
|
||||
getCleanStat(hoodieCleanStatsThree, p0)
|
||||
.getSuccessDeleteFiles().size(), "Must clean two files");
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
|
||||
file1P0C0));
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
|
||||
file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
|
||||
file3P0C2));
|
||||
assertFalse(testTable.fileExists(p0, "00000000000002", file1P0C0));
|
||||
assertFalse(testTable.fileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
|
||||
|
||||
// No cleaning on partially written file, with no commit.
|
||||
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file3P0C2); // update
|
||||
testTable.forCommit("00000000000004").withUpdates(p0, file3P0C2);
|
||||
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
|
||||
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
|
||||
file3P0C2));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files.
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestFileVersionsMOR() throws IOException {
|
||||
public void testKeepLatestFileVersionsMOR() throws Exception {
|
||||
|
||||
HoodieWriteConfig config =
|
||||
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
@@ -638,35 +608,29 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
|
||||
.build();
|
||||
|
||||
HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
|
||||
HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
String p0 = "2020/01/01";
|
||||
|
||||
// Make 3 files, one base file and 2 log files associated with base file
|
||||
String file1P0 =
|
||||
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
|
||||
String file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath,
|
||||
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.empty());
|
||||
HoodieTestUtils.createNewLogFile(fs, basePath,
|
||||
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.of(2));
|
||||
// make 1 compaction commit
|
||||
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000");
|
||||
String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0);
|
||||
testTable.forDeltaCommit("000")
|
||||
.withLogFile(p0, file1P0, 1)
|
||||
.withLogFile(p0, file1P0, 2);
|
||||
|
||||
// Make 4 files, one base file and 3 log files associated with base file
|
||||
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0);
|
||||
file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||
"001", file1P0, Option.of(3));
|
||||
// make 1 compaction commit
|
||||
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001");
|
||||
// Make 2 files, one base file and 1 log files associated with base file
|
||||
testTable.addDeltaCommit("001")
|
||||
.withUpdates(p0, file1P0)
|
||||
.withLogFile(p0, file1P0, 3);
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
|
||||
assertEquals(3,
|
||||
getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
|
||||
getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles()
|
||||
.size(), "Must clean three files, one parquet and 2 log files");
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
||||
file1P0));
|
||||
assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
||||
file2P0L0, Option.empty()));
|
||||
assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
|
||||
file2P0L0, Option.of(2)));
|
||||
assertFalse(testTable.fileExists(p0, "000", file1P0));
|
||||
assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
|
||||
assertTrue(testTable.fileExists(p0, "001", file1P0));
|
||||
assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -835,43 +799,21 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by commit logic for COW table.
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestCommits() throws IOException {
|
||||
testKeepLatestCommits(false, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by commit logic for COW table. Here the operations are simulated
|
||||
* such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds.
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestCommitsWithFailureRetry() throws IOException {
|
||||
testKeepLatestCommits(true, false, false);
|
||||
private static Stream<Arguments> argumentsForTestKeepLatestCommits() {
|
||||
return Stream.of(
|
||||
Arguments.of(false, false, false),
|
||||
Arguments.of(true, false, false),
|
||||
Arguments.of(false, true, false),
|
||||
Arguments.of(false, false, true)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by commit logic for COW table.
|
||||
*/
|
||||
@Test
|
||||
public void testKeepLatestCommitsIncrMode() throws IOException {
|
||||
testKeepLatestCommits(false, true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by commit logic for COW table with Bootstrap source file clean enable.
|
||||
*/
|
||||
@Test
|
||||
public void testBootstrapSourceFileCleanWithKeepLatestCommits() throws IOException {
|
||||
testKeepLatestCommits(false, false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test HoodieTable.clean() Cleaning by commit logic for COW table.
|
||||
*/
|
||||
private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws IOException {
|
||||
@ParameterizedTest
|
||||
@MethodSource("argumentsForTestKeepLatestCommits")
|
||||
public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withIncrementalCleaningMode(enableIncrementalClean)
|
||||
@@ -879,25 +821,23 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
|
||||
.build();
|
||||
|
||||
Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData() : null;
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
String p0 = "2020/01/01";
|
||||
String p1 = "2020/01/02";
|
||||
Map<String, List<BootstrapFileMapping>> bootstrapMapping = enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
|
||||
|
||||
// make 1 commit, with 1 file per partition
|
||||
HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000001");
|
||||
|
||||
String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getFileId()
|
||||
String file1P0C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p0).get(0).getFileId()
|
||||
: UUID.randomUUID().toString();
|
||||
String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).get(0).getFileId()
|
||||
String file1P1C0 = enableBootstrapSourceClean ? bootstrapMapping.get(p1).get(0).getFileId()
|
||||
: UUID.randomUUID().toString();
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001", file1P0C0); // insert
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001", file1P1C0); // insert
|
||||
testTable.addInflightCommit("00000000000001").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0);
|
||||
|
||||
HoodieCommitMetadata commitMetadata = generateCommitMetadata(
|
||||
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
|
||||
{
|
||||
put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
|
||||
put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0));
|
||||
put(p0, CollectionUtils.createImmutableList(file1P0C0));
|
||||
put(p1, CollectionUtils.createImmutableList(file1P1C0));
|
||||
}
|
||||
})
|
||||
);
|
||||
@@ -909,29 +849,18 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
|
||||
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
|
||||
file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
|
||||
file1P1C0));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
|
||||
assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next commit, with 1 insert & 1 update per partition
|
||||
HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000002");
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
String file2P0C1 =
|
||||
HoodieTestUtils
|
||||
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002"); // insert
|
||||
String file2P1C1 =
|
||||
HoodieTestUtils
|
||||
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002"); // insert
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002", file1P0C0); // update
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002", file1P1C0); // update
|
||||
Map<String, String> partitionAndFileId002 = testTable.addInflightCommit("00000000000002").withInserts(p0, p1);
|
||||
String file2P0C1 = partitionAndFileId002.get(p0);
|
||||
String file2P1C1 = partitionAndFileId002.get(p1);
|
||||
testTable.forCommit("00000000000002").withUpdates(p0, file1P0C0).withUpdates(p1, file1P1C0);
|
||||
commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() {
|
||||
{
|
||||
put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
|
||||
put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
|
||||
put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
|
||||
put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
|
||||
}
|
||||
});
|
||||
metaClient.getActiveTimeline().saveAsComplete(
|
||||
@@ -939,28 +868,18 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
|
||||
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files");
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
|
||||
file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000002",
|
||||
file2P1C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
|
||||
file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "00000000000001",
|
||||
file1P1C0));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.fileExists(p1, "00000000000002", file2P1C1));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
|
||||
assertTrue(testTable.fileExists(p1, "00000000000001", file1P1C0));
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000003");
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file1P0C0); // update
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003", file2P0C1); // update
|
||||
String file3P0C2 =
|
||||
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003");
|
||||
|
||||
String file3P0C2 = testTable.addInflightCommit("00000000000003")
|
||||
.withUpdates(p0, file1P0C0)
|
||||
.withUpdates(p0, file2P0C1)
|
||||
.withInserts(p0).get(p0);
|
||||
commitMetadata = generateCommitMetadata(CollectionUtils
|
||||
.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||
.createImmutableMap(p0,
|
||||
CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
|
||||
metaClient.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000003"),
|
||||
@@ -969,57 +888,41 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
|
||||
assertEquals(0, hoodieCleanStatsThree.size(),
|
||||
"Must not clean any file. We have to keep 1 version before the latest commit time to keep");
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
|
||||
file1P0C0));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000001", file1P0C0));
|
||||
|
||||
// make next commit, with 2 updates to existing files, and 1 insert
|
||||
HoodieTestUtils.createInflightCommitFiles(basePath, "00000000000004");
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file1P0C0); // update
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004", file2P0C1); // update
|
||||
String file4P0C3 =
|
||||
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004");
|
||||
String file4P0C3 = testTable.addInflightCommit("00000000000004")
|
||||
.withUpdates(p0, file1P0C0)
|
||||
.withUpdates(p0, file2P0C1)
|
||||
.withInserts(p0).get(p0);
|
||||
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
|
||||
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
|
||||
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
|
||||
metaClient.getActiveTimeline().saveAsComplete(
|
||||
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000004"),
|
||||
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry);
|
||||
// enableBootstrapSourceClean would delete the bootstrap base file as the same time
|
||||
HoodieCleanStat partitionCleanStat =
|
||||
getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
|
||||
HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, p0);
|
||||
|
||||
assertEquals(enableBootstrapSourceClean ? 2 : 1, partitionCleanStat.getSuccessDeleteFiles().size()
|
||||
+ (partitionCleanStat.getSuccessDeleteBootstrapBaseFiles() == null ? 0
|
||||
: partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least one old file");
|
||||
|
||||
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000001",
|
||||
file1P0C0));
|
||||
assertFalse(testTable.fileExists(p0, "00000000000001", file1P0C0));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000003", file1P0C0));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000003", file2P0C1));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000003", file3P0C2));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000004", file4P0C3));
|
||||
if (enableBootstrapSourceClean) {
|
||||
assertFalse(new File(bootstrapMapping.get(
|
||||
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).get(0).getBoostrapFileStatus().getPath().getUri()).exists());
|
||||
assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
|
||||
p0).get(0).getBoostrapFileStatus().getPath().getUri())));
|
||||
}
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
|
||||
file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
|
||||
file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
|
||||
file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
|
||||
file2P0C1));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000003",
|
||||
file3P0C2));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000004",
|
||||
file4P0C3));
|
||||
|
||||
// No cleaning on partially written file, with no commit.
|
||||
HoodieTestUtils
|
||||
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000005", file3P0C2); // update
|
||||
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
|
||||
testTable.forCommit("00000000000005").withUpdates(p0, file3P0C2);
|
||||
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0,
|
||||
CollectionUtils.createImmutableList(file3P0C2)));
|
||||
metaClient.getActiveTimeline().createNewInstant(
|
||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"));
|
||||
@@ -1027,13 +930,11 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "00000000000005"),
|
||||
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry);
|
||||
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
|
||||
assertEquals(0,
|
||||
cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files");
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
|
||||
file1P0C0));
|
||||
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "00000000000002",
|
||||
file2P0C1));
|
||||
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0);
|
||||
assertNull(cleanStat, "Must not clean any files");
|
||||
assertTrue(testTable.fileExists(p0, "00000000000002", file1P0C0));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000002", file2P0C1));
|
||||
assertTrue(testTable.fileExists(p0, "00000000000005", file3P0C2));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1041,7 +942,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
* @return Partition to BootstrapFileMapping Map
|
||||
* @throws IOException
|
||||
*/
|
||||
private Map<String, List<BootstrapFileMapping>> generateBootstrapIndexAndSourceData() throws IOException {
|
||||
private Map<String, List<BootstrapFileMapping>> generateBootstrapIndexAndSourceData(String... partitions) throws IOException {
|
||||
// create bootstrap source data path
|
||||
java.nio.file.Path sourcePath = tempDir.resolve("data");
|
||||
java.nio.file.Files.createDirectories(sourcePath);
|
||||
@@ -1052,7 +953,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
|
||||
// generate bootstrap index
|
||||
Map<String, List<BootstrapFileMapping>> bootstrapMapping = TestBootstrapIndex.generateBootstrapIndex(metaClient, sourcePath.toString(),
|
||||
new String[] {HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH}, 1);
|
||||
partitions, 1);
|
||||
|
||||
for (Map.Entry<String, List<BootstrapFileMapping>> entry : bootstrapMapping.entrySet()) {
|
||||
new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs();
|
||||
@@ -1065,28 +966,29 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
* Test Cleaning functionality of table.rollback() API.
|
||||
*/
|
||||
@Test
|
||||
public void testCleanMarkerDataFilesOnRollback() throws IOException {
|
||||
List<String> markerFiles = createMarkerFiles("000", 10);
|
||||
assertEquals(10, markerFiles.size(), "Some marker files are created.");
|
||||
assertEquals(markerFiles.size(), getTotalTempFiles(), "Some marker files are created.");
|
||||
public void testCleanMarkerDataFilesOnRollback() throws Exception {
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient)
|
||||
.addRequestedCommit("000")
|
||||
.withMarkerFiles("default", 10, IOType.MERGE);
|
||||
final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size();
|
||||
assertEquals(10, numTempFilesBefore, "Some marker files are created.");
|
||||
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||
table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED,
|
||||
HoodieTimeline.COMMIT_ACTION, "000"));
|
||||
table.getActiveTimeline().transitionRequestedToInflight(
|
||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
|
||||
metaClient.reloadActiveTimeline();
|
||||
table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
|
||||
assertEquals(0, getTotalTempFiles(), "All temp files are deleted.");
|
||||
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size();
|
||||
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Test CLeaner Stat when there are no partition paths.
|
||||
*/
|
||||
@Test
|
||||
public void testCleaningWithZeroPartitionPaths() throws IOException {
|
||||
public void testCleaningWithZeroPartitionPaths() throws Exception {
|
||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
|
||||
@@ -1095,7 +997,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
// Make a commit, although there are no partitionPaths.
|
||||
// Example use-case of this is when a client wants to create a table
|
||||
// with just some commit metadata, but no data/partitionPaths.
|
||||
HoodieTestUtils.createCommitFiles(basePath, "000");
|
||||
HoodieTestTable.of(metaClient).addCommit("000");
|
||||
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
|
||||
@@ -1294,33 +1196,6 @@ public class TestCleaner extends HoodieClientTestBase {
|
||||
"Correct number of files under compaction deleted");
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to create temporary data files.
|
||||
*
|
||||
* @param instantTime Commit Timestamp
|
||||
* @param numFiles Number for files to be generated
|
||||
* @return generated files
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
private List<String> createMarkerFiles(String instantTime, int numFiles) throws IOException {
|
||||
List<String> files = new ArrayList<>();
|
||||
for (int i = 0; i < numFiles; i++) {
|
||||
files.add(HoodieClientTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime));
|
||||
}
|
||||
return files;
|
||||
}
|
||||
|
||||
/***
|
||||
* Helper method to return temporary files count.
|
||||
*
|
||||
* @return Number of temporary files found
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
private int getTotalTempFiles() throws IOException {
|
||||
return FileSystemTestUtils.listRecursive(fs, new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME))
|
||||
.size();
|
||||
}
|
||||
|
||||
private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient,
|
||||
List<String> paths) {
|
||||
Predicate<String> roFilePredicate =
|
||||
|
||||
@@ -22,8 +22,8 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
|
||||
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
|
||||
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.testutils.HoodieClientTestHarness;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
@@ -66,9 +66,9 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
@ParameterizedTest
|
||||
@MethodSource("consistencyGuardType")
|
||||
public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3");
|
||||
|
||||
ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
|
||||
ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
|
||||
@@ -88,7 +88,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testCheckFailingAppearFailSafe() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
|
||||
@@ -98,7 +98,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testCheckFailingAppearTimedWait() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||
@@ -106,7 +106,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testCheckFailingAppearsFailSafe() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
|
||||
@@ -115,14 +115,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testCheckFailingAppearsTimedWait() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappearFailSafe() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
|
||||
@@ -132,7 +132,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappearTimedWait() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
|
||||
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
|
||||
@@ -140,8 +140,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappearsFailSafe() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
assertThrows(TimeoutException.class, () -> {
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
|
||||
@@ -150,8 +150,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
|
||||
|
||||
@Test
|
||||
public void testCheckFailingDisappearsTimedWait() throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1");
|
||||
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
|
||||
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
|
||||
}
|
||||
|
||||
@@ -18,17 +18,17 @@
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.testutils.FileSystemTestUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.testutils.FileSystemTestUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
|
||||
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.io.IOType;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.testutils.FileCreateUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
@@ -34,7 +35,6 @@ import org.apache.hudi.table.HoodieCopyOnWriteTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.WorkloadProfile;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -73,8 +73,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
|
||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
|
||||
|
||||
HoodieClientTestUtils.fakeCommit(basePath, "001");
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
|
||||
FileCreateUtils.createCommit(basePath, "001");
|
||||
FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||
|
||||
@@ -193,7 +193,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
|
||||
.insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build();
|
||||
|
||||
HoodieClientTestUtils.fakeCommit(basePath, "001");
|
||||
FileCreateUtils.createCommit(basePath, "001");
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||
|
||||
@@ -53,6 +53,9 @@ import org.junit.jupiter.api.Test;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
@@ -166,7 +169,9 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
||||
assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file");
|
||||
}
|
||||
}
|
||||
HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
|
||||
createDeltaCommit(basePath, newCommitTime);
|
||||
createRequestedDeltaCommit(basePath, newCommitTime);
|
||||
createInflightDeltaCommit(basePath, newCommitTime);
|
||||
|
||||
// Do a compaction
|
||||
table = HoodieTable.create(config, hadoopConf);
|
||||
|
||||
@@ -20,16 +20,14 @@ package org.apache.hudi.table.action.rollback;
|
||||
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.testutils.FileSystemTestUtils;
|
||||
import org.apache.hudi.io.IOType;
|
||||
import org.apache.hudi.common.testutils.HoodieTestTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.hudi.testutils.HoodieClientTestUtils;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -55,38 +53,20 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
cleanupResources();
|
||||
}
|
||||
|
||||
private void givenCommit0(boolean isDeltaCommit) throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2");
|
||||
if (isDeltaCommit) {
|
||||
HoodieClientTestUtils.fakeDeltaCommit(basePath, "000");
|
||||
} else {
|
||||
HoodieClientTestUtils.fakeCommit(basePath, "000");
|
||||
}
|
||||
}
|
||||
|
||||
private void givenInflightCommit1(boolean isDeltaCommit) throws Exception {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1");
|
||||
HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1", IOType.CREATE);
|
||||
|
||||
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3", IOType.CREATE);
|
||||
|
||||
if (isDeltaCommit) {
|
||||
HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0);
|
||||
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.APPEND);
|
||||
HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4", IOType.APPEND);
|
||||
HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001");
|
||||
} else {
|
||||
HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2");
|
||||
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.MERGE);
|
||||
HoodieClientTestUtils.fakeInFlightCommit(basePath, "001");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyOnWriteRollback() throws Exception {
|
||||
// given: wrote some base files and corresponding markers
|
||||
givenCommit0(false);
|
||||
givenInflightCommit1(false);
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
String f0 = testTable.addRequestedCommit("000")
|
||||
.withInserts("partA").get("partA");
|
||||
String f1 = testTable.addCommit("001")
|
||||
.withUpdates("partA", f0)
|
||||
.withInserts("partB").get("partB");
|
||||
String f2 = "f2";
|
||||
testTable.forCommit("001")
|
||||
.withMarkerFile("partA", f0, IOType.MERGE)
|
||||
.withMarkerFile("partB", f1, IOType.CREATE)
|
||||
.withMarkerFile("partA", f2, IOType.CREATE);
|
||||
|
||||
// when
|
||||
List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002")
|
||||
@@ -95,8 +75,8 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
// then: ensure files are deleted correctly, non-existent files reported as failed deletes
|
||||
assertEquals(2, stats.size());
|
||||
|
||||
List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA"));
|
||||
List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB"));
|
||||
List<FileStatus> partAFiles = testTable.listAllFiles("partA");
|
||||
List<FileStatus> partBFiles = testTable.listAllFiles("partB");
|
||||
|
||||
assertEquals(0, partBFiles.size());
|
||||
assertEquals(1, partAFiles.size());
|
||||
@@ -107,8 +87,19 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
@Test
|
||||
public void testMergeOnReadRollback() throws Exception {
|
||||
// given: wrote some base + log files and corresponding markers
|
||||
givenCommit0(true);
|
||||
givenInflightCommit1(true);
|
||||
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
|
||||
String f2 = testTable.addRequestedDeltaCommit("000")
|
||||
.withInserts("partA").get("partA");
|
||||
String f1 = testTable.addDeltaCommit("001")
|
||||
.withLogFile("partA", f2)
|
||||
.withInserts("partB").get("partB");
|
||||
String f3 = "f3";
|
||||
String f4 = "f4";
|
||||
testTable.forDeltaCommit("001")
|
||||
.withMarkerFile("partB", f1, IOType.CREATE)
|
||||
.withMarkerFile("partA", f3, IOType.CREATE)
|
||||
.withMarkerFile("partA", f2, IOType.APPEND)
|
||||
.withMarkerFile("partB", f4, IOType.APPEND);
|
||||
|
||||
// when
|
||||
List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002")
|
||||
@@ -117,12 +108,12 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
|
||||
// then: ensure files are deleted, rollback block is appended (even if append does not exist)
|
||||
assertEquals(2, stats.size());
|
||||
// will have the log file
|
||||
List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB"));
|
||||
List<FileStatus> partBFiles = testTable.listAllFiles("partB");
|
||||
assertEquals(1, partBFiles.size());
|
||||
assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
|
||||
assertTrue(partBFiles.get(0).getLen() > 0);
|
||||
|
||||
List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA"));
|
||||
List<FileStatus> partAFiles = testTable.listAllFiles("partA");
|
||||
assertEquals(3, partAFiles.size());
|
||||
assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
|
||||
assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
|
||||
|
||||
@@ -38,7 +38,6 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.io.IOType;
|
||||
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
|
||||
import org.apache.hudi.io.storage.HoodieParquetWriter;
|
||||
|
||||
@@ -57,14 +56,11 @@ import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@@ -74,57 +70,6 @@ import java.util.stream.Collectors;
|
||||
public class HoodieClientTestUtils {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class);
|
||||
public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
|
||||
|
||||
private static void fakeMetaFile(String basePath, String instantTime, String suffix) throws IOException {
|
||||
String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME;
|
||||
new File(parentPath).mkdirs();
|
||||
new File(parentPath + "/" + instantTime + suffix).createNewFile();
|
||||
}
|
||||
|
||||
public static void fakeCommit(String basePath, String instantTime) throws IOException {
|
||||
fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void fakeDeltaCommit(String basePath, String instantTime) throws IOException {
|
||||
fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void fakeInflightDeltaCommit(String basePath, String instantTime) throws IOException {
|
||||
fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void fakeInFlightCommit(String basePath, String instantTime) throws IOException {
|
||||
fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId)
|
||||
throws Exception {
|
||||
fakeDataFile(basePath, partitionPath, instantTime, fileId, 0);
|
||||
}
|
||||
|
||||
public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length)
|
||||
throws Exception {
|
||||
String parentPath = String.format("%s/%s", basePath, partitionPath);
|
||||
new File(parentPath).mkdirs();
|
||||
String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
|
||||
new File(path).createNewFile();
|
||||
new RandomAccessFile(path, "rw").setLength(length);
|
||||
}
|
||||
|
||||
public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version)
|
||||
throws Exception {
|
||||
fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
|
||||
}
|
||||
|
||||
public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length)
|
||||
throws Exception {
|
||||
String parentPath = String.format("%s/%s", basePath, partitionPath);
|
||||
new File(parentPath).mkdirs();
|
||||
String path = String.format("%s/%s", parentPath, FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1"));
|
||||
new File(path).createNewFile();
|
||||
new RandomAccessFile(path, "rw").setLength(length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Spark config for this test.
|
||||
@@ -153,8 +98,8 @@ public class HoodieClientTestUtils {
|
||||
return HoodieReadClient.addHoodieSupport(sparkConf);
|
||||
}
|
||||
|
||||
public static HashMap<String, String> getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline,
|
||||
List<HoodieInstant> commitsToReturn) throws IOException {
|
||||
private static HashMap<String, String> getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline,
|
||||
List<HoodieInstant> commitsToReturn) throws IOException {
|
||||
HashMap<String, String> fileIdToFullPath = new HashMap<>();
|
||||
for (HoodieInstant commit : commitsToReturn) {
|
||||
HoodieCommitMetadata metadata =
|
||||
@@ -226,25 +171,8 @@ public class HoodieClientTestUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Find total basefiles for passed in paths.
|
||||
* TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
|
||||
*/
|
||||
public static Map<String, Integer> getBaseFileCountForPaths(String basePath, FileSystem fs,
|
||||
String... paths) {
|
||||
Map<String, Integer> toReturn = new HashMap<>();
|
||||
try {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
|
||||
for (String path : paths) {
|
||||
BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new Path(path)));
|
||||
List<HoodieBaseFile> latestFiles = fileSystemView.getLatestBaseFiles().collect(Collectors.toList());
|
||||
toReturn.put(path, latestFiles.size());
|
||||
}
|
||||
return toReturn;
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error reading hoodie table as a dataframe", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static String writeParquetFile(String basePath, String partitionPath, String filename,
|
||||
List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {
|
||||
|
||||
@@ -278,6 +206,9 @@ public class HoodieClientTestUtils {
|
||||
return filename;
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
|
||||
*/
|
||||
public static String writeParquetFile(String basePath, String partitionPath, List<HoodieRecord> records,
|
||||
Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException {
|
||||
Thread.sleep(1000);
|
||||
@@ -289,49 +220,4 @@ public class HoodieClientTestUtils {
|
||||
createCommitTime);
|
||||
}
|
||||
|
||||
public static String createNewMarkerFile(String basePath, String partitionPath, String instantTime)
|
||||
throws IOException {
|
||||
return createMarkerFile(basePath, partitionPath, instantTime);
|
||||
}
|
||||
|
||||
public static String createMarkerFile(String basePath, String partitionPath, String instantTime)
|
||||
throws IOException {
|
||||
return createMarkerFile(basePath, partitionPath, instantTime, UUID.randomUUID().toString(), IOType.MERGE);
|
||||
}
|
||||
|
||||
public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType)
|
||||
throws IOException {
|
||||
String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + instantTime + "/" + partitionPath + "/";
|
||||
new File(folderPath).mkdirs();
|
||||
String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, DEFAULT_WRITE_TOKEN, instantTime,
|
||||
HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType);
|
||||
File f = new File(folderPath + markerFileName);
|
||||
f.createNewFile();
|
||||
return f.getAbsolutePath();
|
||||
}
|
||||
|
||||
public static void createMarkerFile(String basePath, String partitionPath, String instantTime, String dataFileName) throws IOException {
|
||||
createTempFolderForMarkerFiles(basePath);
|
||||
String folderPath = getTempFolderName(basePath);
|
||||
// create dir for this instant
|
||||
new File(folderPath + "/" + instantTime + "/" + partitionPath).mkdirs();
|
||||
new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + dataFileName + ".marker.MERGE").createNewFile();
|
||||
}
|
||||
|
||||
public static int getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime) {
|
||||
String folderPath = getTempFolderName(basePath);
|
||||
File markerDir = new File(folderPath + "/" + instantTime + "/" + partitionPath);
|
||||
if (markerDir.exists()) {
|
||||
return markerDir.listFiles((dir, name) -> name.contains(".marker.MERGE")).length;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static void createTempFolderForMarkerFiles(String basePath) {
|
||||
new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs();
|
||||
}
|
||||
|
||||
public static String getTempFolderName(String basePath) {
|
||||
return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,16 +7,17 @@
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.io;
|
||||
package org.apache.hudi.common.model;
|
||||
|
||||
/**
|
||||
* Types of lower level I/O operations done on each file slice.
|
||||
@@ -0,0 +1,147 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.testutils;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class FileCreateUtils {
|
||||
|
||||
private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException {
|
||||
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
|
||||
Files.createDirectories(parentPath);
|
||||
Path metaFilePath = parentPath.resolve(instantTime + suffix);
|
||||
if (Files.notExists(metaFilePath)) {
|
||||
Files.createFile(metaFilePath);
|
||||
}
|
||||
}
|
||||
|
||||
public static void createCommit(String basePath, String instantTime) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void createRequestedCommit(String basePath, String instantTime) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void createInflightCommit(String basePath, String instantTime) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void createDeltaCommit(String basePath, String instantTime) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void createRequestedDeltaCommit(String basePath, String instantTime) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void createInflightDeltaCommit(String basePath, String instantTime) throws IOException {
|
||||
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
|
||||
}
|
||||
|
||||
public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId)
|
||||
throws Exception {
|
||||
createDataFile(basePath, partitionPath, instantTime, fileId, 0);
|
||||
}
|
||||
|
||||
public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length)
|
||||
throws Exception {
|
||||
Path parentPath = Paths.get(basePath, partitionPath);
|
||||
Files.createDirectories(parentPath);
|
||||
Path dataFilePath = parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
|
||||
if (Files.notExists(dataFilePath)) {
|
||||
Files.createFile(dataFilePath);
|
||||
}
|
||||
new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length);
|
||||
}
|
||||
|
||||
public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version)
|
||||
throws Exception {
|
||||
createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
|
||||
}
|
||||
|
||||
public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length)
|
||||
throws Exception {
|
||||
Path parentPath = Paths.get(basePath, partitionPath);
|
||||
Files.createDirectories(parentPath);
|
||||
Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1"));
|
||||
if (Files.notExists(logFilePath)) {
|
||||
Files.createFile(logFilePath);
|
||||
}
|
||||
new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length);
|
||||
}
|
||||
|
||||
public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType)
|
||||
throws IOException {
|
||||
Path folderPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
|
||||
Files.createDirectories(folderPath);
|
||||
String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", instantTime,
|
||||
HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType);
|
||||
Path markerFilePath = folderPath.resolve(markerFileName);
|
||||
if (Files.notExists(markerFilePath)) {
|
||||
Files.createFile(markerFilePath);
|
||||
}
|
||||
return markerFilePath.toAbsolutePath().toString();
|
||||
}
|
||||
|
||||
public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException {
|
||||
Path markerDir = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
|
||||
if (Files.notExists(markerDir)) {
|
||||
return 0;
|
||||
}
|
||||
return Files.list(markerDir).filter(p -> p.getFileName().toString()
|
||||
.endsWith(String.format("%s.%s", HoodieTableMetaClient.MARKER_EXTN, ioType))).count();
|
||||
}
|
||||
|
||||
/**
|
||||
* Find total basefiles for passed in paths.
|
||||
*/
|
||||
public static Map<String, Long> getBaseFileCountsForPaths(String basePath, FileSystem fs, String... paths) {
|
||||
Map<String, Long> toReturn = new HashMap<>();
|
||||
try {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
|
||||
for (String path : paths) {
|
||||
TableFileSystemView.BaseFileOnlyView fileSystemView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getCommitsTimeline().filterCompletedInstants(), fs.globStatus(new org.apache.hadoop.fs.Path(path)));
|
||||
toReturn.put(path, fileSystemView.getLatestBaseFiles().count());
|
||||
}
|
||||
return toReturn;
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error reading hoodie table as a dataframe", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,232 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.testutils;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.common.model.IOType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
|
||||
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
|
||||
|
||||
public class HoodieTestTable {
|
||||
|
||||
private final String basePath;
|
||||
private final FileSystem fs;
|
||||
private HoodieTableMetaClient metaClient;
|
||||
private String currentInstantTime;
|
||||
|
||||
private HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) {
|
||||
ValidationUtils.checkArgument(Objects.equals(basePath, metaClient.getBasePath()));
|
||||
ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs()));
|
||||
this.basePath = basePath;
|
||||
this.fs = fs;
|
||||
this.metaClient = metaClient;
|
||||
}
|
||||
|
||||
public static HoodieTestTable of(HoodieTableMetaClient metaClient) {
|
||||
return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient);
|
||||
}
|
||||
|
||||
public HoodieTestTable addRequestedCommit(String instantTime) throws Exception {
|
||||
createRequestedCommit(basePath, instantTime);
|
||||
currentInstantTime = instantTime;
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws Exception {
|
||||
createRequestedDeltaCommit(basePath, instantTime);
|
||||
currentInstantTime = instantTime;
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable addInflightCommit(String instantTime) throws Exception {
|
||||
createRequestedCommit(basePath, instantTime);
|
||||
createInflightCommit(basePath, instantTime);
|
||||
currentInstantTime = instantTime;
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable addInflightDeltaCommit(String instantTime) throws Exception {
|
||||
createRequestedDeltaCommit(basePath, instantTime);
|
||||
createInflightDeltaCommit(basePath, instantTime);
|
||||
currentInstantTime = instantTime;
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable addCommit(String instantTime) throws Exception {
|
||||
createRequestedCommit(basePath, instantTime);
|
||||
createInflightCommit(basePath, instantTime);
|
||||
createCommit(basePath, instantTime);
|
||||
currentInstantTime = instantTime;
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable addDeltaCommit(String instantTime) throws Exception {
|
||||
createRequestedDeltaCommit(basePath, instantTime);
|
||||
createInflightDeltaCommit(basePath, instantTime);
|
||||
createDeltaCommit(basePath, instantTime);
|
||||
currentInstantTime = instantTime;
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable forCommit(String instantTime) {
|
||||
currentInstantTime = instantTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable forDeltaCommit(String instantTime) {
|
||||
currentInstantTime = instantTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException {
|
||||
return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType);
|
||||
}
|
||||
|
||||
public HoodieTestTable withMarkerFile(String partitionPath, String fileId, IOType ioType) throws IOException {
|
||||
createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType);
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieTestTable withMarkerFiles(String partitionPath, int num, IOType ioType) throws IOException {
|
||||
String[] fileIds = IntStream.range(0, num).mapToObj(i -> UUID.randomUUID().toString()).toArray(String[]::new);
|
||||
return withMarkerFiles(partitionPath, fileIds, ioType);
|
||||
}
|
||||
|
||||
public HoodieTestTable withMarkerFiles(String partitionPath, String[] fileIds, IOType ioType) throws IOException {
|
||||
for (String fileId : fileIds) {
|
||||
createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert one base file to each of the given distinct partitions.
|
||||
*
|
||||
* @return A {@link Map} of partition and its newly inserted file's id.
|
||||
*/
|
||||
public Map<String, String> withInserts(String... partitions) throws Exception {
|
||||
Map<String, String> partitionFileIdMap = new HashMap<>();
|
||||
for (String p : partitions) {
|
||||
String fileId = UUID.randomUUID().toString();
|
||||
FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId);
|
||||
partitionFileIdMap.put(p, fileId);
|
||||
}
|
||||
return partitionFileIdMap;
|
||||
}
|
||||
|
||||
public HoodieTestTable withUpdates(String partition, String... fileIds) throws Exception {
|
||||
for (String f : fileIds) {
|
||||
FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, f);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public String withLogFile(String partitionPath) throws Exception {
|
||||
String fileId = UUID.randomUUID().toString();
|
||||
withLogFile(partitionPath, fileId);
|
||||
return fileId;
|
||||
}
|
||||
|
||||
public HoodieTestTable withLogFile(String partitionPath, String fileId) throws Exception {
|
||||
return withLogFile(partitionPath, fileId, 0);
|
||||
}
|
||||
|
||||
public HoodieTestTable withLogFile(String partitionPath, String fileId, int version) throws Exception {
|
||||
FileCreateUtils.createLogFile(basePath, partitionPath, currentInstantTime, fileId, version);
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean filesExist(Map<String, String> partitionAndFileId, String instantTime) {
|
||||
return partitionAndFileId.entrySet().stream().allMatch(entry -> {
|
||||
String partition = entry.getKey();
|
||||
String fileId = entry.getValue();
|
||||
return fileExists(partition, instantTime, fileId);
|
||||
});
|
||||
}
|
||||
|
||||
public boolean filesExist(String partition, String instantTime, String... fileIds) {
|
||||
return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, instantTime, f));
|
||||
}
|
||||
|
||||
public boolean fileExists(String partition, String instantTime, String fileId) {
|
||||
try {
|
||||
return fs.exists(new Path(Paths.get(basePath, partition,
|
||||
FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString()));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieTestTableException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) {
|
||||
return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v));
|
||||
}
|
||||
|
||||
public boolean logFileExists(String partition, String instantTime, String fileId, int version) {
|
||||
try {
|
||||
return fs.exists(new Path(Paths.get(basePath, partition,
|
||||
FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, "1-0-1")).toString()));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieTestTableException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public List<FileStatus> listAllFiles(String partitionPath) throws IOException {
|
||||
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString()));
|
||||
}
|
||||
|
||||
public List<FileStatus> listAllFilesInTempFolder() throws IOException {
|
||||
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString()));
|
||||
}
|
||||
|
||||
public static class HoodieTestTableException extends RuntimeException {
|
||||
public HoodieTestTableException(Throwable t) {
|
||||
super(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -100,7 +100,6 @@ import static org.junit.jupiter.api.Assertions.fail;
|
||||
*/
|
||||
public class HoodieTestUtils {
|
||||
|
||||
public static final String TEST_EXTENSION = ".test";
|
||||
public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
|
||||
public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
|
||||
public static final int DEFAULT_LOG_VERSION = 1;
|
||||
@@ -167,6 +166,9 @@ public class HoodieTestUtils {
|
||||
return COMMIT_FORMATTER.format(new Date());
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static void createCommitFiles(String basePath, String... instantTimes) throws IOException {
|
||||
for (String instantTime : instantTimes) {
|
||||
new File(
|
||||
@@ -177,20 +179,6 @@ public class HoodieTestUtils {
|
||||
+ HoodieTimeline.makeInflightCommitFileName(instantTime)).createNewFile();
|
||||
new File(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime))
|
||||
.createNewFile();
|
||||
}
|
||||
}
|
||||
|
||||
public static void createDeltaCommitFiles(String basePath, String... instantTimes) throws IOException {
|
||||
for (String instantTime : instantTimes) {
|
||||
new File(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
||||
+ HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile();
|
||||
new File(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
||||
+ HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile();
|
||||
new File(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(instantTime))
|
||||
.createNewFile();
|
||||
}
|
||||
}
|
||||
@@ -199,6 +187,9 @@ public class HoodieTestUtils {
|
||||
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static void createInflightCommitFiles(String basePath, String... instantTimes) throws IOException {
|
||||
|
||||
for (String instantTime : instantTimes) {
|
||||
@@ -212,11 +203,12 @@ public class HoodieTestUtils {
|
||||
public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) {
|
||||
for (String instantTime : instantTimes) {
|
||||
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime),
|
||||
HoodieTimeline.makeInflightCleanerFileName(instantTime)).forEach(f -> {
|
||||
HoodieTimeline.makeInflightCleanerFileName(instantTime))
|
||||
.forEach(f -> {
|
||||
FSDataOutputStream os = null;
|
||||
try {
|
||||
Path commitFile = new Path(
|
||||
metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
|
||||
Path commitFile = new Path(Paths
|
||||
.get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
|
||||
os = metaClient.getFs().create(commitFile, true);
|
||||
// Write empty clean metadata
|
||||
os.write(TimelineMetadataUtils.serializeCleanerPlan(
|
||||
@@ -239,11 +231,12 @@ public class HoodieTestUtils {
|
||||
|
||||
public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) {
|
||||
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime),
|
||||
HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> {
|
||||
HoodieTimeline.makeInflightCleanerFileName(commitTime))
|
||||
.forEach(f -> {
|
||||
FSDataOutputStream os = null;
|
||||
try {
|
||||
Path commitFile = new Path(
|
||||
metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
|
||||
Path commitFile = new Path(Paths
|
||||
.get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
|
||||
os = metaClient.getFs().create(commitFile, true);
|
||||
// Write empty clean metadata
|
||||
os.write(new byte[0]);
|
||||
@@ -261,18 +254,18 @@ public class HoodieTestUtils {
|
||||
});
|
||||
}
|
||||
|
||||
public static String createNewDataFile(String basePath, String partitionPath, String instantTime)
|
||||
throws IOException {
|
||||
String fileID = UUID.randomUUID().toString();
|
||||
return createDataFile(basePath, partitionPath, instantTime, fileID);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static String createNewDataFile(String basePath, String partitionPath, String instantTime, long length)
|
||||
throws IOException {
|
||||
String fileID = UUID.randomUUID().toString();
|
||||
return createDataFileFixLength(basePath, partitionPath, instantTime, fileID, length);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static String createDataFile(String basePath, String partitionPath, String instantTime, String fileID)
|
||||
throws IOException {
|
||||
String folderPath = basePath + "/" + partitionPath + "/";
|
||||
@@ -281,7 +274,7 @@ public class HoodieTestUtils {
|
||||
return fileID;
|
||||
}
|
||||
|
||||
public static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID,
|
||||
private static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID,
|
||||
long length) throws IOException {
|
||||
String folderPath = basePath + "/" + partitionPath + "/";
|
||||
Files.createDirectories(Paths.get(folderPath));
|
||||
@@ -293,6 +286,9 @@ public class HoodieTestUtils {
|
||||
return fileID;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String instantTime,
|
||||
String fileID, Option<Integer> version) throws IOException {
|
||||
String folderPath = basePath + "/" + partitionPath + "/";
|
||||
@@ -309,17 +305,6 @@ public class HoodieTestUtils {
|
||||
return fileID;
|
||||
}
|
||||
|
||||
public static void createCompactionCommitFiles(FileSystem fs, String basePath, String... instantTimes)
|
||||
throws IOException {
|
||||
for (String instantTime : instantTimes) {
|
||||
boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
||||
+ HoodieTimeline.makeCommitFileName(instantTime)));
|
||||
if (!createFile) {
|
||||
throw new IOException("cannot create commit file for commit " + instantTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant,
|
||||
List<Pair<String, FileSlice>> fileSliceList) throws IOException {
|
||||
HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty());
|
||||
@@ -328,10 +313,16 @@ public class HoodieTestUtils {
|
||||
TimelineMetadataUtils.serializeCompactionPlan(plan));
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static String getDataFilePath(String basePath, String partitionPath, String instantTime, String fileID) {
|
||||
return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static String getLogFilePath(String basePath, String partitionPath, String instantTime, String fileID,
|
||||
Option<Integer> version) {
|
||||
return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", instantTime,
|
||||
@@ -342,32 +333,39 @@ public class HoodieTestUtils {
|
||||
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static String getInflightCommitFilePath(String basePath, String instantTime) {
|
||||
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime
|
||||
+ HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static String getRequestedCompactionFilePath(String basePath, String instantTime) {
|
||||
return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instantTime
|
||||
+ HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static boolean doesDataFileExist(String basePath, String partitionPath, String instantTime,
|
||||
String fileID) {
|
||||
return new File(getDataFilePath(basePath, partitionPath, instantTime, fileID)).exists();
|
||||
}
|
||||
|
||||
public static boolean doesLogFileExist(String basePath, String partitionPath, String instantTime, String fileID,
|
||||
Option<Integer> version) {
|
||||
return new File(getLogFilePath(basePath, partitionPath, instantTime, fileID, version)).exists();
|
||||
}
|
||||
|
||||
public static boolean doesCommitExist(String basePath, String instantTime) {
|
||||
return new File(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION)
|
||||
.exists();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link HoodieTestTable} instead.
|
||||
*/
|
||||
public static boolean doesInflightExist(String basePath, String instantTime) {
|
||||
return new File(
|
||||
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_EXTENSION)
|
||||
|
||||
Reference in New Issue
Block a user