1
0

[HUDI-995] Use HoodieTestTable in more classes (#2079)

* [HUDI-995] Use HoodieTestTable in more classes

Migrate test data prep logic in
- TestStatsCommand
- TestHoodieROTablePathFilter

Re-implement methods for create new commit times in HoodieTestUtils and HoodieClientTestHarness
- Move relevant APIs to HoodieTestTable
- Migrate usages

After changing to HoodieTestTable APIs, removed unused deprecated APIs in HoodieTestUtils
This commit is contained in:
Raymond Xu
2020-09-17 09:29:07 -07:00
committed by GitHub
parent 581d54097c
commit 3201665295
9 changed files with 204 additions and 171 deletions

View File

@@ -53,7 +53,7 @@ import scala.Tuple2;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestUtils.makeNewCommitTime;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.common.testutils.Transformations.recordsToPartitionRecordsMap;
import static org.junit.jupiter.api.Assertions.assertEquals;

View File

@@ -66,6 +66,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.action.clean.CleanPlanner;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -97,6 +98,8 @@ import java.util.stream.Stream;
import scala.Tuple3;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -241,7 +244,7 @@ public class TestCleaner extends HoodieClientTestBase {
.map(e -> Pair.of(e.getKey().getPartitionPath(), e.getValue())).collect(Collectors.toList());
HoodieCompactionPlan compactionPlan =
CompactionUtils.buildFromFileSlices(partitionFileSlicePairs, Option.empty(), Option.empty());
List<String> instantTimes = HoodieTestUtils.monotonicIncreasingCommitTimestamps(9, 1);
List<String> instantTimes = makeIncrementalCommitTimes(9);
String compactionTime = instantTimes.get(0);
table.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionTime),
@@ -383,7 +386,7 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
// Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 1).forEach(newCommitTime -> {
makeIncrementalCommitTimes(8).forEach(newCommitTime -> {
try {
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100);
@@ -432,7 +435,15 @@ public class TestCleaner extends HoodieClientTestBase {
* @param config HoodieWriteConfig
*/
private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config) throws IOException {
return runCleaner(config, false);
return runCleaner(config, false, 1);
}
private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, int firstCommitSequence) throws IOException {
return runCleaner(config, false, firstCommitSequence);
}
private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException {
return runCleaner(config, simulateRetryFailure, 1);
}
/**
@@ -440,9 +451,9 @@ public class TestCleaner extends HoodieClientTestBase {
*
* @param config HoodieWriteConfig
*/
private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure) throws IOException {
private List<HoodieCleanStat> runCleaner(HoodieWriteConfig config, boolean simulateRetryFailure, int firstCommitSequence) throws IOException {
HoodieWriteClient<?> writeClient = getHoodieWriteClient(config);
String cleanInstantTs = getNextInstant();
String cleanInstantTs = makeNewCommitTime(firstCommitSequence);
HoodieCleanMetadata cleanMetadata1 = writeClient.clean(cleanInstantTs);
if (null == cleanMetadata1) {
@@ -463,7 +474,7 @@ public class TestCleaner extends HoodieClientTestBase {
});
});
metaClient.reloadActiveTimeline().revertToInflight(completedCleanInstant);
HoodieCleanMetadata newCleanMetadata = writeClient.clean(getNextInstant());
HoodieCleanMetadata newCleanMetadata = writeClient.clean(makeNewCommitTime(firstCommitSequence + 1));
// No new clean metadata would be created. Only the previous one will be retried
assertNull(newCleanMetadata);
HoodieCleanMetadata cleanMetadata2 = CleanerUtils.getCleanerMetadata(metaClient, completedCleanInstant);
@@ -540,7 +551,7 @@ public class TestCleaner extends HoodieClientTestBase {
.withBaseFilesInPartition(p1, file1P1C0)
.withBaseFilesInPartitions(p0, p1);
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 1);
// enableBootstrapSourceClean would delete the bootstrap base file as the same time
HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsTwo, p0);
assertEquals(enableBootstrapSourceClean ? 2 : 1, cleanStat.getSuccessDeleteFiles().size()
@@ -581,7 +592,7 @@ public class TestCleaner extends HoodieClientTestBase {
String file3P0C2 = testTable.addCommit("00000000000003")
.withBaseFilesInPartition(p0, file1P0C0, file2P0C1)
.withBaseFilesInPartitions(p0).get(p0);
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 3);
assertEquals(2,
getCleanStat(hoodieCleanStatsThree, p0)
.getSuccessDeleteFiles().size(), "Must clean two files");
@@ -1061,7 +1072,18 @@ public class TestCleaner extends HoodieClientTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
HoodieTestUtils.createCorruptedPendingCleanFiles(metaClient, getNextInstant());
String commitTime = makeNewCommitTime(1);
List<String> cleanerFileNames = Arrays.asList(
HoodieTimeline.makeRequestedCleanerFileName(commitTime),
HoodieTimeline.makeInflightCleanerFileName(commitTime));
for (String f : cleanerFileNames) {
Path commitFile = new Path(Paths
.get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
try (FSDataOutputStream os = metaClient.getFs().create(commitFile, true)) {
// Write empty clean metadata
os.write(new byte[0]);
}
}
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> cleanStats = runCleaner(config);

View File

@@ -68,6 +68,7 @@ import java.util.Map;
import java.util.UUID;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateExpectedPartitionNumRecords;
import static org.apache.hudi.execution.bulkinsert.TestBulkInsertInternalPartitioner.generateTestRecordsForBulkInsert;
@@ -86,7 +87,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
String fileName = UUID.randomUUID().toString();
String partitionPath = "2016/05/04";
String instantTime = HoodieTestUtils.makeNewCommitTime();
String instantTime = makeNewCommitTime();
HoodieWriteConfig config = makeHoodieClientConfig();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
@@ -118,7 +119,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
public void testUpdateRecords() throws Exception {
// Prepare the AvroParquetIO
HoodieWriteConfig config = makeHoodieClientConfig();
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
String firstCommitTime = makeNewCommitTime();
HoodieWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -182,7 +183,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
List<HoodieRecord> updatedRecords = Arrays.asList(updatedRecord1, insertedRecord1);
Thread.sleep(1000);
String newCommitTime = HoodieTestUtils.makeNewCommitTime();
String newCommitTime = makeNewCommitTime();
metaClient = HoodieTableMetaClient.reload(metaClient);
writeClient.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = writeClient.upsert(jsc.parallelize(updatedRecords), newCommitTime).collect();
@@ -263,7 +264,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
// Prepare the AvroParquetIO
HoodieWriteConfig config =
makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build();
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
String firstCommitTime = makeNewCommitTime();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
@@ -317,7 +318,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
@Test
public void testInsertRecords() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfig();
String instantTime = HoodieTestUtils.makeNewCommitTime();
String instantTime = makeNewCommitTime();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
@@ -368,7 +369,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(HoodieStorageConfig.newBuilder()
.parquetMaxFileSize(64 * 1024).hfileMaxFileSize(64 * 1024)
.parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
String instantTime = HoodieTestUtils.makeNewCommitTime();
String instantTime = makeNewCommitTime();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
@@ -435,7 +436,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withBulkInsertParallelism(2).withBulkInsertSortMode(bulkInsertMode).build();
String instantTime = HoodieTestUtils.makeNewCommitTime();
String instantTime = makeNewCommitTime();
HoodieWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(instantTime);
metaClient = HoodieTableMetaClient.reload(metaClient);

View File

@@ -38,18 +38,17 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.apache.log4j.Logger;
import org.apache.log4j.LogManager;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* The test harness for resource initialization and cleanup.
@@ -66,17 +65,12 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
protected transient HoodieTestDataGenerator dataGen = null;
protected transient ExecutorService executorService;
protected transient HoodieTableMetaClient metaClient;
private static AtomicInteger instantGen = new AtomicInteger(1);
protected transient HoodieWriteClient writeClient;
protected transient HoodieReadClient readClient;
protected transient HoodieTableFileSystemView tableView;
protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
public String getNextInstant() {
return String.format("%09d", instantGen.getAndIncrement());
}
// dfs
protected String dfsBasePath;
protected transient HdfsTestService hdfsTestService;