[HUDI-4167] Remove the timeline refresh with initializing hoodie table (#5716)
The timeline refresh on table initialization invokes the fs view #sync, which has two actions now: 1. reload the timeline of the fs view, so that the next fs view request is based on this timeline metadata 2. if this is a local fs view, clear all the local states; if this is a remote fs view, send request to sync the remote fs view But, let's see the construction, the meta client is instantiated freshly so the timeline is already the latest, the table is also constructed freshly, so the fs view has no local states, that means, the #sync is unnecessary totally. In this patch, the metadata lifecycle and data set fs view are kept in sync, when the fs view is refreshed, the underneath metadata is also refreshed synchronouly. The freshness of the metadata follows the same rules as data fs view: 1. if the fs view is local, the visibility is based on the client table metadata client's latest commit 2. if the fs view is remote, the timeline server would #sync the fs view and metadata together based on the lagging server local timeline From the perspective of client, no need to care about the refresh action anymore no matter whether the metadata table is enabled or not. That make the client logic more clear and less error-prone. Removes the timeline refresh has another benefit: if avoids unncecessary #refresh of the remote fs view, if all the clients send request to #sync the remote fs view, the server would encounter conflicts and the client encounters a response error.
This commit is contained in:
@@ -123,10 +123,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HoodieTable createTable(HoodieWriteConfig config,
|
||||
Configuration hadoopConf,
|
||||
boolean refreshTimeline) {
|
||||
return HoodieSparkTable.create(config, context, refreshTimeline);
|
||||
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
|
||||
return HoodieSparkTable.create(config, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -333,7 +331,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
|
||||
@Override
|
||||
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
|
||||
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, true);
|
||||
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
|
||||
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
|
||||
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
||||
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
|
||||
@@ -352,7 +350,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
|
||||
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled());
|
||||
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
|
||||
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
|
||||
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
|
||||
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
|
||||
@@ -434,7 +432,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
|
||||
}
|
||||
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());
|
||||
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -54,30 +54,18 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
|
||||
}
|
||||
|
||||
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return create(config, context, false);
|
||||
}
|
||||
|
||||
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context,
|
||||
boolean refreshTimeline) {
|
||||
HoodieTableMetaClient metaClient =
|
||||
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
|
||||
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
|
||||
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
|
||||
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
|
||||
.setProperties(config.getProps()).build();
|
||||
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
|
||||
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
|
||||
}
|
||||
|
||||
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
|
||||
HoodieSparkEngineContext context,
|
||||
HoodieTableMetaClient metaClient) {
|
||||
return create(config, context, metaClient, false);
|
||||
}
|
||||
|
||||
public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
|
||||
HoodieSparkEngineContext context,
|
||||
HoodieTableMetaClient metaClient,
|
||||
boolean refreshTimeline) {
|
||||
HoodieSparkTable<T> hoodieSparkTable;
|
||||
switch (metaClient.getTableType()) {
|
||||
case COPY_ON_WRITE:
|
||||
@@ -89,9 +77,6 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
|
||||
default:
|
||||
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
|
||||
}
|
||||
if (refreshTimeline) {
|
||||
hoodieSparkTable.getHoodieView().sync();
|
||||
}
|
||||
return hoodieSparkTable;
|
||||
}
|
||||
|
||||
|
||||
@@ -515,7 +515,13 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
|
||||
return getConfigBuilder(schema)
|
||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
|
||||
.withAvroSchemaValidate(true);
|
||||
.withAvroSchemaValidate(true)
|
||||
// The test has rollback instants on the timeline,
|
||||
// these rollback instants use real time as instant time, whose instant time is always greater than
|
||||
// the normal commits instant time, this breaks the refresh rule introduced in HUDI-2761:
|
||||
// The last client instant is always the rollback instant but not the normal commit.
|
||||
// Always refresh the timeline when client and server have different timeline.
|
||||
.withRefreshTimelineServerBasedOnLatestCommit(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -111,7 +111,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
|
||||
assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
|
||||
|
||||
// Files within each partition should match
|
||||
HoodieTable table = HoodieSparkTable.create(writeConfig, context, true);
|
||||
HoodieTable table = HoodieSparkTable.create(writeConfig, context);
|
||||
TableFileSystemView tableView = table.getHoodieView();
|
||||
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
|
||||
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
|
||||
|
||||
@@ -247,7 +247,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
|
||||
assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length);
|
||||
|
||||
// Verify that all data file has one log file
|
||||
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient, true);
|
||||
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient);
|
||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||
List<FileSlice> groupedLogFiles =
|
||||
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
|
||||
|
||||
@@ -559,7 +559,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
||||
|
||||
// Files within each partition should match
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext, true);
|
||||
HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
|
||||
TableFileSystemView tableView = table.getHoodieView();
|
||||
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
|
||||
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
|
||||
|
||||
Reference in New Issue
Block a user