1
0

[HUDI-2488][HUDI-3175] Implement async metadata indexing (#4693)

- Add a new action called INDEX, whose state transition is described in the RFC.
- Changes in timeline to support the new action.
- Add an index planner in ScheduleIndexActionExecutor.
- Add index plan executor in RunIndexActionExecutor.
- Add 3 APIs in HoodieTableMetadataWriter; a) scheduleIndex: will generate an index plan based on latest completed instant, initialize file groups and add a requested INDEX instant, b) index: executes the index plan and also takes care of writes that happened after indexing was requested, c) dropIndex: will drop index by removing the given metadata partition.
- Add 2 new table configs to serve as the source of truth for inflight and completed indexes.
- Support upgrade/downgrade taking care of the newly added configs.
- Add tool to trigger indexing in HoodieIndexer.
- Handle corner cases related to partial failures.
- Abort gracefully after deleting partition and instant.
- Handle other actions in timeline to consider before catching up
This commit is contained in:
Sagar Sumit
2022-04-01 01:33:12 +05:30
committed by GitHub
parent 1da196c1e8
commit 28dafa774e
44 changed files with 2123 additions and 150 deletions

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.cli.commands;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.cli.DedupeSparkJob;
@@ -54,9 +53,10 @@ import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieCompactionAdminTool;
import org.apache.hudi.utilities.HoodieCompactionAdminTool.Operation;
import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -67,6 +67,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.buildProperties;
import static org.apache.hudi.utilities.UtilHelpers.readConfig;
/**
* This class deals with initializing spark context based on command entered to hudi-cli.
*/
@@ -194,7 +200,7 @@ public class SparkMain {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2],
Integer.parseInt(args[7]), HoodieClusteringJob.EXECUTE, propsFilePath, configs);
Integer.parseInt(args[7]), EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE_AND_EXECUTE:
assert (args.length >= 8);
@@ -207,7 +213,7 @@ public class SparkMain {
configs.addAll(Arrays.asList(args).subList(8, args.length));
}
returnCode = cluster(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[2],
Integer.parseInt(args[6]), HoodieClusteringJob.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
Integer.parseInt(args[6]), SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE:
assert (args.length >= 7);
@@ -220,7 +226,7 @@ public class SparkMain {
configs.addAll(Arrays.asList(args).subList(7, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
0, HoodieClusteringJob.SCHEDULE, propsFilePath, configs);
0, SCHEDULE, propsFilePath, configs);
break;
case CLEAN:
assert (args.length >= 5);
@@ -413,8 +419,8 @@ public class SparkMain {
String bootstrapIndexClass, String selectorClass, String keyGenerator, String fullBootstrapInputProvider,
String payloadClassName, String enableHiveSync, String propsFilePath, List<String> configs) throws IOException {
TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
: UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true);
TypedProperties properties = propsFilePath == null ? buildProperties(configs)
: readConfig(jsc.hadoopConfiguration(), new Path(propsFilePath), configs).getProps(true);
properties.setProperty(HoodieBootstrapConfig.BASE_PATH.key(), sourcePath);

View File

@@ -25,6 +25,8 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -63,12 +65,14 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
@@ -405,7 +409,6 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
public abstract O bulkInsert(I records, final String instantTime,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);
/**
* Loads the given HoodieRecords, as inserts into the table. This is suitable for doing big bulk loads into a Hoodie
* table for the very first time (e.g: converting an existing table to Hoodie). The input records should contain no
@@ -956,6 +959,53 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
}
/**
* Schedules INDEX action.
*
* @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed
* @return instant time for the requested INDEX action
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
.scheduleIndexing(context, instantTime, partitionTypes);
return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
}
/**
* Runs INDEX action to build out the metadata partitions as planned for the given instant time.
*
* @param indexInstantTime - instant time for the requested INDEX action
* @return {@link Option<HoodieIndexCommitMetadata>} after successful indexing.
*/
public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
return createTable(config, hadoopConf, config.isMetadataTableEnabled()).index(context, indexInstantTime);
}
/**
* Drops the index and removes the metadata partitions.
*
* @param partitionTypes - list of {@link MetadataPartitionType} which needs to be indexed
*/
public void dropIndex(List<MetadataPartitionType> partitionTypes) {
HoodieTable table = createTable(config, hadoopConf);
String dropInstant = HoodieActiveTimeline.createNewInstantTime();
this.txnManager.beginTransaction();
try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
table.getMetadataWriter(dropInstant).ifPresent(w -> {
try {
((HoodieTableMetadataWriter) w).dropMetadataPartitions(partitionTypes);
} catch (IOException e) {
throw new HoodieIndexException("Failed to drop metadata index. ", e);
}
});
} finally {
this.txnManager.endTransaction();
}
}
/**
* Performs Compaction for the workload stored in instant-time.
*

View File

@@ -1507,8 +1507,20 @@ public class HoodieWriteConfig extends HoodieConfig {
return isMetadataTableEnabled() && getMetadataConfig().isBloomFilterIndexEnabled();
}
public boolean isMetadataIndexColumnStatsForAllColumnsEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled();
public boolean isMetadataColumnStatsIndexEnabled() {
return isMetadataTableEnabled() && getMetadataConfig().isColumnStatsIndexEnabled();
}
public String getColumnsEnabledForColumnStatsIndex() {
return getMetadataConfig().getColumnsEnabledForColumnStatsIndex();
}
public String getColumnsEnabledForBloomFilterIndex() {
return getMetadataConfig().getColumnsEnabledForBloomFilterIndex();
}
public int getIndexingCheckTimeoutSeconds() {
return getMetadataConfig().getIndexingCheckTimeoutSeconds();
}
public int getColumnStatsIndexParallelism() {
@@ -1892,6 +1904,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieMetadataConfig.ASYNC_CLEAN_ENABLE);
}
public boolean isMetadataAsyncIndex() {
return getBooleanOrDefault(HoodieMetadataConfig.ASYNC_INDEX_ENABLE);
}
public int getMetadataMaxCommitsToKeep() {
return getInt(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP);
}

View File

@@ -120,7 +120,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
// Step 2: Load all involved files as <Partition, filename> pairs
List<Pair<String, BloomIndexFileInfo>> fileInfoList;
if (config.getBloomIndexPruneByRanges()) {
fileInfoList = (config.getMetadataConfig().isColumnStatsIndexEnabled()
fileInfoList = (config.isMetadataColumnStatsIndexEnabled()
? loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable)
: loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable));
} else {

View File

@@ -50,12 +50,14 @@ import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SizeEstimator;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
@@ -69,8 +71,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.accumulateColumnRanges;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.aggregateColumnStats;
@@ -343,16 +347,27 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
updateWriteStatus(stat, result);
}
if (config.isMetadataIndexColumnStatsForAllColumnsEnabled()) {
if (config.isMetadataColumnStatsIndexEnabled()) {
final List<Schema.Field> fieldsToIndex;
if (!StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) {
Set<String> columnsToIndex = Stream.of(config.getColumnsEnabledForColumnStatsIndex().split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
fieldsToIndex = writeSchemaWithMetaFields.getFields().stream()
.filter(field -> columnsToIndex.contains(field.name())).collect(Collectors.toList());
} else {
// if column stats index is enabled but columns not configured then we assume that all columns should be indexed
fieldsToIndex = writeSchemaWithMetaFields.getFields();
}
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
? stat.getRecordsStats().get().getStats() : new HashMap<>();
final String filePath = stat.getPath();
// initialize map of column name to map of stats name to stats value
Map<String, Map<String, Object>> columnToStats = new HashMap<>();
writeSchemaWithMetaFields.getFields().forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
fieldsToIndex.forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
// collect stats for columns at once per record and keep iterating through every record to eventually find col stats for all fields.
recordList.forEach(record -> aggregateColumnStats(record, writeSchemaWithMetaFields, columnToStats, config.isConsistentLogicalTimestampEnabled()));
writeSchemaWithMetaFields.getFields().forEach(field -> accumulateColumnRanges(field, filePath, columnRangeMap, columnToStats));
recordList.forEach(record -> aggregateColumnStats(record, fieldsToIndex, columnToStats, config.isConsistentLogicalTimestampEnabled()));
fieldsToIndex.forEach(field -> accumulateColumnRanges(field, filePath, columnRangeMap, columnToStats));
stat.setRecordsStats(new HoodieDeltaWriteStat.RecordsStats<>(columnRangeMap));
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -54,6 +55,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -62,6 +64,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.avro.specific.SpecificRecordBase;
@@ -80,14 +83,19 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
/**
* Writer implementation backed by an internal hudi table. Partition and file listing are saved within an internal MOR table
@@ -113,7 +121,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
protected boolean enabled;
protected SerializableConfiguration hadoopConf;
protected final transient HoodieEngineContext engineContext;
// TODO: HUDI-3258 Support secondary key via multiple partitions within a single type
protected final List<MetadataPartitionType> enabledPartitionTypes;
/**
@@ -363,6 +370,18 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
if (!exists) {
// Initialize for the first time by listing partitions and files directly from the file system
if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
}
}
}
private <T extends SpecificRecordBase> boolean metadataTableExists(HoodieTableMetaClient dataMetaClient,
Option<T> actionMetadata) throws IOException {
boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(),
HoodieTableMetaClient.METAFOLDER_NAME));
boolean reInitialize = false;
@@ -391,12 +410,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
exists = false;
}
if (!exists) {
// Initialize for the first time by listing partitions and files directly from the file system
if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
}
}
return exists;
}
/**
@@ -451,7 +465,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
final String INSTANT_ACTION = (actionMetadata.get() instanceof HoodieRollbackMetadata
? HoodieTimeline.ROLLBACK_ACTION
: (actionMetadata.get() instanceof HoodieRestoreMetadata ? HoodieTimeline.RESTORE_ACTION : ""));
: (actionMetadata.get() instanceof HoodieRestoreMetadata ? HoodieTimeline.RESTORE_ACTION : EMPTY_STRING));
List<String> affectedInstantTimestamps;
switch (INSTANT_ACTION) {
@@ -511,16 +525,33 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
initializeMetaClient(dataWriteConfig.getMetadataConfig().populateMetaFields());
initTableMetadata();
initializeEnabledFileGroups(dataMetaClient, createInstantTime);
// if async metadata indexing is enabled,
// then only initialize files partition as other partitions will be built using HoodieIndexer
List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>();
if (dataWriteConfig.isMetadataAsyncIndex()) {
enabledPartitionTypes.add(MetadataPartitionType.FILES);
} else {
// all enabled ones should be initialized
enabledPartitionTypes = this.enabledPartitionTypes;
}
initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes);
// During cold startup, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out
// of these large number of files and calling the existing update(HoodieCommitMetadata) function does not scale
// well. Hence, we have a special commit just for the initialization scenario.
initialCommit(createInstantTime);
initialCommit(createInstantTime, enabledPartitionTypes);
updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
return true;
}
private HoodieTableMetaClient initializeMetaClient(boolean populatMetaFields) throws IOException {
private void updateInitializedPartitionsInTableConfig(List<MetadataPartitionType> partitionTypes) {
Set<String> completedPartitions = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
completedPartitions.addAll(partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
}
private HoodieTableMetaClient initializeMetaClient(boolean populateMetaFields) throws IOException {
return HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(tableName)
@@ -528,7 +559,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.setRecordKeyFields(RECORD_KEY_FIELD_NAME)
.setPopulateMetaFields(populatMetaFields)
.setPopulateMetaFields(populateMetaFields)
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
}
@@ -553,7 +584,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
// In each round we will list a section of directories
int numDirsToList = Math.min(fileListingParallelism, pathsToList.size());
// List all directories in parallel
List<DirectoryInfo> processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
List<DirectoryInfo> processedDirectories = engineContext.map(pathsToList.subList(0, numDirsToList), path -> {
FileSystem fs = path.getFileSystem(conf.get());
String relativeDirPath = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path);
return new DirectoryInfo(relativeDirPath, fs.listStatus(path));
@@ -595,13 +626,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* @param createInstantTime - Metadata table create instant time
* @throws IOException
*/
private void initializeEnabledFileGroups(HoodieTableMetaClient dataMetaClient, String createInstantTime) throws IOException {
for (MetadataPartitionType enabledPartitionType : this.enabledPartitionTypes) {
private void initializeEnabledFileGroups(HoodieTableMetaClient dataMetaClient, String createInstantTime, List<MetadataPartitionType> partitionTypes) throws IOException {
for (MetadataPartitionType enabledPartitionType : partitionTypes) {
initializeFileGroups(dataMetaClient, enabledPartitionType, createInstantTime,
enabledPartitionType.getFileGroupCount());
}
}
public void initializeMetadataPartitions(HoodieTableMetaClient dataMetaClient, List<MetadataPartitionType> metadataPartitions, String instantTime) throws IOException {
for (MetadataPartitionType partitionType : metadataPartitions) {
initializeFileGroups(dataMetaClient, partitionType, instantTime, partitionType.getFileGroupCount());
}
}
/**
* Initialize file groups for a partition. For file listing, we just have one file group.
*
@@ -614,7 +651,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
*/
private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime,
int fileGroupCount) throws IOException {
final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
// Archival of data table has a dependency on compaction(base files) in metadata table.
@@ -645,12 +681,34 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
}
}
public void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException {
Set<String> completedIndexes = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
Set<String> inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig());
for (MetadataPartitionType partitionType : metadataPartitions) {
String partitionPath = partitionType.getPartitionPath();
// first update table config
if (inflightIndexes.contains(partitionPath)) {
inflightIndexes.remove(partitionPath);
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join(",", inflightIndexes));
} else if (completedIndexes.contains(partitionPath)) {
completedIndexes.remove(partitionPath);
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), String.join(",", completedIndexes));
}
HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
LOG.warn("Deleting Metadata Table partitions: " + partitionPath);
dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true);
}
}
private MetadataRecordsGenerationParams getRecordsGenerationParams() {
return new MetadataRecordsGenerationParams(
dataMetaClient, enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.isMetadataIndexColumnStatsForAllColumnsEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism());
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(),
StringUtils.toList(dataWriteConfig.getColumnsEnabledForColumnStatsIndex()),
StringUtils.toList(dataWriteConfig.getColumnsEnabledForBloomFilterIndex()));
}
/**
@@ -663,20 +721,82 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
/**
* Processes commit metadata from data table and commits to metadata table.
*
* @param instantTime instant time of interest.
* @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
* @param <T> type of commit metadata.
* @param canTriggerTableService true if table services can be triggered. false otherwise.
*/
private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
if (enabled && metadata != null) {
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
commit(instantTime, partitionRecordsMap, canTriggerTableService);
if (!dataWriteConfig.isMetadataTableEnabled()) {
return;
}
Set<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
Set<String> inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig());
// if indexing is inflight then do not trigger table service
boolean doNotTriggerTableService = partitionsToUpdate.stream().anyMatch(inflightIndexes::contains);
if (enabled && metadata != null) {
// convert metadata and filter only the entries whose partition path are in partitionsToUpdate
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata().entrySet().stream()
.filter(entry -> partitionsToUpdate.contains(entry.getKey().getPartitionPath())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
commit(instantTime, partitionRecordsMap, !doNotTriggerTableService && canTriggerTableService);
}
}
private Set<String> getMetadataPartitionsToUpdate() {
// fetch partitions to update from table config
Set<String> partitionsToUpdate = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
// add inflight indexes as well because the file groups have already been initialized, so writers can log updates
partitionsToUpdate.addAll(getInflightMetadataPartitions(dataMetaClient.getTableConfig()));
if (!partitionsToUpdate.isEmpty()) {
return partitionsToUpdate;
}
// fallback to all enabled partitions if table config returned no partitions
return getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
}
@Override
public void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos) {
if (indexPartitionInfos.isEmpty()) {
LOG.warn("No partition to index in the plan");
return;
}
String indexUptoInstantTime = indexPartitionInfos.get(0).getIndexUptoInstant();
List<MetadataPartitionType> partitionTypes = new ArrayList<>();
indexPartitionInfos.forEach(indexPartitionInfo -> {
String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
LOG.info(String.format("Creating a new metadata index for partition '%s' under path %s upto instant %s",
relativePartitionPath, metadataWriteConfig.getBasePath(), indexUptoInstantTime));
try {
// file group should have already been initialized while scheduling index for this partition
if (!dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), relativePartitionPath))) {
throw new HoodieIndexException(String.format("File group not initialized for metadata partition: %s, indexUptoInstant: %s. Looks like index scheduling failed!",
relativePartitionPath, indexUptoInstantTime));
}
} catch (IOException e) {
throw new HoodieIndexException(String.format("Unable to check whether file group is initialized for metadata partition: %s, indexUptoInstant: %s",
relativePartitionPath, indexUptoInstantTime));
}
// return early and populate enabledPartitionTypes correctly (check in initialCommit)
MetadataPartitionType partitionType = MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
if (!enabledPartitionTypes.contains(partitionType)) {
throw new HoodieIndexException(String.format("Indexing for metadata partition: %s is not enabled", partitionType));
}
partitionTypes.add(partitionType);
});
// before initial commit update inflight indexes in table config
Set<String> inflightIndexes = getInflightMetadataPartitions(dataMetaClient.getTableConfig());
inflightIndexes.addAll(indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join(",", inflightIndexes));
HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
initialCommit(indexUptoInstantTime, partitionTypes);
}
/**
* Update from {@code HoodieCommitMetadata}.
*
* @param commitMetadata {@code HoodieCommitMetadata}
* @param instantTime Timestamp at which the commit was performed
* @param isTableServiceAction {@code true} if commit metadata is pertaining to a table service. {@code false} otherwise.
@@ -776,12 +896,18 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), partitionName);
if (fileSlices.isEmpty()) {
// scheduling of INDEX only initializes the file group and not add commit
// so if there are no committed file slices, look for inflight slices
fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient, Option.ofNullable(fsView), partitionName);
}
ValidationUtils.checkArgument(fileSlices.size() == fileGroupCount,
String.format("Invalid number of file groups for partition:%s, found=%d, required=%d",
partitionName, fileSlices.size(), fileGroupCount));
List<FileSlice> finalFileSlices = fileSlices;
HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
FileSlice slice = finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
fileGroupCount));
r.setCurrentLocation(new HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
return r;
@@ -850,7 +976,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* This is invoked to initialize metadata table for a dataset. Bootstrap Commit has special handling mechanism due to its scale compared to
* other regular commits.
*/
private void initialCommit(String createInstantTime) {
private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) {
// List all partitions in the basePath of the containing dataset
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions");
@@ -877,6 +1003,29 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
return;
}
if (partitionTypes.contains(MetadataPartitionType.FILES)) {
HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
}
if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
}
if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
}
LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata");
commit(createInstantTime, partitionToRecordsMap, false);
}
private HoodieData<HoodieRecord> getFilesPartitionRecords(String createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord allPartitionRecord) {
HoodieData<HoodieRecord> filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1);
if (!partitionInfoList.isEmpty()) {
HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(partitionInfoList, partitionInfoList.size()).map(partitionInfo -> {
@@ -893,23 +1042,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
});
filesPartitionRecords = filesPartitionRecords.union(fileListRecords);
}
ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
}
if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
}
LOG.info("Committing " + partitions.size() + " partitions and " + totalFiles + " files to metadata");
commit(createInstantTime, partitionToRecordsMap, false);
return filesPartitionRecords;
}
/**

View File

@@ -19,45 +19,79 @@
package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
/**
* Interface that supports updating metadata for a given table, as actions complete.
*/
public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
/**
* Builds the given metadata partitions to create index.
*
* @param engineContext
* @param indexPartitionInfos - information about partitions to build such as partition type and base instant time
*/
void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos);
/**
* Initialize file groups for the given metadata partitions when indexing is requested.
*
* @param dataMetaClient - meta client for the data table
* @param metadataPartitions - metadata partitions for which file groups needs to be initialized
* @param instantTime - instant time of the index action
* @throws IOException
*/
void initializeMetadataPartitions(HoodieTableMetaClient dataMetaClient, List<MetadataPartitionType> metadataPartitions, String instantTime) throws IOException;
/**
* Drop the given metadata partitions.
*
* @param metadataPartitions
* @throws IOException
*/
void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException;
/**
* Update the metadata table due to a COMMIT operation.
* @param commitMetadata commit metadata of the operation of interest.
* @param instantTime instant time of the commit.
*
* @param commitMetadata commit metadata of the operation of interest.
* @param instantTime instant time of the commit.
* @param isTableServiceAction true if caller is a table service. false otherwise. Only regular write operations can trigger metadata table services and this argument
* will assist in this.
* will assist in this.
*/
void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction);
/**
* Update the metadata table due to a CLEAN operation.
*
* @param cleanMetadata clean metadata of the operation of interest.
* @param instantTime instant time of the commit.
* @param instantTime instant time of the commit.
*/
void update(HoodieCleanMetadata cleanMetadata, String instantTime);
/**
* Update the metadata table due to a RESTORE operation.
*
* @param restoreMetadata restore metadata of the operation of interest.
* @param instantTime instant time of the commit.
* @param instantTime instant time of the commit.
*/
void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
/**
* Update the metadata table due to a ROLLBACK operation.
*
* @param rollbackMetadata rollback metadata of the operation of interest.
* @param instantTime instant time of the commit.
* @param instantTime instant time of the commit.
*/
void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
}

View File

@@ -28,6 +28,8 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -71,6 +73,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.marker.WriteMarkers;
@@ -431,7 +434,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
*/
public abstract void rollbackBootstrap(HoodieEngineContext context, String instantTime);
/**
* Schedule cleaning for the instant time.
*
@@ -481,6 +483,25 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
boolean deleteInstants,
boolean skipLocking);
/**
* Schedules Indexing for the table to the given instant.
*
* @param context HoodieEngineContext
* @param indexInstantTime Instant time for scheduling index action.
* @param partitionsToIndex List of {@link MetadataPartitionType} that should be indexed.
* @return HoodieIndexPlan containing metadata partitions and instant upto which they should be indexed.
*/
public abstract Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex);
/**
* Execute requested index action.
*
* @param context HoodieEngineContext
* @param indexInstantTime Instant time for which index action was scheduled.
* @return HoodieIndexCommitMetadata containing write stats for each metadata partition.
*/
public abstract Option<HoodieIndexCommitMetadata> index(HoodieEngineContext context, String indexInstantTime);
/**
* Create a savepoint at the specified instant, so that the table can be restored
* to this point-in-timeline later if needed.
@@ -748,7 +769,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
* Get Table metadata writer.
*
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write
* @return instance of {@link HoodieTableMetadataWriter
* @return instance of {@link HoodieTableMetadataWriter}
*/
public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp) {
return getMetadataWriter(triggeringInstantTimestamp, Option.empty());

View File

@@ -0,0 +1,390 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.index;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.INDEXING_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.RESTORE_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
/**
* Reads the index plan and executes the plan.
* It also reconciles updates on data timeline while indexing was in progress.
*/
public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
private static final int MAX_CONCURRENT_INDEXING = 1;
private static final int TIMELINE_RELOAD_INTERVAL_MILLIS = 5000;
// we use this to update the latest instant in data timeline that has been indexed in metadata table
// this needs to be volatile as it can be updated in the IndexingCheckTask spawned by this executor
// assumption is that only one indexer can execute at a time
private volatile String currentCaughtupInstant;
private final TransactionManager txnManager;
public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
super(context, config, table, instantTime);
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}
@Override
public Option<HoodieIndexCommitMetadata> execute() {
HoodieTimer indexTimer = new HoodieTimer();
indexTimer.startTimer();
HoodieInstant indexInstant = validateAndGetIndexInstant();
// read HoodieIndexPlan
HoodieIndexPlan indexPlan;
try {
indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
} catch (IOException e) {
throw new HoodieIndexException("Failed to read the index plan for instant: " + indexInstant);
}
List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
try {
if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
}
// ensure the metadata partitions for the requested indexes are not already available (or inflight)
Set<String> indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
Set<String> requestedPartitions = indexPartitionInfos.stream()
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
requestedPartitions.retainAll(indexesInflightOrCompleted);
if (!requestedPartitions.isEmpty()) {
throw new HoodieIndexException(String.format("Following partitions already exist or inflight: %s", requestedPartitions));
}
// transition requested indexInstant to inflight
table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
// start indexing for each partition
HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
// this will only build index upto base instant as generated by the plan, we will be doing catchup later
String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
LOG.info("Starting Index Building with base instant: " + indexUptoInstant);
metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
// get remaining instants to catchup
List<HoodieInstant> instantsToCatchup = getInstantsToCatchup(indexUptoInstant);
LOG.info("Total remaining instants to index: " + instantsToCatchup.size());
// reconcile with metadata table timeline
String metadataBasePath = getMetadataTableBasePath(table.getMetaClient().getBasePath());
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
Set<String> metadataCompletedTimestamps = getCompletedArchivedAndActiveInstantsAfter(indexUptoInstant, metadataMetaClient).stream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
// index catchup for all remaining instants with a timeout
currentCaughtupInstant = indexUptoInstant;
catchupWithInflightWriters(metadataWriter, instantsToCatchup, metadataMetaClient, metadataCompletedTimestamps);
// save index commit metadata and update table config
List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = indexPartitionInfos.stream()
.map(info -> new HoodieIndexPartitionInfo(
info.getVersion(),
info.getMetadataPartitionPath(),
currentCaughtupInstant))
.collect(Collectors.toList());
HoodieIndexCommitMetadata indexCommitMetadata = HoodieIndexCommitMetadata.newBuilder()
.setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build();
updateTableConfigAndTimeline(indexInstant, finalIndexPartitionInfos, indexCommitMetadata);
return Option.of(indexCommitMetadata);
} catch (IOException e) {
// abort gracefully
abort(indexInstant, indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
throw new HoodieIndexException(String.format("Unable to index instant: %s", indexInstant));
}
}
private void abort(HoodieInstant indexInstant, Set<String> requestedPartitions) {
Set<String> inflightPartitions = getInflightMetadataPartitions(table.getMetaClient().getTableConfig());
Set<String> completedPartitions = getCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
// update table config
requestedPartitions.forEach(partition -> {
inflightPartitions.remove(partition);
completedPartitions.remove(partition);
});
table.getMetaClient().getTableConfig().setValue(TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join(",", inflightPartitions));
table.getMetaClient().getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(table.getMetaClient().getFs(), new Path(table.getMetaClient().getMetaPath()), table.getMetaClient().getTableConfig().getProps());
// delete metadata partition
requestedPartitions.forEach(partition -> {
MetadataPartitionType partitionType = MetadataPartitionType.valueOf(partition.toUpperCase(Locale.ROOT));
if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType)) {
deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType);
}
});
// delete inflight instant
table.getMetaClient().reloadActiveTimeline().deleteInstantFileIfExists(HoodieTimeline.getIndexInflightInstant(indexInstant.getTimestamp()));
}
private List<HoodieInstant> getInstantsToCatchup(String indexUptoInstant) {
// since only write timeline was considered while scheduling index, which gives us the indexUpto instant
// here we consider other valid actions to pick catchupStart instant
Set<String> validActions = CollectionUtils.createSet(CLEAN_ACTION, RESTORE_ACTION, ROLLBACK_ACTION);
Option<HoodieInstant> catchupStartInstant = table.getMetaClient().reloadActiveTimeline()
.getTimelineOfActions(validActions)
.filterInflightsAndRequested()
.findInstantsBefore(indexUptoInstant)
.firstInstant();
// get all instants since the plan completed (both from active timeline and archived timeline)
List<HoodieInstant> instantsToIndex;
if (catchupStartInstant.isPresent()) {
instantsToIndex = getRemainingArchivedAndActiveInstantsSince(catchupStartInstant.get().getTimestamp(), table.getMetaClient());
} else {
instantsToIndex = getRemainingArchivedAndActiveInstantsSince(indexUptoInstant, table.getMetaClient());
}
return instantsToIndex;
}
private HoodieInstant validateAndGetIndexInstant() {
// ensure lock provider configured
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() || StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
throw new HoodieIndexException(String.format("Need to set %s as %s and configure lock provider class",
WRITE_CONCURRENCY_MODE.key(), OPTIMISTIC_CONCURRENCY_CONTROL.name()));
}
return table.getActiveTimeline()
.filterPendingIndexTimeline()
.filter(instant -> instant.getTimestamp().equals(instantTime) && REQUESTED.equals(instant.getState()))
.lastInstant()
.orElseThrow(() -> new HoodieIndexException(String.format("No requested index instant found: %s", instantTime)));
}
private void updateTableConfigAndTimeline(HoodieInstant indexInstant,
List<HoodieIndexPartitionInfo> finalIndexPartitionInfos,
HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
try {
// update the table config and timeline in a lock as there could be another indexer running
txnManager.beginTransaction();
updateMetadataPartitionsTableConfig(table.getMetaClient(),
finalIndexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
table.getActiveTimeline().saveAsComplete(
new HoodieInstant(true, INDEXING_ACTION, indexInstant.getTimestamp()),
TimelineMetadataUtils.serializeIndexCommitMetadata(indexCommitMetadata));
} finally {
txnManager.endTransaction();
}
}
private void catchupWithInflightWriters(HoodieTableMetadataWriter metadataWriter, List<HoodieInstant> instantsToIndex,
HoodieTableMetaClient metadataMetaClient, Set<String> metadataCompletedTimestamps) {
ExecutorService executorService = Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
Future<?> indexingCatchupTaskFuture = executorService.submit(
new IndexingCatchupTask(metadataWriter, instantsToIndex, metadataCompletedTimestamps, table.getMetaClient(), metadataMetaClient));
try {
LOG.info("Starting index catchup task");
indexingCatchupTaskFuture.get(config.getIndexingCheckTimeoutSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
indexingCatchupTaskFuture.cancel(true);
throw new HoodieIndexException(String.format("Index catchup failed. Current indexed instant = %s. Aborting!", currentCaughtupInstant), e);
} finally {
executorService.shutdownNow();
}
}
private static List<HoodieInstant> getRemainingArchivedAndActiveInstantsSince(String instant, HoodieTableMetaClient metaClient) {
List<HoodieInstant> remainingInstantsToIndex = metaClient.getArchivedTimeline().getInstants()
.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), GREATER_THAN_OR_EQUALS, instant))
.filter(i -> !INDEXING_ACTION.equals(i.getAction()))
.collect(Collectors.toList());
remainingInstantsToIndex.addAll(metaClient.getActiveTimeline().findInstantsAfter(instant).getInstants()
.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), GREATER_THAN_OR_EQUALS, instant))
.filter(i -> !INDEXING_ACTION.equals(i.getAction()))
.collect(Collectors.toList()));
return remainingInstantsToIndex;
}
private static List<HoodieInstant> getCompletedArchivedAndActiveInstantsAfter(String instant, HoodieTableMetaClient metaClient) {
List<HoodieInstant> completedInstants = metaClient.getArchivedTimeline().filterCompletedInstants().findInstantsAfter(instant)
.getInstants().filter(i -> !INDEXING_ACTION.equals(i.getAction())).collect(Collectors.toList());
completedInstants.addAll(metaClient.reloadActiveTimeline().filterCompletedInstants().findInstantsAfter(instant)
.getInstants().filter(i -> !INDEXING_ACTION.equals(i.getAction())).collect(Collectors.toList()));
return completedInstants;
}
private void updateMetadataPartitionsTableConfig(HoodieTableMetaClient metaClient, Set<String> metadataPartitions) {
// remove from inflight and update completed indexes
Set<String> inflightPartitions = getInflightMetadataPartitions(metaClient.getTableConfig());
Set<String> completedPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
inflightPartitions.removeAll(metadataPartitions);
completedPartitions.addAll(metadataPartitions);
// update table config
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS_INFLIGHT.key(), String.join(",", inflightPartitions));
metaClient.getTableConfig().setValue(TABLE_METADATA_PARTITIONS.key(), String.join(",", completedPartitions));
HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps());
}
/**
* Indexing check runs for instants that completed after the base instant (in the index plan).
* It will check if these later instants have logged updates to metadata table or not.
* If not, then it will do the update. If a later instant is inflight, it will wait until it is completed or the task times out.
*/
class IndexingCatchupTask implements Runnable {
private final HoodieTableMetadataWriter metadataWriter;
private final List<HoodieInstant> instantsToIndex;
private final Set<String> metadataCompletedInstants;
private final HoodieTableMetaClient metaClient;
private final HoodieTableMetaClient metadataMetaClient;
IndexingCatchupTask(HoodieTableMetadataWriter metadataWriter,
List<HoodieInstant> instantsToIndex,
Set<String> metadataCompletedInstants,
HoodieTableMetaClient metaClient,
HoodieTableMetaClient metadataMetaClient) {
this.metadataWriter = metadataWriter;
this.instantsToIndex = instantsToIndex;
this.metadataCompletedInstants = metadataCompletedInstants;
this.metaClient = metaClient;
this.metadataMetaClient = metadataMetaClient;
}
@Override
public void run() {
for (HoodieInstant instant : instantsToIndex) {
// metadata index already updated for this instant
if (!metadataCompletedInstants.isEmpty() && metadataCompletedInstants.contains(instant.getTimestamp())) {
currentCaughtupInstant = instant.getTimestamp();
continue;
}
while (!instant.isCompleted()) {
try {
LOG.warn("instant not completed, reloading timeline " + instant);
// reload timeline and fetch instant details again wait until timeout
String instantTime = instant.getTimestamp();
Option<HoodieInstant> currentInstant = metaClient.reloadActiveTimeline()
.filterCompletedInstants().filter(i -> i.getTimestamp().equals(instantTime)).firstInstant();
instant = currentInstant.orElse(instant);
// so that timeline is not reloaded very frequently
Thread.sleep(TIMELINE_RELOAD_INTERVAL_MILLIS);
} catch (InterruptedException e) {
throw new HoodieIndexException(String.format("Thread interrupted while running indexing check for instant: %s", instant), e);
}
}
// if instant completed, ensure that there was metadata commit, else update metadata for this completed instant
if (COMPLETED.equals(instant.getState())) {
String instantTime = instant.getTimestamp();
Option<HoodieInstant> metadataInstant = metadataMetaClient.reloadActiveTimeline()
.filterCompletedInstants().filter(i -> i.getTimestamp().equals(instantTime)).firstInstant();
if (metadataInstant.isPresent()) {
currentCaughtupInstant = instantTime;
continue;
}
try {
// we need take a lock here as inflight writer could also try to update the timeline
txnManager.beginTransaction(Option.of(instant), Option.empty());
LOG.info("Updating metadata table for instant: " + instant);
switch (instant.getAction()) {
// TODO: see if this can be moved to metadata writer itself
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
// do not trigger any table service as partition is not fully built out yet
metadataWriter.update(commitMetadata, instant.getTimestamp(), false);
break;
case CLEAN_ACTION:
HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(table.getMetaClient(), instant);
metadataWriter.update(cleanMetadata, instant.getTimestamp());
break;
case RESTORE_ACTION:
HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
table.getActiveTimeline().getInstantDetails(instant).get());
metadataWriter.update(restoreMetadata, instant.getTimestamp());
break;
case ROLLBACK_ACTION:
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
table.getActiveTimeline().getInstantDetails(instant).get());
metadataWriter.update(rollbackMetadata, instant.getTimestamp());
break;
default:
throw new IllegalStateException("Unexpected value: " + instant.getAction());
}
} catch (IOException e) {
throw new HoodieIndexException(String.format("Could not update metadata partition for instant: %s", instant), e);
} finally {
txnManager.endTransaction(Option.of(instant));
}
}
}
}
}
}

View File

@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.index;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataPartition;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getInflightAndCompletedMetadataPartitions;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
/**
* Schedules INDEX action.
* <li>
* 1. Fetch last completed instant on data timeline.
* 2. Write the index plan to the <instant>.index.requested.
* 3. Initialize file groups for the enabled partition types within a transaction.
* </li>
*/
public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexPlan>> {
private static final Logger LOG = LogManager.getLogger(ScheduleIndexActionExecutor.class);
private static final Integer INDEX_PLAN_VERSION_1 = 1;
private static final Integer LATEST_INDEX_PLAN_VERSION = INDEX_PLAN_VERSION_1;
private final List<MetadataPartitionType> partitionIndexTypes;
private final TransactionManager txnManager;
public ScheduleIndexActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
List<MetadataPartitionType> partitionIndexTypes) {
super(context, config, table, instantTime);
this.partitionIndexTypes = partitionIndexTypes;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}
@Override
public Option<HoodieIndexPlan> execute() {
validateBeforeScheduling();
// make sure that it is idempotent, check with previously pending index operations.
Set<String> indexesInflightOrCompleted = getInflightAndCompletedMetadataPartitions(table.getMetaClient().getTableConfig());
Set<String> requestedPartitions = partitionIndexTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedPartitions.removeAll(indexesInflightOrCompleted);
if (!requestedPartitions.isEmpty()) {
LOG.warn(String.format("Following partitions already exist or inflight: %s. Going to index only these partitions: %s",
indexesInflightOrCompleted, requestedPartitions));
} else {
LOG.error("All requested index types are inflight or completed: " + partitionIndexTypes);
return Option.empty();
}
List<MetadataPartitionType> finalPartitionsToIndex = partitionIndexTypes.stream()
.filter(p -> requestedPartitions.contains(p.getPartitionPath())).collect(Collectors.toList());
final HoodieInstant indexInstant = HoodieTimeline.getIndexRequestedInstant(instantTime);
try {
this.txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
// get last completed instant
Option<HoodieInstant> indexUptoInstant = table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant();
if (indexUptoInstant.isPresent()) {
// start initializing file groups
// in case FILES partition itself was not initialized before (i.e. metadata was never enabled), this will initialize synchronously
HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
.orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to initialize filegroups for indexing for instant: %s", instantTime)));
metadataWriter.initializeMetadataPartitions(table.getMetaClient(), finalPartitionsToIndex, indexInstant.getTimestamp());
// for each partitionToIndex add that time to the plan
List<HoodieIndexPartitionInfo> indexPartitionInfos = finalPartitionsToIndex.stream()
.map(p -> new HoodieIndexPartitionInfo(LATEST_INDEX_PLAN_VERSION, p.getPartitionPath(), indexUptoInstant.get().getTimestamp()))
.collect(Collectors.toList());
HoodieIndexPlan indexPlan = new HoodieIndexPlan(LATEST_INDEX_PLAN_VERSION, indexPartitionInfos);
// update data timeline with requested instant
table.getActiveTimeline().saveToPendingIndexAction(indexInstant, TimelineMetadataUtils.serializeIndexPlan(indexPlan));
return Option.of(indexPlan);
}
} catch (IOException e) {
LOG.error("Could not initialize file groups", e);
// abort gracefully
abort(indexInstant);
throw new HoodieIOException(e.getMessage(), e);
} finally {
this.txnManager.endTransaction(Option.of(indexInstant));
}
return Option.empty();
}
private void validateBeforeScheduling() {
if (!EnumSet.allOf(MetadataPartitionType.class).containsAll(partitionIndexTypes)) {
throw new HoodieIndexException("Not all index types are valid: " + partitionIndexTypes);
}
// ensure lock provider configured
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() || StringUtils.isNullOrEmpty(config.getLockProviderClass())) {
throw new HoodieIndexException(String.format("Need to set %s as %s and configure lock provider class",
WRITE_CONCURRENCY_MODE.key(), OPTIMISTIC_CONCURRENCY_CONTROL.name()));
}
}
private void abort(HoodieInstant indexInstant) {
// delete metadata partition
partitionIndexTypes.forEach(partitionType -> {
if (metadataPartitionExists(table.getMetaClient().getBasePath(), context, partitionType)) {
deleteMetadataPartition(table.getMetaClient().getBasePath(), context, partitionType);
}
});
// delete requested instant
table.getMetaClient().reloadActiveTimeline().deleteInstantFileIfExists(indexInstant);
}
}

View File

@@ -22,6 +22,7 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import java.util.Collections;
import java.util.Map;
@@ -33,6 +34,11 @@ public class FourToThreeDowngradeHandler implements DowngradeHandler {
@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
if (config.isMetadataTableEnabled()) {
// Metadata Table in version 4 has a schema that is not forward compatible.
// Hence, it is safe to delete the metadata table, which will be re-initialized in subsequent commit.
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
}
return Collections.emptyMap();
}
}

View File

@@ -23,10 +23,15 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.MetadataPartitionType;
import java.util.Hashtable;
import java.util.Map;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
/**
* UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 3 to 4.
*/
@@ -35,7 +40,12 @@ public class ThreeToFourUpgradeHandler implements UpgradeHandler {
@Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
tablePropsToAdd.put(HoodieTableConfig.TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
tablePropsToAdd.put(TABLE_CHECKSUM, String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
// if metadata is enabled and files partition exist then update TABLE_METADATA_INDEX_COMPLETED
// schema for the files partition is same between the two versions
if (config.isMetadataTableEnabled() && metadataPartitionExists(config.getBasePath(), context, MetadataPartitionType.FILES)) {
tablePropsToAdd.put(TABLE_METADATA_PARTITIONS, MetadataPartitionType.FILES.getPartitionPath());
}
return tablePropsToAdd;
}
}

View File

@@ -22,6 +22,8 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -48,6 +50,7 @@ import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
@@ -330,6 +333,16 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}
@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
throw new HoodieNotSupportedException("Metadata indexing is not supported for a Flink table yet.");
}
@Override
public Option<HoodieIndexCommitMetadata> index(HoodieEngineContext context, String indexInstantTime) {
throw new HoodieNotSupportedException("Metadata indexing is not supported for a Flink table yet.");
}
@Override
public HoodieSavepointMetadata savepoint(HoodieEngineContext context, String instantToSavepoint, String user, String comment) {
throw new HoodieNotSupportedException("Savepoint is not supported yet");

View File

@@ -22,6 +22,8 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -44,6 +46,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
@@ -60,6 +63,8 @@ import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor
import org.apache.hudi.table.action.commit.JavaMergeHelper;
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.index.RunIndexActionExecutor;
import org.apache.hudi.table.action.index.ScheduleIndexActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
@@ -232,6 +237,16 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}
@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();
}
@Override
public Option<HoodieIndexCommitMetadata> index(HoodieEngineContext context, String indexInstantTime) {
return new RunIndexActionExecutor<>(context, config, this, indexInstantTime).execute();
}
@Override
public HoodieSavepointMetadata savepoint(HoodieEngineContext context,
String instantToSavepoint,

View File

@@ -22,6 +22,8 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -49,6 +51,7 @@ import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
@@ -67,6 +70,8 @@ import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitAction
import org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.index.RunIndexActionExecutor;
import org.apache.hudi.table.action.index.ScheduleIndexActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
@@ -276,6 +281,16 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
deleteInstants, skipLocking).execute();
}
@Override
public Option<HoodieIndexPlan> scheduleIndexing(HoodieEngineContext context, String indexInstantTime, List<MetadataPartitionType> partitionsToIndex) {
return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();
}
@Override
public Option<HoodieIndexCommitMetadata> index(HoodieEngineContext context, String indexInstantTime) {
return new RunIndexActionExecutor<>(context, config, this, indexInstantTime).execute();
}
@Override
public HoodieSavepointMetadata savepoint(HoodieEngineContext context, String instantToSavepoint, String user, String comment) {
return new SavepointActionExecutor<>(context, config, this, instantToSavepoint, user, comment).execute();

View File

@@ -90,6 +90,9 @@
<import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
<import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieIndexPartitionInfo.avsc</import>
<import>${basedir}/src/main/avro/HoodieIndexPlan.avsc</import>
<import>${basedir}/src/main/avro/HoodieIndexCommitMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
</imports>
</configuration>

View File

@@ -120,6 +120,14 @@
"HoodieCommitMetadata"
],
"default": null
},
{
"name":"hoodieIndexCommitMetadata",
"type":[
"null",
"HoodieIndexCommitMetadata"
],
"default": null
}
]
}

View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieIndexCommitMetadata",
"fields": [
{
"name": "version",
"doc": "This field replaces the field filesToBeDeletedPerPartition",
"type": [
"int",
"null"
],
"default": 1
},
{
"name": "operationType",
"doc": "This field replaces the field filesToBeDeletedPerPartition",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "indexPartitionInfos",
"doc": "This field contains the info for each partition that got indexed",
"type": [
"null",
{
"type": "array",
"items": "HoodieIndexPartitionInfo"
}
],
"default": null
}
]
}

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieIndexPartitionInfo",
"fields": [
{
"name": "version",
"type": [
"int",
"null"
],
"default": 1
},
{
"name": "metadataPartitionPath",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "indexUptoInstant",
"type": [
"null",
"string"
],
"default": null
}
]
}

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieIndexPlan",
"fields": [
{
"name": "version",
"type": [
"int",
"null"
],
"default": 1
},
{
"name": "indexPartitionInfos",
"type": [
"null",
{
"type": "array",
"items": "HoodieIndexPartitionInfo"
}
],
"default": null
}
]
}

View File

@@ -78,7 +78,7 @@ public class SimpleBloomFilter implements BloomFilter {
@Override
public void add(String key) {
if (key == null) {
throw new NullPointerException("Key cannot by null");
throw new NullPointerException("Key cannot be null");
}
filter.add(new Key(key.getBytes(StandardCharsets.UTF_8)));
}
@@ -86,7 +86,7 @@ public class SimpleBloomFilter implements BloomFilter {
@Override
public boolean mightContain(String key) {
if (key == null) {
throw new NullPointerException("Key cannot by null");
throw new NullPointerException("Key cannot be null");
}
return filter.membershipTest(new Key(key.getBytes(StandardCharsets.UTF_8)));
}

View File

@@ -71,6 +71,13 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Enable asynchronous cleaning for metadata table");
// Async index
public static final ConfigProperty<Boolean> ASYNC_INDEX_ENABLE = ConfigProperty
.key(METADATA_PREFIX + ".index.async")
.defaultValue(false)
.sinceVersion("0.11.0")
.withDocumentation("Enable asynchronous indexing of metadata table.");
// Maximum delta commits before compaction occurs
public static final ConfigProperty<Integer> COMPACT_NUM_DELTA_COMMITS = ConfigProperty
.key(METADATA_PREFIX + ".compact.max.delta.commits")
@@ -175,6 +182,25 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Parallelism to use, when generating column stats index.");
public static final ConfigProperty<String> COLUMN_STATS_INDEX_FOR_COLUMNS = ConfigProperty
.key(METADATA_PREFIX + ".index.column.stats.column.list")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of columns for which column stats index will be built. If not set, all columns will be indexed");
public static final ConfigProperty<String> BLOOM_FILTER_INDEX_FOR_COLUMNS = ConfigProperty
.key(METADATA_PREFIX + ".index.bloom.filter.column.list")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of columns for which bloom filter index will be built. If not set, only record key will be indexed.");
public static final ConfigProperty<Integer> METADATA_INDEX_CHECK_TIMEOUT_SECONDS = ConfigProperty
.key(METADATA_PREFIX + ".index.check.timeout.seconds")
.defaultValue(900)
.sinceVersion("0.11.0")
.withDocumentation("After the async indexer has finished indexing upto the base instant, it will ensure that all inflight writers "
+ "reliably write index updates as well. If this timeout expires, then the indexer will abort itself safely.");
public static final ConfigProperty<Boolean> POPULATE_META_FIELDS = ConfigProperty
.key(METADATA_PREFIX + ".populate.meta.fields")
.defaultValue(false)
@@ -221,6 +247,14 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS);
}
public String getColumnsEnabledForColumnStatsIndex() {
return getString(COLUMN_STATS_INDEX_FOR_COLUMNS);
}
public String getColumnsEnabledForBloomFilterIndex() {
return getString(BLOOM_FILTER_INDEX_FOR_COLUMNS);
}
public int getBloomFilterIndexFileGroupCount() {
return getIntOrDefault(METADATA_INDEX_BLOOM_FILTER_FILE_GROUP_COUNT);
}
@@ -233,6 +267,10 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return getIntOrDefault(COLUMN_STATS_INDEX_PARALLELISM);
}
public int getIndexingCheckTimeoutSeconds() {
return getIntOrDefault(METADATA_INDEX_CHECK_TIMEOUT_SECONDS);
}
public boolean enableMetrics() {
return getBoolean(METRICS_ENABLE);
}
@@ -305,6 +343,21 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return this;
}
public Builder withColumnStatsIndexForColumns(String columns) {
metadataConfig.setValue(COLUMN_STATS_INDEX_FOR_COLUMNS, columns);
return this;
}
public Builder withBloomFilterIndexForColumns(String columns) {
metadataConfig.setValue(BLOOM_FILTER_INDEX_FOR_COLUMNS, columns);
return this;
}
public Builder withIndexingCheckTimeout(int timeoutInSeconds) {
metadataConfig.setValue(METADATA_INDEX_CHECK_TIMEOUT_SECONDS, String.valueOf(timeoutInSeconds));
return this;
}
public Builder enableMetrics(boolean enableMetrics) {
metadataConfig.setValue(METRICS_ENABLE, String.valueOf(enableMetrics));
return this;
@@ -320,6 +373,11 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return this;
}
public Builder withAsyncIndex(boolean asyncIndex) {
metadataConfig.setValue(ASYNC_INDEX_ENABLE, String.valueOf(asyncIndex));
return this;
}
public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) {
metadataConfig.setValue(COMPACT_NUM_DELTA_COMMITS, String.valueOf(maxNumDeltaCommitsBeforeCompaction));
return this;

View File

@@ -48,6 +48,8 @@ public enum WriteOperationType {
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// compact
COMPACT("compact"),
INDEX("index"),
// used for old version
UNKNOWN("unknown");
@@ -86,6 +88,8 @@ public enum WriteOperationType {
return CLUSTER;
case "compact":
return COMPACT;
case "index":
return INDEX;
case "unknown":
return UNKNOWN;
default:

View File

@@ -36,6 +36,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -208,6 +209,20 @@ public class HoodieTableConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Table checksum is used to guard against partial writes in HDFS. It is added as the last entry in hoodie.properties and then used to validate while reading table config.");
public static final ConfigProperty<String> TABLE_METADATA_PARTITIONS_INFLIGHT = ConfigProperty
.key("hoodie.table.metadata.partitions.inflight")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of metadata partitions whose building is in progress. "
+ "These partitions are not yet ready for use by the readers.");
public static final ConfigProperty<String> TABLE_METADATA_PARTITIONS = ConfigProperty
.key("hoodie.table.metadata.partitions")
.noDefaultValue()
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of metadata partitions that have been completely built and in-sync with data table. "
+ "These partitions are ready for use by the readers");
private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>
public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
@@ -585,6 +600,14 @@ public class HoodieTableConfig extends HoodieConfig {
return getLong(TABLE_CHECKSUM);
}
public String getMetadataPartitionsInflight() {
return getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, StringUtils.EMPTY_STRING);
}
public String getMetadataPartitions() {
return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING);
}
public Map<String, String> propsMap() {
return props.entrySet().stream()
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));

View File

@@ -73,7 +73,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION,
REQUESTED_RESTORE_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION,
REQUESTED_INDEX_COMMIT_EXTENSION, INFLIGHT_INDEX_COMMIT_EXTENSION, INDEX_COMMIT_EXTENSION));
private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;
@@ -99,7 +100,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return HoodieInstantTimeGenerator.createNewInstantTime(0);
}
/**
* Returns next instant time that adds N milliseconds to current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
@@ -225,7 +225,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
deleteInstantFile(instant);
}
private void deleteInstantFileIfExists(HoodieInstant instant) {
public void deleteInstantFileIfExists(HoodieInstant instant) {
LOG.info("Deleting instant " + instant);
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
try {
@@ -339,6 +339,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
}
public Option<byte[]> readIndexPlanAsBytes(HoodieInstant instant) {
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
}
/**
* Revert compaction State from inflight to requested.
*
@@ -652,6 +656,65 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
createFileInMetaPath(instant.getFileName(), content, false);
}
/**
* Transition index instant state from requested to inflight.
*
* @param requestedInstant Inflight Instant
* @return inflight instant
*/
public HoodieInstant transitionIndexRequestedToInflight(HoodieInstant requestedInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.INDEXING_ACTION),
String.format("%s is not equal to %s action", requestedInstant.getAction(), INDEXING_ACTION));
ValidationUtils.checkArgument(requestedInstant.isRequested(),
String.format("Instant %s not in requested state", requestedInstant.getTimestamp()));
HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, INDEXING_ACTION, requestedInstant.getTimestamp());
transitionState(requestedInstant, inflightInstant, data);
return inflightInstant;
}
/**
* Transition index instant state from inflight to completed.
* @param inflightInstant Inflight Instant
* @return completed instant
*/
public HoodieInstant transitionIndexInflightToComplete(HoodieInstant inflightInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.INDEXING_ACTION),
String.format("%s is not equal to %s action", inflightInstant.getAction(), INDEXING_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight(),
String.format("Instant %s not inflight", inflightInstant.getTimestamp()));
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, INDEXING_ACTION, inflightInstant.getTimestamp());
transitionState(inflightInstant, commitInstant, data);
return commitInstant;
}
/**
* Revert index instant state from inflight to requested.
* @param inflightInstant Inflight Instant
* @return requested instant
*/
public HoodieInstant revertIndexInflightToRequested(HoodieInstant inflightInstant) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.INDEXING_ACTION),
String.format("%s is not equal to %s action", inflightInstant.getAction(), INDEXING_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight(),
String.format("Instant %s not inflight", inflightInstant.getTimestamp()));
HoodieInstant requestedInstant = new HoodieInstant(State.REQUESTED, INDEXING_ACTION, inflightInstant.getTimestamp());
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
transitionState(inflightInstant, requestedInstant, Option.empty());
} else {
deleteInflight(inflightInstant);
}
return requestedInstant;
}
/**
* Save content for inflight/requested index instant.
*/
public void saveToPendingIndexAction(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.INDEXING_ACTION),
String.format("%s is not equal to %s action", instant.getAction(), INDEXING_ACTION));
createFileInMetaPath(instant.getFileName(), content, false);
}
private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
Path fullPath = new Path(metaClient.getMetaPath(), filename);
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {

View File

@@ -118,7 +118,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
*
* @deprecated
*/
public HoodieArchivedTimeline() {}
public HoodieArchivedTimeline() {
}
/**
* This method is only used when this object is deserialized in a spark executor.
@@ -207,6 +208,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
return Option.of("hoodieCompactionPlan");
case HoodieTimeline.REPLACE_COMMIT_ACTION:
return Option.of("hoodieReplaceCommitMetadata");
case HoodieTimeline.INDEXING_ACTION:
return Option.of("hoodieIndexCommitMetadata");
default:
LOG.error(String.format("Unknown action in metadata (%s)", action));
return Option.empty();

View File

@@ -75,7 +75,8 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
*
* @deprecated
*/
public HoodieDefaultTimeline() {}
public HoodieDefaultTimeline() {
}
@Override
public HoodieTimeline filterInflights() {
@@ -112,6 +113,16 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details);
}
@Override
public HoodieTimeline getContiguousCompletedWriteTimeline() {
Option<HoodieInstant> earliestPending = getWriteTimeline().filterInflightsAndRequested().firstInstant();
if (earliestPending.isPresent()) {
return getWriteTimeline().filterCompletedInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), LESSER_THAN, earliestPending.get().getTimestamp()));
}
return getWriteTimeline().filterCompletedInstants();
}
@Override
public HoodieTimeline getCompletedReplaceTimeline() {
return new HoodieDefaultTimeline(
@@ -181,6 +192,16 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
return new HoodieDefaultTimeline(instants.stream().filter(filter), details);
}
@Override
public HoodieTimeline filterPendingIndexTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(INDEXING_ACTION) && !s.isCompleted()), details);
}
@Override
public HoodieTimeline filterCompletedIndexTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(INDEXING_ACTION) && s.isCompleted()), details);
}
/**
* Get all instants (commits, delta commits) that produce new data, in the active timeline.
*/
@@ -189,12 +210,12 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
}
/**
* Get all instants (commits, delta commits, compaction, clean, savepoint, rollback) that result in actions,
* Get all instants (commits, delta commits, compaction, clean, savepoint, rollback, replace commits, index) that result in actions,
* in the active timeline.
*/
public HoodieTimeline getAllCommitsTimeline() {
return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION,
CLEAN_ACTION, COMPACTION_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION, REPLACE_COMMIT_ACTION));
CLEAN_ACTION, COMPACTION_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION, REPLACE_COMMIT_ACTION, INDEXING_ACTION));
}
/**

View File

@@ -172,6 +172,10 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp)
: isRequested() ? HoodieTimeline.makeRequestedReplaceFileName(timestamp)
: HoodieTimeline.makeReplaceFileName(timestamp);
} else if (HoodieTimeline.INDEXING_ACTION.equals(action)) {
return isInflight() ? HoodieTimeline.makeInflightIndexFileName(timestamp)
: isRequested() ? HoodieTimeline.makeRequestedIndexFileName(timestamp)
: HoodieTimeline.makeIndexCommitFileName(timestamp);
}
throw new IllegalArgumentException("Cannot get file name for unknown action " + action);
}

View File

@@ -55,10 +55,11 @@ public interface HoodieTimeline extends Serializable {
String COMPACTION_ACTION = "compaction";
String REQUESTED_EXTENSION = ".requested";
String RESTORE_ACTION = "restore";
String INDEXING_ACTION = "indexing";
String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
COMPACTION_ACTION, REPLACE_COMMIT_ACTION};
COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEXING_ACTION};
String COMMIT_EXTENSION = "." + COMMIT_ACTION;
String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION;
@@ -84,6 +85,9 @@ public interface HoodieTimeline extends Serializable {
String INFLIGHT_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + INFLIGHT_EXTENSION;
String REQUESTED_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + REQUESTED_EXTENSION;
String REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION;
String INFLIGHT_INDEX_COMMIT_EXTENSION = "." + INDEXING_ACTION + INFLIGHT_EXTENSION;
String REQUESTED_INDEX_COMMIT_EXTENSION = "." + INDEXING_ACTION + REQUESTED_EXTENSION;
String INDEX_COMMIT_EXTENSION = "." + INDEXING_ACTION;
String INVALID_INSTANT_TS = "0";
@@ -139,6 +143,15 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline getWriteTimeline();
/**
* Timeline to just include commits (commit/deltacommit), compaction and replace actions that are completed and contiguous.
* For example, if timeline is [C0.completed, C1.completed, C2.completed, C3.inflight, C4.completed].
* Then, a timeline of [C0.completed, C1.completed, C2.completed] will be returned.
*
* @return
*/
HoodieTimeline getContiguousCompletedWriteTimeline();
/**
* Timeline to just include replace instants that have valid (commit/deltacommit) actions.
*
@@ -198,6 +211,16 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filter(Predicate<HoodieInstant> filter);
/**
* Filter this timeline to just include requested and inflight index instants.
*/
HoodieTimeline filterPendingIndexTimeline();
/**
* Filter this timeline to just include completed index instants.
*/
HoodieTimeline filterCompletedIndexTimeline();
/**
* If the timeline has any instants.
*
@@ -341,6 +364,14 @@ public interface HoodieTimeline extends Serializable {
return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant);
}
static HoodieInstant getIndexRequestedInstant(final String timestamp) {
return new HoodieInstant(State.REQUESTED, INDEXING_ACTION, timestamp);
}
static HoodieInstant getIndexInflightInstant(final String timestamp) {
return new HoodieInstant(State.INFLIGHT, INDEXING_ACTION, timestamp);
}
/**
* Returns the inflight instant corresponding to the instant being passed. Takes care of changes in action names
* between inflight and completed instants (compaction <=> commit).
@@ -454,4 +485,16 @@ public interface HoodieTimeline extends Serializable {
static String makeFileNameAsInflight(String fileName) {
return StringUtils.join(fileName, HoodieTimeline.INFLIGHT_EXTENSION);
}
static String makeIndexCommitFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.INDEX_COMMIT_EXTENSION);
}
static String makeInflightIndexFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.INFLIGHT_INDEX_COMMIT_EXTENSION);
}
static String makeRequestedIndexFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.REQUESTED_INDEX_COMMIT_EXTENSION);
}
}

View File

@@ -22,6 +22,8 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
@@ -137,6 +139,14 @@ public class TimelineMetadataUtils {
return serializeAvroMetadata(clusteringPlan, HoodieRequestedReplaceMetadata.class);
}
public static Option<byte[]> serializeIndexPlan(HoodieIndexPlan indexPlan) throws IOException {
return serializeAvroMetadata(indexPlan, HoodieIndexPlan.class);
}
public static Option<byte[]> serializeIndexCommitMetadata(HoodieIndexCommitMetadata indexCommitMetadata) throws IOException {
return serializeAvroMetadata(indexCommitMetadata, HoodieIndexCommitMetadata.class);
}
public static <T extends SpecificRecordBase> Option<byte[]> serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
@@ -180,6 +190,14 @@ public class TimelineMetadataUtils {
return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class);
}
public static HoodieIndexPlan deserializeIndexPlan(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieIndexPlan.class);
}
public static HoodieIndexCommitMetadata deserializeIndexCommitMetadata(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieIndexCommitMetadata.class);
}
public static <T extends SpecificRecordBase> T deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
throws IOException {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);

View File

@@ -18,9 +18,9 @@
package org.apache.hudi.common.table.view;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.common.model.BootstrapBaseFileMapping;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -29,6 +29,8 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -358,6 +360,19 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId));
}
/**
* Get the latest file slices for a given partition including the inflight ones.
*
* @param partitionPath
* @return Stream of latest {@link FileSlice} in the partition path.
*/
public Stream<FileSlice> fetchLatestFileSlicesIncludingInflight(String partitionPath) {
return fetchAllStoredFileGroups(partitionPath)
.map(HoodieFileGroup::getLatestFileSlicesIncludingInflight)
.filter(Option::isPresent)
.map(Option::get);
}
@Override
public void close() {
super.close();

View File

@@ -20,12 +20,24 @@ package org.apache.hudi.common.util;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Simple utility for operations on strings.
*/
public class StringUtils {
public static final String EMPTY_STRING = "";
private static final Function<String, Set<String>> STRING_TO_SET = (str) -> Stream.of(str.split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
private static final Function<String, List<String>> STRING_TO_LIST = (str) -> Stream.of(str.split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
/**
* <p>
@@ -46,7 +58,7 @@ public class StringUtils {
* </pre>
*/
public static <T> String join(final String... elements) {
return join(elements, "");
return join(elements, EMPTY_STRING);
}
public static <T> String joinUsingDelim(String delim, final String... elements) {
@@ -100,4 +112,24 @@ public class StringUtils {
private static boolean stringIsNullOrEmpty(@Nullable String string) {
return string == null || string.isEmpty();
}
/**
* Converts the input string, delimited by comma, to a set of strings.
*
* @param input
* @return
*/
public static Set<String> toSet(@Nullable String input) {
return isNullOrEmpty(input) ? new HashSet<>() : STRING_TO_SET.apply(input);
}
/**
* Converts the input string, delimited by comma, to a list of strings.
*
* @param input
* @return
*/
public static List<String> toList(@Nullable String input) {
return isNullOrEmpty(input) ? new ArrayList<>() : STRING_TO_LIST.apply(input);
}
}

View File

@@ -284,7 +284,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
List<String> partitions = Collections.emptyList();
if (hoodieRecord.isPresent()) {
mayBeHandleSpuriousDeletes(hoodieRecord, "\"all partitions\"");
handleSpuriousDeletes(hoodieRecord, "\"all partitions\"");
partitions = hoodieRecord.get().getData().getFilenames();
// Partition-less tables have a single empty partition
if (partitions.contains(NON_PARTITIONED_NAME)) {
@@ -315,7 +315,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
FileStatus[] statuses = {};
if (hoodieRecord.isPresent()) {
mayBeHandleSpuriousDeletes(hoodieRecord, partitionName);
handleSpuriousDeletes(hoodieRecord, partitionName);
statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath);
}
@@ -350,7 +350,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry: partitionsFileStatus) {
if (entry.getValue().isPresent()) {
mayBeHandleSpuriousDeletes(entry.getValue(), entry.getKey());
handleSpuriousDeletes(entry.getValue(), entry.getKey());
result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey())));
}
}
@@ -360,11 +360,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
/**
* Maybe handle spurious deletes. Depending on config, throw an exception or log a warn msg.
* Handle spurious deletes. Depending on config, throw an exception or log a warn msg.
* @param hoodieRecord instance of {@link HoodieRecord} of interest.
* @param partitionName partition name of interest.
*/
private void mayBeHandleSpuriousDeletes(Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord, String partitionName) {
private void handleSpuriousDeletes(Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord, String partitionName) {
if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
if (metadataConfig.ignoreSpuriousDeletes()) {
LOG.warn("Metadata record for " + partitionName + " encountered some files to be deleted which was not added before. "

View File

@@ -74,6 +74,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -121,6 +122,39 @@ public class HoodieTableMetadataUtil {
}
}
/**
* Deletes the metadata partition from the file system.
*
* @param basePath - base path of the dataset
* @param context - instance of {@link HoodieEngineContext}
* @param partitionType - {@link MetadataPartitionType} of the partition to delete
*/
public static void deleteMetadataPartition(String basePath, HoodieEngineContext context, MetadataPartitionType partitionType) {
final String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
FileSystem fs = FSUtils.getFs(metadataTablePath, context.getHadoopConf().get());
try {
fs.delete(new Path(metadataTablePath, partitionType.getPartitionPath()), true);
} catch (Exception e) {
throw new HoodieMetadataException(String.format("Failed to remove metadata partition %s from path %s", partitionType, metadataTablePath), e);
}
}
/**
* Check if the given metadata partition exists.
*
* @param basePath base path of the dataset
* @param context instance of {@link HoodieEngineContext}.
*/
public static boolean metadataPartitionExists(String basePath, HoodieEngineContext context, MetadataPartitionType partitionType) {
final String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
FileSystem fs = FSUtils.getFs(metadataTablePath, context.getHadoopConf().get());
try {
return fs.exists(new Path(metadataTablePath, partitionType.getPartitionPath()));
} catch (Exception e) {
throw new HoodieIOException(String.format("Failed to check metadata partition %s exists.", partitionType.getPartitionPath()));
}
}
/**
* Convert commit action to metadata records for the enabled partition types.
*
@@ -885,6 +919,24 @@ public class HoodieTableMetadataUtil {
return fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
}
/**
* Get the latest file slices for a given partition including the inflight ones.
*
* @param metaClient - instance of {@link HoodieTableMetaClient}
* @param fileSystemView - hoodie table file system view, which will be fetched from meta client if not already present
* @param partition - name of the partition whose file groups are to be loaded
* @return
*/
public static List<FileSlice> getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient,
Option<HoodieTableFileSystemView> fileSystemView,
String partition) {
HoodieTableFileSystemView fsView = fileSystemView.orElse(getFileSystemView(metaClient));
Stream<FileSlice> fileSliceStream = fsView.fetchLatestFileSlicesIncludingInflight(partition);
return fileSliceStream
.sorted(Comparator.comparing(FileSlice::getFileId))
.collect(Collectors.toList());
}
public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
HoodieEngineContext engineContext,
MetadataRecordsGenerationParams recordsGenerationParams) {
@@ -900,8 +952,8 @@ public class HoodieTableMetadataUtil {
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
.flatMap(writerSchemaStr ->
isNullOrEmpty(writerSchemaStr)
? Option.empty()
: Option.of(new Schema.Parser().parse(writerSchemaStr)));
? Option.empty()
: Option.of(new Schema.Parser().parse(writerSchemaStr)));
HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig();
@@ -1062,18 +1114,18 @@ public class HoodieTableMetadataUtil {
* Aggregates column stats for each field.
*
* @param record - current record
* @param schema - write schema
* @param fields - fields for which stats will be aggregated
* @param columnToStats - map of column to map of each stat and its value which gets updates in this method
* @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value
*/
public static void aggregateColumnStats(IndexedRecord record, Schema schema,
public static void aggregateColumnStats(IndexedRecord record, List<Schema.Field> fields,
Map<String, Map<String, Object>> columnToStats,
boolean consistentLogicalTimestampEnabled) {
if (!(record instanceof GenericRecord)) {
throw new HoodieIOException("Record is not a generic type to get column range metadata!");
}
schema.getFields().forEach(field -> {
fields.forEach(field -> {
Map<String, Object> columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>());
final String fieldVal = getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
// update stats
@@ -1114,4 +1166,18 @@ public class HoodieTableMetadataUtil {
throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e);
}
}
public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
return StringUtils.toSet(tableConfig.getMetadataPartitionsInflight());
}
public static Set<String> getCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
return StringUtils.toSet(tableConfig.getMetadataPartitions());
}
public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableConfig tableConfig) {
Set<String> inflightAndCompletedPartitions = getInflightMetadataPartitions(tableConfig);
inflightAndCompletedPartitions.addAll(getCompletedMetadataPartitions(tableConfig));
return inflightAndCompletedPartitions;
}
}

View File

@@ -35,15 +35,19 @@ public class MetadataRecordsGenerationParams implements Serializable {
private final int bloomIndexParallelism;
private final boolean isAllColumnStatsIndexEnabled;
private final int columnStatsIndexParallelism;
private final List<String> columnsToIndex;
private final List<String> bloomSecondaryKeys;
MetadataRecordsGenerationParams(HoodieTableMetaClient dataMetaClient, List<MetadataPartitionType> enabledPartitionTypes, String bloomFilterType, int bloomIndexParallelism,
boolean isAllColumnStatsIndexEnabled, int columnStatsIndexParallelism) {
boolean isAllColumnStatsIndexEnabled, int columnStatsIndexParallelism, List<String> columnsToIndex, List<String> bloomSecondaryKeys) {
this.dataMetaClient = dataMetaClient;
this.enabledPartitionTypes = enabledPartitionTypes;
this.bloomFilterType = bloomFilterType;
this.bloomIndexParallelism = bloomIndexParallelism;
this.isAllColumnStatsIndexEnabled = isAllColumnStatsIndexEnabled;
this.columnStatsIndexParallelism = columnStatsIndexParallelism;
this.columnsToIndex = columnsToIndex;
this.bloomSecondaryKeys = bloomSecondaryKeys;
}
public HoodieTableMetaClient getDataMetaClient() {
@@ -69,4 +73,12 @@ public class MetadataRecordsGenerationParams implements Serializable {
public int getColumnStatsIndexParallelism() {
return columnStatsIndexParallelism;
}
public List<String> getColumnsToIndex() {
return columnsToIndex;
}
public List<String> getBloomSecondaryKeys() {
return bloomSecondaryKeys;
}
}

View File

@@ -199,6 +199,46 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
assertTrue(activeCommitTimeline.isBeforeTimelineStarts("00"));
}
@Test
public void testGetContiguousCompletedWriteTimeline() {
// a mock timeline with holes
timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "13", "15", "17"),
Stream.of("09", "11", "19"));
assertTrue(timeline.getContiguousCompletedWriteTimeline().lastInstant().isPresent());
assertEquals("07", timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp());
// add some instants where two are inflight and one of them (instant8 below) is not part of write timeline
HoodieInstant instant1 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1");
HoodieInstant instant2 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "2");
HoodieInstant instant3 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "3");
HoodieInstant instant4 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "4");
HoodieInstant instant5 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "5");
HoodieInstant instant6 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "6");
HoodieInstant instant7 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "7");
HoodieInstant instant8 = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, "8");
timeline = new HoodieActiveTimeline(metaClient);
timeline.createNewInstant(instant1);
timeline.createNewInstant(instant2);
timeline.createNewInstant(instant3);
timeline.createNewInstant(instant4);
timeline.createNewInstant(instant5);
timeline.createNewInstant(instant6);
timeline.createNewInstant(instant7);
timeline.createNewInstant(instant8);
timeline.setInstants(Stream.of(instant1, instant2, instant3, instant4, instant5, instant6, instant7, instant8).collect(Collectors.toList()));
assertTrue(timeline.getContiguousCompletedWriteTimeline().lastInstant().isPresent());
assertEquals(instant4.getTimestamp(), timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp());
// transition both inflight instants to complete
timeline.saveAsComplete(new HoodieInstant(true, instant5.getAction(), instant5.getTimestamp()), Option.empty());
timeline.saveAsComplete(new HoodieInstant(true, instant8.getAction(), instant8.getTimestamp()), Option.empty());
timeline = timeline.reload();
// instant8 in not considered in write timeline, so last completed instant in timeline should be instant7
assertTrue(timeline.getContiguousCompletedWriteTimeline().lastInstant().isPresent());
assertEquals(instant7.getTimestamp(), timeline.getContiguousCompletedWriteTimeline().lastInstant().get().getTimestamp());
}
@Test
public void testTimelineGetOperations() {
List<HoodieInstant> allInstants = getAllInstants();
@@ -218,20 +258,19 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
// Test that various types of getXXX operations from HoodieActiveTimeline
// return the correct set of Instant
checkTimeline.accept(timeline.getCommitsTimeline(),
CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION));
checkTimeline.accept(timeline.getWriteTimeline(),
CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION));
checkTimeline.accept(timeline.getCommitsTimeline(), CollectionUtils.createSet(
HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION));
checkTimeline.accept(timeline.getWriteTimeline(), CollectionUtils.createSet(
HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION));
checkTimeline.accept(timeline.getCommitTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION));
checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION));
checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION));
checkTimeline.accept(timeline.getRollbackTimeline(), Collections.singleton(HoodieTimeline.ROLLBACK_ACTION));
checkTimeline.accept(timeline.getRestoreTimeline(), Collections.singleton(HoodieTimeline.RESTORE_ACTION));
checkTimeline.accept(timeline.getSavePointTimeline(), Collections.singleton(HoodieTimeline.SAVEPOINT_ACTION));
checkTimeline.accept(timeline.getAllCommitsTimeline(),
CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION,
HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION,
HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION));
checkTimeline.accept(timeline.getAllCommitsTimeline(), CollectionUtils.createSet(
HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.INDEXING_ACTION));
// Get some random Instants
Random rand = new Random();

View File

@@ -20,6 +20,12 @@ package org.apache.hudi.common.util;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -61,4 +67,20 @@ public class TestStringUtils {
assertNotEquals(null, StringUtils.isNullOrEmpty("this is not empty"));
assertTrue(StringUtils.isNullOrEmpty(""));
}
@Test
public void testStringToSet() {
assertEquals(new HashSet<>(), StringUtils.toSet(null));
assertEquals(new HashSet<>(), StringUtils.toSet(""));
Set<String> expected = new HashSet<>(Arrays.asList("a", "b", "c"));
assertEquals(expected, StringUtils.toSet("a,b, c"));
}
@Test
public void testStringToList() {
assertEquals(new ArrayList<>(), StringUtils.toList(null));
assertEquals(new ArrayList<>(), StringUtils.toList(""));
List<String> expected = Arrays.asList("a", "b", "c");
assertEquals(expected, StringUtils.toList("a,b, c"));
}
}

View File

@@ -20,24 +20,19 @@ package org.apache.hudi.utilities;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -49,15 +44,15 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
public class HoodieClusteringJob {
public static final String EXECUTE = "execute";
public static final String SCHEDULE = "schedule";
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
private static final Logger LOG = LogManager.getLogger(HoodieClusteringJob.class);
private final Config cfg;
private transient FileSystem fs;
private TypedProperties props;
private final TypedProperties props;
private final JavaSparkContext jsc;
private final HoodieTableMetaClient metaClient;
@@ -83,34 +78,34 @@ public class HoodieClusteringJob {
@Parameter(names = {"--instant-time", "-it"}, description = "Clustering Instant time, only used when set --mode execute. "
+ "If the instant time is not provided with --mode execute, "
+ "the earliest scheduled clustering instant time is used by default. "
+ "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.", required = false)
+ "When set \"--mode scheduleAndExecute\" this instant-time will be ignored.")
public String clusteringInstantTime = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = false)
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert")
public int parallelism = 1;
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
public String sparkMaster = null;
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
@Parameter(names = {"--retry", "-rt"}, description = "number of retries")
public int retry = 0;
@Parameter(names = {"--schedule", "-sc"}, description = "Schedule clustering @desperate soon please use \"--mode schedule\" instead")
public Boolean runSchedule = false;
@Parameter(names = {"--retry-last-failed-clustering-job", "-rc"}, description = "Take effect when using --mode/-m scheduleAndExecute. Set true means "
+ "check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.", required = false)
+ "check, rollback and execute last failed clustering plan instead of planing a new clustering job directly.")
public Boolean retryLastFailedClusteringJob = false;
@Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" means make a cluster plan; "
+ "Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; "
+ "Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately", required = false)
+ "Set \"execute\" means execute a cluster plan at given instant which means --instant-time is needed here; "
+ "Set \"scheduleAndExecute\" means make a cluster plan first and execute that plan immediately")
public String runningMode = null;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--job-max-processing-time-ms", "-jt"}, description = "Take effect when using --mode/-m scheduleAndExecute and --retry-last-failed-clustering-job/-rc true. "
+ "If maxProcessingTimeMs passed but clustering job is still unfinished, hoodie would consider this job as failed and relaunch.", required = false)
+ "If maxProcessingTimeMs passed but clustering job is still unfinished, hoodie would consider this job as failed and relaunch.")
public long maxProcessingTimeMs = 0;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
@@ -119,7 +114,7 @@ public class HoodieClusteringJob {
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class)
splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>();
}
@@ -155,10 +150,9 @@ public class HoodieClusteringJob {
}
public int cluster(int retry) {
this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
// need to do validate in case that users call cluster() directly without setting cfg.runningMode
validateRunningMode(cfg);
int ret = UtilHelpers.retry(retry, () -> {
return UtilHelpers.retry(retry, () -> {
switch (cfg.runningMode.toLowerCase()) {
case SCHEDULE: {
LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
@@ -183,20 +177,10 @@ public class HoodieClusteringJob {
}
}
}, "Cluster failed");
return ret;
}
private String getSchemaFromLatestInstant() throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
throw new HoodieException("Cannot run clustering without any completed commits");
}
Schema schema = schemaResolver.getTableAvroSchema(false);
return schema.toString();
}
private int doCluster(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant();
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
if (StringUtils.isNullOrEmpty(cfg.clusteringInstantTime)) {
// Instant time is not specified
@@ -224,7 +208,7 @@ public class HoodieClusteringJob {
}
private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant();
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
return doSchedule(client);
}
@@ -240,7 +224,7 @@ public class HoodieClusteringJob {
private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
LOG.info("Step 1: Do schedule");
String schemaStr = getSchemaFromLatestInstant();
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
Option<String> instantTime = Option.empty();

View File

@@ -0,0 +1,307 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.MetadataPartitionType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.jetbrains.annotations.TestOnly;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
/**
* A tool to run metadata indexing asynchronously.
* <p>
* Example command (assuming indexer.properties contains related index configs, see {@link org.apache.hudi.common.config.HoodieMetadataConfig} for configs):
* <p>
* spark-submit \
* --class org.apache.hudi.utilities.HoodieIndexer \
* /path/to/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
* --props /path/to/indexer.properties \
* --mode scheduleAndExecute \
* --base-path /tmp/hudi_trips_cow \
* --table-name hudi_trips_cow \
* --index-types COLUMN_STATS \
* --parallelism 1 \
* --spark-memory 1g
* <p>
* A sample indexer.properties file:
* <p>
* hoodie.metadata.index.async=true
* hoodie.metadata.index.column.stats.enable=true
* hoodie.metadata.index.check.timeout.seconds=60
* hoodie.write.concurrency.mode=optimistic_concurrency_control
* hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
*/
public class HoodieIndexer {
private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
private static final String DROP_INDEX = "dropindex";
private final HoodieIndexer.Config cfg;
private TypedProperties props;
private final JavaSparkContext jsc;
private final HoodieTableMetaClient metaClient;
public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
this.cfg = cfg;
this.jsc = jsc;
this.props = isNullOrEmpty(cfg.propsFilePath)
? UtilHelpers.buildProperties(cfg.configs)
: readConfigFromFileSystem(jsc, cfg);
this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
}
private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
.getProps(true);
}
public static class Config implements Serializable {
@Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
public String basePath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
public String tableName = null;
@Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
public String indexInstantTime = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
public int parallelism = 1;
@Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
public String sparkMaster = null;
@Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
public String sparkMemory = null;
@Parameter(names = {"--retry", "-rt"}, description = "number of retries")
public int retry = 0;
@Parameter(names = {"--index-types", "-ixt"}, description = "Comma-separated index types to be built, e.g. BLOOM_FILTERS,COLUMN_STATS", required = true)
public String indexTypes = null;
@Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+ "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+ "Set \"scheduleandExecute\" to generate an indexing plan first and execute that plan immediately;"
+ "Set \"dropindex\" to drop the index types specified in --index-types;")
public String runningMode = null;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for hoodie client for indexing")
public String propsFilePath = null;
@Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
splitter = IdentitySplitter.class)
public List<String> configs = new ArrayList<>();
}
public static void main(String[] args) {
final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
int result = indexer.start(cfg.retry);
String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s",
cfg.basePath, cfg.tableName, cfg.runningMode);
if (result == -1) {
LOG.error(resultMsg + " failed");
} else {
LOG.info(resultMsg + " success");
}
jsc.stop();
}
public int start(int retry) {
// indexing should be done only if metadata is enabled
if (!props.getBoolean(HoodieMetadataConfig.ENABLE.key())) {
LOG.error(String.format("Metadata is not enabled. Please set %s to true.", HoodieMetadataConfig.ENABLE.key()));
return -1;
}
return UtilHelpers.retry(retry, () -> {
switch (cfg.runningMode.toLowerCase()) {
case SCHEDULE: {
LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
Option<String> instantTime = scheduleIndexing(jsc);
int result = instantTime.isPresent() ? 0 : -1;
if (result == 0) {
LOG.info("The schedule instant time is " + instantTime.get());
}
return result;
}
case SCHEDULE_AND_EXECUTE: {
LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
return scheduleAndRunIndexing(jsc);
}
case EXECUTE: {
LOG.info("Running Mode: [" + EXECUTE + "];");
return runIndexing(jsc);
}
case DROP_INDEX: {
LOG.info("Running Mode: [" + DROP_INDEX + "];");
return dropIndex(jsc);
}
default: {
LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
return -1;
}
}
}, "Indexer failed");
}
@TestOnly
public Option<String> doSchedule() throws Exception {
return this.scheduleIndexing(jsc);
}
private Option<String> scheduleIndexing(JavaSparkContext jsc) throws Exception {
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
return doSchedule(client);
}
}
private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
List<MetadataPartitionType> partitionTypes = getRequestedPartitionTypes(cfg.indexTypes);
checkArgument(partitionTypes.size() == 1, "Currently, only one index type can be scheduled at a time.");
if (indexExists(partitionTypes)) {
return Option.empty();
}
Option<String> indexingInstant = client.scheduleIndexing(partitionTypes);
if (!indexingInstant.isPresent()) {
LOG.error("Scheduling of index action did not return any instant.");
}
return indexingInstant;
}
private boolean indexExists(List<MetadataPartitionType> partitionTypes) {
Set<String> indexedMetadataPartitions = getCompletedMetadataPartitions(metaClient.getTableConfig());
Set<String> requestedIndexPartitionPaths = partitionTypes.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedIndexPartitionPaths.retainAll(indexedMetadataPartitions);
if (!requestedIndexPartitionPaths.isEmpty()) {
LOG.error("Following indexes already built: " + requestedIndexPartitionPaths);
return true;
}
return false;
}
private int runIndexing(JavaSparkContext jsc) throws Exception {
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
if (isNullOrEmpty(cfg.indexInstantTime)) {
// Instant time is not specified
// Find the earliest scheduled indexing instant for execution
Option<HoodieInstant> earliestPendingIndexInstant = metaClient.getActiveTimeline()
.filterPendingIndexTimeline()
.firstInstant();
if (earliestPendingIndexInstant.isPresent()) {
cfg.indexInstantTime = earliestPendingIndexInstant.get().getTimestamp();
LOG.info("Found the earliest scheduled indexing instant which will be executed: "
+ cfg.indexInstantTime);
} else {
throw new HoodieIndexException("There is no scheduled indexing in the table.");
}
}
return handleResponse(client.index(cfg.indexInstantTime)) ? 0 : 1;
}
}
private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception {
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
Option<String> indexingInstantTime = doSchedule(client);
if (indexingInstantTime.isPresent()) {
return handleResponse(client.index(indexingInstantTime.get())) ? 0 : 1;
} else {
return -1;
}
}
}
private int dropIndex(JavaSparkContext jsc) throws Exception {
List<MetadataPartitionType> partitionTypes = getRequestedPartitionTypes(cfg.indexTypes);
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
client.dropIndex(partitionTypes);
return 0;
} catch (Exception e) {
LOG.error("Failed to drop index. ", e);
return -1;
}
}
private boolean handleResponse(Option<HoodieIndexCommitMetadata> commitMetadata) {
if (!commitMetadata.isPresent()) {
LOG.error("Indexing failed as no commit metadata present.");
return false;
}
List<HoodieIndexPartitionInfo> indexPartitionInfos = commitMetadata.get().getIndexPartitionInfos();
LOG.info(String.format("Indexing complete for partitions: %s",
indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList())));
return isIndexBuiltForAllRequestedTypes(indexPartitionInfos);
}
boolean isIndexBuiltForAllRequestedTypes(List<HoodieIndexPartitionInfo> indexPartitionInfos) {
Set<String> indexedPartitions = indexPartitionInfos.stream()
.map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet());
Set<String> requestedPartitions = getRequestedPartitionTypes(cfg.indexTypes).stream()
.map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
requestedPartitions.removeAll(indexedPartitions);
return requestedPartitions.isEmpty();
}
List<MetadataPartitionType> getRequestedPartitionTypes(String indexTypes) {
List<String> requestedIndexTypes = Arrays.asList(indexTypes.split(","));
return requestedIndexTypes.stream()
.map(p -> MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)))
// FILES partition is initialized synchronously while getting metadata writer
.filter(p -> !MetadataPartitionType.FILES.equals(p))
.collect(Collectors.toList());
}
}

View File

@@ -104,21 +104,26 @@ import java.util.Properties;
* Bunch of helper methods.
*/
public class UtilHelpers {
public static final String EXECUTE = "execute";
public static final String SCHEDULE = "schedule";
public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieDeltaStreamerMetrics metrics) throws IOException {
SparkSession sparkSession, SchemaProvider schemaProvider,
HoodieDeltaStreamerMetrics metrics) throws IOException {
try {
try {
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[]{TypedProperties.class, JavaSparkContext.class,
new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
SparkSession.class, SchemaProvider.class,
HoodieDeltaStreamerMetrics.class},
cfg, jssc, sparkSession, schemaProvider, metrics);
} catch (HoodieException e) {
return (Source) ReflectionUtils.loadClass(sourceClass,
new Class<?>[]{TypedProperties.class, JavaSparkContext.class,
new Class<?>[] {TypedProperties.class, JavaSparkContext.class,
SparkSession.class, SchemaProvider.class},
cfg, jssc, sparkSession, schemaProvider);
}
@@ -238,7 +243,7 @@ public class UtilHelpers {
/**
* Parse Schema from file.
*
* @param fs File System
* @param fs File System
* @param schemaFile Schema File
*/
public static String parseSchema(FileSystem fs, String schemaFile) throws Exception {
@@ -300,13 +305,13 @@ public class UtilHelpers {
/**
* Build Hoodie write client.
*
* @param jsc Java Spark Context
* @param basePath Base Path
* @param schemaStr Schema
* @param jsc Java Spark Context
* @param basePath Base Path
* @param schemaStr Schema
* @param parallelism Parallelism
*/
public static SparkRDDWriteClient<HoodieRecordPayload> createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
int parallelism, Option<String> compactionStrategyClass, TypedProperties properties) {
HoodieCompactionConfig compactionConfig = compactionStrategyClass
.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
@@ -466,8 +471,7 @@ public class UtilHelpers {
Option.ofNullable(createSchemaPostProcessor(schemaPostProcessorClass, cfg, jssc)));
}
public static SchemaProvider createRowBasedSchemaProvider(StructType structType,
TypedProperties cfg, JavaSparkContext jssc) {
public static SchemaProvider createRowBasedSchemaProvider(StructType structType, TypedProperties cfg, JavaSparkContext jssc) {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null);
}
@@ -476,13 +480,13 @@ public class UtilHelpers {
* Create latest schema provider for Target schema.
*
* @param structType spark data type of incoming batch.
* @param jssc instance of {@link JavaSparkContext}.
* @param fs instance of {@link FileSystem}.
* @param basePath base path of the table.
* @param jssc instance of {@link JavaSparkContext}.
* @param fs instance of {@link FileSystem}.
* @param basePath base path of the table.
* @return the schema provider where target schema refers to latest schema(either incoming schema or table schema).
*/
public static SchemaProvider createLatestSchemaProvider(StructType structType,
JavaSparkContext jssc, FileSystem fs, String basePath) {
JavaSparkContext jssc, FileSystem fs, String basePath) {
SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
Schema writeSchema = rowSchemaProvider.getTargetSchema();
Schema latestTableSchema = writeSchema;
@@ -540,4 +544,12 @@ public class UtilHelpers {
return ret;
}
public static String getSchemaFromLatestInstant(HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
throw new HoodieException("Cannot run clustering without any completed commits");
}
Schema schema = schemaResolver.getTableAvroSchema(false);
return schema.toString();
}
}

View File

@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkProvider {
private static transient SparkSession spark;
private static transient SQLContext sqlContext;
private static transient JavaSparkContext jsc;
private static transient HoodieSparkEngineContext context;
@BeforeEach
public void init() throws IOException {
boolean initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
jsc = new JavaSparkContext(spark.sparkContext());
context = new HoodieSparkEngineContext(jsc);
}
initPath();
metaClient = HoodieTestUtils.init(basePath, getTableType());
}
@Test
public void testGetRequestedPartitionTypes() {
HoodieIndexer.Config config = new HoodieIndexer.Config();
config.basePath = basePath;
config.tableName = "indexer_test";
config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS";
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
List<MetadataPartitionType> partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes);
assertFalse(partitionTypes.contains(MetadataPartitionType.FILES));
assertTrue(partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS));
assertTrue(partitionTypes.contains(MetadataPartitionType.COLUMN_STATS));
}
@Test
public void testIsIndexBuiltForAllRequestedTypes() {
HoodieIndexer.Config config = new HoodieIndexer.Config();
config.basePath = basePath;
config.tableName = "indexer_test";
config.indexTypes = "BLOOM_FILTERS,COLUMN_STATS";
HoodieIndexer indexer = new HoodieIndexer(jsc, config);
HoodieIndexCommitMetadata commitMetadata = HoodieIndexCommitMetadata.newBuilder()
.setIndexPartitionInfos(Arrays.asList(new HoodieIndexPartitionInfo(
1,
MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
"0000")))
.build();
assertFalse(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos()));
config.indexTypes = "COLUMN_STATS";
indexer = new HoodieIndexer(jsc, config);
assertTrue(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos()));
}
@Override
public HoodieEngineContext context() {
return context;
}
@Override
public SparkSession spark() {
return spark;
}
@Override
public SQLContext sqlContext() {
return sqlContext;
}
@Override
public JavaSparkContext jsc() {
return jsc;
}
}

View File

@@ -129,6 +129,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties", dfs, dfsBasePath + "/config/uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties", dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties", dfs, dfsBasePath + "/clusteringjob.properties");
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/indexer.properties", dfs, dfsBasePath + "/indexer.properties");
writeCommonPropsToFile(dfs, dfsBasePath);

View File

@@ -57,6 +57,7 @@ import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
@@ -129,6 +130,9 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
@@ -397,6 +401,22 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected);
}
static void assertPendingIndexCommit(String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numIndexCommits = (int) timeline.getInstants().count();
assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
}
static void assertCompletedIndexCommit(String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numIndexCommits = (int) timeline.getInstants().count();
assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
}
static void assertNoReplaceCommits(String tablePath, FileSystem fs) {
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline();
@@ -961,6 +981,53 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
return config;
}
private HoodieIndexer.Config buildIndexerConfig(String basePath,
String tableName,
String indexInstantTime,
String runningMode,
String indexTypes) {
HoodieIndexer.Config config = new HoodieIndexer.Config();
config.basePath = basePath;
config.tableName = tableName;
config.indexInstantTime = indexInstantTime;
config.propsFilePath = dfsBasePath + "/indexer.properties";
config.runningMode = runningMode;
config.indexTypes = indexTypes;
return config;
}
@Test
public void testHoodieIndexer() throws Exception {
String tableBasePath = dfsBasePath + "/asyncindexer";
HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000, "false");
deltaStreamerTestRunner(ds, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
Option<String> scheduleIndexInstantTime = Option.empty();
try {
HoodieIndexer scheduleIndexingJob = new HoodieIndexer(jsc,
buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, null, SCHEDULE, "COLUMN_STATS"));
scheduleIndexInstantTime = scheduleIndexingJob.doSchedule();
} catch (Exception e) {
LOG.info("Schedule indexing failed", e);
return false;
}
if (scheduleIndexInstantTime.isPresent()) {
TestHelpers.assertPendingIndexCommit(tableBasePath, dfs);
LOG.info("Schedule indexing success, now build index with instant time " + scheduleIndexInstantTime.get());
HoodieIndexer runIndexingJob = new HoodieIndexer(jsc,
buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, scheduleIndexInstantTime.get(), EXECUTE, "COLUMN_STATS"));
runIndexingJob.start(0);
LOG.info("Metadata indexing success");
TestHelpers.assertCompletedIndexCommit(tableBasePath, dfs);
} else {
LOG.warn("Metadata indexing failed");
}
return true;
});
}
@Disabled("HUDI-3710 to fix the ConcurrentModificationException")
@ParameterizedTest
@ValueSource(booleans = {true, false})
@@ -1131,28 +1198,28 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
LOG.info("Cluster success");
} else {
LOG.warn("Import failed");
if (!runningMode.toLowerCase().equals(HoodieClusteringJob.EXECUTE)) {
if (!runningMode.toLowerCase().equals(EXECUTE)) {
return false;
}
}
} catch (Exception e) {
LOG.warn("ScheduleAndExecute clustering failed", e);
exception = e;
if (!runningMode.equalsIgnoreCase(HoodieClusteringJob.EXECUTE)) {
if (!runningMode.equalsIgnoreCase(EXECUTE)) {
return false;
}
}
switch (runningMode.toLowerCase()) {
case HoodieClusteringJob.SCHEDULE_AND_EXECUTE: {
case SCHEDULE_AND_EXECUTE: {
TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
return true;
}
case HoodieClusteringJob.SCHEDULE: {
case SCHEDULE: {
TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs);
TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
return true;
}
case HoodieClusteringJob.EXECUTE: {
case EXECUTE: {
TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
return true;
}

View File

@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
hoodie.metadata.enable=true
hoodie.metadata.index.async=true
hoodie.metadata.index.column.stats.enable=true
hoodie.metadata.index.check.timeout.seconds=60
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider