diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index be06b8d72..4fbc63da7 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -84,6 +84,11 @@
${basedir}/src/main/avro/HoodieBootstrapSourceFilePartitionInfo.avsc
${basedir}/src/main/avro/HoodieBootstrapIndexInfo.avsc
${basedir}/src/main/avro/HoodieBootstrapMetadata.avsc
+ ${basedir}/src/main/avro/HoodieSliceInfo.avsc
+ ${basedir}/src/main/avro/HoodieClusteringGroup.avsc
+ ${basedir}/src/main/avro/HoodieClusteringStrategy.avsc
+ ${basedir}/src/main/avro/HoodieClusteringPlan.avsc
+ ${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc
diff --git a/hudi-common/src/main/avro/HoodieClusteringGroup.avsc b/hudi-common/src/main/avro/HoodieClusteringGroup.avsc
new file mode 100644
index 000000000..fb41f6ef5
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieClusteringGroup.avsc
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace":"org.apache.hudi.avro.model",
+ "type":"record",
+ "name":"HoodieClusteringGroup",
+ "type":"record",
+ "fields":[
+ {
+ /* Group of files that needs to merged. All the slices in a group will belong to same partition initially.
+ * Files of different partitions may be grouped later when we have better on disk layout with indexing support.
+ */
+ "name":"slices",
+ "type":["null", {
+ "type":"array",
+ "items": "HoodieSliceInfo"
+ }],
+ "default": null
+ },
+ {
+ "name":"metrics",
+ "type":["null", {
+ "type":"map",
+ "values":"double"
+ }],
+ "default": null
+ },
+ {
+ "name":"version",
+ "type":["int", "null"],
+ "default": 1
+ }
+ ]
+}
diff --git a/hudi-common/src/main/avro/HoodieClusteringPlan.avsc b/hudi-common/src/main/avro/HoodieClusteringPlan.avsc
new file mode 100644
index 000000000..709a0eb72
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieClusteringPlan.avsc
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace":"org.apache.hudi.avro.model",
+ "type":"record",
+ "name":"HoodieClusteringPlan",
+ "fields":[
+ {
+ "name":"inputGroups",
+ "type":["null", {
+ "type":"array",
+ "items": "HoodieClusteringGroup"
+ }],
+ "default": null
+ },
+ {
+ "name":"strategy",
+ "type":["HoodieClusteringStrategy", "null"],
+ "default": null
+ },
+ {
+ "name":"extraMetadata",
+ "type":["null", {
+ "type":"map",
+ "values":"string"
+ }],
+ "default": null
+ },
+ {
+ "name":"version",
+ "type":["int", "null"],
+ "default": 1
+ }
+ ]
+}
diff --git a/hudi-common/src/main/avro/HoodieClusteringStrategy.avsc b/hudi-common/src/main/avro/HoodieClusteringStrategy.avsc
new file mode 100644
index 000000000..caaf02631
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieClusteringStrategy.avsc
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace":"org.apache.hudi.avro.model",
+ "name":"HoodieClusteringStrategy",
+ "type":"record",
+ "fields":[
+ {
+ "name":"strategyClassName", /* have to be subclass of ClusteringStrategy interface defined in hudi. ClusteringStrategy class include methods like getPartitioner */
+ "type":["null","string"],
+ "default": null
+ },
+ {
+ "name":"strategyParams", /* Parameters could be different for different strategies. example, if sorting is needed for the strategy, parameters can contain sortColumns */
+ "type":["null", {
+ "type":"map",
+ "values":"string"
+ }],
+ "default": null
+ },
+ {
+ "name":"version",
+ "type":["int", "null"],
+ "default": 1
+ }
+ ]
+}
diff --git a/hudi-common/src/main/avro/HoodieRequestedReplaceMetadata.avsc b/hudi-common/src/main/avro/HoodieRequestedReplaceMetadata.avsc
new file mode 100644
index 000000000..f98f42410
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieRequestedReplaceMetadata.avsc
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace":"org.apache.hudi.avro.model",
+ "type":"record",
+ "name":"HoodieRequestedReplaceMetadata",
+ "fields":[
+ {
+ "name":"operationType",
+ "type":["null", "string"],
+ "default": ""
+ },
+ {
+ "name":"clusteringPlan", /* only set if operationType == clustering" */
+ "type":["HoodieClusteringPlan", "null"],
+ "default": null
+ },
+ {
+ "name":"extraMetadata",
+ "type":["null", {
+ "type":"map",
+ "values":"string"
+ }],
+ "default": null
+ },
+ {
+ "name":"version",
+ "type":["int", "null"],
+ "default": 1
+ }
+ ]
+}
diff --git a/hudi-common/src/main/avro/HoodieSliceInfo.avsc b/hudi-common/src/main/avro/HoodieSliceInfo.avsc
new file mode 100644
index 000000000..eff025184
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieSliceInfo.avsc
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace":"org.apache.hudi.avro.model",
+ "type":"record",
+ "name":"HoodieSliceInfo",
+ "fields":[
+ {
+ "name":"dataFilePath",
+ "type":["null","string"],
+ "default": null
+ },
+ {
+ "name":"deltaFilePaths",
+ "type":["null", {
+ "type":"array",
+ "items":"string"
+ }],
+ "default": null
+ },
+ {
+ "name":"fileId",
+ "type":["null","string"]
+ },
+ {
+ "name":"partitionPath",
+ "type":["null","string"],
+ "default": null
+ },
+ {
+ "name":"bootstrapFilePath",
+ "type":["null", "string"],
+ "default": null
+ },
+ {
+ "name":"version",
+ "type":["int", "null"],
+ "default": 1
+ }
+ ]
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 71e0ff2ab..f6386b94d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -40,6 +40,8 @@ public enum WriteOperationType {
BOOTSTRAP("bootstrap"),
// insert overwrite
INSERT_OVERWRITE("insert_overwrite"),
+ // cluster
+ CLUSTER("cluster"),
// used for old version
UNKNOWN("unknown");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 8e5b0b664..99fd793e3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -412,6 +412,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
createFileInMetaPath(instant.getFileName(), content, overwrite);
}
+ /**
+ * Saves content for inflight/requested REPLACE instant.
+ */
+ public void saveToPendingReplaceCommit(HoodieInstant instant, Option content) {
+ ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
+ createFileInMetaPath(instant.getFileName(), content, false);
+ }
+
public void saveToCleanRequested(HoodieInstant instant, Option content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 8ced025af..484d91b27 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -118,6 +118,12 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(s -> s.isCompleted()), details);
}
+ @Override
+ public HoodieTimeline filterPendingReplaceTimeline() {
+ return new HoodieDefaultTimeline(instants.stream().filter(
+ s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details);
+ }
+
@Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 03ba43c11..ff251e314 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -151,6 +151,12 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterPendingCompactionTimeline();
+ /**
+ * Filter this timeline to just include requested and inflight replacecommit instants.
+ */
+ HoodieTimeline filterPendingReplaceTimeline();
+
+
/**
* Create a new Timeline with all the instants after startTs.
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 80e7c66be..640d4894f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -18,19 +18,6 @@
package org.apache.hudi.common.table.timeline;
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.avro.model.HoodieInstantInfo;
-import org.apache.hudi.avro.model.HoodieRestoreMetadata;
-import org.apache.hudi.avro.model.HoodieRollbackMetadata;
-import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
-import org.apache.hudi.avro.model.HoodieSavepointMetadata;
-import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
-import org.apache.hudi.common.HoodieRollbackStat;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
@@ -40,6 +27,19 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieInstantInfo;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
+import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -115,6 +115,10 @@ public class TimelineMetadataUtils {
return serializeAvroMetadata(restoreMetadata, HoodieRestoreMetadata.class);
}
+ public static Option serializeRequestedReplaceMetadata(HoodieRequestedReplaceMetadata clusteringPlan) throws IOException {
+ return serializeAvroMetadata(clusteringPlan, HoodieRequestedReplaceMetadata.class);
+ }
+
public static Option serializeAvroMetadata(T metadata, Class clazz)
throws IOException {
DatumWriter datumWriter = new SpecificDatumWriter<>(clazz);
@@ -146,6 +150,10 @@ public class TimelineMetadataUtils {
return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
}
+ public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException {
+ return deserializeAvroMetadata(bytes, HoodieRequestedReplaceMetadata.class);
+ }
+
public static T deserializeAvroMetadata(byte[] bytes, Class clazz)
throws IOException {
DatumReader reader = new SpecificDatumReader<>(clazz);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/ClusteringOpDTO.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/ClusteringOpDTO.java
new file mode 100644
index 000000000..e87226c99
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/ClusteringOpDTO.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline.dto;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+
+/**
+ * The data transfer object of clustering.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ClusteringOpDTO {
+
+ @JsonProperty("id")
+ private String fileId;
+
+ @JsonProperty("partition")
+ private String partitionPath;
+
+ @JsonProperty("instantTime")
+ private String instantTime;
+
+ @JsonProperty("instantState")
+ private String instantState;
+
+ @JsonProperty("instantAction")
+ private String instantAction;
+
+ public static ClusteringOpDTO fromClusteringOp(HoodieFileGroupId fileGroupId, HoodieInstant instant) {
+ ClusteringOpDTO dto = new ClusteringOpDTO();
+ dto.fileId = fileGroupId.getFileId();
+ dto.partitionPath = fileGroupId.getPartitionPath();
+ dto.instantAction = instant.getAction();
+ dto.instantState = instant.getState().name();
+ dto.instantTime = instant.getTimestamp();
+ return dto;
+ }
+
+ public static Pair toClusteringOperation(ClusteringOpDTO dto) {
+ return Pair.of(new HoodieFileGroupId(dto.partitionPath, dto.fileId),
+ new HoodieInstant(HoodieInstant.State.valueOf(dto.instantState), dto.instantAction, dto.instantTime));
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 0c6d9adbd..1dd6b006b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -107,6 +108,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
resetPendingCompactionOperations(CompactionUtils.getAllPendingCompactionOperations(metaClient).values().stream()
.map(e -> Pair.of(e.getKey(), CompactionOperation.convertFromAvroRecordInstance(e.getValue()))));
resetBootstrapBaseFileMapping(Stream.empty());
+ resetFileGroupsInPendingClustering(ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(metaClient));
}
/**
@@ -678,6 +680,15 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxCommitTime));
}
+ @Override
+ public final Stream> getFileGroupsInPendingClustering() {
+ try {
+ readLock.lock();
+ return fetchFileGroupsInPendingClustering();
+ } finally {
+ readLock.unlock();
+ }
+ }
// Fetch APIs to be implemented by concrete sub-classes
@@ -710,6 +721,40 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
*/
abstract void removePendingCompactionOperations(Stream> operations);
+ /**
+ * Check if there is an outstanding clustering operation (requested/inflight) scheduled for this file.
+ *
+ * @param fgId File-Group Id
+ * @return true if there is a pending clustering, false otherwise
+ */
+ protected abstract boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId);
+
+ /**
+ * Get pending clustering instant time for specified file group. Return None if file group is not in pending
+ * clustering operation.
+ */
+ protected abstract Option getPendingClusteringInstant(final HoodieFileGroupId fileGroupId);
+
+ /**
+ * Fetch all file groups in pending clustering.
+ */
+ protected abstract Stream> fetchFileGroupsInPendingClustering();
+
+ /**
+ * resets the pending clustering operation and overwrite with the new list.
+ */
+ abstract void resetFileGroupsInPendingClustering(Map fgIdToInstantMap);
+
+ /**
+ * Add metadata for file groups in pending clustering operations to the view.
+ */
+ abstract void addFileGroupsInPendingClustering(Stream> fileGroups);
+
+ /**
+ * Remove metadata for file groups in pending clustering operations from the view.
+ */
+ abstract void removeFileGroupsInPendingClustering(Stream> fileGroups);
+
/**
* Return pending compaction operation for a file-group.
*
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
index 2ba4ff57a..ff3a78f77 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java
@@ -45,6 +45,8 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
"hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction";
public static final String FILESYSTEM_VIEW_REPLACED_MEM_FRACTION =
"hoodie.filesystem.view.spillable.replaced.mem.fraction";
+ public static final String FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION =
+ "hoodie.filesystem.view.spillable.clustering.mem.fraction";
private static final String ROCKSDB_BASE_PATH_PROP = "hoodie.filesystem.view.rocksdb.base.path";
public static final String FILESTYSTEM_REMOTE_TIMELINE_CLIENT_TIMEOUT_SECS =
"hoodie.filesystem.view.remote.timeout.secs";
@@ -62,6 +64,7 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_COMPACTION = 0.01;
private static final Double DEFAULT_MEM_FRACTION_FOR_EXTERNAL_DATA_FILE = 0.05;
private static final Double DEFAULT_MEM_FRACTION_FOR_REPLACED_FILEGROUPS = 0.01;
+ private static final Double DEFAULT_MEM_FRACTION_FOR_PENDING_CLUSTERING_FILEGROUPS = 0.01;
private static final Long DEFAULT_MAX_MEMORY_FOR_VIEW = 100 * 1024 * 1024L; // 100 MB
/**
@@ -126,6 +129,12 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
.longValue();
}
+ public long getMaxMemoryForPendingClusteringFileGroups() {
+ long totalMemory = Long.parseLong(props.getProperty(FILESYSTEM_VIEW_SPILLABLE_MEM));
+ return new Double(totalMemory * Double.parseDouble(props.getProperty(FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION)))
+ .longValue();
+ }
+
public String getBaseStoreDir() {
return props.getProperty(FILESYSTEM_VIEW_SPILLABLE_DIR);
}
@@ -245,6 +254,8 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
FILESYSTEM_VIEW_BOOTSTRAP_BASE_FILE_FRACTION, DEFAULT_MEM_FRACTION_FOR_EXTERNAL_DATA_FILE.toString());
setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_REPLACED_MEM_FRACTION),
FILESYSTEM_VIEW_REPLACED_MEM_FRACTION, DEFAULT_MEM_FRACTION_FOR_REPLACED_FILEGROUPS.toString());
+ setDefaultOnCondition(props, !props.containsKey(FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION),
+ FILESYSTEM_VIEW_PENDING_CLUSTERING_MEM_FRACTION, DEFAULT_MEM_FRACTION_FOR_PENDING_CLUSTERING_FILEGROUPS.toString());
setDefaultOnCondition(props, !props.containsKey(ROCKSDB_BASE_PATH_PROP), ROCKSDB_BASE_PATH_PROP,
DEFAULT_ROCKSDB_BASE_PATH);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index fcf7eccc3..f0c095f59 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -70,6 +70,11 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
*/
protected Map fgIdToReplaceInstants;
+ /**
+ * Track file groups in pending clustering.
+ */
+ protected Map fgIdToPendingClustering;
+
/**
* Flag to determine if closed.
*/
@@ -113,6 +118,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
this.partitionToFileGroupsMap = null;
this.fgIdToBootstrapBaseFile = null;
this.fgIdToReplaceInstants = null;
+ this.fgIdToPendingClustering = null;
}
protected Map> createPartitionToFileGroups() {
@@ -134,6 +140,11 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
return replacedFileGroupsMap;
}
+ protected Map createFileIdToPendingClusteringMap(final Map fileGroupsInClustering) {
+ Map fgInpendingClustering = new ConcurrentHashMap<>(fileGroupsInClustering);
+ return fgInpendingClustering;
+ }
+
/**
* Create a file system view, as of the given timeline, with the provided file statuses.
*/
@@ -189,6 +200,49 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
});
}
+ @Override
+ protected boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId) {
+ return fgIdToPendingClustering.containsKey(fgId);
+ }
+
+ @Override
+ protected Option getPendingClusteringInstant(HoodieFileGroupId fgId) {
+ return Option.ofNullable(fgIdToPendingClustering.get(fgId));
+ }
+
+ @Override
+ protected Stream> fetchFileGroupsInPendingClustering() {
+ return fgIdToPendingClustering.entrySet().stream().map(entry -> Pair.of(entry.getKey(), entry.getValue()));
+ }
+
+ @Override
+ void resetFileGroupsInPendingClustering(Map fgIdToInstantMap) {
+ fgIdToPendingClustering = createFileIdToPendingClusteringMap(fgIdToInstantMap);
+ }
+
+ @Override
+ void addFileGroupsInPendingClustering(Stream> fileGroups) {
+ fileGroups.forEach(fileGroupInstantPair -> {
+ ValidationUtils.checkArgument(fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()),
+ "Trying to add a FileGroupId which is already in pending clustering operation. FgId :"
+ + fileGroupInstantPair.getLeft() + ", new instant: " + fileGroupInstantPair.getRight() + ", existing instant "
+ + fgIdToPendingClustering.get(fileGroupInstantPair.getLeft()));
+
+ fgIdToPendingClustering.put(fileGroupInstantPair.getLeft(), fileGroupInstantPair.getRight());
+ });
+ }
+
+ @Override
+ void removeFileGroupsInPendingClustering(Stream> fileGroups) {
+ fileGroups.forEach(fileGroupInstantPair -> {
+ ValidationUtils.checkArgument(fgIdToPendingClustering.containsKey(fileGroupInstantPair.getLeft()),
+ "Trying to remove a FileGroupId which is not found in pending clustering operation. FgId :"
+ + fileGroupInstantPair.getLeft() + ", new instant: " + fileGroupInstantPair.getRight());
+
+ fgIdToPendingClustering.remove(fileGroupInstantPair.getLeft());
+ });
+ }
+
/**
* Given a partition path, obtain all filegroups within that. All methods, that work at the partition level go through
* this.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 5d34b385e..f7244eefd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Functions.Function0;
@@ -203,6 +204,11 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
return execute(preferredView::getPendingCompactionOperations, secondaryView::getPendingCompactionOperations);
}
+ @Override
+ public Stream> getFileGroupsInPendingClustering() {
+ return execute(preferredView::getFileGroupsInPendingClustering, secondaryView::getFileGroupsInPendingClustering);
+ }
+
@Override
public void close() {
preferredView.close();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index ec62361d8..d42592bd1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -27,10 +27,12 @@ import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
+import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
@@ -89,6 +91,9 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
+ public static final String PENDING_CLUSTERING_FILEGROUPS = String.format("%s/%s", BASE_URL, "clustering/pending/");
+
+
public static final String LAST_INSTANT = String.format("%s/%s", BASE_URL, "timeline/instant/last");
public static final String LAST_INSTANTS = String.format("%s/%s", BASE_URL, "timeline/instants/last");
@@ -396,6 +401,18 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
}
}
+ @Override
+ public Stream> getFileGroupsInPendingClustering() {
+ Map paramsMap = getParams();
+ try {
+ List dtos = executeRequest(PENDING_CLUSTERING_FILEGROUPS, paramsMap,
+ new TypeReference>() {}, RequestMethod.GET);
+ return dtos.stream().map(ClusteringOpDTO::toClusteringOperation);
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
@Override
public void close() {
closed = true;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
index 0210ae1fc..af0dc1301 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
@@ -135,6 +135,65 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
);
}
+ @Override
+ protected boolean isPendingClusteringScheduledForFileId(HoodieFileGroupId fgId) {
+ return getPendingClusteringInstant(fgId).isPresent();
+ }
+
+ @Override
+ protected Option getPendingClusteringInstant(HoodieFileGroupId fgId) {
+ String lookupKey = schemaHelper.getKeyForFileGroupsInPendingClustering(fgId);
+ HoodieInstant pendingClusteringInstant =
+ rocksDB.get(schemaHelper.getColFamilyForFileGroupsInPendingClustering(), lookupKey);
+ return Option.ofNullable(pendingClusteringInstant);
+ }
+
+ @Override
+ public Stream> fetchFileGroupsInPendingClustering() {
+ return rocksDB.>prefixSearch(schemaHelper.getColFamilyForFileGroupsInPendingClustering(), "")
+ .map(Pair::getValue);
+ }
+
+ @Override
+ void resetFileGroupsInPendingClustering(Map fgIdToInstantMap) {
+ LOG.info("Resetting file groups in pending clustering to ROCKSDB based file-system view at "
+ + config.getRocksdbBasePath() + ", Total file-groups=" + fgIdToInstantMap.size());
+
+ // Delete all replaced file groups
+ rocksDB.prefixDelete(schemaHelper.getColFamilyForFileGroupsInPendingClustering(), "part=");
+ // Now add new entries
+ addFileGroupsInPendingClustering(fgIdToInstantMap.entrySet().stream().map(entry -> Pair.of(entry.getKey(), entry.getValue())));
+ LOG.info("Resetting replacedFileGroups to ROCKSDB based file-system view complete");
+ }
+
+ @Override
+ void addFileGroupsInPendingClustering(Stream> fileGroups) {
+ rocksDB.writeBatch(batch ->
+ fileGroups.forEach(fgIdToClusterInstant -> {
+ ValidationUtils.checkArgument(!isPendingClusteringScheduledForFileId(fgIdToClusterInstant.getLeft()),
+ "Duplicate FileGroupId found in pending compaction operations. FgId :"
+ + fgIdToClusterInstant.getLeft());
+
+ rocksDB.putInBatch(batch, schemaHelper.getColFamilyForFileGroupsInPendingClustering(),
+ schemaHelper.getKeyForFileGroupsInPendingClustering(fgIdToClusterInstant.getKey()), fgIdToClusterInstant);
+ })
+ );
+ }
+
+ @Override
+ void removeFileGroupsInPendingClustering(Stream> fileGroups) {
+ rocksDB.writeBatch(batch ->
+ fileGroups.forEach(fgToPendingClusteringInstant -> {
+ ValidationUtils.checkArgument(
+ !isPendingClusteringScheduledForFileId(fgToPendingClusteringInstant.getLeft()),
+ "Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :"
+ + fgToPendingClusteringInstant.getLeft());
+ rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForFileGroupsInPendingClustering(),
+ schemaHelper.getKeyForFileGroupsInPendingClustering(fgToPendingClusteringInstant.getLeft()));
+ })
+ );
+ }
+
@Override
protected void resetViewState() {
LOG.info("Deleting all rocksdb data associated with table filesystem view");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
index 9f807da82..d72516921 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
@@ -51,6 +51,7 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
private final long maxMemoryForPendingCompaction;
private final long maxMemoryForBootstrapBaseFile;
private final long maxMemoryForReplaceFileGroups;
+ private final long maxMemoryForClusteringFileGroups;
private final String baseStoreDir;
public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
@@ -60,6 +61,7 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
this.maxMemoryForPendingCompaction = config.getMaxMemoryForPendingCompaction();
this.maxMemoryForBootstrapBaseFile = config.getMaxMemoryForBootstrapBaseFile();
this.maxMemoryForReplaceFileGroups = config.getMaxMemoryForReplacedFileGroups();
+ this.maxMemoryForClusteringFileGroups = config.getMaxMemoryForPendingClusteringFileGroups();
this.baseStoreDir = config.getBaseStoreDir();
init(metaClient, visibleActiveTimeline);
}
@@ -130,6 +132,21 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
}
}
+ @Override
+ protected Map createFileIdToPendingClusteringMap(final Map fileGroupsInClustering) {
+ try {
+ LOG.info("Creating file group id to clustering instant map using external spillable Map. Max Mem=" + maxMemoryForClusteringFileGroups
+ + ", BaseDir=" + baseStoreDir);
+ new File(baseStoreDir).mkdirs();
+ Map pendingMap = new ExternalSpillableMap<>(
+ maxMemoryForClusteringFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>());
+ pendingMap.putAll(fileGroupsInClustering);
+ return pendingMap;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public Stream getAllFileGroups() {
return ((ExternalSpillableMap) partitionToFileGroupsMap).valueStream()
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index e8fe68ad4..504f95a9e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
@@ -169,4 +170,9 @@ public interface TableFileSystemView {
* Stream all the replaced file groups before maxCommitTime.
*/
Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath);
+
+ /**
+ * Filegroups that are in pending clustering.
+ */
+ Stream> getFileGroupsInPendingClustering();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
new file mode 100644
index 000000000..c0c88c04a
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Helper class to generate clustering plan from metadata.
+ */
+public class ClusteringUtils {
+
+ private static final Logger LOG = LogManager.getLogger(ClusteringUtils.class);
+
+ public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB";
+ public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE";
+ public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES";
+
+ /**
+ * Get all pending clustering plans along with their instants.
+ */
+ public static Stream> getAllPendingClusteringPlans(
+ HoodieTableMetaClient metaClient) {
+ List pendingReplaceInstants =
+ metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
+ return pendingReplaceInstants.stream().map(instant -> getClusteringPlan(metaClient, instant))
+ .filter(Option::isPresent).map(Option::get);
+ }
+
+ public static Option> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant requestedReplaceInstant) {
+ try {
+ Option content = metaClient.getActiveTimeline().getInstantDetails(requestedReplaceInstant);
+ if (!content.isPresent() || content.get().length == 0) {
+ // few operations create requested file without any content. Assume these are not clustering
+ LOG.warn("No content found in requested file for instant " + requestedReplaceInstant);
+ return Option.empty();
+ }
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
+ if (WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType())) {
+ return Option.of(Pair.of(requestedReplaceInstant, requestedReplaceMetadata.getClusteringPlan()));
+ }
+ return Option.empty();
+ } catch (IOException e) {
+ throw new HoodieIOException("Error reading clustering plan " + requestedReplaceInstant.getTimestamp(), e);
+ }
+ }
+
+ /**
+ * Get filegroups to pending clustering instant mapping for all pending clustering plans.
+ * This includes all clustering operattions in 'requested' and 'inflight' states.
+ */
+ public static Map getAllFileGroupsInPendingClusteringPlans(
+ HoodieTableMetaClient metaClient) {
+ Stream> pendingClusteringPlans = getAllPendingClusteringPlans(metaClient);
+ Stream> resultStream = pendingClusteringPlans.flatMap(clusteringPlan ->
+ // get all filegroups in the plan
+ getFileGroupEntriesInClusteringPlan(clusteringPlan.getLeft(), clusteringPlan.getRight()));
+
+ Map resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ LOG.info("Found " + resultMap.size() + " files in pending clustering operations");
+ return resultMap;
+ }
+
+ public static Stream> getFileGroupsInPendingClusteringInstant(
+ HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
+ Stream partitionToFileIdLists = clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup);
+ return partitionToFileIdLists.map(e -> Pair.of(e, instant));
+ }
+
+ private static Stream> getFileGroupEntriesInClusteringPlan(
+ HoodieInstant instant, HoodieClusteringPlan clusteringPlan) {
+ return getFileGroupsInPendingClusteringInstant(instant, clusteringPlan).map(entry ->
+ new AbstractMap.SimpleEntry<>(entry.getLeft(), entry.getRight()));
+ }
+
+ private static Stream getFileGroupsFromClusteringGroup(HoodieClusteringGroup group) {
+ return group.getSlices().stream().map(slice -> new HoodieFileGroupId(slice.getPartitionPath(), slice.getFileId()));
+ }
+
+ /**
+ * Create clustering plan from input fileSliceGroups.
+ */
+ public static HoodieClusteringPlan createClusteringPlan(String strategyClassName,
+ Map strategyParams,
+ List[] fileSliceGroups,
+ Map extraMetadata) {
+ List clusteringGroups = Arrays.stream(fileSliceGroups).map(fileSliceGroup -> {
+ Map groupMetrics = buildMetrics(fileSliceGroup);
+ List sliceInfos = getFileSliceInfo(fileSliceGroup);
+ return HoodieClusteringGroup.newBuilder().setSlices(sliceInfos).setMetrics(groupMetrics).build();
+ }).collect(Collectors.toList());
+
+ HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
+ .setStrategyClassName(strategyClassName).setStrategyParams(strategyParams)
+ .build();
+
+ HoodieClusteringPlan plan = HoodieClusteringPlan.newBuilder()
+ .setInputGroups(clusteringGroups)
+ .setExtraMetadata(extraMetadata)
+ .setStrategy(strategy)
+ .build();
+
+ return plan;
+ }
+
+ private static List getFileSliceInfo(List slices) {
+ return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder()
+ .setPartitionPath(slice.getPartitionPath())
+ .setFileId(slice.getFileId())
+ .setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(null))
+ .setDeltaFilePaths(slice.getLogFiles().map(f -> f.getPath().getName()).collect(Collectors.toList()))
+ .setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(bbf -> bbf.getPath()).orElse(null)).orElse(null))
+ .build()).collect(Collectors.toList());
+ }
+
+ private static Map buildMetrics(List fileSlices) {
+ int numLogFiles = 0;
+ long totalLogFileSize = 0;
+ long totalIORead = 0;
+
+ for (FileSlice slice : fileSlices) {
+ numLogFiles += slice.getLogFiles().count();
+ // Total size of all the log files
+ totalLogFileSize += slice.getLogFiles().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
+ .reduce(Long::sum).orElse(0L);
+ // Total read will be the base file + all the log files
+ totalIORead =
+ FSUtils.getSizeInMB((slice.getBaseFile().isPresent() ? slice.getBaseFile().get().getFileSize() : 0L) + totalLogFileSize);
+ }
+
+ Map metrics = new HashMap<>();
+ metrics.put(TOTAL_IO_READ_MB, (double) totalIORead);
+ metrics.put(TOTAL_LOG_FILE_SIZE, (double) totalLogFileSize);
+ metrics.put(TOTAL_LOG_FILES, (double) numLogFiles);
+ return metrics;
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
index be8ccefc2..e9377a667 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBSchemaHelper.java
@@ -48,6 +48,7 @@ public class RocksDBSchemaHelper {
private final String colFamilyForBootstrapBaseFile;
private final String colFamilyForStoredPartitions;
private final String colFamilyForReplacedFileGroups;
+ private final String colFamilyForPendingClusteringFileGroups;
public RocksDBSchemaHelper(HoodieTableMetaClient metaClient) {
this.colFamilyForBootstrapBaseFile = "hudi_bootstrap_basefile_" + metaClient.getBasePath().replace("/", "_");
@@ -55,11 +56,12 @@ public class RocksDBSchemaHelper {
this.colFamilyForStoredPartitions = "hudi_partitions_" + metaClient.getBasePath().replace("/", "_");
this.colFamilyForView = "hudi_view_" + metaClient.getBasePath().replace("/", "_");
this.colFamilyForReplacedFileGroups = "hudi_replaced_fg" + metaClient.getBasePath().replace("/", "_");
+ this.colFamilyForPendingClusteringFileGroups = "hudi_pending_clustering_fg" + metaClient.getBasePath().replace("/", "_");
}
public List getAllColumnFamilies() {
return Arrays.asList(getColFamilyForView(), getColFamilyForPendingCompaction(), getColFamilyForBootstrapBaseFile(),
- getColFamilyForStoredPartitions(), getColFamilyForReplacedFileGroups());
+ getColFamilyForStoredPartitions(), getColFamilyForReplacedFileGroups(), getColFamilyForFileGroupsInPendingClustering());
}
public String getKeyForPartitionLookup(String partition) {
@@ -78,6 +80,10 @@ public class RocksDBSchemaHelper {
return getPartitionFileIdBasedLookup(fgId);
}
+ public String getKeyForFileGroupsInPendingClustering(HoodieFileGroupId fgId) {
+ return getPartitionFileIdBasedLookup(fgId);
+ }
+
public String getKeyForSliceView(HoodieFileGroup fileGroup, FileSlice slice) {
return getKeyForSliceView(fileGroup.getPartitionPath(), fileGroup.getFileGroupId().getFileId(),
slice.getBaseInstantTime());
@@ -135,4 +141,8 @@ public class RocksDBSchemaHelper {
public String getColFamilyForReplacedFileGroups() {
return colFamilyForReplacedFileGroups;
}
+
+ public String getColFamilyForFileGroupsInPendingClustering() {
+ return colFamilyForPendingClusteringFileGroups;
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index c58df38c0..3fceee3bb 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -21,10 +21,12 @@ package org.apache.hudi.common.table.view;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieFSPermission;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.avro.model.HoodiePath;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
@@ -49,6 +51,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
@@ -1424,6 +1427,63 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
assertEquals(actualReplacedFileIds, allReplacedFileIds);
}
+ @Test
+ public void testPendingClusteringOperations() throws IOException {
+ String partitionPath1 = "2020/06/27";
+ new File(basePath + "/" + partitionPath1).mkdirs();
+
+ // create 2 fileId in partition1 - fileId1 is replaced later on.
+ String fileId1 = UUID.randomUUID().toString();
+ String fileId2 = UUID.randomUUID().toString();
+ String fileId3 = UUID.randomUUID().toString();
+
+ assertFalse(roView.getLatestBaseFiles(partitionPath1)
+ .anyMatch(dfile -> dfile.getFileId().equals(fileId1) || dfile.getFileId().equals(fileId2) || dfile.getFileId().equals(fileId3)),
+ "No commit, should not find any data file");
+ // Only one commit
+ String commitTime1 = "1";
+ String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1);
+ String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2);
+ String fileName3 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId3);
+ new File(basePath + "/" + partitionPath1 + "/" + fileName1).createNewFile();
+ new File(basePath + "/" + partitionPath1 + "/" + fileName2).createNewFile();
+ new File(basePath + "/" + partitionPath1 + "/" + fileName3).createNewFile();
+
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+ HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);
+ saveAsComplete(commitTimeline, instant1, Option.empty());
+ refreshFsView();
+ assertEquals(1, roView.getLatestBaseFiles(partitionPath1)
+ .filter(dfile -> dfile.getFileId().equals(fileId1)).count());
+ assertEquals(1, roView.getLatestBaseFiles(partitionPath1)
+ .filter(dfile -> dfile.getFileId().equals(fileId2)).count());
+ assertEquals(1, roView.getLatestBaseFiles(partitionPath1)
+ .filter(dfile -> dfile.getFileId().equals(fileId3)).count());
+
+ List[] fileSliceGroups = new List[] {
+ Collections.singletonList(fsView.getLatestFileSlice(partitionPath1, fileId1).get()),
+ Collections.singletonList(fsView.getLatestFileSlice(partitionPath1, fileId2).get())
+ };
+
+ // create pending clustering operation - fileId1, fileId2 are being clustered in different groups
+ HoodieClusteringPlan plan = ClusteringUtils.createClusteringPlan("strategy", new HashMap<>(),
+ fileSliceGroups, Collections.emptyMap());
+
+ String clusterTime = "2";
+ HoodieInstant instant2 = new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
+ .setClusteringPlan(plan).setOperationType(WriteOperationType.CLUSTER.name()).build();
+ metaClient.getActiveTimeline().saveToPendingReplaceCommit(instant2, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+
+ //make sure view doesnt include fileId1
+ refreshFsView();
+ Set fileIds =
+ fsView.getFileGroupsInPendingClustering().map(e -> e.getLeft().getFileId()).collect(Collectors.toSet());
+ assertTrue(fileIds.contains(fileId1));
+ assertTrue(fileIds.contains(fileId2));
+ assertFalse(fileIds.contains(fileId3));
+ }
+
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
new file mode 100644
index 000000000..5d82bbce7
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link ClusteringUtils}.
+ */
+public class TestClusteringUtils extends HoodieCommonTestHarness {
+
+ private static final String CLUSTERING_STRATEGY_CLASS = "org.apache.hudi.DefaultClusteringStrategy";
+ private static final Map STRATEGY_PARAMS = new HashMap() {
+ {
+ put("sortColumn", "record_key");
+ }
+ };
+
+ @BeforeEach
+ public void init() throws IOException {
+ initMetaClient();
+ }
+
+ @Test
+ public void testClusteringPlanMultipleInstants() throws Exception {
+ String partitionPath1 = "partition1";
+ List fileIds1 = new ArrayList<>();
+ fileIds1.add(UUID.randomUUID().toString());
+ fileIds1.add(UUID.randomUUID().toString());
+ String clusterTime1 = "1";
+ createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
+
+ List fileIds2 = new ArrayList<>();
+ fileIds2.add(UUID.randomUUID().toString());
+ fileIds2.add(UUID.randomUUID().toString());
+ fileIds2.add(UUID.randomUUID().toString());
+
+ List fileIds3 = new ArrayList<>();
+ fileIds3.add(UUID.randomUUID().toString());
+
+ String clusterTime = "2";
+ createRequestedReplaceInstant(partitionPath1, clusterTime, fileIds2, fileIds3);
+
+ // create replace.requested without clustering plan. this instant should be ignored by ClusteringUtils
+ createRequestedReplaceInstantNotClustering("3");
+
+ // create replace.requested without any metadata content. This instant should be ignored by ClusteringUtils
+ metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, "4"));
+
+ metaClient.reloadActiveTimeline();
+ assertEquals(4, metaClient.getActiveTimeline().filterPendingReplaceTimeline().countInstants());
+
+ Map fileGroupToInstantMap =
+ ClusteringUtils.getAllFileGroupsInPendingClusteringPlans(metaClient);
+ assertEquals(fileIds1.size() + fileIds2.size() + fileIds3.size(), fileGroupToInstantMap.size());
+ validateClusteringInstant(fileIds1, partitionPath1, clusterTime1, fileGroupToInstantMap);
+ validateClusteringInstant(fileIds2, partitionPath1, clusterTime, fileGroupToInstantMap);
+ validateClusteringInstant(fileIds3, partitionPath1, clusterTime, fileGroupToInstantMap);
+ }
+
+ private void validateClusteringInstant(List fileIds, String partitionPath,
+ String expectedInstantTime, Map fileGroupToInstantMap) {
+ for (String fileId : fileIds) {
+ assertEquals(expectedInstantTime, fileGroupToInstantMap.get(new HoodieFileGroupId(partitionPath, fileId)).getTimestamp());
+ }
+ }
+
+ private HoodieInstant createRequestedReplaceInstantNotClustering(String instantTime) throws IOException {
+ HoodieInstant newRequestedInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
+ .setOperationType(WriteOperationType.UNKNOWN.name()).build();
+ metaClient.getActiveTimeline().saveToPendingReplaceCommit(newRequestedInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+ return newRequestedInstant;
+ }
+
+ private HoodieInstant createRequestedReplaceInstant(String partitionPath1, String clusterTime, List... fileIds) throws IOException {
+ List[] fileSliceGroups = new List[fileIds.length];
+ for (int i = 0; i < fileIds.length; i++) {
+ fileSliceGroups[i] = fileIds[i].stream().map(fileId -> generateFileSlice(partitionPath1, fileId, "0")).collect(Collectors.toList());
+ }
+
+ HoodieClusteringPlan clusteringPlan =
+ ClusteringUtils.createClusteringPlan(CLUSTERING_STRATEGY_CLASS, STRATEGY_PARAMS, fileSliceGroups, Collections.emptyMap());
+
+ HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
+ .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
+ metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+ return clusteringInstant;
+ }
+
+ private FileSlice generateFileSlice(String partitionPath, String fileId, String baseInstant) {
+ FileSlice fs = new FileSlice(new HoodieFileGroupId(partitionPath, fileId), baseInstant);
+ fs.setBaseFile(new HoodieBaseFile(FSUtils.makeDataFileName(baseInstant, "1-0-1", fileId)));
+ return fs;
+ }
+
+ @Override
+ protected HoodieTableType getTableType() {
+ return HoodieTableType.MERGE_ON_READ;
+ }
+}
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
index a4bb52c86..e008fc542 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
@@ -20,6 +20,7 @@ package org.apache.hudi.timeline.service;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
+import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
@@ -297,6 +298,12 @@ public class FileSystemViewHandler {
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
writeValueAsString(ctx, dtos);
}, true));
+
+ app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new ViewHandler(ctx -> {
+ List dtos = sliceHandler.getFileGroupsInPendingClustering(
+ ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
+ writeValueAsString(ctx, dtos);
+ }, true));
}
private static boolean isRefreshCheckDisabledInQuery(Context ctxt) {
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 56cb7a870..18c5eb17c 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -19,6 +19,7 @@
package org.apache.hudi.timeline.service.handlers;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO;
import org.apache.hudi.common.table.timeline.dto.FileGroupDTO;
import org.apache.hudi.common.table.timeline.dto.FileSliceDTO;
@@ -93,6 +94,12 @@ public class FileSliceHandler extends Handler {
.collect(Collectors.toList());
}
+ public List getFileGroupsInPendingClustering(String basePath) {
+ return viewManager.getFileSystemView(basePath).getFileGroupsInPendingClustering()
+ .map(fgInstant -> ClusteringOpDTO.fromClusteringOp(fgInstant.getLeft(), fgInstant.getRight()))
+ .collect(Collectors.toList());
+ }
+
public boolean refreshTable(String basePath) {
viewManager.clearFileSystemView(basePath);
return true;