1
0

[HUDI-1486] Remove inline inflight rollback in hoodie writer (#2359)

1. Refactor rollback and move cleaning failed commits logic into cleaner
2. Introduce hoodie heartbeat to ascertain failed commits
3. Fix test cases
This commit is contained in:
n3nash
2021-02-19 20:12:22 -08:00
committed by GitHub
parent c9fcf964b2
commit ffcfb58bac
64 changed files with 1541 additions and 306 deletions

View File

@@ -75,13 +75,20 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
super(context, clientConfig);
}
@Deprecated
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
super(context, writeConfig, rollbackPending);
super(context, writeConfig);
}
@Deprecated
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, rollbackPending, timelineService);
super(context, writeConfig, timelineService);
}
public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
}
/**
@@ -131,9 +138,6 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
*/
@Override
public void bootstrap(Option<Map<String, String>> extraMetadata) {
if (rollbackPending) {
rollBackInflightBootstrap();
}
getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).bootstrap(context, extraMetadata);
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.client;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -200,14 +201,11 @@ public class TestClientRollback extends HoodieClientTestBase {
.withBaseFilesInPartitions(partitionAndFileId3);
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(config, false)) {
// Rollback commit 1 (this should fail, since commit2 is still around)
assertThrows(HoodieRollbackException.class, () -> {
client.rollback(commitTime1);
}, "Should have thrown an exception ");
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
// Rollback commit3
client.rollback(commitTime3);
@@ -290,12 +288,14 @@ public class TestClientRollback extends HoodieClientTestBase {
.addInflightCommit(commitTime3)
.withBaseFilesInPartitions(partitionAndFileId3);
// Turn auto rollback off
// Set Failed Writes rollback to LAZY
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build();
final String commitTime4 = "20160506030621";
try (SparkRDDWriteClient client = getHoodieWriteClient(config, false)) {
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.startCommitWithTime(commitTime4);
// Check results, nothing changed
assertTrue(testTable.commitExists(commitTime1));
@@ -306,9 +306,11 @@ public class TestClientRollback extends HoodieClientTestBase {
assertTrue(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
}
// Turn auto rollback on
// Set Failed Writes rollback to EAGER
config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
final String commitTime5 = "20160506030631";
try (SparkRDDWriteClient client = getHoodieWriteClient(config, true)) {
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
client.startCommitWithTime(commitTime5);
assertTrue(testTable.commitExists(commitTime1));
assertFalse(testTable.inflightCommitExists(commitTime2));

View File

@@ -23,10 +23,12 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -43,8 +45,10 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
@@ -95,6 +99,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Properties;
@@ -283,7 +290,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// Perform write-action and check
JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
try (SparkRDDWriteClient client = getHoodieWriteClient(getConfigBuilder().combineInput(true, true).build(), false);) {
try (SparkRDDWriteClient client = getHoodieWriteClient(getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.combineInput(true, true).build());) {
client.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
assertNoWriteErrors(statuses);
@@ -338,12 +346,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped)
throws Exception {
// Force using older timeline layout
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withProps(config.getProps()).withTimelineLayoutVersion(
VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
// Write 1 (only inserts)
String newCommitTime = "001";
@@ -373,7 +382,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// Now simulate an upgrade and perform a restore operation
HoodieWriteConfig newConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion(
TimelineLayoutVersion.CURR_VERSION).build();
client = getHoodieWriteClient(newConfig, false);
client = getHoodieWriteClient(newConfig);
client.restoreToInstant("004");
// Check the entire dataset has all records still
@@ -488,7 +497,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
// Write 1 (only inserts)
String newCommitTime = "001";
@@ -508,7 +517,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
writeBatch(client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime,
numRecords, recordGenFunction, SparkRDDWriteClient::insert, true, numRecords, 300,
2);
2, false);
}
/**
@@ -516,8 +525,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
*/
@Test
public void testDeletes() throws Exception {
SparkRDDWriteClient client = getHoodieWriteClient(getConfig(), false);
SparkRDDWriteClient client = getHoodieWriteClient(getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).build());
/**
* Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records
*/
@@ -536,7 +544,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
};
writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime,
// unused as genFn uses hard-coded number of inserts/updates/deletes
-1, recordGenFunction, SparkRDDWriteClient::upsert, true, 200, 200, 1);
-1, recordGenFunction, SparkRDDWriteClient::upsert, true, 200, 200, 1, false);
/**
* Write 2 (deletes+writes).
@@ -553,7 +561,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
return recordsInSecondBatch;
};
writeBatch(client, newCommitTime, prevCommitTime, Option.empty(), initCommitTime, 100, recordGenFunction,
SparkRDDWriteClient::upsert, true, 50, 150, 2);
SparkRDDWriteClient::upsert, true, 50, 150, 2, false);
}
/**
@@ -563,8 +571,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
*/
@Test
public void testDeletesForInsertsInSameBatch() throws Exception {
SparkRDDWriteClient client = getHoodieWriteClient(getConfig(), false);
SparkRDDWriteClient client = getHoodieWriteClient(getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).build());
/**
* Write 200 inserts and issue deletes to a subset(50) of inserts.
*/
@@ -583,7 +590,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
};
writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime,
-1, recordGenFunction, SparkRDDWriteClient::upsert, true, 150, 150, 1);
-1, recordGenFunction, SparkRDDWriteClient::upsert, true, 150, 150, 1, false);
}
/**
@@ -625,7 +632,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
// Set rollback to LAZY so no inflights are deleted
hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP,
HoodieFailedWritesCleaningPolicy.LAZY.name());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
// Write 1
String newCommitTime = "001";
@@ -779,7 +789,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY, "true");
HoodieWriteConfig config = getSmallInsertWriteConfig(100,
TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), props);
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
SparkRDDWriteClient client = getHoodieWriteClient(config);
HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient);
//1. insert to generate 2 file group
@@ -834,8 +844,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
SparkRDDWriteClient client = getHoodieWriteClient(config);
// Inserts => will write file1
String commitTime1 = "001";
@@ -946,7 +955,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
SparkRDDWriteClient client = getHoodieWriteClient(config);
// Inserts => will write file1
String commitTime1 = "001";
@@ -1026,7 +1035,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
SparkRDDWriteClient client = getHoodieWriteClient(config);
// Inserts => will write file1
String commitTime1 = "001";
@@ -1104,7 +1113,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private void testClustering(HoodieClusteringConfig clusteringConfig) throws Exception {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10);
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator();
String commitTime = "100";
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 200);
@@ -1120,10 +1129,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
fileIdIntersection.retainAll(fileIds2);
assertEquals(0, fileIdIntersection.size());
config = getConfigBuilder().withClusteringConfig(clusteringConfig).build();
config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withClusteringConfig(clusteringConfig).build();
// create client with new config.
client = getHoodieWriteClient(config, false);
client = getHoodieWriteClient(config);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, true);
List<HoodieRecord> allRecords = Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList());
@@ -1176,7 +1185,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int batch2RecordsCount) throws Exception {
final String testPartitionPath = "americas";
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
// Do Inserts
@@ -1257,7 +1266,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
*/
private void verifyDeletePartitionsHandling(int batch1RecordsCount, int batch2RecordsCount, int batch3RecordsCount) throws Exception {
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false);
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator();
// Do Inserts for DEFAULT_FIRST_PARTITION_PATH
@@ -1405,8 +1414,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// setup the small file handling params
HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, true); // hold upto 200 records max
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
SparkRDDWriteClient client = getHoodieWriteClient(config);
// delete non existent keys
String commitTime1 = "001";
@@ -1612,6 +1620,181 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
testRollbackAfterConsistencyCheckFailureUsingFileList(true, enableOptimisticConsistencyGuard);
}
@ParameterizedTest
@EnumSource(value = HoodieFailedWritesCleaningPolicy.class, names = {"LAZY", "NEVER"})
public void testRollbackFailedCommits(HoodieFailedWritesCleaningPolicy cleaningPolicy) throws Exception {
HoodieTestUtils.init(hadoopConf, basePath);
// Perform 2 failed writes to table
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Perform 1 successful write
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, true);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("200");
Thread.sleep(2000);
}
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
// Perform 1 successful write
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, true);
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
if (cleaningPolicy.isLazy()) {
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION))
.countInstants()
== 2);
// Since we write rollbacks not clean, there should be no clean action on the timeline
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION))
.countInstants()
== 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
} else if (cleaningPolicy.isNever()) {
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION))
.countInstants()
== 0);
// There should be no clean or rollback action on the timeline
assertTrue(
timeline
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION))
.countInstants()
== 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
}
}
@Test
public void testRollbackFailedCommitsToggleCleaningPolicy() throws Exception {
HoodieTestUtils.init(hadoopConf, basePath);
HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER;
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
// Perform 1 failed writes to table
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Toggle cleaning policy to LAZY
cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
// Perform 2 failed writes to table
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "200", "100", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "300", "200", Option.of(Arrays.asList("300")), "300",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
Thread.sleep(2000);
}
client.clean();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 3);
// Perform 2 failed commits
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "400", "300", Option.of(Arrays.asList("400")), "400",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 300,
0, false);
client.close();
// Toggle cleaning policy to EAGER
cleaningPolicy = HoodieFailedWritesCleaningPolicy.EAGER;
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
client.startCommit();
timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 5);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 0);
}
@Test
public void testParallelInsertAndCleanPreviousFailedCommits() throws Exception {
HoodieFailedWritesCleaningPolicy cleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
ExecutorService service = Executors.newFixedThreadPool(2);
HoodieTestUtils.init(hadoopConf, basePath);
// Perform 2 failed writes to table
SparkRDDWriteClient client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "100", "100", Option.of(Arrays.asList("100")), "100",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100,
0, false);
client.close();
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
writeBatch(client, "200", "200", Option.of(Arrays.asList("200")), "200",
100, dataGen::generateInserts, SparkRDDWriteClient::bulkInsert, false, 100, 100,
0, false);
client.close();
// refresh data generator to delete records generated from failed commits
dataGen = new HoodieTestDataGenerator();
// Create a succesful commit
Future<JavaRDD<WriteStatus>> commit3 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy)),
"300", "200", Option.of(Arrays.asList("300")), "200", 100, dataGen::generateInserts,
SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
commit3.get();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 0);
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 2);
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 1);
client = new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy));
// Await till enough time passes such that the first 2 failed commits heartbeats are expired
boolean conditionMet = false;
while (!conditionMet) {
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("200");
Thread.sleep(2000);
}
Future<JavaRDD<WriteStatus>> commit4 = service.submit(() -> writeBatch(new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy)),
"400", "300", Option.of(Arrays.asList("400")), "400", 100, dataGen::generateInserts,
SparkRDDWriteClient::bulkInsert, false, 100, 100, 0, true));
Future<HoodieCleanMetadata> clean1 = service.submit(() -> new SparkRDDWriteClient(context, getParallelWritingWriteConfig(cleaningPolicy)).clean());
commit4.get();
clean1.get();
HoodieActiveTimeline timeline = metaClient.getActiveTimeline().reload();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 2);
// Since we write rollbacks not clean, there should be no clean action on the timeline
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).countInstants() == 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants() == 2);
}
private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime, boolean enableOptimisticConsistencyGuard)
throws Exception {
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? (getConfigBuilder().withAutoCommit(false)
@@ -1746,6 +1929,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(smallFileSize)
// Set rollback to LAZY so no inflights are deleted
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.insertSplitSize(insertSplitSize).build())
.withStorageConfig(
HoodieStorageConfig.newBuilder()
@@ -1767,4 +1952,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
return clusteringInstant;
}
private HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy cleaningPolicy) {
return getConfigBuilder()
.withEmbeddedTimelineServerEnabled(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(cleaningPolicy)
.withAutoClean(false).build())
.withTimelineLayoutVersion(1)
.withHeartbeatIntervalInMs(3 * 1000)
.withAutoCommit(false).build();
}
}

View File

@@ -155,7 +155,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1);
HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
// Initial inserts with TRIP_EXAMPLE_SCHEMA
int numRecords = 10;
@@ -184,13 +184,13 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
// Insert with evolved schema is not allowed
HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED);
client = getHoodieWriteClient(hoodieDevolvedWriteConfig, false);
client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED);
try {
// We cannot use insertBatch directly here because we want to insert records
// with a devolved schema and insertBatch inserts records using the TRIP_EXMPLE_SCHEMA.
writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, 0, 0, 0);
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, 0, 0, 0, false);
fail("Insert with devolved scheme should fail");
} catch (HoodieInsertException ex) {
// no new commit
@@ -213,13 +213,13 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
// Insert with an evolved scheme is allowed
HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED);
client = getHoodieWriteClient(hoodieEvolvedWriteConfig, false);
client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
// We cannot use insertBatch directly here because we want to insert records
// with a evolved schemaand insertBatch inserts records using the TRIP_EXMPLE_SCHEMA.
final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("005", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
writeBatch(client, "005", "004", Option.empty(), initCommitTime, numRecords,
(String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, false, 0, 0, 0);
(String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, false, 0, 0, 0, false);
// new commit
checkLatestDeltaCommit("005");
@@ -228,14 +228,14 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
// Updates with evolved schema is allowed
final List<HoodieRecord> updateRecords = generateUpdatesWithSchema("006", numUpdateRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
writeBatch(client, "006", "005", Option.empty(), initCommitTime,
numUpdateRecords, (String s, Integer a) -> updateRecords, SparkRDDWriteClient::upsert, false, 0, 0, 0);
numUpdateRecords, (String s, Integer a) -> updateRecords, SparkRDDWriteClient::upsert, false, 0, 0, 0, false);
// new commit
checkLatestDeltaCommit("006");
checkReadRecords("000", 2 * numRecords);
// Now even the original schema cannot be used for updates as it is devolved in relation to the
// current schema of the dataset.
client = getHoodieWriteClient(hoodieWriteConfig, false);
client = getHoodieWriteClient(hoodieWriteConfig);
try {
updateBatch(hoodieWriteConfig, client, "007", "006", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, 0, 0, 0);
@@ -256,7 +256,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
failedRecords.clear();
failedRecords.addAll(dataGen.generateInserts("007", numRecords));
writeBatch(client, "007", "006", Option.empty(), initCommitTime, numRecords,
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords, 1);
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords, 1, false);
fail("Insert with original scheme should fail");
} catch (HoodieInsertException ex) {
// no new commit
@@ -278,7 +278,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
checkLatestDeltaCommit("004");
// Updates with original schema are now allowed
client = getHoodieWriteClient(hoodieWriteConfig, false);
client = getHoodieWriteClient(hoodieWriteConfig);
updateBatch(hoodieWriteConfig, client, "008", "004", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, 0, 0, 0);
// new commit
@@ -300,7 +300,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1);
HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
// Initial inserts with TRIP_EXAMPLE_SCHEMA
int numRecords = 10;
@@ -324,13 +324,13 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
// Insert with devolved schema is not allowed
HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_DEVOLVED);
client = getHoodieWriteClient(hoodieDevolvedWriteConfig, false);
client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_DEVOLVED);
try {
// We cannot use insertBatch directly here because we want to insert records
// with a devolved schema.
writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords, 1);
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords, 1, false);
fail("Insert with devolved scheme should fail");
} catch (HoodieInsertException ex) {
// no new commit
@@ -354,12 +354,12 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
// Insert with evolved scheme is allowed
HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED);
client = getHoodieWriteClient(hoodieEvolvedWriteConfig, false);
client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
// We cannot use insertBatch directly here because we want to insert records
// with a evolved schema.
writeBatch(client, "004", "003", Option.empty(), initCommitTime, numRecords,
(String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, true, numRecords, 2 * numRecords, 4);
(String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, true, numRecords, 2 * numRecords, 4, false);
// new commit
HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("004"));
@@ -368,12 +368,12 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
// Updates with evolved schema is allowed
final List<HoodieRecord> updateRecords = generateUpdatesWithSchema("005", numUpdateRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED);
writeBatch(client, "005", "004", Option.empty(), initCommitTime,
numUpdateRecords, (String s, Integer a) -> updateRecords, SparkRDDWriteClient::upsert, true, numUpdateRecords, 2 * numRecords, 5);
numUpdateRecords, (String s, Integer a) -> updateRecords, SparkRDDWriteClient::upsert, true, numUpdateRecords, 2 * numRecords, 5, false);
checkReadRecords("000", 2 * numRecords);
// Now even the original schema cannot be used for updates as it is devolved
// in relation to the current schema of the dataset.
client = getHoodieWriteClient(hoodieWriteConfig, false);
client = getHoodieWriteClient(hoodieWriteConfig);
try {
updateBatch(hoodieWriteConfig, client, "006", "005", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true,
@@ -395,7 +395,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
failedRecords.clear();
failedRecords.addAll(dataGen.generateInserts("006", numRecords));
writeBatch(client, "006", "005", Option.empty(), initCommitTime, numRecords,
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords, 1);
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords, 1, false);
fail("Insert with original scheme should fail");
} catch (HoodieInsertException ex) {
// no new commit

View File

@@ -103,6 +103,9 @@ public class TestHBaseIndex extends FunctionalTestHarness {
utility.deleteTable(TABLE_NAME);
utility.shutdownMiniCluster();
}
if (spark != null) {
spark.close();
}
}
@BeforeAll
@@ -538,9 +541,9 @@ public class TestHBaseIndex extends FunctionalTestHarness {
final Map<String, Integer> fileIdPartitionMap = index.mapFileWithInsertsToUniquePartition(writeStatusRDD);
int numWriteStatusWithInserts = (int) index.getHBasePutAccessParallelism(writeStatusRDD)._2;
JavaRDD<WriteStatus> partitionedRDD = writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
.partitionBy(new SparkHoodieHBaseIndex
.WriteStatusPartitioner(fileIdPartitionMap,
numWriteStatusWithInserts)).map(w -> w._2());
.partitionBy(new SparkHoodieHBaseIndex
.WriteStatusPartitioner(fileIdPartitionMap,
numWriteStatusWithInserts)).map(w -> w._2());
assertEquals(numWriteStatusWithInserts, partitionedRDD.getNumPartitions());
int[] partitionIndexesBeforeRepartition = writeStatusRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
assertEquals(parallelism, partitionIndexesBeforeRepartition.length);
@@ -576,9 +579,9 @@ public class TestHBaseIndex extends FunctionalTestHarness {
final Map<String, Integer> fileIdPartitionMap = index.mapFileWithInsertsToUniquePartition(writeStatusRDD);
int numWriteStatusWithInserts = (int) index.getHBasePutAccessParallelism(writeStatusRDD)._2;
JavaRDD<WriteStatus> partitionedRDD = writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
.partitionBy(new SparkHoodieHBaseIndex
.WriteStatusPartitioner(fileIdPartitionMap,
numWriteStatusWithInserts)).map(w -> w._2());
.partitionBy(new SparkHoodieHBaseIndex
.WriteStatusPartitioner(fileIdPartitionMap,
numWriteStatusWithInserts)).map(w -> w._2());
assertEquals(numWriteStatusWithInserts, partitionedRDD.getNumPartitions());
int[] partitionIndexesBeforeRepartition = writeStatusRDD.partitions().stream().mapToInt(p -> p.index()).toArray();
assertEquals(parallelism, partitionIndexesBeforeRepartition.length);
@@ -749,4 +752,4 @@ public class TestHBaseIndex extends FunctionalTestHarness {
.hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
.build());
}
}
}

View File

@@ -408,11 +408,11 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
HoodieTimeline timeline = metaClient.getActiveTimeline().getWriteTimeline();
assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(context);
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
timeline = metaClient.getActiveTimeline().reload().getWriteTimeline();
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
"Instants before oldest pending compaction can be removed");
assertEquals(7, timeline.countInstants(),

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.metadata;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -52,9 +54,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -610,13 +609,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
// Start the next commit which will rollback the previous one and also should update the metadata table by
// updating it with HoodieRollbackMetadata.
String newCommitTime = client.startCommit();
// Dangling commit but metadata should be valid at this time
validateMetadata(client);
// Next insert
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieCleanStat;
@@ -38,6 +39,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
@@ -96,6 +98,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -104,6 +107,7 @@ import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCo
import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -126,7 +130,7 @@ public class TestCleaner extends HoodieClientTestBase {
* @param insertFn Insertion API for testing
* @throws Exception in case of error
*/
private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client,
private Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client,
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
HoodieCleaningPolicy cleaningPolicy) throws Exception {
@@ -140,10 +144,9 @@ public class TestCleaner extends HoodieClientTestBase {
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 5);
List<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime).collect();
JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime);
// Verify there are no errors
assertNoWriteErrors(statuses);
assertNoWriteErrors(statuses.collect());
// verify that there is a commit
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
@@ -151,13 +154,49 @@ public class TestCleaner extends HoodieClientTestBase {
// Should have 100 records in table (check using Index), all in locations marked at commit
HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient);
assertFalse(table.getCompletedCommitsTimeline().empty());
if (client.getConfig().shouldAutoCommit()) {
assertFalse(table.getCompletedCommitsTimeline().empty());
}
// We no longer write empty cleaner plans when there is nothing to be cleaned.
assertTrue(table.getCompletedCleanTimeline().empty());
HoodieIndex index = SparkHoodieIndex.createIndex(cfg);
List<HoodieRecord> taggedRecords = ((JavaRDD<HoodieRecord>) index.tagLocation(jsc.parallelize(records, 1), context, table)).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
if (client.getConfig().shouldAutoCommit()) {
HoodieIndex index = SparkHoodieIndex.createIndex(cfg);
List<HoodieRecord> taggedRecords = ((JavaRDD<HoodieRecord>) index.tagLocation(jsc.parallelize(records, 1), context, table)).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
}
return Pair.of(newCommitTime, statuses);
}
/**
* Helper method to do first batch of insert for clean by versions/commits tests.
*
* @param cfg Hoodie Write Config
* @param client Hoodie Client
* @param recordGenFunction Function to generate records for insertion
* @param insertFn Insertion API for testing
* @throws Exception in case of error
*/
private Pair<String, JavaRDD<WriteStatus>> insertFirstFailedBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client,
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
HoodieCleaningPolicy cleaningPolicy) throws Exception {
/*
* do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
* in insert(), if the implementation diverges.)
*/
String newCommitTime = client.startCommit();
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 5);
JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime);
// Verify there are no errors
assertNoWriteErrors(statuses.collect());
// Don't invoke commit to simulate failed write
client.getHeartbeatClient().stop(newCommitTime);
return Pair.of(newCommitTime, statuses);
}
/**
@@ -168,6 +207,14 @@ public class TestCleaner extends HoodieClientTestBase {
testInsertAndCleanByVersions(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false);
}
/**
* Test Clean-Failed-Writes when Cleaning policy is by VERSIONS using insert/upsert API.
*/
@Test
public void testInsertAndCleanFailedWritesByVersions() throws Exception {
testInsertAndCleanFailedWritesByVersions(SparkRDDWriteClient::insert, false);
}
/**
* Test Clean-By-Versions using prepped versions of insert/upsert API.
*/
@@ -323,7 +370,7 @@ public class TestCleaner extends HoodieClientTestBase {
}
/**
* Test Clean-By-Versions using insert/upsert API.
* Test Clean-By-Commits using insert/upsert API.
*/
@Test
public void testInsertAndCleanByCommits() throws Exception {
@@ -331,7 +378,15 @@ public class TestCleaner extends HoodieClientTestBase {
}
/**
* Test Clean-By-Versions using prepped version of insert/upsert API.
* Test Clean-By-Commits using insert/upsert API.
*/
@Test
public void testFailedInsertAndCleanByCommits() throws Exception {
testFailedInsertAndCleanByCommits(SparkRDDWriteClient::insert, false);
}
/**
* Test Clean-By-Commits using prepped version of insert/upsert API.
*/
@Test
public void testInsertPreppedAndCleanByCommits() throws Exception {
@@ -339,7 +394,7 @@ public class TestCleaner extends HoodieClientTestBase {
}
/**
* Test Clean-By-Versions using prepped versions of bulk-insert/upsert API.
* Test Clean-By-Commits using prepped versions of bulk-insert/upsert API.
*/
@Test
public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
@@ -349,7 +404,7 @@ public class TestCleaner extends HoodieClientTestBase {
}
/**
* Test Clean-By-Versions using bulk-insert/upsert API.
* Test Clean-By-Commits using bulk-insert/upsert API.
*/
@Test
public void testBulkInsertAndCleanByCommits() throws Exception {
@@ -431,6 +486,64 @@ public class TestCleaner extends HoodieClientTestBase {
});
}
/**
* Test Helper for Cleaning failed commits by commits logic from HoodieWriteClient API perspective.
*
* @param insertFn Insert API to be tested
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
* record generation to also tag the regards (de-dupe is implicit as we use uniq record-gen APIs)
* @throws Exception in case of errors
*/
private void testFailedInsertAndCleanByCommits(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
throws Exception {
int maxCommits = 3; // keep upto 3 commits from the past
HoodieWriteConfig cfg = getConfigBuilder()
.withAutoCommit(false)
.withHeartbeatIntervalInMs(3000)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
client.commit(result.getLeft(), result.getRight());
HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient);
assertTrue(table.getCompletedCleanTimeline().empty());
insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
Pair<String, JavaRDD<WriteStatus>> ret =
insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
// Await till enough time passes such that the last failed commits heartbeats are expired
await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient()
.isHeartbeatExpired(ret.getLeft()));
List<HoodieCleanStat> cleanStats = runCleaner(cfg);
assertEquals(0, cleanStats.size(), "Must not clean any files");
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3);
Option<HoodieInstant> rolleBackInstantForFailedCommit = timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata(
timeline.getInstantDetails(rolleBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class);
// Rollback of one of the failed writes should have deleted 3 files
assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
}
/**
* Helper to run cleaner and collect Clean Stats.
*
@@ -980,6 +1093,7 @@ public class TestCleaner extends HoodieClientTestBase {
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
@@ -1246,6 +1360,69 @@ public class TestCleaner extends HoodieClientTestBase {
assertEquals(0, cleanStats.size(), "Must not clean any files");
}
/**
* Test Helper for cleaning failed writes by versions logic from HoodieWriteClient API perspective.
*
* @param insertFn Insert API to be tested
* @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
* record generation to also tag the regards (de-dupe is implicit as we use uniq record-gen APIs)
* @throws Exception in case of errors
*/
private void testInsertAndCleanFailedWritesByVersions(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn, boolean isPreppedAPI)
throws Exception {
int maxVersions = 3; // keep upto 3 versions for each file
HoodieWriteConfig cfg = getConfigBuilder()
.withAutoCommit(false)
.withHeartbeatIntervalInMs(3000)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
client.commit(result.getLeft(), result.getRight());
HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient);
assertTrue(table.getCompletedCleanTimeline().empty());
insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
Pair<String, JavaRDD<WriteStatus>> ret =
insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
// Await till enough time passes such that the last failed commits heartbeats are expired
await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient()
.isHeartbeatExpired(ret.getLeft()));
List<HoodieCleanStat> cleanStats = runCleaner(cfg);
assertEquals(0, cleanStats.size(), "Must not clean any files");
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3);
Option<HoodieInstant> rolleBackInstantForFailedCommit = timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant();
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata(
timeline.getInstantDetails(rolleBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class);
// Rollback of one of the failed writes should have deleted 3 files
assertEquals(3, rollbackMetadata.getTotalFilesDeleted());
}
}
/**
* Common test method for validating pending compactions.
*

View File

@@ -262,7 +262,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
protected List<FileSlice> getCurrentLatestFileSlices(HoodieTable table) {
HoodieTableFileSystemView view = new HoodieTableFileSystemView(table.getMetaClient(),
table.getMetaClient().getActiveTimeline().reload().getCommitsAndCompactionTimeline());
table.getMetaClient().getActiveTimeline().reload().getWriteTimeline());
return Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)
.flatMap(view::getLatestFileSlices).collect(Collectors.toList());
}

View File

@@ -60,7 +60,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
public void testRollbackForInflightCompaction() throws Exception {
// Rollback inflight compaction
HoodieWriteConfig cfg = getConfig(false);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -120,7 +120,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
int numRecs = 2000;
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
@@ -162,7 +162,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
public void testInflightCompaction() throws Exception {
// There is inflight compaction. Subsequent compaction run must work correctly
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -195,7 +195,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
public void testScheduleIngestionBeforePendingCompaction() throws Exception {
// Case: Failure case. Latest pending compaction instant time must be earlier than this instant time
HoodieWriteConfig cfg = getConfig(false);
SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
@@ -226,7 +226,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
// Case: Failure case. Earliest ingestion inflight instant time must be later than compaction time
HoodieWriteConfig cfg = getConfig(false);
SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
@@ -258,7 +258,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
// Case: Failure case. Earliest ingestion inflight instant time must be later than compaction time
HoodieWriteConfig cfg = getConfig(false);
SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
final String firstInstantTime = "001";
@@ -293,7 +293,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
public void testCompactionAfterTwoDeltaCommits() throws Exception {
// No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -314,7 +314,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
public void testInterleavedCompaction() throws Exception {
// Case: Two delta commits before and after compaction schedule
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";
@@ -342,7 +342,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
public void testCompactionOnReplacedFiles() throws Exception {
// Schedule a compaction. Replace those file groups and ensure compaction completes successfully.
HoodieWriteConfig cfg = getConfig(true);
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);) {
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001";
String secondInstantTime = "004";

View File

@@ -60,7 +60,7 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
// Then: ensure no compaction is executedm since there are only 2 delta commits
assertEquals(2, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(2, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
}
}
@@ -82,7 +82,7 @@ public class TestInlineCompaction extends CompactionTestBase {
// Then: ensure the file slices are compacted as per policy
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
}
}
@@ -105,7 +105,7 @@ public class TestInlineCompaction extends CompactionTestBase {
// Then: ensure the file slices are compacted as per policy
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(3, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
assertEquals(HoodieTimeline.COMMIT_ACTION, metaClient.getActiveTimeline().lastInstant().get().getAction());
}
}
@@ -125,14 +125,14 @@ public class TestInlineCompaction extends CompactionTestBase {
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
// 4th commit, that will trigger compaction because reach the time elapsed
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(6, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(6, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
}
}
@@ -148,14 +148,14 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
// Then: ensure no compaction is executedm since there are only 3 delta commits
assertEquals(3, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(3, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
// 4th commit, that will trigger compaction
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(5, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(5, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
}
}
@@ -189,7 +189,7 @@ public class TestInlineCompaction extends CompactionTestBase {
// Then: 1 delta commit is done, the failed compaction is retried
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
@@ -225,7 +225,7 @@ public class TestInlineCompaction extends CompactionTestBase {
// Then: 1 delta commit is done, the failed compaction is retried
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
@@ -262,7 +262,7 @@ public class TestInlineCompaction extends CompactionTestBase {
// Then: 1 delta commit is done, the failed compaction is retried
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4, metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
}

View File

@@ -18,6 +18,10 @@
package org.apache.hudi.table.upgrade;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
@@ -38,11 +42,6 @@ import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.testutils.Assertions;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -291,7 +290,7 @@ public class TestUpgradeDowngrade extends HoodieClientTestBase {
params.put(HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name());
}
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withRollbackUsingMarkers(enableMarkedBasedRollback).withProps(params).build();
SparkRDDWriteClient client = getHoodieWriteClient(cfg, true);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
client.startCommitWithTime(newCommitTime);

View File

@@ -56,7 +56,7 @@ import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NA
public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
private static transient SparkSession spark;
protected static transient SparkSession spark;
private static transient SQLContext sqlContext;
private static transient JavaSparkContext jsc;
protected static transient HoodieSparkEngineContext context;
@@ -126,7 +126,7 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider, Hoodie
@Override
public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException {
return new SparkRDDWriteClient(context(), cfg, false);
return new SparkRDDWriteClient(context(), cfg);
}
@BeforeEach

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
@@ -110,12 +111,8 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
*
* @return Config Builder
*/
public HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
}
public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
return getConfigBuilder(schemaStr, IndexType.BLOOM);
public HoodieWriteConfig.Builder getConfigBuilder(HoodieFailedWritesCleaningPolicy cleaningPolicy) {
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, IndexType.BLOOM, cleaningPolicy);
}
/**
@@ -123,13 +120,32 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
*
* @return Config Builder
*/
public HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType, HoodieFailedWritesCleaningPolicy.EAGER);
}
public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
return getConfigBuilder(schemaStr, IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER);
}
public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
return getConfigBuilder(schemaStr, indexType, HoodieFailedWritesCleaningPolicy.EAGER);
}
/**
* Get Config builder with default configs set.
*
* @return Config Builder
*/
public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType,
HoodieFailedWritesCleaningPolicy cleaningPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy)
.compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
@@ -308,7 +324,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1, false);
}
/**
@@ -336,7 +352,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits);
recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords, expTotalCommits, false);
}
/**
@@ -368,7 +384,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
expTotalCommits);
expTotalCommits, false);
}
/**
@@ -416,13 +432,14 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
* @param expRecordsInThisCommit Expected number of records in this commit
* @param expTotalRecords Expected number of records when scanned
* @param expTotalCommits Expected number of commits (including this commit)
* @param doCommit
* @throws Exception in case of error
*/
public JavaRDD<WriteStatus> writeBatch(SparkRDDWriteClient client, String newCommitTime, String prevCommitTime,
Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean doCommit) throws Exception {
// Write 1 (only inserts)
client.startCommitWithTime(newCommitTime);
@@ -434,6 +451,9 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
if (doCommit) {
client.commit(newCommitTime, result);
}
// check the partition metadata is written out
assertPartitionMetadataForRecords(records, fs);

View File

@@ -327,21 +327,17 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
}
}
public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
return getHoodieWriteClient(cfg, false);
}
public HoodieReadClient getHoodieReadClient(String basePath) {
readClient = new HoodieReadClient(context, basePath, SQLContext.getOrCreate(jsc.sc()));
return readClient;
}
public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
if (null != writeClient) {
writeClient.close();
writeClient = null;
}
writeClient = new SparkRDDWriteClient(context, cfg, rollbackInflightCommit);
writeClient = new SparkRDDWriteClient(context, cfg);
return writeClient;
}