[HUDI-1717] Metadata Reader should merge all the un-synced but complete instants from the dataset timeline. (#3082)
This commit is contained in:
@@ -400,7 +400,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
// (re) init the metadata for reading.
|
||||
initTableMetadata();
|
||||
try {
|
||||
List<HoodieInstant> instantsToSync = metadata.findInstantsToSync();
|
||||
List<HoodieInstant> instantsToSync = metadata.findInstantsToSyncForWriter();
|
||||
if (instantsToSync.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
@@ -411,7 +411,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
||||
for (HoodieInstant instant : instantsToSync) {
|
||||
LOG.info("Syncing instant " + instant + " to metadata table");
|
||||
|
||||
Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, metadata.getSyncedInstantTime());
|
||||
Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, getLatestSyncedInstantTime());
|
||||
if (records.isPresent()) {
|
||||
commit(records.get(), MetadataPartitionType.FILES.partitionPath(), instant.getTimestamp());
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@@ -40,4 +41,9 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
|
||||
void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
|
||||
|
||||
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
|
||||
|
||||
/**
|
||||
* Return the timestamp of the latest instant synced to the metadata table.
|
||||
*/
|
||||
Option<String> getLatestSyncedInstantTime();
|
||||
}
|
||||
|
||||
@@ -29,6 +29,8 @@ import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
@@ -132,6 +134,23 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the timestamp of the latest instant synced.
|
||||
*
|
||||
* To sync a instant on dataset, we create a corresponding delta-commit on the metadata table. So return the latest
|
||||
* delta-commit.
|
||||
*/
|
||||
@Override
|
||||
public Option<String> getLatestSyncedInstantTime() {
|
||||
if (!enabled) {
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
|
||||
return timeline.getDeltaCommitTimeline().filterCompletedInstants()
|
||||
.lastInstant().map(HoodieInstant::getTimestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tag each record with the location.
|
||||
*
|
||||
|
||||
@@ -491,6 +491,8 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
||||
|
||||
// Various table operations without metadata table enabled
|
||||
String restoreToInstant;
|
||||
String inflightActionTimestamp;
|
||||
String beforeInflightActionTimestamp;
|
||||
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, false))) {
|
||||
// updates
|
||||
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
@@ -523,6 +525,10 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
||||
assertTrue(metadata(client).isInSync());
|
||||
}
|
||||
|
||||
// Record a timestamp for creating an inflight instance for sync testing
|
||||
inflightActionTimestamp = HoodieActiveTimeline.createNewInstantTime();
|
||||
beforeInflightActionTimestamp = newCommitTime;
|
||||
|
||||
// Deletes
|
||||
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
|
||||
records = dataGen.generateDeletes(newCommitTime, 5);
|
||||
@@ -554,9 +560,41 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
|
||||
assertTrue(metadata(client).isInSync());
|
||||
}
|
||||
|
||||
// If there is an incomplete operation, the Metadata Table is not updated beyond that operations but the
|
||||
// in-memory merge should consider all the completed operations.
|
||||
Path inflightCleanPath = new Path(metaClient.getMetaPath(), HoodieTimeline.makeInflightCleanerFileName(inflightActionTimestamp));
|
||||
fs.create(inflightCleanPath).close();
|
||||
|
||||
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
|
||||
// Restore cannot be done until the metadata table is in sync. See HUDI-1502 for details
|
||||
client.syncTableMetadata();
|
||||
|
||||
// Table should sync only before the inflightActionTimestamp
|
||||
HoodieBackedTableMetadataWriter writer =
|
||||
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
|
||||
assertEquals(writer.getLatestSyncedInstantTime().get(), beforeInflightActionTimestamp);
|
||||
|
||||
// Reader should sync to all the completed instants
|
||||
HoodieTableMetadata metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
|
||||
client.getConfig().getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
|
||||
assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
|
||||
|
||||
// Remove the inflight instance holding back table sync
|
||||
fs.delete(inflightCleanPath, false);
|
||||
client.syncTableMetadata();
|
||||
|
||||
writer =
|
||||
(HoodieBackedTableMetadataWriter)SparkHoodieBackedTableMetadataWriter.create(hadoopConf, client.getConfig(), context);
|
||||
assertEquals(writer.getLatestSyncedInstantTime().get(), newCommitTime);
|
||||
|
||||
// Reader should sync to all the completed instants
|
||||
metadata = HoodieTableMetadata.create(context, client.getConfig().getMetadataConfig(),
|
||||
client.getConfig().getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);
|
||||
assertEquals(metadata.getSyncedInstantTime().get(), newCommitTime);
|
||||
}
|
||||
|
||||
// Enable metadata table and ensure it is synced
|
||||
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) {
|
||||
client.restoreToInstant(restoreToInstant);
|
||||
assertFalse(metadata(client).isInSync());
|
||||
|
||||
|
||||
@@ -62,6 +62,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
|
||||
protected final HoodieMetadataConfig metadataConfig;
|
||||
// Directory used for Spillable Map when merging records
|
||||
protected final String spillableMapDirectory;
|
||||
private String syncedInstantTime;
|
||||
|
||||
protected boolean enabled;
|
||||
private TimelineMergedTableMetadata timelineMergedMetadata;
|
||||
@@ -277,17 +278,44 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
|
||||
|
||||
private void openTimelineScanner() {
|
||||
if (timelineMergedMetadata == null) {
|
||||
List<HoodieInstant> unSyncedInstants = findInstantsToSync();
|
||||
List<HoodieInstant> unSyncedInstants = findInstantsToSyncForReader();
|
||||
timelineMergedMetadata =
|
||||
new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), null);
|
||||
|
||||
syncedInstantTime = unSyncedInstants.isEmpty() ? getLatestDatasetInstantTime()
|
||||
: unSyncedInstants.get(unSyncedInstants.size() - 1).getTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract List<HoodieInstant> findInstantsToSync();
|
||||
/**
|
||||
* Return the timestamp of the latest synced instant.
|
||||
*/
|
||||
@Override
|
||||
public Option<String> getSyncedInstantTime() {
|
||||
if (!enabled) {
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
return Option.ofNullable(syncedInstantTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the instants which are not-synced to the {@code HoodieTableMetadata}.
|
||||
*
|
||||
* This is the list of all completed but un-synched instants.
|
||||
*/
|
||||
protected abstract List<HoodieInstant> findInstantsToSyncForReader();
|
||||
|
||||
/**
|
||||
* Return the instants which are not-synced to the {@code HoodieTableMetadataWriter}.
|
||||
*
|
||||
* This is the list of all completed but un-synched instants which do not have any incomplete instants in between them.
|
||||
*/
|
||||
protected abstract List<HoodieInstant> findInstantsToSyncForWriter();
|
||||
|
||||
@Override
|
||||
public boolean isInSync() {
|
||||
return enabled && findInstantsToSync().isEmpty();
|
||||
return enabled && findInstantsToSyncForWriter().isEmpty();
|
||||
}
|
||||
|
||||
protected HoodieEngineContext getEngineContext() {
|
||||
|
||||
@@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||
@@ -265,7 +264,22 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
|
||||
* Return an ordered list of instants which have not been synced to the Metadata Table.
|
||||
*/
|
||||
@Override
|
||||
protected List<HoodieInstant> findInstantsToSync() {
|
||||
protected List<HoodieInstant> findInstantsToSyncForReader() {
|
||||
return findInstantsToSync(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an ordered list of instants which have not been synced to the Metadata Table.
|
||||
*/
|
||||
@Override
|
||||
protected List<HoodieInstant> findInstantsToSyncForWriter() {
|
||||
return findInstantsToSync(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return an ordered list of instants which have not been synced to the Metadata Table.
|
||||
*/
|
||||
private List<HoodieInstant> findInstantsToSync(boolean ignoreIncompleteInstants) {
|
||||
initIfNeeded();
|
||||
|
||||
// if there are no instants yet, return empty list, since there is nothing to sync here.
|
||||
@@ -277,7 +291,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
|
||||
// are candidates for sync.
|
||||
String latestMetadataInstantTime = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get().getTimestamp();
|
||||
HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE);
|
||||
Option<HoodieInstant> earliestIncompleteInstant = candidateTimeline.filterInflightsAndRequested().firstInstant();
|
||||
Option<HoodieInstant> earliestIncompleteInstant = ignoreIncompleteInstants ? Option.empty()
|
||||
: candidateTimeline.filterInflightsAndRequested().firstInstant();
|
||||
|
||||
if (earliestIncompleteInstant.isPresent()) {
|
||||
return candidateTimeline.filterCompletedInstants()
|
||||
@@ -289,20 +304,6 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the timestamp of the latest compaction instant.
|
||||
*/
|
||||
@Override
|
||||
public Option<String> getSyncedInstantTime() {
|
||||
if (!enabled) {
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
|
||||
return timeline.getDeltaCommitTimeline().filterCompletedInstants()
|
||||
.lastInstant().map(HoodieInstant::getTimestamp);
|
||||
}
|
||||
|
||||
public boolean enabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user