[HUDI-4098] Metadata table heartbeat for instant has expired, last heartbeat 0 (#5583)
This commit is contained in:
@@ -138,6 +138,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
|
|||||||
// reuses the same instant time without rollback first. It is a no-op here as the
|
// reuses the same instant time without rollback first. It is a no-op here as the
|
||||||
// clean plan is the same, so we don't need to delete the requested and inflight instant
|
// clean plan is the same, so we don't need to delete the requested and inflight instant
|
||||||
// files in the active timeline.
|
// files in the active timeline.
|
||||||
|
|
||||||
|
// The metadata writer uses LAZY cleaning strategy without auto commit,
|
||||||
|
// write client then checks the heartbeat expiration when committing the instant,
|
||||||
|
// sets up the heartbeat explicitly to make the check pass.
|
||||||
|
writeClient.getHeartbeatClient().start(instantTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<WriteStatus> statuses = preppedRecordList.size() > 0
|
List<WriteStatus> statuses = preppedRecordList.size() > 0
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ import org.apache.hudi.client.WriteStatus;
|
|||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.configuration.HadoopConfigurations;
|
import org.apache.hudi.configuration.HadoopConfigurations;
|
||||||
@@ -253,6 +255,49 @@ public class TestStreamWriteOperatorCoordinator {
|
|||||||
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testSyncMetadataTableWithReusedInstant() throws Exception {
|
||||||
|
// reset
|
||||||
|
reset();
|
||||||
|
// override the default configuration
|
||||||
|
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||||
|
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
|
||||||
|
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
|
||||||
|
coordinator = new StreamWriteOperatorCoordinator(conf, context);
|
||||||
|
coordinator.start();
|
||||||
|
coordinator.setExecutor(new MockCoordinatorExecutor(context));
|
||||||
|
|
||||||
|
final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
|
||||||
|
|
||||||
|
coordinator.handleEventFromOperator(0, event0);
|
||||||
|
|
||||||
|
String instant = coordinator.getInstant();
|
||||||
|
assertNotEquals("", instant);
|
||||||
|
|
||||||
|
final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
|
||||||
|
HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf));
|
||||||
|
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
|
||||||
|
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L));
|
||||||
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
|
||||||
|
|
||||||
|
// writes a normal commit
|
||||||
|
mockWriteWithMetadata();
|
||||||
|
instant = coordinator.getInstant();
|
||||||
|
// creates an inflight commit on the metadata timeline
|
||||||
|
metadataTableMetaClient.getActiveTimeline()
|
||||||
|
.createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant));
|
||||||
|
metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant);
|
||||||
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
|
// write another commit with existing instant on the metadata timeline
|
||||||
|
instant = mockWriteWithMetadata();
|
||||||
|
metadataTableMetaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
|
completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
|
||||||
|
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(3L));
|
||||||
|
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Utilities
|
// Utilities
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user