1
0

[HUDI-1352] Add FileSystemView APIs to query pending clustering operations (#2202)

This commit is contained in:
satishkotha
2020-11-05 08:49:58 -08:00
committed by GitHub
parent 5f5c15b0d9
commit 33ec88fc38
26 changed files with 974 additions and 14 deletions

View File

@@ -84,6 +84,11 @@
<import>${basedir}/src/main/avro/HoodieBootstrapSourceFilePartitionInfo.avsc</import>
<import>${basedir}/src/main/avro/HoodieBootstrapIndexInfo.avsc</import>
<import>${basedir}/src/main/avro/HoodieBootstrapMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieSliceInfo.avsc</import>
<import>${basedir}/src/main/avro/HoodieClusteringGroup.avsc</import>
<import>${basedir}/src/main/avro/HoodieClusteringStrategy.avsc</import>
<import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
<import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
</imports>
</configuration>
</plugin>

View File

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"type":"record",
"name":"HoodieClusteringGroup",
"type":"record",
"fields":[
{
/* Group of files that needs to merged. All the slices in a group will belong to same partition initially.
* Files of different partitions may be grouped later when we have better on disk layout with indexing support.
*/
"name":"slices",
"type":["null", {
"type":"array",
"items": "HoodieSliceInfo"
}],
"default": null
},
{
"name":"metrics",
"type":["null", {
"type":"map",
"values":"double"
}],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"type":"record",
"name":"HoodieClusteringPlan",
"fields":[
{
"name":"inputGroups",
"type":["null", {
"type":"array",
"items": "HoodieClusteringGroup"
}],
"default": null
},
{
"name":"strategy",
"type":["HoodieClusteringStrategy", "null"],
"default": null
},
{
"name":"extraMetadata",
"type":["null", {
"type":"map",
"values":"string"
}],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"name":"HoodieClusteringStrategy",
"type":"record",
"fields":[
{
"name":"strategyClassName", /* have to be subclass of ClusteringStrategy interface defined in hudi. ClusteringStrategy class include methods like getPartitioner */
"type":["null","string"],
"default": null
},
{
"name":"strategyParams", /* Parameters could be different for different strategies. example, if sorting is needed for the strategy, parameters can contain sortColumns */
"type":["null", {
"type":"map",
"values":"string"
}],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}

View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"type":"record",
"name":"HoodieRequestedReplaceMetadata",
"fields":[
{
"name":"operationType",
"type":["null", "string"],
"default": ""
},
{
"name":"clusteringPlan", /* only set if operationType == clustering" */
"type":["HoodieClusteringPlan", "null"],
"default": null
},
{
"name":"extraMetadata",
"type":["null", {
"type":"map",
"values":"string"
}],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"type":"record",
"name":"HoodieSliceInfo",
"fields":[
{
"name":"dataFilePath",
"type":["null","string"],
"default": null
},
{
"name":"deltaFilePaths",
"type":["null", {
"type":"array",
"items":"string"
}],
"default": null
},
{
"name":"fileId",
"type":["null","string"]
},
{
"name":"partitionPath",
"type":["null","string"],
"default": null
},
{
"name":"bootstrapFilePath",
"type":["null", "string"],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}

View File

@@ -40,6 +40,8 @@ public enum WriteOperationType {
BOOTSTRAP("bootstrap"),
// insert overwrite
INSERT_OVERWRITE("insert_overwrite"),
// cluster
CLUSTER("cluster"),
// used for old version
UNKNOWN("unknown");

View File

@@ -412,6 +412,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
createFileInMetaPath(instant.getFileName(), content, overwrite);
}
/**
* Saves content for inflight/requested REPLACE instant.
*/
public void saveToPendingReplaceCommit(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
createFileInMetaPath(instant.getFileName(), content, false);
}
public void saveToCleanRequested(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));

View File

@@ -118,6 +118,12 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(s -> s.isCompleted()), details);
}
@Override
public HoodieTimeline filterPendingReplaceTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(
s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details);
}
@Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(

View File

@@ -151,6 +151,12 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterPendingCompactionTimeline();
/**
* Filter this timeline to just include requested and inflight replacecommit instants.
*/
HoodieTimeline filterPendingReplaceTimeline();
/**
* Create a new Timeline with all the instants after startTs.
*/

View File

@@ -18,19 +18,6 @@
package org.apache.hudi.common.table.timeline;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
@@ -40,6 +27,19 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -115,6 +115,10 @@ public class TimelineMetadataUtils {
return serializeAvroMetadata(restoreMetadata, HoodieRestoreMetadata.class);
}
public static Option<byte[]> serializeRequestedReplaceMetadata(HoodieRequestedReplaceMetadata clusteringPlan) throws IOException {
return serializeAvroMetadata(clusteringPlan, HoodieRequestedReplaceMetadata.class);
}
public static <T extends SpecificRecordBase> Option<byte[]> serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
@@ -146,6 +150,10 @@ public class TimelineMetadataUtils {
return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
}
public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieRequestedReplaceMetadata.class);
}
public static <T extends SpecificRecordBase> T deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
throws IOException {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);

View File

@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.timeline.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
/**
* The data transfer object of clustering.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClusteringOpDTO {
@JsonProperty("id")
private String fileId;
@JsonProperty("partition")
private String partitionPath;
@JsonProperty("instantTime")
private String instantTime;
@JsonProperty("instantState")
private String instantState;
@JsonProperty("instantAction")
private String instantAction;
public static ClusteringOpDTO fromClusteringOp(HoodieFileGroupId fileGroupId, HoodieInstant instant) {
ClusteringOpDTO dto = new ClusteringOpDTO();
dto.fileId = fileGroupId.getFileId();
dto.partitionPath = fileGroupId.getPartitionPath();
dto.instantAction = instant.getAction();
dto.instantState = instant.getState().name();
dto.instantTime = instant.getTimestamp();
return dto;
}
public static Pair<HoodieFileGroupId, HoodieInstant> toClusteringOperation(ClusteringOpDTO dto) {
return Pair.of(new HoodieFileGroupId(dto.partitionPath, dto.fileId),
new HoodieInstant(HoodieInstant.State.valueOf(dto.instantState), dto.instantAction, dto.instantTime));
}
}

View File

@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -107,6 +108,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
resetPendingCompactionOperations(CompactionUtils.getAllPendingCompactionOperations(metaClient).values().stream()
.map(e -> Pair.of(e.getKey(), CompactionOperation.convertFromAvroRecordInstance(e.getValue()))));
resetBootstrapBaseFileMapping(Stream.empty());
resetFileGroupsInPendingClustering(ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(metaClient));
}
/**
@@ -678,6 +680,15 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
}
@Override
public final Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering() {
try {
readLock.lock();
return fetchFileGroupsInPendingClustering();
} finally {
readLock.unlock();
}
}
// Fetch APIs to be implemented by concrete sub-classes
@@ -710,6 +721,40 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
*/
abstract void removePendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations);
/**
* Check if there is an outstanding clustering operation (requested/inflight) scheduled for this file.
*
* @param fgId File-Group Id
* @return true if there is a pending clustering, false otherwise
*/
protected abstract boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId);
/**
* Get pending clustering instant time for specified file group. Return None if file group is not in pending
* clustering operation.
*/
protected abstract Option<HoodieInstant> getPendingClusteringInstant(final HoodieFileGroupId fileGroupId);
/**
* Fetch all file groups in pending clustering.
*/
protected abstract Stream<Pair<HoodieFileGroupId, HoodieInstant>> fetchFileGroupsInPendingClustering();
/**
* resets the pending clustering operation and overwrite with the new list.
*/
abstract void resetFileGroupsInPendingClustering(Map<HoodieFileGroupId, HoodieInstant> fgIdToInstantMap);
/**
* Add metadata for file groups in pending clustering operations to the view.
*/
abstract void addFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroups);
/**
* Remove metadata for file groups in pending clustering operations from the view.
*/
abstract void removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroups);
/**
* Return pending compaction operation for a file-group.
*

View File

@@ -45,6 +45,8 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
"hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction";
public static final String FILESYSTEM_VIEW_REPLACED_MEM_FRACTION =
"hoodie.filesystem.view.spillable.replaced.mem.fraction";
public static final String FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION =
"hoodie.filesystem.view.spillable.clustering.mem.fraction";
private static final String ROCKSDB_BASE_PATH_PROP = "hoodie.filesystem.view.rocksdb.base.path";
public static final String FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS =
"hoodie.filesystem.view.remote.timeout.secs";
@@ -62,6 +64,7 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01;
private static final Double DEFAULT_MEM_FRACTION_FOR_EXTERNAL_DATA_FILE = 0.05;
private static final Double DEFAULT_MEM_FRACTION_FOR_REPLACED_FILEGROUPS = 0.01;
private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_CLUSTERING_FILEGROUPS = 0.01;
private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB
/**
@@ -126,6 +129,12 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
.longValue();
}
public long getMaxMemoryForPendingClusteringFileGroups() {
long totalMemory = Long.parseLong(props.getProperty(FILESYSTEM_VIEW_SPILLABLE_MEM));
return new Double(totalMemory * Double.parseDouble(props.getProperty(FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION)))
.longValue();
}
public String getBaseStoreDir() {
return props.getProperty(FILESYSTEM_VIEW_SPILLABLE_DIR);
}
@@ -245,6 +254,8 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
FILESYSTEM_VIEW_BOOTSTRAP_BASE_FILE_FRACTION, DEFAULT_MEM_FRACTION_FOR_EXTERNAL_DATA_FILE.toString());
setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_REPLACED_MEM_FRACTION),
FILESYSTEM_VIEW_REPLACED_MEM_FRACTION, DEFAULT_MEM_FRACTION_FOR_REPLACED_FILEGROUPS.toString());
setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION),
FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION, DEFAULT_MEM_FRACTION_FOR_PENDING_CLUSTERING_FILEGROUPS.toString());
setDefaultOnCondition(props, !props.containsKey(ROCKSDB_BASE_PATH_PROP), ROCKSDB_BASE_PATH_PROP,
DEFAULT_ROCKSDB_BASE_PATH);

View File

@@ -70,6 +70,11 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
*/
protected Map<HoodieFileGroupId, HoodieInstant> fgIdToReplaceInstants;
/**
* Track file groups in pending clustering.
*/
protected Map<HoodieFileGroupId, HoodieInstant> fgIdToPendingClustering;
/**
* Flag to determine if closed.
*/
@@ -113,6 +118,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
this.partitionToFileGroupsMap = null;
this.fgIdToBootstrapBaseFile = null;
this.fgIdToReplaceInstants = null;
this.fgIdToPendingClustering = null;
}
protected Map<String, List<HoodieFileGroup>> createPartitionToFileGroups() {
@@ -134,6 +140,11 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
return replacedFileGroupsMap;
}
protected Map<HoodieFileGroupId, HoodieInstant> createFileIdToPendingClusteringMap(final Map<HoodieFileGroupId, HoodieInstant> fileGroupsInClustering) {
Map<HoodieFileGroupId, HoodieInstant> fgInpendingClustering = new ConcurrentHashMap<>(fileGroupsInClustering);
return fgInpendingClustering;
}
/**
* Create a file system view, as of the given timeline, with the provided file statuses.
*/
@@ -189,6 +200,49 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
});
}
@Override
protected boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId) {
return fgIdToPendingClustering.containsKey(fgId);
}
@Override
protected Option<HoodieInstant> getPendingClusteringInstant(HoodieFileGroupId fgId) {
return Option.ofNullable(fgIdToPendingClustering.get(fgId));
}
@Override
protected Stream<Pair<HoodieFileGroupId, HoodieInstant>> fetchFileGroupsInPendingClustering() {
return fgIdToPendingClustering.entrySet().stream().map(entry -> Pair.of(entry.getKey(), entry.getValue()));
}
@Override
void resetFileGroupsInPendingClustering(Map<HoodieFileGroupId, HoodieInstant> fgIdToInstantMap) {
fgIdToPendingClustering = createFileIdToPendingClusteringMap(fgIdToInstantMap);
}
@Override
void addFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroups) {
fileGroups.forEach(fileGroupInstantPair -> {
ValidationUtils.checkArgument(fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()),
"Trying to add a FileGroupId which is already in pending clustering operation. FgId :"
+ fileGroupInstantPair.getLeft() + ", new instant: " + fileGroupInstantPair.getRight() + ", existing instant "
+ fgIdToPendingClustering.get(fileGroupInstantPair.getLeft()));
fgIdToPendingClustering.put(fileGroupInstantPair.getLeft(), fileGroupInstantPair.getRight());
});
}
@Override
void removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroups) {
fileGroups.forEach(fileGroupInstantPair -> {
ValidationUtils.checkArgument(fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()),
"Trying to remove a FileGroupId which is not found in pending clustering operation. FgId :"
+ fileGroupInstantPair.getLeft() + ", new instant: " + fileGroupInstantPair.getRight());
fgIdToPendingClustering.remove(fileGroupInstantPair.getLeft());
});
}
/**
* Given a partition path, obtain all filegroups within that. All methods, that work at the partition level go through
* this.

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Functions.Function0;
@@ -203,6 +204,11 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
return execute(preferredView::getPendingCompactionOperations, secondaryView::getPendingCompactionOperations);
}
@Override
public Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering() {
return execute(preferredView::getFileGroupsInPendingClustering, secondaryView::getFileGroupsInPendingClustering);
}
@Override
public void close() {
preferredView.close();

View File

@@ -27,10 +27,12 @@ import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
@@ -89,6 +91,9 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
public static final String PENDING_CLUSTERING_FILEGROUPS = String.format("%s/%s", BASE_URL, "clustering/pending/");
public static final String LAST_INSTANT = String.format("%s/%s", BASE_URL, "timeline/instant/last");
public static final String LAST_INSTANTS = String.format("%s/%s", BASE_URL, "timeline/instants/last");
@@ -396,6 +401,18 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
}
}
@Override
public Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering() {
Map<String, String> paramsMap = getParams();
try {
List<ClusteringOpDTO> dtos = executeRequest(PENDING_CLUSTERING_FILEGROUPS, paramsMap,
new TypeReference<List<ClusteringOpDTO>>() {}, RequestMethod.GET);
return dtos.stream().map(ClusteringOpDTO::toClusteringOperation);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}
@Override
public void close() {
closed = true;

View File

@@ -135,6 +135,65 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
);
}
@Override
protected boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId) {
return getPendingClusteringInstant(fgId).isPresent();
}
@Override
protected Option<HoodieInstant> getPendingClusteringInstant(HoodieFileGroupId fgId) {
String lookupKey = schemaHelper.getKeyForFileGroupsInPendingClustering(fgId);
HoodieInstant pendingClusteringInstant =
rocksDB.get(schemaHelper.getColFamilyForFileGroupsInPendingClustering(), lookupKey);
return Option.ofNullable(pendingClusteringInstant);
}
@Override
public Stream<Pair<HoodieFileGroupId, HoodieInstant>> fetchFileGroupsInPendingClustering() {
return rocksDB.<Pair<HoodieFileGroupId, HoodieInstant>>prefixSearch(schemaHelper.getColFamilyForFileGroupsInPendingClustering(), "")
.map(Pair::getValue);
}
@Override
void resetFileGroupsInPendingClustering(Map<HoodieFileGroupId, HoodieInstant> fgIdToInstantMap) {
LOG.info("Resetting file groups in pending clustering to ROCKSDB based file-system view at "
+ config.getRocksdbBasePath() + ", Total file-groups=" + fgIdToInstantMap.size());
// Delete all replaced file groups
rocksDB.prefixDelete(schemaHelper.getColFamilyForFileGroupsInPendingClustering(), "part=");
// Now add new entries
addFileGroupsInPendingClustering(fgIdToInstantMap.entrySet().stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())));
LOG.info("Resetting replacedFileGroups to ROCKSDB based file-system view complete");
}
@Override
void addFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroups) {
rocksDB.writeBatch(batch ->
fileGroups.forEach(fgIdToClusterInstant -> {
ValidationUtils.checkArgument(!isPendingClusteringScheduledForFileId(fgIdToClusterInstant.getLeft()),
"Duplicate FileGroupId found in pending compaction operations. FgId :"
+ fgIdToClusterInstant.getLeft());
rocksDB.putInBatch(batch, schemaHelper.getColFamilyForFileGroupsInPendingClustering(),
schemaHelper.getKeyForFileGroupsInPendingClustering(fgIdToClusterInstant.getKey()), fgIdToClusterInstant);
})
);
}
@Override
void removeFileGroupsInPendingClustering(Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroups) {
rocksDB.writeBatch(batch ->
fileGroups.forEach(fgToPendingClusteringInstant -> {
ValidationUtils.checkArgument(
!isPendingClusteringScheduledForFileId(fgToPendingClusteringInstant.getLeft()),
"Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :"
+ fgToPendingClusteringInstant.getLeft());
rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForFileGroupsInPendingClustering(),
schemaHelper.getKeyForFileGroupsInPendingClustering(fgToPendingClusteringInstant.getLeft()));
})
);
}
@Override
protected void resetViewState() {
LOG.info("Deleting all rocksdb data associated with table filesystem view");

View File

@@ -51,6 +51,7 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
private final long maxMemoryForPendingCompaction;
private final long maxMemoryForBootstrapBaseFile;
private final long maxMemoryForReplaceFileGroups;
private final long maxMemoryForClusteringFileGroups;
private final String baseStoreDir;
public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
@@ -60,6 +61,7 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
this.maxMemoryForPendingCompaction = config.getMaxMemoryForPendingCompaction();
this.maxMemoryForBootstrapBaseFile = config.getMaxMemoryForBootstrapBaseFile();
this.maxMemoryForReplaceFileGroups = config.getMaxMemoryForReplacedFileGroups();
this.maxMemoryForClusteringFileGroups = config.getMaxMemoryForPendingClusteringFileGroups();
this.baseStoreDir = config.getBaseStoreDir();
init(metaClient, visibleActiveTimeline);
}
@@ -130,6 +132,21 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
}
}
@Override
protected Map<HoodieFileGroupId, HoodieInstant> createFileIdToPendingClusteringMap(final Map<HoodieFileGroupId, HoodieInstant> fileGroupsInClustering) {
try {
LOG.info("Creating file group id to clustering instant map using external spillable Map. Max Mem=" + maxMemoryForClusteringFileGroups
+ ", BaseDir=" + baseStoreDir);
new File(baseStoreDir).mkdirs();
Map<HoodieFileGroupId, HoodieInstant> pendingMap = new ExternalSpillableMap<>(
maxMemoryForClusteringFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>());
pendingMap.putAll(fileGroupsInClustering);
return pendingMap;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public Stream<HoodieFileGroup> getAllFileGroups() {
return ((ExternalSpillableMap) partitionToFileGroupsMap).valueStream()

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
@@ -169,4 +170,9 @@ public interface TableFileSystemView {
* Stream all the replaced file groups before maxCommitTime.
*/
Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath);
/**
* Filegroups that are in pending clustering.
*/
Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering();
}

View File

@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Helper class to generate clustering plan from metadata.
*/
public class ClusteringUtils {
private static final Logger LOG = LogManager.getLogger(ClusteringUtils.class);
public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB";
public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE";
public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES";
/**
* Get all pending clustering plans along with their instants.
*/
public static Stream<Pair<HoodieInstant, HoodieClusteringPlan>> getAllPendingClusteringPlans(
HoodieTableMetaClient metaClient) {
List<HoodieInstant> pendingReplaceInstants =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
return pendingReplaceInstants.stream().map(instant -> getClusteringPlan(metaClient, instant))
.filter(Option::isPresent).map(Option::get);
}
public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant requestedReplaceInstant) {
try {
Option<byte[]> content = metaClient.getActiveTimeline().getInstantDetails(requestedReplaceInstant);
if (!content.isPresent() || content.get().length == 0) {
// few operations create requested file without any content. Assume these are not clustering
LOG.warn("No content found in requested file for instant " + requestedReplaceInstant);
return Option.empty();
}
HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
if (WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType())) {
return Option.of(Pair.of(requestedReplaceInstant, requestedReplaceMetadata.getClusteringPlan()));
}
return Option.empty();
} catch (IOException e) {
throw new HoodieIOException("Error reading clustering plan " + requestedReplaceInstant.getTimestamp(), e);
}
}
/**
* Get filegroups to pending clustering instant mapping for all pending clustering plans.
* This includes all clustering operattions in 'requested' and 'inflight' states.
*/
public static Map<HoodieFileGroupId, HoodieInstant> getAllFileGroupsInPendingClusteringPlans(
HoodieTableMetaClient metaClient) {
Stream<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans = getAllPendingClusteringPlans(metaClient);
Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>> resultStream = pendingClusteringPlans.flatMap(clusteringPlan ->
// get all filegroups in the plan
getFileGroupEntriesInClusteringPlan(clusteringPlan.getLeft(), clusteringPlan.getRight()));
Map<HoodieFileGroupId, HoodieInstant> resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
LOG.info("Found " + resultMap.size() + " files in pending clustering operations");
return resultMap;
}
public static Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClusteringInstant(
HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
Stream<HoodieFileGroupId> partitionToFileIdLists = clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
return partitionToFileIdLists.map(e -> Pair.of(e, instant));
}
private static Stream<Map.Entry<HoodieFileGroupId, HoodieInstant>> getFileGroupEntriesInClusteringPlan(
HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
return getFileGroupsInPendingClusteringInstant(instant, clusteringPlan).map(entry ->
new AbstractMap.SimpleEntry<>(entry.getLeft(), entry.getRight()));
}
private static Stream<HoodieFileGroupId> getFileGroupsFromClusteringGroup(HoodieClusteringGroup group) {
return group.getSlices().stream().map(slice -> new HoodieFileGroupId(slice.getPartitionPath(), slice.getFileId()));
}
/**
* Create clustering plan from input fileSliceGroups.
*/
public static HoodieClusteringPlan createClusteringPlan(String strategyClassName,
Map<String, String> strategyParams,
List<FileSlice>[] fileSliceGroups,
Map<String, String> extraMetadata) {
List<HoodieClusteringGroup> clusteringGroups = Arrays.stream(fileSliceGroups).map(fileSliceGroup -> {
Map<String, Double> groupMetrics = buildMetrics(fileSliceGroup);
List<HoodieSliceInfo> sliceInfos = getFileSliceInfo(fileSliceGroup);
return HoodieClusteringGroup.newBuilder().setSlices(sliceInfos).setMetrics(groupMetrics).build();
}).collect(Collectors.toList());
HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
.setStrategyClassName(strategyClassName).setStrategyParams(strategyParams)
.build();
HoodieClusteringPlan plan = HoodieClusteringPlan.newBuilder()
.setInputGroups(clusteringGroups)
.setExtraMetadata(extraMetadata)
.setStrategy(strategy)
.build();
return plan;
}
private static List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> slices) {
return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder()
.setPartitionPath(slice.getPartitionPath())
.setFileId(slice.getFileId())
.setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(null))
.setDeltaFilePaths(slice.getLogFiles().map(f -> f.getPath().getName()).collect(Collectors.toList()))
.setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(bbf -> bbf.getPath()).orElse(null)).orElse(null))
.build()).collect(Collectors.toList());
}
private static Map<String, Double> buildMetrics(List<FileSlice> fileSlices) {
int numLogFiles = 0;
long totalLogFileSize = 0;
long totalIORead = 0;
for (FileSlice slice : fileSlices) {
numLogFiles += slice.getLogFiles().count();
// Total size of all the log files
totalLogFileSize += slice.getLogFiles().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
.reduce(Long::sum).orElse(0L);
// Total read will be the base file + all the log files
totalIORead =
FSUtils.getSizeInMB((slice.getBaseFile().isPresent() ? slice.getBaseFile().get().getFileSize() : 0L) + totalLogFileSize);
}
Map<String, Double> metrics = new HashMap<>();
metrics.put(TOTAL_IO_READ_MB, (double) totalIORead);
metrics.put(TOTAL_LOG_FILE_SIZE, (double) totalLogFileSize);
metrics.put(TOTAL_LOG_FILES, (double) numLogFiles);
return metrics;
}
}

View File

@@ -48,6 +48,7 @@ public class RocksDBSchemaHelper {
private final String colFamilyForBootstrapBaseFile;
private final String colFamilyForStoredPartitions;
private final String colFamilyForReplacedFileGroups;
private final String colFamilyForPendingClusteringFileGroups;
public RocksDBSchemaHelper(HoodieTableMetaClient metaClient) {
this.colFamilyForBootstrapBaseFile = "hudi_bootstrap_basefile_" + metaClient.getBasePath().replace("/", "_");
@@ -55,11 +56,12 @@ public class RocksDBSchemaHelper {
this.colFamilyForStoredPartitions = "hudi_partitions_" + metaClient.getBasePath().replace("/", "_");
this.colFamilyForView = "hudi_view_" + metaClient.getBasePath().replace("/", "_");
this.colFamilyForReplacedFileGroups = "hudi_replaced_fg" + metaClient.getBasePath().replace("/", "_");
this.colFamilyForPendingClusteringFileGroups = "hudi_pending_clustering_fg" + metaClient.getBasePath().replace("/", "_");
}
public List<String> getAllColumnFamilies() {
return Arrays.asList(getColFamilyForView(), getColFamilyForPendingCompaction(), getColFamilyForBootstrapBaseFile(),
getColFamilyForStoredPartitions(), getColFamilyForReplacedFileGroups());
getColFamilyForStoredPartitions(), getColFamilyForReplacedFileGroups(), getColFamilyForFileGroupsInPendingClustering());
}
public String getKeyForPartitionLookup(String partition) {
@@ -78,6 +80,10 @@ public class RocksDBSchemaHelper {
return getPartitionFileIdBasedLookup(fgId);
}
public String getKeyForFileGroupsInPendingClustering(HoodieFileGroupId fgId) {
return getPartitionFileIdBasedLookup(fgId);
}
public String getKeyForSliceView(HoodieFileGroup fileGroup, FileSlice slice) {
return getKeyForSliceView(fileGroup.getPartitionPath(), fileGroup.getFileGroupId().getFileId(),
slice.getBaseInstantTime());
@@ -135,4 +141,8 @@ public class RocksDBSchemaHelper {
public String getColFamilyForReplacedFileGroups() {
return colFamilyForReplacedFileGroups;
}
public String getColFamilyForFileGroupsInPendingClustering() {
return colFamilyForPendingClusteringFileGroups;
}
}

View File

@@ -21,10 +21,12 @@ package org.apache.hudi.common.table.view;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieFSPermission;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.avro.model.HoodiePath;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
@@ -49,6 +51,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
@@ -1424,6 +1427,63 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
assertEquals(actualReplacedFileIds, allReplacedFileIds);
}
@Test
public void testPendingClusteringOperations() throws IOException {
String partitionPath1 = "2020/06/27";
new File(basePath + "/" + partitionPath1).mkdirs();
// create 2 fileId in partition1 - fileId1 is replaced later on.
String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString();
assertFalse(roView.getLatestBaseFiles(partitionPath1)
.anyMatch(dfile -> dfile.getFileId().equals(fileId1) || dfile.getFileId().equals(fileId2) || dfile.getFileId().equals(fileId3)),
"No commit, should not find any data file");
// Only one commit
String commitTime1 = "1";
String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1);
String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2);
String fileName3 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId3);
new File(basePath + "/" + partitionPath1 + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath1 + "/" + fileName2).createNewFile();
new File(basePath + "/" + partitionPath1 + "/" + fileName3).createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);
saveAsComplete(commitTimeline, instant1, Option.empty());
refreshFsView();
assertEquals(1, roView.getLatestBaseFiles(partitionPath1)
.filter(dfile -> dfile.getFileId().equals(fileId1)).count());
assertEquals(1, roView.getLatestBaseFiles(partitionPath1)
.filter(dfile -> dfile.getFileId().equals(fileId2)).count());
assertEquals(1, roView.getLatestBaseFiles(partitionPath1)
.filter(dfile -> dfile.getFileId().equals(fileId3)).count());
List<FileSlice>[] fileSliceGroups = new List[] {
Collections.singletonList(fsView.getLatestFileSlice(partitionPath1, fileId1).get()),
Collections.singletonList(fsView.getLatestFileSlice(partitionPath1, fileId2).get())
};
// create pending clustering operation - fileId1, fileId2 are being clustered in different groups
HoodieClusteringPlan plan = ClusteringUtils.createClusteringPlan("strategy", new HashMap<>(),
fileSliceGroups, Collections.emptyMap());
String clusterTime = "2";
HoodieInstant instant2 = new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime);
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setClusteringPlan(plan).setOperationType(WriteOperationType.CLUSTER.name()).build();
metaClient.getActiveTimeline().saveToPendingReplaceCommit(instant2, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
//make sure view doesnt include fileId1
refreshFsView();
Set<String> fileIds =
fsView.getFileGroupsInPendingClustering().map(e -> e.getLeft().getFileId()).collect(Collectors.toSet());
assertTrue(fileIds.contains(fileId1));
assertTrue(fileIds.contains(fileId2));
assertFalse(fileIds.contains(fileId3));
}
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;

View File

@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.util;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Tests for {@link ClusteringUtils}.
*/
public class TestClusteringUtils extends HoodieCommonTestHarness {
private static final String CLUSTERING_STRATEGY_CLASS = "org.apache.hudi.DefaultClusteringStrategy";
private static final Map<String, String> STRATEGY_PARAMS = new HashMap<String, String>() {
{
put("sortColumn", "record_key");
}
};
@BeforeEach
public void init() throws IOException {
initMetaClient();
}
@Test
public void testClusteringPlanMultipleInstants() throws Exception {
String partitionPath1 = "partition1";
List<String> fileIds1 = new ArrayList<>();
fileIds1.add(UUID.randomUUID().toString());
fileIds1.add(UUID.randomUUID().toString());
String clusterTime1 = "1";
createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
List<String> fileIds2 = new ArrayList<>();
fileIds2.add(UUID.randomUUID().toString());
fileIds2.add(UUID.randomUUID().toString());
fileIds2.add(UUID.randomUUID().toString());
List<String> fileIds3 = new ArrayList<>();
fileIds3.add(UUID.randomUUID().toString());
String clusterTime = "2";
createRequestedReplaceInstant(partitionPath1, clusterTime, fileIds2, fileIds3);
// create replace.requested without clustering plan. this instant should be ignored by ClusteringUtils
createRequestedReplaceInstantNotClustering("3");
// create replace.requested without any metadata content. This instant should be ignored by ClusteringUtils
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, "4"));
metaClient.reloadActiveTimeline();
assertEquals(4, metaClient.getActiveTimeline().filterPendingReplaceTimeline().countInstants());
Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap =
ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(metaClient);
assertEquals(fileIds1.size() + fileIds2.size() + fileIds3.size(), fileGroupToInstantMap.size());
validateClusteringInstant(fileIds1, partitionPath1, clusterTime1, fileGroupToInstantMap);
validateClusteringInstant(fileIds2, partitionPath1, clusterTime, fileGroupToInstantMap);
validateClusteringInstant(fileIds3, partitionPath1, clusterTime, fileGroupToInstantMap);
}
private void validateClusteringInstant(List<String> fileIds, String partitionPath,
String expectedInstantTime, Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
for (String fileId : fileIds) {
assertEquals(expectedInstantTime, fileGroupToInstantMap.get(new HoodieFileGroupId(partitionPath, fileId)).getTimestamp());
}
}
private HoodieInstant createRequestedReplaceInstantNotClustering(String instantTime) throws IOException {
HoodieInstant newRequestedInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.UNKNOWN.name()).build();
metaClient.getActiveTimeline().saveToPendingReplaceCommit(newRequestedInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
return newRequestedInstant;
}
private HoodieInstant createRequestedReplaceInstant(String partitionPath1, String clusterTime, List<String>... fileIds) throws IOException {
List<FileSlice>[] fileSliceGroups = new List[fileIds.length];
for (int i = 0; i < fileIds.length; i++) {
fileSliceGroups[i] = fileIds[i].stream().map(fileId -> generateFileSlice(partitionPath1, fileId, "0")).collect(Collectors.toList());
}
HoodieClusteringPlan clusteringPlan =
ClusteringUtils.createClusteringPlan(CLUSTERING_STRATEGY_CLASS, STRATEGY_PARAMS, fileSliceGroups, Collections.emptyMap());
HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime);
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
return clusteringInstant;
}
private FileSlice generateFileSlice(String partitionPath, String fileId, String baseInstant) {
FileSlice fs = new FileSlice(new HoodieFileGroupId(partitionPath, fileId), baseInstant);
fs.setBaseFile(new HoodieBaseFile(FSUtils.makeDataFileName(baseInstant, "1-0-1", fileId)));
return fs;
}
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.timeline.service;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
@@ -297,6 +298,12 @@ public class FileSystemViewHandler {
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new ViewHandler(ctx -> {
List<ClusteringOpDTO> dtos = sliceHandler.getFileGroupsInPendingClustering(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, dtos);
}, true));
}
private static boolean isRefreshCheckDisabledInQuery(Context ctxt) {

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.timeline.service.handlers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
@@ -93,6 +94,12 @@ public class FileSliceHandler extends Handler {
.collect(Collectors.toList());
}
public List<ClusteringOpDTO> getFileGroupsInPendingClustering(String basePath) {
return viewManager.getFileSystemView(basePath).getFileGroupsInPendingClustering()
.map(fgInstant -> ClusteringOpDTO.fromClusteringOp(fgInstant.getLeft(), fgInstant.getRight()))
.collect(Collectors.toList());
}
public boolean refreshTable(String basePath) {
viewManager.clearFileSystemView(basePath);
return true;