1
0

Revert "[HUDI-781] Introduce HoodieTestTable for test preparation (#1871)"

This reverts commit b2e703d442.
This commit is contained in:
Balaji Varadarajan
2020-08-10 22:13:02 -07:00
parent 9c24151929
commit 626f78f6f6
19 changed files with 470 additions and 645 deletions

View File

@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.block.HoodieDataBlock; import org.apache.hudi.common.table.log.block.HoodieDataBlock;

View File

@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;

View File

@@ -24,7 +24,6 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
@@ -34,13 +33,13 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;

View File

@@ -9,15 +9,14 @@
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing, software
* software distributed under the License is distributed on an * distributed under the License is distributed on an "AS IS" BASIS,
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* KIND, either express or implied. See the License for the * See the License for the specific language governing permissions and
* specific language governing permissions and limitations * limitations under the License.
* under the License.
*/ */
package org.apache.hudi.common.model; package org.apache.hudi.io;
/** /**
* Types of lower level I/O operations done on each file slice. * Types of lower level I/O operations done on each file slice.

View File

@@ -18,27 +18,26 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOType;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;

View File

@@ -18,10 +18,10 @@
package org.apache.hudi.table.action.rollback; package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -29,21 +29,19 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import scala.Tuple2;
/** /**
* Performs rollback using marker files generated during the write.. * Performs rollback using marker files generated during the write..
*/ */

View File

@@ -21,13 +21,13 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper; import org.apache.hudi.table.action.rollback.ListingBasedRollbackHelper;

View File

@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -48,6 +47,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.commit.WriteHelper; import org.apache.hudi.table.action.commit.WriteHelper;

View File

@@ -35,7 +35,6 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -44,8 +43,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator; import org.apache.hudi.common.table.timeline.versioning.clean.CleanMetadataMigrator;
import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CollectionUtils;
@@ -57,16 +56,13 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@@ -486,97 +482,125 @@ public class TestCleaner extends HoodieClientTestBase {
* Test HoodieTable.clean() Cleaning by versions logic. * Test HoodieTable.clean() Cleaning by versions logic.
*/ */
@Test @Test
public void testKeepLatestFileVersions() throws Exception { public void testKeepLatestFileVersions() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build(); .build();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String p0 = "2020/01/01";
String p1 = "2020/01/02";
// make 1 commit, with 1 file per partition // make 1 commit, with 1 file per partition
Map<String, String> partitionAndFileId000 = testTable.addCommit("000").withInserts(p0, p1); HoodieTestUtils.createCommitFiles(basePath, "000");
String file1P0C0 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
String file1P1C0 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files"); assertEquals(0, hoodieCleanStatsOne.size(), "Must not clean any files");
assertTrue(testTable.filesExist(partitionAndFileId000, "000")); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
file1P1C0));
// make next commit, with 1 insert & 1 update per partition // make next commit, with 1 insert & 1 update per partition
String file1P0C0 = partitionAndFileId000.get(p0); HoodieTestUtils.createCommitFiles(basePath, "001");
String file1P1C0 = partitionAndFileId000.get(p1); metaClient = HoodieTableMetaClient.reload(metaClient);
Map<String, String> partitionAndFileId001 = testTable.addCommit("001")
.withUpdates(p0, file1P0C0) String file2P0C1 =
.withUpdates(p1, file1P1C0) HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
.withInserts(p0, p1); String file2P1C1 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
assertEquals(1, assertEquals(1,
getCleanStat(hoodieCleanStatsTwo, p0).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size(), "Must clean 1 file"); .size(), "Must clean 1 file");
assertEquals(1, assertEquals(1,
getCleanStat(hoodieCleanStatsTwo, p1).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsTwo, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
.size(), "Must clean 1 file"); .size(), "Must clean 1 file");
String file2P0C1 = partitionAndFileId001.get(p0); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
String file2P1C1 = partitionAndFileId001.get(p1); file2P0C1));
assertTrue(testTable.fileExists(p0, "001", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
assertTrue(testTable.fileExists(p1, "001", file2P1C1)); file2P1C1));
assertFalse(testTable.fileExists(p0, "000", file1P0C0)); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
assertFalse(testTable.fileExists(p1, "000", file1P1C0)); file1P0C0));
assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
"000", file1P1C0));
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
String file3P0C2 = testTable.addCommit("002") HoodieTestUtils.createCommitFiles(basePath, "002");
.withUpdates(p0, file1P0C0, file2P0C1) metaClient = HoodieTableMetaClient.reload(metaClient);
.withInserts(p0, "002").get(p0);
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update
String file3P0C2 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
assertEquals(2, assertEquals(2,
getCleanStat(hoodieCleanStatsThree, p0) getCleanStat(hoodieCleanStatsThree, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.getSuccessDeleteFiles().size(), "Must clean two files"); .getSuccessDeleteFiles().size(), "Must clean two files");
assertFalse(testTable.fileExists(p0, "001", file1P0C0)); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
assertFalse(testTable.fileExists(p0, "001", file2P0C1)); file1P0C0));
assertTrue(testTable.fileExists(p0, "002", file3P0C2)); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
file3P0C2));
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
testTable.forCommit("003").withUpdates(p0, file3P0C2); HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file3P0C2); // update
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files"); assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
assertTrue(testTable.fileExists(p0, "003", file3P0C2)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
file3P0C2));
} }
/** /**
* Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files. * Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files.
*/ */
@Test @Test
public void testKeepLatestFileVersionsMOR() throws Exception { public void testKeepLatestFileVersionsMOR() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build(); .build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String p0 = "2020/01/01";
// Make 3 files, one base file and 2 log files associated with base file // Make 3 files, one base file and 2 log files associated with base file
String file1P0 = testTable.addDeltaCommit("000").withInserts(p0).get(p0); String file1P0 =
testTable.forDeltaCommit("000") HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
.withLogFile(p0, file1P0, 1) String file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath,
.withLogFile(p0, file1P0, 2); HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.empty());
HoodieTestUtils.createNewLogFile(fs, basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000", file1P0, Option.of(2));
// make 1 compaction commit
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "000");
// Make 2 files, one base file and 1 log files associated with base file // Make 4 files, one base file and 3 log files associated with base file
testTable.addDeltaCommit("001") HoodieTestUtils.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0);
.withUpdates(p0, file1P0) file2P0L0 = HoodieTestUtils.createNewLogFile(fs, basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
.withLogFile(p0, file1P0, 3); "001", file1P0, Option.of(3));
// make 1 compaction commit
HoodieTestUtils.createCompactionCommitFiles(fs, basePath, "001");
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config); List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
assertEquals(3, assertEquals(3,
getCleanStat(hoodieCleanStats, p0).getSuccessDeleteFiles() getCleanStat(hoodieCleanStats, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size(), "Must clean three files, one parquet and 2 log files"); .size(), "Must clean three files, one parquet and 2 log files");
assertFalse(testTable.fileExists(p0, "000", file1P0)); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2)); file1P0));
assertTrue(testTable.fileExists(p0, "001", file1P0)); assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
assertTrue(testTable.logFileExists(p0, "001", file1P0, 3)); file2P0L0, Option.empty()));
assertFalse(HoodieTestUtils.doesLogFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
file2P0L0, Option.of(2)));
} }
@Test @Test
@@ -628,33 +652,33 @@ public class TestCleaner extends HoodieClientTestBase {
); );
metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1); metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1);
// Now upgrade and check // NOw upgrade and check
CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient);
metadata = metadataMigrator.upgradeToLatest(metadata, metadata.getVersion()); metadata = metadataMigrator.upgradeToLatest(metadata, metadata.getVersion());
assertCleanMetadataPathEquals(newExpected, metadata); testCleanMetadataPathEquality(metadata, newExpected);
CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient); CleanMetadataMigrator migrator = new CleanMetadataMigrator(metaClient);
HoodieCleanMetadata oldMetadata = HoodieCleanMetadata oldMetadata =
migrator.migrateToVersion(metadata, metadata.getVersion(), CleanerUtils.CLEAN_METADATA_VERSION_1); migrator.migrateToVersion(metadata, metadata.getVersion(), CleanerUtils.CLEAN_METADATA_VERSION_1);
assertEquals(CleanerUtils.CLEAN_METADATA_VERSION_1, oldMetadata.getVersion()); assertEquals(CleanerUtils.CLEAN_METADATA_VERSION_1, oldMetadata.getVersion());
assertCleanMetadataEquals(metadata, oldMetadata); testCleanMetadataEquality(metadata, oldMetadata);
assertCleanMetadataPathEquals(oldExpected, oldMetadata); testCleanMetadataPathEquality(oldMetadata, oldExpected);
HoodieCleanMetadata newMetadata = migrator.upgradeToLatest(oldMetadata, oldMetadata.getVersion()); HoodieCleanMetadata newMetadata = migrator.upgradeToLatest(oldMetadata, oldMetadata.getVersion());
assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, newMetadata.getVersion()); assertEquals(CleanerUtils.LATEST_CLEAN_METADATA_VERSION, newMetadata.getVersion());
assertCleanMetadataEquals(oldMetadata, newMetadata); testCleanMetadataEquality(oldMetadata, newMetadata);
assertCleanMetadataPathEquals(newExpected, newMetadata); testCleanMetadataPathEquality(newMetadata, newExpected);
assertCleanMetadataPathEquals(oldExpected, oldMetadata); testCleanMetadataPathEquality(oldMetadata, oldExpected);
} }
private static void assertCleanMetadataEquals(HoodieCleanMetadata expected, HoodieCleanMetadata actual) { public void testCleanMetadataEquality(HoodieCleanMetadata input1, HoodieCleanMetadata input2) {
assertEquals(expected.getEarliestCommitToRetain(), actual.getEarliestCommitToRetain()); assertEquals(input1.getEarliestCommitToRetain(), input2.getEarliestCommitToRetain());
assertEquals(expected.getStartCleanTime(), actual.getStartCleanTime()); assertEquals(input1.getStartCleanTime(), input2.getStartCleanTime());
assertEquals(expected.getTimeTakenInMillis(), actual.getTimeTakenInMillis()); assertEquals(input1.getTimeTakenInMillis(), input2.getTimeTakenInMillis());
assertEquals(expected.getTotalFilesDeleted(), actual.getTotalFilesDeleted()); assertEquals(input1.getTotalFilesDeleted(), input2.getTotalFilesDeleted());
Map<String, HoodieCleanPartitionMetadata> map1 = expected.getPartitionMetadata(); Map<String, HoodieCleanPartitionMetadata> map1 = input1.getPartitionMetadata();
Map<String, HoodieCleanPartitionMetadata> map2 = actual.getPartitionMetadata(); Map<String, HoodieCleanPartitionMetadata> map2 = input2.getPartitionMetadata();
assertEquals(map1.keySet(), map2.keySet()); assertEquals(map1.keySet(), map2.keySet());
@@ -669,7 +693,7 @@ public class TestCleaner extends HoodieClientTestBase {
assertEquals(policies1, policies2); assertEquals(policies1, policies2);
} }
private static void assertCleanMetadataPathEquals(Map<String, Tuple3> expected, HoodieCleanMetadata metadata) { private void testCleanMetadataPathEquality(HoodieCleanMetadata metadata, Map<String, Tuple3> expected) {
Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = metadata.getPartitionMetadata(); Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = metadata.getPartitionMetadata();
@@ -683,40 +707,54 @@ public class TestCleaner extends HoodieClientTestBase {
} }
} }
private static Stream<Arguments> argumentsForTestKeepLatestCommits() { /**
return Stream.of( * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
Arguments.of(false, false), */
Arguments.of(true, false), @Test
Arguments.of(false, true) public void testKeepLatestCommits() throws IOException {
); testKeepLatestCommits(false, false);
} }
/** /**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated
* such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds.
*/ */
@ParameterizedTest @Test
@MethodSource("argumentsForTestKeepLatestCommits") public void testKeepLatestCommitsWithFailureRetry() throws IOException {
public void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws Exception { testKeepLatestCommits(true, false);
}
/**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
*/
@Test
public void testKeepLatestCommitsIncrMode() throws IOException {
testKeepLatestCommits(false, true);
}
/**
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files.
*/
private void testKeepLatestCommits(boolean simulateFailureRetry, boolean enableIncrementalClean) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean) .withIncrementalCleaningMode(enableIncrementalClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build(); .build();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String p0 = "2020/01/01";
String p1 = "2020/01/02";
// make 1 commit, with 1 file per partition // make 1 commit, with 1 file per partition
Map<String, String> partitionAndFileId000 = testTable.addInflightCommit("000").withInserts(p0, p1); HoodieTestUtils.createInflightCommitFiles(basePath, "000");
String file1P0C0 = partitionAndFileId000.get(p0);
String file1P1C0 = partitionAndFileId000.get(p1); String file1P0C0 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
String file1P1C0 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
HoodieCommitMetadata commitMetadata = generateCommitMetadata( HoodieCommitMetadata commitMetadata = generateCommitMetadata(
Collections.unmodifiableMap(new HashMap<String, List<String>>() { Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{ {
put(p0, CollectionUtils.createImmutableList(file1P0C0)); put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0));
put(p1, CollectionUtils.createImmutableList(file1P1C0)); put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0));
} }
}) })
); );
@@ -728,20 +766,29 @@ public class TestCleaner extends HoodieClientTestBase {
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files");
assertTrue(testTable.fileExists(p0, "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
assertTrue(testTable.fileExists(p1, "000", file1P1C0)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
file1P1C0));
// make next commit, with 1 insert & 1 update per partition // make next commit, with 1 insert & 1 update per partition
Map<String, String> partitionAndFileId001 = testTable.addInflightCommit("001").withInserts(p0, p1); HoodieTestUtils.createInflightCommitFiles(basePath, "001");
String file2P0C1 = partitionAndFileId001.get(p0); metaClient = HoodieTableMetaClient.reload(metaClient);
String file2P1C1 = partitionAndFileId001.get(p1);
testTable.forCommit("001") String file2P0C1 =
.withUpdates(p0, file1P0C0) HoodieTestUtils
.withUpdates(p1, file1P1C0); .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
String file2P1C1 =
HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001"); // insert
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001", file1P0C0); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001", file1P1C0); // update
commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() { commitMetadata = generateCommitMetadata(new HashMap<String, List<String>>() {
{ {
put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1)); put(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1)); put(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
} }
}); });
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
@@ -749,18 +796,28 @@ public class TestCleaner extends HoodieClientTestBase {
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files");
assertTrue(testTable.fileExists(p0, "001", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
assertTrue(testTable.fileExists(p1, "001", file2P1C1)); file2P0C1));
assertTrue(testTable.fileExists(p0, "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
assertTrue(testTable.fileExists(p1, "000", file1P1C0)); file2P1C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
file1P1C0));
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
String file3P0C2 = testTable.addInflightCommit("002") HoodieTestUtils.createInflightCommitFiles(basePath, "002");
.withUpdates(p0, file1P0C0) metaClient = HoodieTableMetaClient.reload(metaClient);
.withUpdates(p0, file2P0C1)
.withInserts(p0).get(p0); HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file2P0C1); // update
String file3P0C2 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002");
commitMetadata = generateCommitMetadata(CollectionUtils commitMetadata = generateCommitMetadata(CollectionUtils
.createImmutableMap(p0, .createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2))); CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file3P0C2)));
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "002"),
@@ -769,35 +826,49 @@ public class TestCleaner extends HoodieClientTestBase {
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, simulateFailureRetry);
assertEquals(0, hoodieCleanStatsThree.size(), assertEquals(0, hoodieCleanStatsThree.size(),
"Must not clean any file. We have to keep 1 version before the latest commit time to keep"); "Must not clean any file. We have to keep 1 version before the latest commit time to keep");
assertTrue(testTable.fileExists(p0, "000", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
file1P0C0));
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
String file4P0C3 = testTable.addInflightCommit("003") HoodieTestUtils.createInflightCommitFiles(basePath, "003");
.withUpdates(p0, file1P0C0) metaClient = HoodieTableMetaClient.reload(metaClient);
.withUpdates(p0, file2P0C1)
.withInserts(p0).get(p0); HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file2P0C1); // update
String file4P0C3 =
HoodieTestUtils.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003");
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap( commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(
p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3))); HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, file4P0C3)));
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"), new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "003"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, simulateFailureRetry);
assertEquals(1, assertEquals(1,
getCleanStat(hoodieCleanStatsFour, p0).getSuccessDeleteFiles() getCleanStat(hoodieCleanStatsFour, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
.size(), "Must not clean one old file"); .size(), "Must not clean one old file");
assertFalse(testTable.fileExists(p0, "000", file1P0C0)); assertFalse(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
assertTrue(testTable.fileExists(p0, "001", file1P0C0)); file1P0C0));
assertTrue(testTable.fileExists(p0, "002", file1P0C0)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
assertTrue(testTable.fileExists(p0, "001", file2P0C1)); file1P0C0));
assertTrue(testTable.fileExists(p0, "002", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
assertTrue(testTable.fileExists(p0, "002", file3P0C2)); file1P0C0));
assertTrue(testTable.fileExists(p0, "003", file4P0C3)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002",
file3P0C2));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003",
file4P0C3));
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
testTable.forCommit("004").withUpdates(p0, file3P0C2); HoodieTestUtils
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(p0, .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "004", file3P0C2); // update
commitMetadata = generateCommitMetadata(CollectionUtils.createImmutableMap(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
CollectionUtils.createImmutableList(file3P0C2))); CollectionUtils.createImmutableList(file3P0C2)));
metaClient.getActiveTimeline().createNewInstant( metaClient.getActiveTimeline().createNewInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004")); new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"));
@@ -805,40 +876,41 @@ public class TestCleaner extends HoodieClientTestBase {
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"), new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "004"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry); List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, simulateFailureRetry);
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, p0); HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
assertNull(cleanStat, "Must not clean any files"); assertEquals(0,
assertTrue(testTable.fileExists(p0, "001", file1P0C0)); cleanStat != null ? cleanStat.getSuccessDeleteFiles().size() : 0, "Must not clean any files");
assertTrue(testTable.fileExists(p0, "001", file2P0C1)); assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
assertTrue(testTable.fileExists(p0, "004", file3P0C2)); file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
file2P0C1));
} }
/** /**
* Test Cleaning functionality of table.rollback() API. * Test Cleaning functionality of table.rollback() API.
*/ */
@Test @Test
public void testCleanMarkerDataFilesOnRollback() throws Exception { public void testCleanMarkerDataFilesOnRollback() throws IOException {
HoodieTestTable testTable = HoodieTestTable.of(metaClient) List<String> markerFiles = createMarkerFiles("000", 10);
.addRequestedCommit("000") assertEquals(10, markerFiles.size(), "Some marker files are created.");
.withMarkerFiles("default", 10, IOType.MERGE); assertEquals(markerFiles.size(), getTotalTempFiles(), "Some marker files are created.");
final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size();
assertEquals(10, numTempFilesBefore, "Some marker files are created.");
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf); HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, "000"));
table.getActiveTimeline().transitionRequestedToInflight( table.getActiveTimeline().transitionRequestedToInflight(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty()); new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
metaClient.reloadActiveTimeline(); metaClient.reloadActiveTimeline();
table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true); table.rollback(jsc, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size(); assertEquals(0, getTotalTempFiles(), "All temp files are deleted.");
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
} }
/** /**
* Test CLeaner Stat when there are no partition paths. * Test CLeaner Stat when there are no partition paths.
*/ */
@Test @Test
public void testCleaningWithZeroPartitionPaths() throws Exception { public void testCleaningWithZeroPartitionPaths() throws IOException {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
@@ -847,7 +919,9 @@ public class TestCleaner extends HoodieClientTestBase {
// Make a commit, although there are no partitionPaths. // Make a commit, although there are no partitionPaths.
// Example use-case of this is when a client wants to create a table // Example use-case of this is when a client wants to create a table
// with just some commit metadata, but no data/partitionPaths. // with just some commit metadata, but no data/partitionPaths.
HoodieTestTable.of(metaClient).addCommit("000"); HoodieTestUtils.createCommitFiles(basePath, "000");
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config); List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config);
assertTrue(hoodieCleanStatsOne.isEmpty(), "HoodieCleanStats should be empty for a table with empty partitionPaths"); assertTrue(hoodieCleanStatsOne.isEmpty(), "HoodieCleanStats should be empty for a table with empty partitionPaths");
@@ -878,9 +952,21 @@ public class TestCleaner extends HoodieClientTestBase {
* Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated * Test HoodieTable.clean() Cleaning by commit logic for MOR table with Log files. Here the operations are simulated
* such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds. * such that first clean attempt failed after files were cleaned and a subsequent cleanup succeeds.
*/ */
@ParameterizedTest @Test
@ValueSource(booleans = {false, true}) public void testKeepLatestVersionsWithPendingCompactions() throws IOException {
public void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException { testKeepLatestVersionsWithPendingCompactions(false);
}
/**
* Test Keep Latest Versions when there are pending compactions.
*/
@Test
public void testKeepLatestVersionsWithPendingCompactionsAndFailureRetry() throws IOException {
testKeepLatestVersionsWithPendingCompactions(true);
}
private void testKeepLatestVersionsWithPendingCompactions(boolean retryFailure) throws IOException {
HoodieWriteConfig config = HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -1032,6 +1118,33 @@ public class TestCleaner extends HoodieClientTestBase {
"Correct number of files under compaction deleted"); "Correct number of files under compaction deleted");
} }
/**
* Utility method to create temporary data files.
*
* @param instantTime Commit Timestamp
* @param numFiles Number for files to be generated
* @return generated files
* @throws IOException in case of error
*/
private List<String> createMarkerFiles(String instantTime, int numFiles) throws IOException {
List<String> files = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
files.add(HoodieClientTestUtils.createNewMarkerFile(basePath, "2019/03/29", instantTime));
}
return files;
}
/***
* Helper method to return temporary files count.
*
* @return Number of temporary files found
* @throws IOException in case of error
*/
private int getTotalTempFiles() throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME))
.size();
}
private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient, private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(final HoodieTableMetaClient metaClient,
List<String> paths) { List<String> paths) {
Predicate<String> roFilePredicate = Predicate<String> roFilePredicate =

View File

@@ -22,8 +22,8 @@ import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard; import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.fs.OptimisticConsistencyGuard; import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@@ -66,9 +66,9 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@ParameterizedTest @ParameterizedTest
@MethodSource("consistencyGuardType") @MethodSource("consistencyGuardType")
public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception { public void testCheckPassingAppearAndDisAppear(String consistencyGuardType) throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f2"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f3"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000); ConsistencyGuardConfig config = getConsistencyGuardConfig(1, 1000, 1000);
ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName()) ConsistencyGuard passing = consistencyGuardType.equals(FailSafeConsistencyGuard.class.getName())
@@ -88,7 +88,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test @Test
public void testCheckFailingAppearFailSafe() throws Exception { public void testCheckFailingAppearFailSafe() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> { assertThrows(TimeoutException.class, () -> {
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
@@ -98,7 +98,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test @Test
public void testCheckFailingAppearTimedWait() throws Exception { public void testCheckFailingAppearTimedWait() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays passing.waitTillAllFilesAppear(basePath + "/partition/path", Arrays
.asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); .asList(basePath + "/partition/path/f1_1-0-2_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
@@ -106,7 +106,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test @Test
public void testCheckFailingAppearsFailSafe() throws Exception { public void testCheckFailingAppearsFailSafe() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> { assertThrows(TimeoutException.class, () -> {
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
@@ -115,14 +115,14 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test @Test
public void testCheckFailingAppearsTimedWait() throws Exception { public void testCheckFailingAppearsTimedWait() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet")); passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
} }
@Test @Test
public void testCheckFailingDisappearFailSafe() throws Exception { public void testCheckFailingDisappearFailSafe() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> { assertThrows(TimeoutException.class, () -> {
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
@@ -132,7 +132,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test @Test
public void testCheckFailingDisappearTimedWait() throws Exception { public void testCheckFailingDisappearTimedWait() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays passing.waitTillAllFilesDisappear(basePath + "/partition/path", Arrays
.asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet")); .asList(basePath + "/partition/path/f1_1-0-1_000.parquet", basePath + "/partition/path/f2_1-0-2_000.parquet"));
@@ -140,8 +140,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test @Test
public void testCheckFailingDisappearsFailSafe() throws Exception { public void testCheckFailingDisappearsFailSafe() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig()); ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, getConsistencyGuardConfig());
assertThrows(TimeoutException.class, () -> { assertThrows(TimeoutException.class, () -> {
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
@@ -150,8 +150,8 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@Test @Test
public void testCheckFailingDisappearsTimedWait() throws Exception { public void testCheckFailingDisappearsTimedWait() throws Exception {
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
FileCreateUtils.createDataFile(basePath, "partition/path", "000", "f1"); HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig()); ConsistencyGuard passing = new OptimisticConsistencyGuard(fs, getConsistencyGuardConfig());
passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet")); passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
} }

View File

@@ -18,17 +18,17 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.IOType;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;

View File

@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
@@ -35,6 +34,7 @@ import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@@ -73,8 +73,8 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
.insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build()) .insertSplitSize(100).autoTuneInsertSplits(autoSplitInserts).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build(); .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
FileCreateUtils.createCommit(basePath, "001"); HoodieClientTestUtils.fakeCommit(basePath, "001");
FileCreateUtils.createDataFile(basePath, testPartitionPath, "001", "file1", fileSize); HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
@@ -193,7 +193,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
.insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build(); .insertSplitSize(totalInsertNum / 2).autoTuneInsertSplits(false).build()).build();
FileCreateUtils.createCommit(basePath, "001"); HoodieClientTestUtils.fakeCommit(basePath, "001");
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf); HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath}); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});

View File

@@ -53,9 +53,6 @@ import org.junit.jupiter.api.Test;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -169,9 +166,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file"); assertEquals(1, fileSlice.getLogFiles().count(), "There should be 1 log file written for every data file");
} }
} }
createDeltaCommit(basePath, newCommitTime); HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
createRequestedDeltaCommit(basePath, newCommitTime);
createInflightDeltaCommit(basePath, newCommitTime);
// Do a compaction // Do a compaction
table = HoodieTable.create(config, hadoopConf); table = HoodieTable.create(config, hadoopConf);

View File

@@ -20,14 +20,16 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.FileSystemTestUtils;
import org.apache.hudi.io.IOType;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -53,20 +55,38 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
cleanupResources(); cleanupResources();
} }
private void givenCommit0(boolean isDeltaCommit) throws Exception {
HoodieClientTestUtils.fakeDataFile(basePath, "partA", "000", "f2");
if (isDeltaCommit) {
HoodieClientTestUtils.fakeDeltaCommit(basePath, "000");
} else {
HoodieClientTestUtils.fakeCommit(basePath, "000");
}
}
private void givenInflightCommit1(boolean isDeltaCommit) throws Exception {
HoodieClientTestUtils.fakeDataFile(basePath, "partB", "001", "f1");
HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f1", IOType.CREATE);
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f3", IOType.CREATE);
if (isDeltaCommit) {
HoodieClientTestUtils.fakeLogFile(basePath, "partA", "001", "f2", 0);
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.APPEND);
HoodieClientTestUtils.createMarkerFile(basePath, "partB", "001", "f4", IOType.APPEND);
HoodieClientTestUtils.fakeInflightDeltaCommit(basePath, "001");
} else {
HoodieClientTestUtils.fakeDataFile(basePath, "partA", "001", "f2");
HoodieClientTestUtils.createMarkerFile(basePath, "partA", "001", "f2", IOType.MERGE);
HoodieClientTestUtils.fakeInFlightCommit(basePath, "001");
}
}
@Test @Test
public void testCopyOnWriteRollback() throws Exception { public void testCopyOnWriteRollback() throws Exception {
// given: wrote some base files and corresponding markers // given: wrote some base files and corresponding markers
HoodieTestTable testTable = HoodieTestTable.of(metaClient); givenCommit0(false);
String f0 = testTable.addRequestedCommit("000") givenInflightCommit1(false);
.withInserts("partA").get("partA");
String f1 = testTable.addCommit("001")
.withUpdates("partA", f0)
.withInserts("partB").get("partB");
String f2 = "f2";
testTable.forCommit("001")
.withMarkerFile("partA", f0, IOType.MERGE)
.withMarkerFile("partB", f1, IOType.CREATE)
.withMarkerFile("partA", f2, IOType.CREATE);
// when // when
List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002") List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002")
@@ -75,8 +95,8 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
// then: ensure files are deleted correctly, non-existent files reported as failed deletes // then: ensure files are deleted correctly, non-existent files reported as failed deletes
assertEquals(2, stats.size()); assertEquals(2, stats.size());
List<FileStatus> partAFiles = testTable.listAllFiles("partA"); List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA"));
List<FileStatus> partBFiles = testTable.listAllFiles("partB"); List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB"));
assertEquals(0, partBFiles.size()); assertEquals(0, partBFiles.size());
assertEquals(1, partAFiles.size()); assertEquals(1, partAFiles.size());
@@ -87,19 +107,8 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
@Test @Test
public void testMergeOnReadRollback() throws Exception { public void testMergeOnReadRollback() throws Exception {
// given: wrote some base + log files and corresponding markers // given: wrote some base + log files and corresponding markers
HoodieTestTable testTable = HoodieTestTable.of(metaClient); givenCommit0(true);
String f2 = testTable.addRequestedDeltaCommit("000") givenInflightCommit1(true);
.withInserts("partA").get("partA");
String f1 = testTable.addDeltaCommit("001")
.withLogFile("partA", f2)
.withInserts("partB").get("partB");
String f3 = "f3";
String f4 = "f4";
testTable.forDeltaCommit("001")
.withMarkerFile("partB", f1, IOType.CREATE)
.withMarkerFile("partA", f3, IOType.CREATE)
.withMarkerFile("partA", f2, IOType.APPEND)
.withMarkerFile("partB", f4, IOType.APPEND);
// when // when
List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002") List<HoodieRollbackStat> stats = new MarkerBasedRollbackStrategy(HoodieTable.create(metaClient, getConfig(), hadoopConf), jsc, getConfig(), "002")
@@ -108,12 +117,12 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
// then: ensure files are deleted, rollback block is appended (even if append does not exist) // then: ensure files are deleted, rollback block is appended (even if append does not exist)
assertEquals(2, stats.size()); assertEquals(2, stats.size());
// will have the log file // will have the log file
List<FileStatus> partBFiles = testTable.listAllFiles("partB"); List<FileStatus> partBFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partB"));
assertEquals(1, partBFiles.size()); assertEquals(1, partBFiles.size());
assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())); assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
assertTrue(partBFiles.get(0).getLen() > 0); assertTrue(partBFiles.get(0).getLen() > 0);
List<FileStatus> partAFiles = testTable.listAllFiles("partA"); List<FileStatus> partAFiles = FileSystemTestUtils.listRecursive(fs, new Path(basePath + "/partA"));
assertEquals(3, partAFiles.size()); assertEquals(3, partAFiles.size());
assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count()); assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count()); assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());

View File

@@ -35,10 +35,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.IOType;
import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetWriter; import org.apache.hudi.io.storage.HoodieParquetWriter;
@@ -59,6 +59,7 @@ import org.apache.spark.sql.SQLContext;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@@ -73,6 +74,57 @@ import java.util.stream.Collectors;
public class HoodieClientTestUtils { public class HoodieClientTestUtils {
private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class); private static final Logger LOG = LogManager.getLogger(HoodieClientTestUtils.class);
public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
private static void fakeMetaFile(String basePath, String instantTime, String suffix) throws IOException {
String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME;
new File(parentPath).mkdirs();
new File(parentPath + "/" + instantTime + suffix).createNewFile();
}
public static void fakeCommit(String basePath, String instantTime) throws IOException {
fakeMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
}
public static void fakeDeltaCommit(String basePath, String instantTime) throws IOException {
fakeMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
}
public static void fakeInflightDeltaCommit(String basePath, String instantTime) throws IOException {
fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
}
public static void fakeInFlightCommit(String basePath, String instantTime) throws IOException {
fakeMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_EXTENSION);
}
public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId)
throws Exception {
fakeDataFile(basePath, partitionPath, instantTime, fileId, 0);
}
public static void fakeDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length)
throws Exception {
String parentPath = String.format("%s/%s", basePath, partitionPath);
new File(parentPath).mkdirs();
String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
new File(path).createNewFile();
new RandomAccessFile(path, "rw").setLength(length);
}
public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version)
throws Exception {
fakeLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
}
public static void fakeLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length)
throws Exception {
String parentPath = String.format("%s/%s", basePath, partitionPath);
new File(parentPath).mkdirs();
String path = String.format("%s/%s", parentPath, FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1"));
new File(path).createNewFile();
new RandomAccessFile(path, "rw").setLength(length);
}
/** /**
* Returns a Spark config for this test. * Returns a Spark config for this test.
@@ -101,7 +153,7 @@ public class HoodieClientTestUtils {
return HoodieReadClient.addHoodieSupport(sparkConf); return HoodieReadClient.addHoodieSupport(sparkConf);
} }
private static HashMap<String, String> getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline, public static HashMap<String, String> getLatestFileIDsToFullPath(String basePath, HoodieTimeline commitTimeline,
List<HoodieInstant> commitsToReturn) throws IOException { List<HoodieInstant> commitsToReturn) throws IOException {
HashMap<String, String> fileIdToFullPath = new HashMap<>(); HashMap<String, String> fileIdToFullPath = new HashMap<>();
for (HoodieInstant commit : commitsToReturn) { for (HoodieInstant commit : commitsToReturn) {
@@ -175,8 +227,6 @@ public class HoodieClientTestUtils {
/** /**
* Find total basefiles for passed in paths. * Find total basefiles for passed in paths.
* <p>
* TODO move to {@link FileCreateUtils}.
*/ */
public static Map<String, Integer> getBaseFileCountForPaths(String basePath, FileSystem fs, public static Map<String, Integer> getBaseFileCountForPaths(String basePath, FileSystem fs,
String... paths) { String... paths) {
@@ -195,9 +245,6 @@ public class HoodieClientTestUtils {
} }
} }
/**
* TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
*/
public static String writeParquetFile(String basePath, String partitionPath, String filename, public static String writeParquetFile(String basePath, String partitionPath, String filename,
List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException { List<HoodieRecord> records, Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException {
@@ -231,9 +278,6 @@ public class HoodieClientTestUtils {
return filename; return filename;
} }
/**
* TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
*/
public static String writeParquetFile(String basePath, String partitionPath, List<HoodieRecord> records, public static String writeParquetFile(String basePath, String partitionPath, List<HoodieRecord> records,
Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException { Schema schema, BloomFilter filter, boolean createCommitTime) throws IOException, InterruptedException {
Thread.sleep(1000); Thread.sleep(1000);
@@ -245,9 +289,27 @@ public class HoodieClientTestUtils {
createCommitTime); createCommitTime);
} }
/** public static String createNewMarkerFile(String basePath, String partitionPath, String instantTime)
* TODO move to {@link FileCreateUtils}. throws IOException {
*/ return createMarkerFile(basePath, partitionPath, instantTime);
}
public static String createMarkerFile(String basePath, String partitionPath, String instantTime)
throws IOException {
return createMarkerFile(basePath, partitionPath, instantTime, UUID.randomUUID().toString(), IOType.MERGE);
}
public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType)
throws IOException {
String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + instantTime + "/" + partitionPath + "/";
new File(folderPath).mkdirs();
String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, DEFAULT_WRITE_TOKEN, instantTime,
HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType);
File f = new File(folderPath + markerFileName);
f.createNewFile();
return f.getAbsolutePath();
}
public static void createMarkerFile(String basePath, String partitionPath, String instantTime, String dataFileName) throws IOException { public static void createMarkerFile(String basePath, String partitionPath, String instantTime, String dataFileName) throws IOException {
createTempFolderForMarkerFiles(basePath); createTempFolderForMarkerFiles(basePath);
String folderPath = getTempFolderName(basePath); String folderPath = getTempFolderName(basePath);
@@ -256,9 +318,6 @@ public class HoodieClientTestUtils {
new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + dataFileName + ".marker.MERGE").createNewFile(); new File(folderPath + "/" + instantTime + "/" + partitionPath + "/" + dataFileName + ".marker.MERGE").createNewFile();
} }
/**
* TODO Incorporate into {@link org.apache.hudi.common.testutils.HoodieTestTable}.
*/
public static int getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime) { public static int getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime) {
String folderPath = getTempFolderName(basePath); String folderPath = getTempFolderName(basePath);
File markerDir = new File(folderPath + "/" + instantTime + "/" + partitionPath); File markerDir = new File(folderPath + "/" + instantTime + "/" + partitionPath);
@@ -268,11 +327,11 @@ public class HoodieClientTestUtils {
return 0; return 0;
} }
private static void createTempFolderForMarkerFiles(String basePath) { public static void createTempFolderForMarkerFiles(String basePath) {
new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs(); new File(basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME).mkdirs();
} }
private static String getTempFolderName(String basePath) { public static String getTempFolderName(String basePath) {
return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME; return basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME;
} }
} }

View File

@@ -1,113 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.testutils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class FileCreateUtils {
private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException {
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
Files.createDirectories(parentPath);
Path metaFilePath = parentPath.resolve(instantTime + suffix);
if (Files.notExists(metaFilePath)) {
Files.createFile(metaFilePath);
}
}
public static void createCommit(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.COMMIT_EXTENSION);
}
public static void createRequestedCommit(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_COMMIT_EXTENSION);
}
public static void createInflightCommit(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_COMMIT_EXTENSION);
}
public static void createDeltaCommit(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.DELTA_COMMIT_EXTENSION);
}
public static void createRequestedDeltaCommit(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_DELTA_COMMIT_EXTENSION);
}
public static void createInflightDeltaCommit(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_DELTA_COMMIT_EXTENSION);
}
public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId)
throws Exception {
createDataFile(basePath, partitionPath, instantTime, fileId, 0);
}
public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileId, long length)
throws Exception {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);
Path dataFilePath = parentPath.resolve(FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
if (Files.notExists(dataFilePath)) {
Files.createFile(dataFilePath);
}
new RandomAccessFile(dataFilePath.toFile(), "rw").setLength(length);
}
public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version)
throws Exception {
createLogFile(basePath, partitionPath, baseInstantTime, fileId, version, 0);
}
public static void createLogFile(String basePath, String partitionPath, String baseInstantTime, String fileId, int version, int length)
throws Exception {
Path parentPath = Paths.get(basePath, partitionPath);
Files.createDirectories(parentPath);
Path logFilePath = parentPath.resolve(FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseInstantTime, version, "1-0-1"));
if (Files.notExists(logFilePath)) {
Files.createFile(logFilePath);
}
new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length);
}
public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileID, IOType ioType)
throws IOException {
Path folderPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
Files.createDirectories(folderPath);
String markerFileName = String.format("%s_%s_%s%s%s.%s", fileID, "1-0-1", instantTime,
HoodieFileFormat.PARQUET.getFileExtension(), HoodieTableMetaClient.MARKER_EXTN, ioType);
Path markerFilePath = folderPath.resolve(markerFileName);
if (Files.notExists(markerFilePath)) {
Files.createFile(markerFilePath);
}
return markerFilePath.toAbsolutePath().toString();
}
}

View File

@@ -1,232 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.testutils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.IntStream;
import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
public class HoodieTestTable {
private final String basePath;
private final FileSystem fs;
private HoodieTableMetaClient metaClient;
private String currentInstantTime;
private HoodieTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient) {
ValidationUtils.checkArgument(Objects.equals(basePath, metaClient.getBasePath()));
ValidationUtils.checkArgument(Objects.equals(fs, metaClient.getRawFs()));
this.basePath = basePath;
this.fs = fs;
this.metaClient = metaClient;
}
public static HoodieTestTable of(HoodieTableMetaClient metaClient) {
return new HoodieTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient);
}
public HoodieTestTable addRequestedCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addRequestedDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addInflightCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
createInflightCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addInflightDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
createInflightDeltaCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addCommit(String instantTime) throws Exception {
createRequestedCommit(basePath, instantTime);
createInflightCommit(basePath, instantTime);
createCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable addDeltaCommit(String instantTime) throws Exception {
createRequestedDeltaCommit(basePath, instantTime);
createInflightDeltaCommit(basePath, instantTime);
createDeltaCommit(basePath, instantTime);
currentInstantTime = instantTime;
metaClient = HoodieTableMetaClient.reload(metaClient);
return this;
}
public HoodieTestTable forCommit(String instantTime) {
currentInstantTime = instantTime;
return this;
}
public HoodieTestTable forDeltaCommit(String instantTime) {
currentInstantTime = instantTime;
return this;
}
public HoodieTestTable withMarkerFile(String partitionPath, IOType ioType) throws IOException {
return withMarkerFile(partitionPath, UUID.randomUUID().toString(), ioType);
}
public HoodieTestTable withMarkerFile(String partitionPath, String fileId, IOType ioType) throws IOException {
createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType);
return this;
}
public HoodieTestTable withMarkerFiles(String partitionPath, int num, IOType ioType) throws IOException {
String[] fileIds = IntStream.range(0, num).mapToObj(i -> UUID.randomUUID().toString()).toArray(String[]::new);
return withMarkerFiles(partitionPath, fileIds, ioType);
}
public HoodieTestTable withMarkerFiles(String partitionPath, String[] fileIds, IOType ioType) throws IOException {
for (String fileId : fileIds) {
createMarkerFile(basePath, partitionPath, currentInstantTime, fileId, ioType);
}
return this;
}
/**
* Insert one base file to each of the given distinct partitions.
*
* @return A {@link Map} of partition and its newly inserted file's id.
*/
public Map<String, String> withInserts(String... partitions) throws Exception {
Map<String, String> partitionFileIdMap = new HashMap<>();
for (String p : partitions) {
String fileId = UUID.randomUUID().toString();
FileCreateUtils.createDataFile(basePath, p, currentInstantTime, fileId);
partitionFileIdMap.put(p, fileId);
}
return partitionFileIdMap;
}
public HoodieTestTable withUpdates(String partition, String... fileIds) throws Exception {
for (String f : fileIds) {
FileCreateUtils.createDataFile(basePath, partition, currentInstantTime, f);
}
return this;
}
public String withLogFile(String partitionPath) throws Exception {
String fileId = UUID.randomUUID().toString();
withLogFile(partitionPath, fileId);
return fileId;
}
public HoodieTestTable withLogFile(String partitionPath, String fileId) throws Exception {
return withLogFile(partitionPath, fileId, 0);
}
public HoodieTestTable withLogFile(String partitionPath, String fileId, int version) throws Exception {
FileCreateUtils.createLogFile(basePath, partitionPath, currentInstantTime, fileId, version);
return this;
}
public boolean filesExist(Map<String, String> partitionAndFileId, String instantTime) {
return partitionAndFileId.entrySet().stream().allMatch(entry -> {
String partition = entry.getKey();
String fileId = entry.getValue();
return fileExists(partition, instantTime, fileId);
});
}
public boolean filesExist(String partition, String instantTime, String... fileIds) {
return Arrays.stream(fileIds).allMatch(f -> fileExists(partition, instantTime, f));
}
public boolean fileExists(String partition, String instantTime, String fileId) {
try {
return fs.exists(new Path(Paths.get(basePath, partition,
FSUtils.makeDataFileName(instantTime, "1-0-1", fileId)).toString()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public boolean logFilesExist(String partition, String instantTime, String fileId, int... versions) {
return Arrays.stream(versions).allMatch(v -> logFileExists(partition, instantTime, fileId, v));
}
public boolean logFileExists(String partition, String instantTime, String fileId, int version) {
try {
return fs.exists(new Path(Paths.get(basePath, partition,
FSUtils.makeLogFileName(fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, version, "1-0-1")).toString()));
} catch (IOException e) {
throw new HoodieTestTableException(e);
}
}
public List<FileStatus> listAllFiles(String partitionPath) throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString()));
}
public List<FileStatus> listAllFilesInTempFolder() throws IOException {
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString()));
}
public static class HoodieTestTableException extends RuntimeException {
public HoodieTestTableException(Throwable t) {
super(t);
}
}
}

View File

@@ -99,6 +99,7 @@ import static org.junit.jupiter.api.Assertions.fail;
*/ */
public class HoodieTestUtils { public class HoodieTestUtils {
public static final String TEST_EXTENSION = ".test";
public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
public static final String DEFAULT_WRITE_TOKEN = "1-0-1"; public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
public static final int DEFAULT_LOG_VERSION = 1; public static final int DEFAULT_LOG_VERSION = 1;
@@ -165,9 +166,6 @@ public class HoodieTestUtils {
return COMMIT_FORMATTER.format(new Date()); return COMMIT_FORMATTER.format(new Date());
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static void createCommitFiles(String basePath, String... instantTimes) throws IOException { public static void createCommitFiles(String basePath, String... instantTimes) throws IOException {
for (String instantTime : instantTimes) { for (String instantTime : instantTimes) {
new File( new File(
@@ -182,13 +180,24 @@ public class HoodieTestUtils {
} }
} }
public static void createDeltaCommitFiles(String basePath, String... instantTimes) throws IOException {
for (String instantTime : instantTimes) {
new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeRequestedDeltaFileName(instantTime)).createNewFile();
new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeInflightDeltaFileName(instantTime)).createNewFile();
new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(instantTime))
.createNewFile();
}
}
public static void createMetadataFolder(String basePath) { public static void createMetadataFolder(String basePath) {
new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs();
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static void createInflightCommitFiles(String basePath, String... instantTimes) throws IOException { public static void createInflightCommitFiles(String basePath, String... instantTimes) throws IOException {
for (String instantTime : instantTimes) { for (String instantTime : instantTimes) {
@@ -202,12 +211,11 @@ public class HoodieTestUtils {
public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) { public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) {
for (String instantTime : instantTimes) { for (String instantTime : instantTimes) {
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime), Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime),
HoodieTimeline.makeInflightCleanerFileName(instantTime)) HoodieTimeline.makeInflightCleanerFileName(instantTime)).forEach(f -> {
.forEach(f -> {
FSDataOutputStream os = null; FSDataOutputStream os = null;
try { try {
Path commitFile = new Path(Paths Path commitFile = new Path(
.get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
os = metaClient.getFs().create(commitFile, true); os = metaClient.getFs().create(commitFile, true);
// Write empty clean metadata // Write empty clean metadata
os.write(TimelineMetadataUtils.serializeCleanerPlan( os.write(TimelineMetadataUtils.serializeCleanerPlan(
@@ -229,12 +237,11 @@ public class HoodieTestUtils {
public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) { public static void createCorruptedPendingCleanFiles(HoodieTableMetaClient metaClient, String commitTime) {
Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime), Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime),
HoodieTimeline.makeInflightCleanerFileName(commitTime)) HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> {
.forEach(f -> {
FSDataOutputStream os = null; FSDataOutputStream os = null;
try { try {
Path commitFile = new Path(Paths Path commitFile = new Path(
.get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString()); metaClient.getBasePath() + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
os = metaClient.getFs().create(commitFile, true); os = metaClient.getFs().create(commitFile, true);
// Write empty clean metadata // Write empty clean metadata
os.write(new byte[0]); os.write(new byte[0]);
@@ -252,18 +259,18 @@ public class HoodieTestUtils {
}); });
} }
/** public static String createNewDataFile(String basePath, String partitionPath, String instantTime)
* @deprecated Use {@link HoodieTestTable} instead. throws IOException {
*/ String fileID = UUID.randomUUID().toString();
return createDataFile(basePath, partitionPath, instantTime, fileID);
}
public static String createNewDataFile(String basePath, String partitionPath, String instantTime, long length) public static String createNewDataFile(String basePath, String partitionPath, String instantTime, long length)
throws IOException { throws IOException {
String fileID = UUID.randomUUID().toString(); String fileID = UUID.randomUUID().toString();
return createDataFileFixLength(basePath, partitionPath, instantTime, fileID, length); return createDataFileFixLength(basePath, partitionPath, instantTime, fileID, length);
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String createDataFile(String basePath, String partitionPath, String instantTime, String fileID) public static String createDataFile(String basePath, String partitionPath, String instantTime, String fileID)
throws IOException { throws IOException {
String folderPath = basePath + "/" + partitionPath + "/"; String folderPath = basePath + "/" + partitionPath + "/";
@@ -272,7 +279,7 @@ public class HoodieTestUtils {
return fileID; return fileID;
} }
private static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID, public static String createDataFileFixLength(String basePath, String partitionPath, String instantTime, String fileID,
long length) throws IOException { long length) throws IOException {
String folderPath = basePath + "/" + partitionPath + "/"; String folderPath = basePath + "/" + partitionPath + "/";
Files.createDirectories(Paths.get(folderPath)); Files.createDirectories(Paths.get(folderPath));
@@ -284,9 +291,6 @@ public class HoodieTestUtils {
return fileID; return fileID;
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String instantTime, public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String instantTime,
String fileID, Option<Integer> version) throws IOException { String fileID, Option<Integer> version) throws IOException {
String folderPath = basePath + "/" + partitionPath + "/"; String folderPath = basePath + "/" + partitionPath + "/";
@@ -303,6 +307,17 @@ public class HoodieTestUtils {
return fileID; return fileID;
} }
public static void createCompactionCommitFiles(FileSystem fs, String basePath, String... instantTimes)
throws IOException {
for (String instantTime : instantTimes) {
boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(instantTime)));
if (!createFile) {
throw new IOException("cannot create commit file for commit " + instantTime);
}
}
}
public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant,
List<Pair<String, FileSlice>> fileSliceList) throws IOException { List<Pair<String, FileSlice>> fileSliceList) throws IOException {
HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty()); HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty());
@@ -311,16 +326,10 @@ public class HoodieTestUtils {
TimelineMetadataUtils.serializeCompactionPlan(plan)); TimelineMetadataUtils.serializeCompactionPlan(plan));
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String getDataFilePath(String basePath, String partitionPath, String instantTime, String fileID) { public static String getDataFilePath(String basePath, String partitionPath, String instantTime, String fileID) {
return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID); return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(instantTime, DEFAULT_WRITE_TOKEN, fileID);
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String getLogFilePath(String basePath, String partitionPath, String instantTime, String fileID, public static String getLogFilePath(String basePath, String partitionPath, String instantTime, String fileID,
Option<Integer> version) { Option<Integer> version) {
return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", instantTime, return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", instantTime,
@@ -331,39 +340,32 @@ public class HoodieTestUtils {
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION; return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION;
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String getInflightCommitFilePath(String basePath, String instantTime) { public static String getInflightCommitFilePath(String basePath, String instantTime) {
return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime
+ HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static String getRequestedCompactionFilePath(String basePath, String instantTime) { public static String getRequestedCompactionFilePath(String basePath, String instantTime) {
return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instantTime return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instantTime
+ HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static boolean doesDataFileExist(String basePath, String partitionPath, String instantTime, public static boolean doesDataFileExist(String basePath, String partitionPath, String instantTime,
String fileID) { String fileID) {
return new File(getDataFilePath(basePath, partitionPath, instantTime, fileID)).exists(); return new File(getDataFilePath(basePath, partitionPath, instantTime, fileID)).exists();
} }
public static boolean doesLogFileExist(String basePath, String partitionPath, String instantTime, String fileID,
Option<Integer> version) {
return new File(getLogFilePath(basePath, partitionPath, instantTime, fileID, version)).exists();
}
public static boolean doesCommitExist(String basePath, String instantTime) { public static boolean doesCommitExist(String basePath, String instantTime) {
return new File( return new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION) basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.COMMIT_EXTENSION)
.exists(); .exists();
} }
/**
* @deprecated Use {@link HoodieTestTable} instead.
*/
public static boolean doesInflightExist(String basePath, String instantTime) { public static boolean doesInflightExist(String basePath, String instantTime) {
return new File( return new File(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_EXTENSION) basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + instantTime + HoodieTimeline.INFLIGHT_EXTENSION)