[HUDI-990] Timeline API : filterCompletedAndCompactionInstants needs to handle requested state correctly. Also ensure timeline gets reloaded after we revert committed transactions
This commit is contained in:
committed by
Balaji Varadarajan
parent
a68180b179
commit
fb283934a3
@@ -89,7 +89,9 @@ public class EmbeddedTimelineService {
|
|||||||
* Retrieves proper view storage configs for remote clients to access this service.
|
* Retrieves proper view storage configs for remote clients to access this service.
|
||||||
*/
|
*/
|
||||||
public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
|
public FileSystemViewStorageConfig getRemoteFileSystemViewConfig() {
|
||||||
return FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST)
|
FileSystemViewStorageType viewStorageType = config.shouldEnableBackupForRemoteFileSystemView()
|
||||||
|
? FileSystemViewStorageType.REMOTE_FIRST : FileSystemViewStorageType.REMOTE_ONLY;
|
||||||
|
return FileSystemViewStorageConfig.newBuilder().withStorageType(viewStorageType)
|
||||||
.withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build();
|
.withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -66,6 +66,8 @@ public class CopyOnWriteRollbackActionExecutor extends BaseRollbackActionExecuto
|
|||||||
if (instantToRollback.isCompleted()) {
|
if (instantToRollback.isCompleted()) {
|
||||||
LOG.info("Unpublishing instant " + instantToRollback);
|
LOG.info("Unpublishing instant " + instantToRollback);
|
||||||
resolvedInstant = activeTimeline.revertToInflight(instantToRollback);
|
resolvedInstant = activeTimeline.revertToInflight(instantToRollback);
|
||||||
|
// reload meta-client to reflect latest timeline status
|
||||||
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
}
|
}
|
||||||
|
|
||||||
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
|
// For Requested State (like failure during index lookup), there is nothing to do rollback other than
|
||||||
|
|||||||
@@ -74,6 +74,8 @@ public class MergeOnReadRollbackActionExecutor extends BaseRollbackActionExecuto
|
|||||||
if (instantToRollback.isCompleted()) {
|
if (instantToRollback.isCompleted()) {
|
||||||
LOG.error("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants);
|
LOG.error("Un-publishing instant " + instantToRollback + ", deleteInstants=" + deleteInstants);
|
||||||
resolvedInstant = table.getActiveTimeline().revertToInflight(instantToRollback);
|
resolvedInstant = table.getActiveTimeline().revertToInflight(instantToRollback);
|
||||||
|
// reload meta-client to reflect latest timeline status
|
||||||
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
}
|
}
|
||||||
|
|
||||||
List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
|
List<HoodieRollbackStat> allRollbackStats = new ArrayList<>();
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
|||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.common.table.view.SyncableFileSystemView;
|
import org.apache.hudi.common.table.view.SyncableFileSystemView;
|
||||||
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
|
||||||
@@ -1327,7 +1328,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
*/
|
*/
|
||||||
newCommitTime = "002";
|
newCommitTime = "002";
|
||||||
client.startCommitWithTime(newCommitTime);
|
client.startCommitWithTime(newCommitTime);
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
records = dataGen.generateUpdates(newCommitTime, records);
|
records = dataGen.generateUpdates(newCommitTime, records);
|
||||||
writeRecords = jsc.parallelize(records, 1);
|
writeRecords = jsc.parallelize(records, 1);
|
||||||
statuses = client.upsert(writeRecords, newCommitTime).collect();
|
statuses = client.upsert(writeRecords, newCommitTime).collect();
|
||||||
@@ -1340,6 +1341,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
final String partitionPath = records.get(0).getPartitionPath();
|
final String partitionPath = records.get(0).getPartitionPath();
|
||||||
final String fileId = statuses.get(0).getFileId();
|
final String fileId = statuses.get(0).getFileId();
|
||||||
client.startCommitWithTime(newDeleteTime);
|
client.startCommitWithTime(newDeleteTime);
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records);
|
List<HoodieRecord> fewRecordsForDelete = dataGen.generateDeletesFromExistingRecords(records);
|
||||||
JavaRDD<HoodieRecord> deleteRDD = jsc.parallelize(fewRecordsForDelete, 1);
|
JavaRDD<HoodieRecord> deleteRDD = jsc.parallelize(fewRecordsForDelete, 1);
|
||||||
@@ -1377,6 +1379,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024 * 1024).build())
|
||||||
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
|
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
|
||||||
|
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
|
||||||
|
.withEnableBackupForRemoteFileSystemView(false).build())
|
||||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build());
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -132,6 +132,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
|
|||||||
.forTable("test-trip-table")
|
.forTable("test-trip-table")
|
||||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
|
||||||
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
|
||||||
|
.withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server
|
||||||
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -103,7 +103,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HoodieTimeline filterCompletedAndCompactionInstants() {
|
public HoodieTimeline filterCompletedAndCompactionInstants() {
|
||||||
return new HoodieDefaultTimeline(instants.stream().filter(s -> !s.isInflight()
|
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted()
|
||||||
|| s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details);
|
|| s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -55,6 +55,15 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
|
|||||||
private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01;
|
private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01;
|
||||||
private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB
|
private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configs to control whether backup needs to be configured if clients were not able to reach
|
||||||
|
* timeline service.
|
||||||
|
*/
|
||||||
|
public static final String REMOTE_BACKUP_VIEW_HANDLER_ENABLE =
|
||||||
|
"hoodie.filesystem.remote.backup.view.enable";
|
||||||
|
// Need to be disabled only for tests.
|
||||||
|
public static final String DEFAULT_REMOTE_BACKUP_VIEW_HANDLER_ENABLE = "true";
|
||||||
|
|
||||||
public static FileSystemViewStorageConfig.Builder newBuilder() {
|
public static FileSystemViewStorageConfig.Builder newBuilder() {
|
||||||
return new Builder();
|
return new Builder();
|
||||||
}
|
}
|
||||||
@@ -98,6 +107,10 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
|
|||||||
return FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));
|
return FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean shouldEnableBackupForRemoteFileSystemView() {
|
||||||
|
return Boolean.parseBoolean(props.getProperty(REMOTE_BACKUP_VIEW_HANDLER_ENABLE));
|
||||||
|
}
|
||||||
|
|
||||||
public String getRocksdbBasePath() {
|
public String getRocksdbBasePath() {
|
||||||
return props.getProperty(ROCKSDB_BASE_PATH_PROP);
|
return props.getProperty(ROCKSDB_BASE_PATH_PROP);
|
||||||
}
|
}
|
||||||
@@ -166,6 +179,11 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withEnableBackupForRemoteFileSystemView(boolean enable) {
|
||||||
|
props.setProperty(REMOTE_BACKUP_VIEW_HANDLER_ENABLE, Boolean.toString(enable));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public FileSystemViewStorageConfig build() {
|
public FileSystemViewStorageConfig build() {
|
||||||
setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_STORAGE_TYPE), FILESYSTEM_VIEW_STORAGE_TYPE,
|
setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_STORAGE_TYPE), FILESYSTEM_VIEW_STORAGE_TYPE,
|
||||||
DEFAULT_VIEW_STORAGE_TYPE.name());
|
DEFAULT_VIEW_STORAGE_TYPE.name());
|
||||||
@@ -188,6 +206,9 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
|
|||||||
setDefaultOnCondition(props, !props.containsKey(ROCKSDB_BASE_PATH_PROP), ROCKSDB_BASE_PATH_PROP,
|
setDefaultOnCondition(props, !props.containsKey(ROCKSDB_BASE_PATH_PROP), ROCKSDB_BASE_PATH_PROP,
|
||||||
DEFAULT_ROCKSDB_BASE_PATH);
|
DEFAULT_ROCKSDB_BASE_PATH);
|
||||||
|
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(REMOTE_BACKUP_VIEW_HANDLER_ENABLE),
|
||||||
|
REMOTE_BACKUP_VIEW_HANDLER_ENABLE, DEFAULT_REMOTE_BACKUP_VIEW_HANDLER_ENABLE);
|
||||||
|
|
||||||
// Validations
|
// Validations
|
||||||
FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_VIEW_STORAGE_TYPE));
|
FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_VIEW_STORAGE_TYPE));
|
||||||
FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));
|
FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));
|
||||||
|
|||||||
@@ -384,7 +384,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
|||||||
// filterCompletedAndCompactionInstants
|
// filterCompletedAndCompactionInstants
|
||||||
// This cannot be done using checkFilter as it involves both states and actions
|
// This cannot be done using checkFilter as it involves both states and actions
|
||||||
final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
|
final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
|
||||||
final Set<State> states = CollectionUtils.createSet(State.REQUESTED, State.COMPLETED);
|
final Set<State> states = CollectionUtils.createSet(State.COMPLETED);
|
||||||
final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION);
|
final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION);
|
||||||
sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
|
sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
|
||||||
.forEach(i -> assertTrue(t1.containsInstant(i)));
|
.forEach(i -> assertTrue(t1.containsInstant(i)));
|
||||||
|
|||||||
Reference in New Issue
Block a user