diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index d8b671b9d..730585b12 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -89,7 +89,9 @@ public class EmbeddedTimelineService { * Retrieves proper view storage configs for remote clients to access this service. */ public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() { - return FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST) + FileSystemViewStorageType viewStorageType = config.shouldEnableBackupForRemoteFileSystemView() + ? FileSystemViewStorageType.REMOTE_FIRST : FileSystemViewStorageType.REMOTE_ONLY; + return FileSystemViewStorageConfig.newBuilder().withStorageType(viewStorageType) .withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build(); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java index 112337455..3d6c123ae 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/CopyOnWriteRollbackActionExecutor.java @@ -66,6 +66,8 @@ public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecuto if (instantToRollback.isCompleted()) { LOG.info("Unpublishing instant " + instantToRollback); resolvedInstant = activeTimeline.revertToInflight(instantToRollback); + // reload meta-client to reflect latest timeline status + table.getMetaClient().reloadActiveTimeline(); } // For Requested State (like failure during index lookup), there is nothing to do rollback other than diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java index f5d84a85d..4b9aaddf9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/MergeOnReadRollbackActionExecutor.java @@ -74,6 +74,8 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto if (instantToRollback.isCompleted()) { LOG.error("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants); resolvedInstant = table.getActiveTimeline().revertToInflight(instantToRollback); + // reload meta-client to reflect latest timeline status + table.getMetaClient().reloadActiveTimeline(); } List allRollbackStats = new ArrayList<>(); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index cd3adae19..a26b80b4f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; @@ -1327,7 +1328,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { */ newCommitTime = "002"; client.startCommitWithTime(newCommitTime); - + metaClient.reloadActiveTimeline(); records = dataGen.generateUpdates(newCommitTime, records); writeRecords = jsc.parallelize(records, 1); statuses = client.upsert(writeRecords, newCommitTime).collect(); @@ -1340,6 +1341,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { final String partitionPath = records.get(0).getPartitionPath(); final String fileId = statuses.get(0).getFileId(); client.startCommitWithTime(newDeleteTime); + metaClient.reloadActiveTimeline(); List fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records); JavaRDD deleteRDD = jsc.parallelize(fewRecordsForDelete, 1); @@ -1377,6 +1379,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build()) .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") + .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() + .withEnableBackupForRemoteFileSystemView(false).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()); } diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 14123b264..1db208222 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -132,6 +132,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness { .forTable("test-trip-table") .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() + .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 2be4a9e63..c7a6230ff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -103,7 +103,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline { @Override public HoodieTimeline filterCompletedAndCompactionInstants() { - return new HoodieDefaultTimeline(instants.stream().filter(s -> !s.isInflight() + return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted() || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java index 17eac93ac..5e21e43fc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java @@ -55,6 +55,15 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig { private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01; private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB + /** + * Configs to control whether backup needs to be configured if clients were not able to reach + * timeline service. + */ + public static final String REMOTE_BACKUP_VIEW_HANDLER_ENABLE = + "hoodie.filesystem.remote.backup.view.enable"; + // Need to be disabled only for tests. + public static final String DEFAULT_REMOTE_BACKUP_VIEW_HANDLER_ENABLE = "true"; + public static FileSystemViewStorageConfig.Builder newBuilder() { return new Builder(); } @@ -98,6 +107,10 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig { return FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE)); } + public boolean shouldEnableBackupForRemoteFileSystemView() { + return Boolean.parseBoolean(props.getProperty(REMOTE_BACKUP_VIEW_HANDLER_ENABLE)); + } + public String getRocksdbBasePath() { return props.getProperty(ROCKSDB_BASE_PATH_PROP); } @@ -166,6 +179,11 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig { return this; } + public Builder withEnableBackupForRemoteFileSystemView(boolean enable) { + props.setProperty(REMOTE_BACKUP_VIEW_HANDLER_ENABLE, Boolean.toString(enable)); + return this; + } + public FileSystemViewStorageConfig build() { setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_STORAGE_TYPE), FILESYSTEM_VIEW_STORAGE_TYPE, DEFAULT_VIEW_STORAGE_TYPE.name()); @@ -188,6 +206,9 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(ROCKSDB_BASE_PATH_PROP), ROCKSDB_BASE_PATH_PROP, DEFAULT_ROCKSDB_BASE_PATH); + setDefaultOnCondition(props, !props.containsKey(REMOTE_BACKUP_VIEW_HANDLER_ENABLE), + REMOTE_BACKUP_VIEW_HANDLER_ENABLE, DEFAULT_REMOTE_BACKUP_VIEW_HANDLER_ENABLE); + // Validations FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_VIEW_STORAGE_TYPE)); FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 4fc9eaca6..34c705516 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -384,7 +384,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { // filterCompletedAndCompactionInstants // This cannot be done using checkFilter as it involves both states and actions final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants(); - final Set states = CollectionUtils.createSet(State.REQUESTED, State.COMPLETED); + final Set states = CollectionUtils.createSet(State.COMPLETED); final Set actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION); sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction())) .forEach(i -> assertTrue(t1.containsInstant(i)));