diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 9fe83f199..43fe16858 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -296,7 +296,7 @@ public class SparkMain { SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false); HoodieWriteConfig config = client.getConfig(); HoodieEngineContext context = client.getEngineContext(); - HoodieSparkTable table = HoodieSparkTable.create(config, context, true); + HoodieSparkTable table = HoodieSparkTable.create(config, context); WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); return 0; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 5c485bed0..455cb644c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -296,11 +296,7 @@ public abstract class BaseHoodieWriteClient createTable(HoodieWriteConfig config, Configuration hadoopConf); void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) { try { @@ -365,7 +361,7 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); Option instant = Option.fromJavaOptional( inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst()); @@ -634,7 +630,7 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } @@ -668,7 +664,7 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); table.savepoint(context, instantTime, user, comment); } @@ -680,7 +676,7 @@ public abstract class BaseHoodieWriteClient table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); SavepointHelpers.deleteSavepoint(table, savepointTime); } @@ -1012,7 +1008,7 @@ public abstract class BaseHoodieWriteClient scheduleIndexing(List partitionTypes) { String instantTime = HoodieActiveTimeline.createNewInstantTime(); - Option indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option indexPlan = createTable(config, hadoopConf) .scheduleIndexing(context, instantTime, partitionTypes); return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty(); } @@ -1024,7 +1020,7 @@ public abstract class BaseHoodieWriteClient} after successful indexing. */ public Option index(String indexInstantTime) { - return createTable(config, hadoopConf, config.isMetadataTableEnabled()).index(context, indexInstantTime); + return createTable(config, hadoopConf).index(context, indexInstantTime); } /** @@ -1339,17 +1335,17 @@ public abstract class BaseHoodieWriteClient clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option clusteringPlan = createTable(config, hadoopConf) .scheduleClustering(context, instantTime, extraMetadata); return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty(); case COMPACT: LOG.info("Scheduling compaction at instant time :" + instantTime); - Option compactionPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option compactionPlan = createTable(config, hadoopConf) .scheduleCompaction(context, instantTime, extraMetadata); return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty(); case CLEAN: LOG.info("Scheduling cleaning at instant time :" + instantTime); - Option cleanerPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option cleanerPlan = createTable(config, hadoopConf) .scheduleCleaning(context, instantTime, extraMetadata); return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty(); default: @@ -1702,6 +1698,6 @@ public abstract class BaseHoodieWriteClient REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty .key("hoodie.refresh.timeline.server.based.on.latest.commit") - .defaultValue(false) - .withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (false), "); + .defaultValue(true) + .withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (true)."); public static final ConfigProperty INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty .key("hoodie.consistency.check.initial_interval_ms") @@ -2499,6 +2499,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withRefreshTimelineServerBasedOnLatestCommit(boolean refreshTimelineServerBasedOnLatestCommit) { + writeConfig.setValue(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT, Boolean.toString(refreshTimelineServerBasedOnLatestCommit)); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index ddfbabaf3..b68cf97e9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -117,8 +117,7 @@ public class HoodieFlinkWriteClient extends } @Override - protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, - boolean refreshTimeline) { + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 6eae15e7e..26149918c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -62,13 +62,6 @@ public abstract class HoodieFlinkTable public static HoodieFlinkTable create(HoodieWriteConfig config, HoodieFlinkEngineContext context, HoodieTableMetaClient metaClient) { - return HoodieFlinkTable.create(config, context, metaClient, config.isMetadataTableEnabled()); - } - - public static HoodieFlinkTable create(HoodieWriteConfig config, - HoodieFlinkEngineContext context, - HoodieTableMetaClient metaClient, - boolean refreshTimeline) { final HoodieFlinkTable hoodieFlinkTable; switch (metaClient.getTableType()) { case COPY_ON_WRITE: @@ -80,9 +73,6 @@ public abstract class HoodieFlinkTable default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } - if (refreshTimeline) { - hoodieFlinkTable.getHoodieView().sync(); - } return hoodieFlinkTable; } diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index 50adabbd5..e23ee4ad5 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -104,7 +104,7 @@ public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieBloomIndex index = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance()); - HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient, false); + HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient); HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA); // Create some partitions, and put some files diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 7f5dc19ba..fbfb85bab 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -89,9 +89,7 @@ public class HoodieJavaWriteClient extends } @Override - protected HoodieTable createTable(HoodieWriteConfig config, - Configuration hadoopConf, - boolean refreshTimeline) { + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { return HoodieJavaTable.create(config, context); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 7f9ec05e3..fe6ea975e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -123,10 +123,8 @@ public class SparkRDDWriteClient extends } @Override - protected HoodieTable createTable(HoodieWriteConfig config, - Configuration hadoopConf, - boolean refreshTimeline) { - return HoodieSparkTable.create(config, context, refreshTimeline); + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { + return HoodieSparkTable.create(config, context); } @Override @@ -333,7 +331,7 @@ public class SparkRDDWriteClient extends @Override protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { - HoodieSparkTable table = HoodieSparkTable.create(config, context, true); + HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); @@ -352,7 +350,7 @@ public class SparkRDDWriteClient extends @Override public HoodieWriteMetadata> cluster(String clusteringInstant, boolean shouldComplete) { - HoodieSparkTable table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled()); + HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient()); HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); @@ -434,7 +432,7 @@ public class SparkRDDWriteClient extends } // Create a Hoodie table which encapsulated the commits and files visible - return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled()); + return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index 20e3bd4c1..66d51c912 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -54,30 +54,18 @@ public abstract class HoodieSparkTable } public static HoodieSparkTable create(HoodieWriteConfig config, HoodieEngineContext context) { - return create(config, context, false); - } - - public static HoodieSparkTable create(HoodieWriteConfig config, HoodieEngineContext context, - boolean refreshTimeline) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) .setFileSystemRetryConfig(config.getFileSystemRetryConfig()) .setProperties(config.getProps()).build(); - return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline); + return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); } public static HoodieSparkTable create(HoodieWriteConfig config, HoodieSparkEngineContext context, HoodieTableMetaClient metaClient) { - return create(config, context, metaClient, false); - } - - public static HoodieSparkTable create(HoodieWriteConfig config, - HoodieSparkEngineContext context, - HoodieTableMetaClient metaClient, - boolean refreshTimeline) { HoodieSparkTable hoodieSparkTable; switch (metaClient.getTableType()) { case COPY_ON_WRITE: @@ -89,9 +77,6 @@ public abstract class HoodieSparkTable default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } - if (refreshTimeline) { - hoodieSparkTable.getHoodieView().sync(); - } return hoodieSparkTable; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 1cb7bcbfc..98bcb1103 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -515,7 +515,13 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { return getConfigBuilder(schema) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withAvroSchemaValidate(true); + .withAvroSchemaValidate(true) + // The test has rollback instants on the timeline, + // these rollback instants use real time as instant time, whose instant time is always greater than + // the normal commits instant time, this breaks the refresh rule introduced in HUDI-2761: + // The last client instant is always the rollback instant but not the normal commit. + // Always refresh the timeline when client and server have different timeline. + .withRefreshTimelineServerBasedOnLatestCommit(false); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 9a8fc55a2..e19c8fc1a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -111,7 +111,7 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { assertEquals(fsPartitions, metadataPartitions, "Partitions should match"); // Files within each partition should match - HoodieTable table = HoodieSparkTable.create(writeConfig, context, true); + HoodieTable table = HoodieSparkTable.create(writeConfig, context); TableFileSystemView tableView = table.getHoodieView(); List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index b9f025223..0ce6ca0ee 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -247,7 +247,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length); // Verify that all data file has one log file - HoodieTable table = HoodieSparkTable.create(config, context(), metaClient, true); + HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 4504c552c..d0365dced 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -559,7 +559,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im // Files within each partition should match metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext, true); + HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext); TableFileSystemView tableView = table.getHoodieView(); List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index af0dc1301..02a406e7e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -199,6 +199,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste LOG.info("Deleting all rocksdb data associated with table filesystem view"); rocksDB.close(); rocksDB = new RocksDBDAO(metaClient.getBasePath(), config.getRocksdbBasePath()); + schemaHelper.getAllColumnFamilies().forEach(rocksDB::addColumnFamily); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index cf941bb70..e8937b39d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -550,10 +550,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { @Override public void close() { - for (Pair partitionFileSlicePair : partitionReaders.keySet()) { - close(partitionFileSlicePair); - } - partitionReaders.clear(); + closePartitionReaders(); } /** @@ -567,6 +564,16 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { closeReader(readers); } + /** + * Close and clear all the partitions readers. + */ + private void closePartitionReaders() { + for (Pair partitionFileSlicePair : partitionReaders.keySet()) { + close(partitionFileSlicePair); + } + partitionReaders.clear(); + } + private void closeReader(Pair readers) { if (readers != null) { try { @@ -624,5 +631,11 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { public void reset() { initIfNeeded(); dataMetaClient.reloadActiveTimeline(); + if (metadataMetaClient != null) { + metadataMetaClient.reloadActiveTimeline(); + } + // the cached reader has max instant time restriction, they should be cleared + // because the metadata timeline may have changed. + closePartitionReaders(); } } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 40669f50e..2ff216822 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -150,7 +150,7 @@ public class TimelineService { private int markerBatchNumThreads = 20; private long markerBatchIntervalMs = 50L; private int markerParallelism = 100; - private boolean refreshTimelineBasedOnLatestCommit = false; + private boolean refreshTimelineBasedOnLatestCommit = true; public Builder() { } @@ -240,6 +240,7 @@ public class TimelineService { config.markerBatchNumThreads = this.markerBatchNumThreads; config.markerBatchIntervalMs = this.markerBatchIntervalMs; config.markerParallelism = this.markerParallelism; + config.refreshTimelineBasedOnLatestCommit = this.refreshTimelineBasedOnLatestCommit; return config; } }