1
0

[HUDI-3015] Implement #reset and #sync for metadata filesystem view (#4307)

This commit is contained in:
Danny Chan
2021-12-16 15:26:16 +08:00
committed by GitHub
parent f5b07a77bc
commit ea2eba1a55
9 changed files with 63 additions and 40 deletions

View File

@@ -100,38 +100,33 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.build()).withAutoCommit(false).withProperties(properties).build(); .build()).withAutoCommit(false).withProperties(properties).build();
// Create the first commit // Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200); createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
try { ExecutorService executors = Executors.newFixedThreadPool(2);
ExecutorService executors = Executors.newFixedThreadPool(2); SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); Future future1 = executors.submit(() -> {
Future future1 = executors.submit(() -> { String newCommitTime = "004";
String newCommitTime = "004"; int numRecords = 100;
int numRecords = 100; String commitTimeBetweenPrevAndNew = "002";
String commitTimeBetweenPrevAndNew = "002"; try {
try { createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); } catch (Exception e1) {
} catch (Exception e1) { assertTrue(e1 instanceof HoodieWriteConflictException);
assertTrue(e1 instanceof HoodieWriteConflictException); throw new RuntimeException(e1);
throw new RuntimeException(e1); }
} });
}); Future future2 = executors.submit(() -> {
Future future2 = executors.submit(() -> { String newCommitTime = "005";
String newCommitTime = "005"; int numRecords = 100;
int numRecords = 100; String commitTimeBetweenPrevAndNew = "002";
String commitTimeBetweenPrevAndNew = "002"; try {
try { createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); } catch (Exception e2) {
} catch (Exception e2) { assertTrue(e2 instanceof HoodieWriteConflictException);
assertTrue(e2 instanceof HoodieWriteConflictException); throw new RuntimeException(e2);
throw new RuntimeException(e2); }
} });
}); future1.get();
future1.get(); future2.get();
future2.get();
fail("Should not reach here, this means concurrent writes were handled incorrectly");
} catch (Exception e) {
// Expected to fail due to overlapping commits
}
} }
@Test @Test

View File

@@ -257,7 +257,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
* Clears the partition Map and reset view states. * Clears the partition Map and reset view states.
*/ */
@Override @Override
public final void reset() { public void reset() {
try { try {
writeLock.lock(); writeLock.lock();
clear(); clear();
@@ -1135,8 +1135,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
*/ */
protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) {
refreshTimeline(newTimeline); refreshTimeline(newTimeline);
addedPartitions.clear(); clear();
resetViewState();
// Initialize with new Hoodie timeline. // Initialize with new Hoodie timeline.
init(metaClient, newTimeline); init(metaClient, newTimeline);
} }

View File

@@ -253,8 +253,8 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
@Override @Override
public void sync() { public void sync() {
preferredView.reset(); preferredView.sync();
secondaryView.reset(); secondaryView.sync();
} }
@Override @Override

View File

@@ -413,6 +413,8 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public boolean refresh() { public boolean refresh() {
Map<String, String> paramsMap = getParams(); Map<String, String> paramsMap = getParams();
try { try {
// refresh the local timeline first.
this.timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
return executeRequest(REFRESH_TABLE, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST); return executeRequest(REFRESH_TABLE, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST);
} catch (IOException e) { } catch (IOException e) {
throw new HoodieRemoteException(e); throw new HoodieRemoteException(e);
@@ -450,7 +452,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
@Override @Override
public void reset() { public void reset() {
timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
refresh(); refresh();
} }

View File

@@ -139,4 +139,9 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
public void close() throws Exception { public void close() throws Exception {
// no-op // no-op
} }
@Override
public void reset() {
// no-op
}
} }

View File

@@ -441,4 +441,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
} }
return Option.empty(); return Option.empty();
} }
@Override
public void reset() {
initIfNeeded();
dataMetaClient.reloadActiveTimeline();
}
} }

View File

@@ -65,6 +65,18 @@ public class HoodieMetadataFileSystemView extends HoodieTableFileSystemView {
return tableMetadata.getAllFilesInPartition(partitionPath); return tableMetadata.getAllFilesInPartition(partitionPath);
} }
@Override
public void reset() {
super.reset();
tableMetadata.reset();
}
@Override
public void sync() {
super.sync();
tableMetadata.reset();
}
@Override @Override
public void close() { public void close() {
try { try {

View File

@@ -113,4 +113,9 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
* Returns the timestamp of the latest compaction. * Returns the timestamp of the latest compaction.
*/ */
Option<String> getLatestCompactionTime(); Option<String> getLatestCompactionTime();
/**
* Clear the states of the table metadata.
*/
void reset();
} }

View File

@@ -622,8 +622,8 @@ public class TestPriorityBasedFileSystemView {
@Test @Test
public void testSync() { public void testSync() {
fsView.sync(); fsView.sync();
verify(primary, times(1)).reset(); verify(primary, times(1)).sync();
verify(secondary, times(1)).reset(); verify(secondary, times(1)).sync();
} }
@Test @Test