1
0

[HUDI-3899] Drop index to delete pending index instants from timeline if applicable (#5342)

Co-authored-by: sivabalan <n.siva.b@gmail.com>
This commit is contained in:
Sagar Sumit
2022-04-19 07:58:46 +05:30
committed by GitHub
parent 52d878c52b
commit 4f44e6aeb5
4 changed files with 225 additions and 17 deletions

View File

@@ -18,13 +18,9 @@
package org.apache.hudi.metadata;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -70,6 +66,12 @@ import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -88,6 +90,9 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
@@ -727,9 +732,33 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
LOG.warn("Deleting Metadata Table partitions: " + partitionPath);
dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true);
// delete corresponding pending indexing instant file in the timeline
LOG.warn("Deleting pending indexing instant from the timeline for partition: " + partitionPath);
deletePendingIndexingInstant(dataMetaClient, partitionPath);
}
}
/**
* Deletes any pending indexing instant, if it exists.
* It reads the plan from indexing.requested file and deletes both requested and inflight instants,
* if the partition path in the plan matches with the given partition path.
*/
private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClient, String partitionPath) {
metaClient.reloadActiveTimeline().filterPendingIndexTimeline().getInstants().filter(instant -> REQUESTED.equals(instant.getState()))
.forEach(instant -> {
try {
HoodieIndexPlan indexPlan = deserializeIndexPlan(metaClient.getActiveTimeline().readIndexPlanAsBytes(instant).get());
if (indexPlan.getIndexPartitionInfos().stream()
.anyMatch(indexPartitionInfo -> indexPartitionInfo.getMetadataPartitionPath().equals(partitionPath))) {
metaClient.getActiveTimeline().deleteInstantFileIfExists(instant);
metaClient.getActiveTimeline().deleteInstantFileIfExists(getIndexInflightInstant(instant.getTimestamp()));
}
} catch (IOException e) {
LOG.error("Failed to delete the instant file corresponding to " + instant);
}
});
}
private MetadataRecordsGenerationParams getRecordsGenerationParams() {
return new MetadataRecordsGenerationParams(
dataMetaClient,

View File

@@ -85,7 +85,7 @@ import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
public class HoodieIndexer {
private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
private static final String DROP_INDEX = "dropindex";
static final String DROP_INDEX = "dropindex";
private final HoodieIndexer.Config cfg;
private TypedProperties props;

View File

@@ -28,14 +28,17 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -50,10 +53,16 @@ import java.util.List;
import java.util.Objects;
import static org.apache.hudi.common.table.HoodieTableMetaClient.reload;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.apache.hudi.utilities.HoodieIndexer.DROP_INDEX;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -81,6 +90,14 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
initMetaClient();
}
protected void initMetaClient() throws IOException {
String rootPathStr = "file://" + tempDir.toAbsolutePath().toString();
Path rootPath = new Path(rootPathStr);
rootPath.getFileSystem(jsc.hadoopConfiguration()).mkdirs(rootPath);
metaClient = HoodieTestUtils.init(rootPathStr, getTableType());
basePath = metaClient.getBasePath();
}
@Test
public void testGetRequestedPartitionTypes() {
HoodieIndexer.Config config = new HoodieIndexer.Config();
@@ -132,29 +149,166 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
assertNoWriteErrors(statuses);
// validate table config
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
// build indexer config which has only column_stats enabled (files is enabled by default)
HoodieIndexer.Config config = new HoodieIndexer.Config();
String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
config.basePath = basePath;
config.tableName = tableName;
config.indexTypes = "COLUMN_STATS";
config.runningMode = "scheduleAndExecute";
config.indexTypes = COLUMN_STATS.name();
config.runningMode = SCHEDULE_AND_EXECUTE;
config.propsFilePath = propsPath;
// start the indexer and validate column_stats index is also complete
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));
// validate table config
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath()));
// validate metadata partitions actually exist
assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, FILES));
assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, COLUMN_STATS));
assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, BLOOM_FILTERS));
assertTrue(metadataPartitionExists(basePath, context, FILES));
assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS));
assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
}
@Test
public void testIndexerDropPartitionDeletesInstantFromTimeline() {
initTestDataGenerator();
String tableName = "indexer_test";
HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName);
// enable files on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true);
HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
// do one upsert with synchronous metadata update
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
String instant = "0001";
writeClient.startCommitWithTime(instant);
List<HoodieRecord> records = dataGen.generateInserts(instant, 100);
JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), instant);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
// validate partitions built successfully
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath, context, FILES));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
// build indexer config which has only column_stats enabled (files is enabled by default)
HoodieIndexer.Config config = new HoodieIndexer.Config();
String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath();
config.basePath = basePath;
config.tableName = tableName;
config.indexTypes = COLUMN_STATS.name();
config.runningMode = SCHEDULE;
config.propsFilePath = propsPath;
// schedule indexing and validate column_stats index is also initialized
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));
Option<HoodieInstant> indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
assertTrue(indexInstantInTimeline.isPresent());
assertEquals(REQUESTED, indexInstantInTimeline.get().getState());
assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS));
// drop column_stats and validate indexing.requested is also removed from the timeline
config.runningMode = DROP_INDEX;
indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));
indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
assertFalse(indexInstantInTimeline.isPresent());
assertFalse(metadataPartitionExists(basePath, context, COLUMN_STATS));
// check other partitions are intact
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath, context, FILES));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
}
@Test
public void testTwoIndexersOneCreateOneDropPartition() {
initTestDataGenerator();
String tableName = "indexer_test";
HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName);
// enable files on the regular write client
HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false);
HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build();
// do one upsert with synchronous metadata update
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
String instant = "0001";
writeClient.startCommitWithTime(instant);
List<HoodieRecord> records = dataGen.generateInserts(instant, 100);
JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), instant);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
// validate files partition built successfully
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath, context, FILES));
// build indexer config which has only bloom_filters enabled
HoodieIndexer.Config config = getHoodieIndexConfig(BLOOM_FILTERS.name(), SCHEDULE_AND_EXECUTE, "delta-streamer-config/indexer-only-bloom.properties");
// start the indexer and validate bloom_filters index is also complete
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
// completed index timeline for later validation
Option<HoodieInstant> bloomIndexInstant = metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant();
assertTrue(bloomIndexInstant.isPresent());
// build indexer config which has only column_stats enabled
config = getHoodieIndexConfig(COLUMN_STATS.name(), SCHEDULE, "delta-streamer-config/indexer.properties");
// schedule indexing and validate column_stats index is also initialized
// and indexing.requested instant is present
indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));
Option<HoodieInstant> columnStatsIndexInstant = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
assertTrue(columnStatsIndexInstant.isPresent());
assertEquals(REQUESTED, columnStatsIndexInstant.get().getState());
assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS));
// drop column_stats and validate indexing.requested is also removed from the timeline
// and completed indexing instant corresponding to bloom_filters index is still present
dropIndexAndAssert(COLUMN_STATS, "delta-streamer-config/indexer.properties", Option.empty());
// check other partitions are intact
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath, context, FILES));
assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath()));
assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS));
// drop bloom filter partition. timeline files should not be deleted since the index building is complete.
dropIndexAndAssert(BLOOM_FILTERS, "delta-streamer-config/indexer-only-bloom.properties", bloomIndexInstant);
}
private void dropIndexAndAssert(MetadataPartitionType indexType, String resourceFilePath, Option<HoodieInstant> completedIndexInstant) {
HoodieIndexer.Config config = getHoodieIndexConfig(indexType.name(), DROP_INDEX, resourceFilePath);
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
assertEquals(0, indexer.start(0));
Option<HoodieInstant> pendingFlights = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant();
assertFalse(pendingFlights.isPresent());
assertFalse(metadataPartitionExists(basePath, context, indexType));
if (completedIndexInstant.isPresent()) {
assertEquals(completedIndexInstant, metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant());
}
}
private HoodieIndexer.Config getHoodieIndexConfig(String indexType, String runMode, String resourceFilePath) {
HoodieIndexer.Config config = new HoodieIndexer.Config();
String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource(resourceFilePath)).getPath();
config.basePath = basePath;
config.tableName = tableName;
config.indexTypes = indexType;
config.runningMode = runMode;
config.propsFilePath = propsPath;
return config;
}
private static HoodieWriteConfig.Builder getWriteConfigBuilder(String basePath, String tableName) {

View File

@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
hoodie.metadata.enable=true
hoodie.metadata.index.async=true
hoodie.metadata.index.bloom.filter.enable=true
hoodie.metadata.index.check.timeout.seconds=60
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider