diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 2c6be7ff0..fd1d4b51b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -21,6 +21,7 @@ package org.apache.hudi.client; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -38,9 +39,12 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; @@ -88,6 +92,7 @@ public abstract class AbstractHoodieWriteClient table) { + table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> { + Option> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant); + if (instantPlan.isPresent()) { + LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft()); + cluster(instant.getTimestamp(), true); + } + }); + } + /** * Handle auto clean during commit. * @@ -725,6 +750,54 @@ public abstract class AbstractHoodieWriteClient scheduleClustering(Option> extraMetadata) throws HoodieIOException { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty(); + } + + /** + * Schedules a new clustering instant with passed-in instant time. + * + * @param instantTime clustering Instant Time + * @param extraMetadata Extra Metadata to be stored + */ + public boolean scheduleClusteringAtInstant(String instantTime, Option> extraMetadata) throws HoodieIOException { + LOG.info("Scheduling clustering at instant time :" + instantTime); + Option plan = createTable(config, hadoopConf) + .scheduleClustering(context, instantTime, extraMetadata); + return plan.isPresent(); + } + + /** + * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata. + * + * @param clusteringInstant Clustering Instant Time + * @return Collection of Write Status + */ + public abstract HoodieWriteMetadata cluster(String clusteringInstant, boolean shouldComplete); + + /** + * Executes a clustering plan on a table, serially before or after an insert/upsert action. + */ + protected Option inlineCluster(Option> extraMetadata) { + Option clusteringInstantOpt = scheduleClustering(extraMetadata); + clusteringInstantOpt.ifPresent(clusteringInstant -> { + // inline cluster should auto commit as the user is never given control + cluster(clusteringInstant, true); + }); + return clusteringInstantOpt; + } + + protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) { + table.rollback(context, HoodieActiveTimeline.createNewInstantTime(), inflightInstant, false); + table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant); + } + /** * Finalize Write operation. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java new file mode 100644 index 000000000..aa6c29b08 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ConcatenatingIterator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.util.ValidationUtils; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +/** + * Provides iterator interface over List of iterators. Consumes all records from first iterator element + * before moving to next iterator in the list. That is concatenate elements across multiple iterators. + * + * @param + */ +public class ConcatenatingIterator implements Iterator { + + private final Queue> allIterators; + + public ConcatenatingIterator(List> iterators) { + allIterators = new LinkedList<>(iterators); + } + + @Override + public boolean hasNext() { + while (!allIterators.isEmpty()) { + if (allIterators.peek().hasNext()) { + return true; + } + // iterator at current head is done. move ahead + allIterators.poll(); + } + + return false; + } + + @Override + public T next() { + ValidationUtils.checkArgument(hasNext(), "No more elements left"); + return allIterators.peek().next(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/FileSliceMetricUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/FileSliceMetricUtils.java new file mode 100644 index 000000000..347f86cfb --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/FileSliceMetricUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieLogFile; + +import java.util.List; +import java.util.Map; + +/** + * A utility class for calculating metrics related to FileSlice. + */ +public class FileSliceMetricUtils { + + public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB"; + public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB"; + public static final String TOTAL_IO_MB = "TOTAL_IO_MB"; + public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE"; + public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES"; + + public static void addFileSliceCommonMetrics(List fileSlices, Map metrics, long defaultBaseFileSize) { + int numLogFiles = 0; + long totalLogFileSize = 0; + long totalIORead = 0; + long totalIOWrite = 0; + long totalIO = 0; + + for (FileSlice slice : fileSlices) { + numLogFiles += slice.getLogFiles().count(); + // Total size of all the log files + totalLogFileSize += slice.getLogFiles().map(HoodieLogFile::getFileSize).filter(size -> size >= 0) + .reduce(Long::sum).orElse(0L); + + long baseFileSize = slice.getBaseFile().isPresent() ? slice.getBaseFile().get().getFileSize() : 0L; + // Total read will be the base file + all the log files + totalIORead = FSUtils.getSizeInMB(baseFileSize + totalLogFileSize); + // Total write will be similar to the size of the base file + totalIOWrite = FSUtils.getSizeInMB(baseFileSize > 0 ? baseFileSize : defaultBaseFileSize); + // Total IO will the the IO for read + write + totalIO = totalIORead + totalIOWrite; + } + + metrics.put(TOTAL_IO_READ_MB, (double) totalIORead); + metrics.put(TOTAL_IO_WRITE_MB, (double) totalIOWrite); + metrics.put(TOTAL_IO_MB, (double) totalIO); + metrics.put(TOTAL_LOG_FILE_SIZE, (double) totalLogFileSize); + metrics.put(TOTAL_LOG_FILES, (double) numLogFiles); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java new file mode 100644 index 000000000..ae805ca01 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Clustering specific configs. + */ +public class HoodieClusteringConfig extends DefaultHoodieConfig { + + // Config to provide a strategy class to create ClusteringPlan. Class has to be subclass of ClusteringPlanStrategy + public static final String CLUSTERING_PLAN_STRATEGY_CLASS = "hoodie.clustering.plan.strategy.class"; + public static final String DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS = + "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy"; + + // Config to provide a strategy class to execute a ClusteringPlan. Class has to be subclass of RunClusteringStrategy + public static final String CLUSTERING_EXECUTION_STRATEGY_CLASS = "hoodie.clustering.execution.strategy.class"; + public static final String DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS = + "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy"; + + // Turn on inline clustering - clustering will be run after write operation is complete. + public static final String INLINE_CLUSTERING_PROP = "hoodie.clustering.inline"; + private static final String DEFAULT_INLINE_CLUSTERING = "false"; + + // Config to control frequency of clustering + public static final String INLINE_CLUSTERING_MAX_COMMIT_PROP = "hoodie.clustering.inline.max.commits"; + private static final String DEFAULT_INLINE_CLUSTERING_NUM_COMMITS = "4"; + + // Any strategy specific params can be saved with this prefix + public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy."; + + // Number of partitions to list to create ClusteringPlan. + public static final String CLUSTERING_TARGET_PARTITIONS = CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.lookback.partitions"; + public static final String DEFAULT_CLUSTERING_TARGET_PARTITIONS = String.valueOf(2); + + // Files smaller than the size specified here are candidates for clustering. + public static final String CLUSTERING_PLAN_SMALL_FILE_LIMIT = CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit"; + public static final String DEFAULT_CLUSTERING_PLAN_SMALL_FILE_LIMIT = String.valueOf(600 * 1024 * 1024L); // 600MB + + // Each clustering operation can create multiple groups. Total amount of data processed by clustering operation + // is defined by below two properties (CLUSTERING_MAX_BYTES_PER_GROUP * CLUSTERING_MAX_NUM_GROUPS). + // Max amount of data to be included in one group + public static final String CLUSTERING_MAX_BYTES_PER_GROUP = CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group"; + public static final String DEFAULT_CLUSTERING_MAX_GROUP_SIZE = String.valueOf(2 * 1024 * 1024 * 1024L); + + // Maximum number of groups to create as part of ClusteringPlan. Increasing groups will increase parallelism. + public static final String CLUSTERING_MAX_NUM_GROUPS = CLUSTERING_STRATEGY_PARAM_PREFIX + "max.num.groups"; + public static final String DEFAULT_CLUSTERING_MAX_NUM_GROUPS = "30"; + + // Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups. + public static final String CLUSTERING_TARGET_FILE_MAX_BYTES = CLUSTERING_STRATEGY_PARAM_PREFIX + "target.file.max.bytes"; + public static final String DEFAULT_CLUSTERING_TARGET_FILE_MAX_BYTES = String.valueOf(1 * 1024 * 1024 * 1024L); // 1GB + + // constants related to clustering that may be used by more than 1 strategy. + public static final String CLUSTERING_SORT_COLUMNS_PROPERTY = HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns"; + + public HoodieClusteringConfig(Properties props) { + super(props); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private final Properties props = new Properties(); + + public Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.props.load(reader); + return this; + } + } + + public Builder withClusteringPlanStrategyClass(String clusteringStrategyClass) { + props.setProperty(CLUSTERING_PLAN_STRATEGY_CLASS, clusteringStrategyClass); + return this; + } + + public Builder withClusteringExecutionStrategyClass(String runClusteringStrategyClass) { + props.setProperty(CLUSTERING_EXECUTION_STRATEGY_CLASS, runClusteringStrategyClass); + return this; + } + + public Builder withClusteringTargetPartitions(int clusteringTargetPartitions) { + props.setProperty(CLUSTERING_TARGET_PARTITIONS, String.valueOf(clusteringTargetPartitions)); + return this; + } + + public Builder withClusteringPlanSmallFileLimit(long clusteringSmallFileLimit) { + props.setProperty(CLUSTERING_PLAN_SMALL_FILE_LIMIT, String.valueOf(clusteringSmallFileLimit)); + return this; + } + + public Builder withClusteringSortColumns(String sortColumns) { + props.setProperty(CLUSTERING_SORT_COLUMNS_PROPERTY, sortColumns); + return this; + } + + public Builder withClusteringMaxBytesInGroup(long clusteringMaxGroupSize) { + props.setProperty(CLUSTERING_MAX_BYTES_PER_GROUP, String.valueOf(clusteringMaxGroupSize)); + return this; + } + + public Builder withClusteringMaxNumGroups(int maxNumGroups) { + props.setProperty(CLUSTERING_MAX_NUM_GROUPS, String.valueOf(maxNumGroups)); + return this; + } + + public Builder withClusteringTargetFileMaxBytes(long targetFileSize) { + props.setProperty(CLUSTERING_TARGET_FILE_MAX_BYTES, String.valueOf(targetFileSize)); + return this; + } + + public Builder withInlineClustering(Boolean inlineCompaction) { + props.setProperty(INLINE_CLUSTERING_PROP, String.valueOf(inlineCompaction)); + return this; + } + + public Builder withInlineClusteringNumCommits(int numCommits) { + props.setProperty(INLINE_CLUSTERING_MAX_COMMIT_PROP, String.valueOf(numCommits)); + return this; + } + + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public HoodieClusteringConfig build() { + HoodieClusteringConfig config = new HoodieClusteringConfig(props); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_PLAN_STRATEGY_CLASS), + CLUSTERING_PLAN_STRATEGY_CLASS, DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_EXECUTION_STRATEGY_CLASS), + CLUSTERING_EXECUTION_STRATEGY_CLASS, DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_MAX_BYTES_PER_GROUP), CLUSTERING_MAX_BYTES_PER_GROUP, + DEFAULT_CLUSTERING_MAX_GROUP_SIZE); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_MAX_NUM_GROUPS), CLUSTERING_MAX_NUM_GROUPS, + DEFAULT_CLUSTERING_MAX_NUM_GROUPS); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_TARGET_FILE_MAX_BYTES), CLUSTERING_TARGET_FILE_MAX_BYTES, + DEFAULT_CLUSTERING_TARGET_FILE_MAX_BYTES); + setDefaultOnCondition(props, !props.containsKey(INLINE_CLUSTERING_PROP), INLINE_CLUSTERING_PROP, + DEFAULT_INLINE_CLUSTERING); + setDefaultOnCondition(props, !props.containsKey(INLINE_CLUSTERING_MAX_COMMIT_PROP), INLINE_CLUSTERING_MAX_COMMIT_PROP, + DEFAULT_INLINE_CLUSTERING_NUM_COMMITS); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_TARGET_PARTITIONS), CLUSTERING_TARGET_PARTITIONS, + DEFAULT_CLUSTERING_TARGET_PARTITIONS); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_PLAN_SMALL_FILE_LIMIT), CLUSTERING_PLAN_SMALL_FILE_LIMIT, + DEFAULT_CLUSTERING_PLAN_SMALL_FILE_LIMIT); + return config; + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e5baaf6ec..bf9e20362 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -390,6 +390,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP)); } + public boolean isInlineClustering() { + return Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_PROP)); + } + + public int getInlineClusterMaxCommits() { + return Integer.parseInt(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP)); + } + public String getPayloadClass() { return props.getProperty(HoodieCompactionConfig.PAYLOAD_CLASS_PROP); } @@ -406,6 +414,41 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED)); } + /** + * Clustering properties. + */ + public String getClusteringPlanStrategyClass() { + return props.getProperty(HoodieClusteringConfig.CLUSTERING_PLAN_STRATEGY_CLASS); + } + + public String getClusteringExecutionStrategyClass() { + return props.getProperty(HoodieClusteringConfig.CLUSTERING_EXECUTION_STRATEGY_CLASS); + } + + public long getClusteringMaxBytesInGroup() { + return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_BYTES_PER_GROUP)); + } + + public long getClusteringSmallFileLimit() { + return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_PLAN_SMALL_FILE_LIMIT)); + } + + public int getClusteringMaxNumGroups() { + return Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS)); + } + + public long getClusteringTargetFileMaxBytes() { + return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_FILE_MAX_BYTES)); + } + + public int getTargetPartitionsForClustering() { + return Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS)); + } + + public String getClusteringSortColumns() { + return props.getProperty(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY); + } + /** * index properties. */ @@ -804,6 +847,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private boolean isIndexConfigSet = false; private boolean isStorageConfigSet = false; private boolean isCompactionConfigSet = false; + private boolean isClusteringConfigSet = false; private boolean isMetricsConfigSet = false; private boolean isBootstrapConfigSet = false; private boolean isMemoryConfigSet = false; @@ -933,6 +977,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withClusteringConfig(HoodieClusteringConfig clusteringConfig) { + props.putAll(clusteringConfig.getProps()); + isClusteringConfigSet = true; + return this; + } + public Builder withMetricsConfig(HoodieMetricsConfig metricsConfig) { props.putAll(metricsConfig.getProps()); isMetricsConfigSet = true; @@ -1087,6 +1137,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !isStorageConfigSet, HoodieStorageConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isCompactionConfigSet, HoodieCompactionConfig.newBuilder().fromProperties(props).build()); + setDefaultOnCondition(props, !isClusteringConfigSet, + HoodieClusteringConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isBootstrapConfigSet, HoodieBootstrapConfig.newBuilder().fromProperties(props).build()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java new file mode 100644 index 000000000..bb6aaa247 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class HoodieClusteringException extends HoodieException { + + public HoodieClusteringException(String msg) { + super(msg); + } + + public HoodieClusteringException(String msg, Throwable e) { + super(msg, e); + } +} + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 292039b83..c8c112fc2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -37,6 +37,7 @@ public class HoodieMetrics { public String cleanTimerName = null; public String commitTimerName = null; public String deltaCommitTimerName = null; + public String replaceCommitTimerName = null; public String finalizeTimerName = null; public String compactionTimerName = null; public String indexTimerName = null; @@ -48,6 +49,7 @@ public class HoodieMetrics { private Timer deltaCommitTimer = null; private Timer finalizeTimer = null; private Timer compactionTimer = null; + private Timer clusteringTimer = null; private Timer indexTimer = null; public HoodieMetrics(HoodieWriteConfig config, String tableName) { @@ -59,6 +61,7 @@ public class HoodieMetrics { this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION); this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION); this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION); + this.replaceCommitTimerName = getMetricsName("timer", HoodieTimeline.REPLACE_COMMIT_ACTION); this.finalizeTimerName = getMetricsName("timer", "finalize"); this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION); this.indexTimerName = getMetricsName("timer", "index"); @@ -83,6 +86,13 @@ public class HoodieMetrics { return compactionTimer == null ? null : compactionTimer.time(); } + public Timer.Context getClusteringCtx() { + if (config.isMetricsOn() && clusteringTimer == null) { + clusteringTimer = createTimer(replaceCommitTimerName); + } + return clusteringTimer == null ? null : clusteringTimer.time(); + } + public Timer.Context getCleanCtx() { if (config.isMetricsOn() && cleanTimer == null) { cleanTimer = createTimer(cleanTimerName); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 644eb6273..36cd89add 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -337,6 +338,27 @@ public abstract class HoodieTable implem public abstract HoodieWriteMetadata compact(HoodieEngineContext context, String compactionInstantTime); + + /** + * Schedule clustering for the instant time. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for scheduling clustering + * @param extraMetadata additional metadata to write into plan + * @return HoodieClusteringPlan, if there is enough data for clustering. + */ + public abstract Option scheduleClustering(HoodieEngineContext context, + String instantTime, + Option> extraMetadata); + + /** + * Execute Clustering on the table. Clustering re-arranges the data so that it is optimized for data access. + * + * @param context HoodieEngineContext + * @param clusteringInstantTime Instant Time + */ + public abstract HoodieWriteMetadata cluster(HoodieEngineContext context, String clusteringInstantTime); + /** * Perform metadata/full bootstrap of a Hudi table. * @param context HoodieEngineContext diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java new file mode 100644 index 000000000..c8329b87f --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.BaseActionExecutor; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +public abstract class BaseClusteringPlanActionExecutor extends BaseActionExecutor> { + + private final Option> extraMetadata; + + public BaseClusteringPlanActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + Option> extraMetadata) { + super(context, config, table, instantTime); + this.extraMetadata = extraMetadata; + } + + protected abstract Option createClusteringPlan(); + + @Override + public Option execute() { + Option planOption = createClusteringPlan(); + if (planOption.isPresent()) { + HoodieInstant clusteringInstant = + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime); + try { + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setOperationType(WriteOperationType.CLUSTER.name()) + .setExtraMetadata(extraMetadata.orElse(Collections.emptyMap())) + .setClusteringPlan(planOption.get()) + .build(); + table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, + TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)); + } catch (IOException ioe) { + throw new HoodieIOException("Exception scheduling clustering", ioe); + } + } + return planOption; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java new file mode 100644 index 000000000..27a5b23e2 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.avro.Schema; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Serializable; +import java.util.Map; + +/** + * Pluggable implementation for writing data into new file groups based on ClusteringPlan. + */ +public abstract class ClusteringExecutionStrategy implements Serializable { + private static final Logger LOG = LogManager.getLogger(ClusteringExecutionStrategy.class); + + private final HoodieTable hoodieTable; + private final HoodieEngineContext engineContext; + private final HoodieWriteConfig writeConfig; + + public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + this.writeConfig = writeConfig; + this.hoodieTable = table; + this.engineContext = engineContext; + } + + /** + * Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. The number of new + * file groups created is bounded by numOutputGroups. + * Note that commit is not done as part of strategy. commit is callers responsibility. + */ + public abstract O performClustering(final I inputRecords, final int numOutputGroups, final String instantTime, + final Map strategyParams, final Schema schema); + + protected HoodieTable getHoodieTable() { + return this.hoodieTable; + } + + protected HoodieEngineContext getEngineContext() { + return this.engineContext; + } + + protected HoodieWriteConfig getWriteConfig() { + return this.writeConfig; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java new file mode 100644 index 000000000..31f566cb3 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieSliceInfo; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.client.utils.FileSliceMetricUtils; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Pluggable implementation for scheduling clustering and creating ClusteringPlan. + */ +public abstract class ClusteringPlanStrategy implements Serializable { + private static final Logger LOG = LogManager.getLogger(ClusteringPlanStrategy.class); + + public static final int CLUSTERING_PLAN_VERSION_1 = 1; + + private final HoodieTable hoodieTable; + private final transient HoodieEngineContext engineContext; + private final HoodieWriteConfig writeConfig; + + public ClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + this.writeConfig = writeConfig; + this.hoodieTable = table; + this.engineContext = engineContext; + } + + /** + * Generate metadata for grouping eligible files and create a plan. Note that data is not moved around + * as part of this step. + * + * If there is no data available to cluster, return None. + */ + public abstract Option generateClusteringPlan(); + + /** + * Return file slices eligible for clustering. FileIds in pending clustering/compaction are not eligible for clustering. + */ + protected Stream getFileSlicesEligibleForClustering(String partition) { + SyncableFileSystemView fileSystemView = (SyncableFileSystemView) getHoodieTable().getSliceView(); + Set fgIdsInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet()); + fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet())); + + return hoodieTable.getSliceView().getLatestFileSlices(partition) + // file ids already in clustering are not eligible + .filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())); + } + + /** + * Get parameters specific to strategy. These parameters are passed from 'schedule clustering' step to + * 'execute clustering' step. 'execute clustering' step is typically async. So these params help with passing any required + * context from schedule to run step. + */ + protected abstract Map getStrategyParams(); + + /** + * Returns any specific parameters to be stored as part of clustering metadata. + */ + protected Map getExtraMetadata() { + return Collections.emptyMap(); + } + + /** + * Version to support future changes for plan. + */ + protected int getPlanVersion() { + return CLUSTERING_PLAN_VERSION_1; + } + + /** + * Transform {@link FileSlice} to {@link HoodieSliceInfo}. + */ + protected static List getFileSliceInfo(List slices) { + return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder() + .setPartitionPath(slice.getPartitionPath()) + .setFileId(slice.getFileId()) + .setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(StringUtils.EMPTY_STRING)) + .setDeltaFilePaths(slice.getLogFiles().map(f -> f.getPath().toString()).collect(Collectors.toList())) + .setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(bbf -> bbf.getPath()).orElse(StringUtils.EMPTY_STRING)).orElse(StringUtils.EMPTY_STRING)) + .build()).collect(Collectors.toList()); + } + + /** + * Generate metrics for the data to be clustered. + */ + protected Map buildMetrics(List fileSlices) { + Map metrics = new HashMap<>(); + FileSliceMetricUtils.addFileSliceCommonMetrics(fileSlices, metrics, getWriteConfig().getParquetMaxFileSize()); + return metrics; + } + + protected HoodieTable getHoodieTable() { + return this.hoodieTable; + } + + protected HoodieEngineContext getEngineContext() { + return this.engineContext; + } + + protected HoodieWriteConfig getWriteConfig() { + return this.writeConfig; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java new file mode 100644 index 000000000..404cc0258 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieClusteringStrategy; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Scheduling strategy with restriction that clustering groups can only contain files from same partition. + */ +public abstract class PartitionAwareClusteringPlanStrategy extends ClusteringPlanStrategy { + private static final Logger LOG = LogManager.getLogger(PartitionAwareClusteringPlanStrategy.class); + + public PartitionAwareClusteringPlanStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + /** + * Create Clustering group based on files eligible for clustering in the partition. + */ + protected abstract Stream buildClusteringGroupsForPartition(String partitionPath, + List fileSlices); + + /** + * Return list of partition paths to be considered for clustering. + */ + protected List filterPartitionPaths(List partitionPaths) { + return partitionPaths; + } + + @Override + public Option generateClusteringPlan() { + try { + HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient(); + LOG.info("Scheduling clustering for " + metaClient.getBasePath()); + List partitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), + getWriteConfig().shouldAssumeDatePartitioning()); + + // filter the partition paths if needed to reduce list status + partitionPaths = filterPartitionPaths(partitionPaths); + + if (partitionPaths.isEmpty()) { + // In case no partitions could be picked, return no clustering plan + return Option.empty(); + } + + List clusteringGroups = getEngineContext().flatMap(partitionPaths, + partitionPath -> { + List fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList()); + return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups()); + }, + partitionPaths.size()) + .stream().limit(getWriteConfig().getClusteringMaxNumGroups()).collect(Collectors.toList()); + + if (clusteringGroups.isEmpty()) { + LOG.info("No data available to cluster"); + return Option.empty(); + } + + HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder() + .setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass()) + .setStrategyParams(getStrategyParams()) + .build(); + + return Option.of(HoodieClusteringPlan.newBuilder() + .setStrategy(strategy) + .setInputGroups(clusteringGroups) + .setExtraMetadata(getExtraMetadata()) + .setVersion(getPlanVersion()) + .build()); + } catch (IOException e) { + throw new HoodieIOException("Unable to create clustering plan", e); + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java index 3ead7a07d..d2aa8627b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractBulkInsertHelper.java @@ -27,8 +27,21 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; public abstract class AbstractBulkInsertHelper { + /** + * Mark instant as inflight, write input records, update index and return result. + */ public abstract HoodieWriteMetadata bulkInsert(I inputRecords, String instantTime, HoodieTable table, HoodieWriteConfig config, BaseCommitActionExecutor executor, boolean performDedupe, Option> userDefinedBulkInsertPartitioner); + + /** + * Only write input records. Does not change timeline/index. Return information about new files created. + */ + public abstract O bulkInsert(I inputRecords, String instantTime, + HoodieTable table, HoodieWriteConfig config, + boolean performDedupe, + Option> userDefinedBulkInsertPartitioner, + boolean addMetadataFields, + int parallelism); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java index 6c631c462..ff2dfbd4c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java @@ -20,14 +20,13 @@ package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.util.CompactionUtils; -import org.apache.hudi.common.util.Option; +import org.apache.hudi.client.utils.FileSliceMetricUtils; import org.apache.hudi.config.HoodieWriteConfig; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,32 +49,14 @@ public abstract class CompactionStrategy implements Serializable { * Callback hook when a HoodieCompactionOperation is created. Individual strategies can capture the metrics they need * to decide on the priority. * - * @param dataFile - Base file to compact - * @param partitionPath - Partition path - * @param logFiles - List of log files to compact with the base file + * @param writeConfig write configuration. + * @param slice fileSlice to capture metrics for. * @return Map[String, Object] - metrics captured */ - public Map captureMetrics(HoodieWriteConfig writeConfig, Option dataFile, - String partitionPath, List logFiles) { + public Map captureMetrics(HoodieWriteConfig writeConfig, FileSlice slice) { Map metrics = new HashMap<>(); long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize(); - // Total size of all the log files - Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0) - .reduce(Long::sum).orElse(0L); - // Total read will be the base file + all the log files - Long totalIORead = - FSUtils.getSizeInMB((dataFile.isPresent() ? dataFile.get().getFileSize() : 0L) + totalLogFileSize); - // Total write will be similar to the size of the base file - Long totalIOWrite = - FSUtils.getSizeInMB(dataFile.isPresent() ? dataFile.get().getFileSize() : defaultMaxParquetFileSize); - // Total IO will the the IO for read + write - long totalIO = totalIORead + totalIOWrite; - // Save these metrics and we will use during the filter - metrics.put(TOTAL_IO_READ_MB, totalIORead.doubleValue()); - metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite.doubleValue()); - metrics.put(TOTAL_IO_MB, (double) totalIO); - metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue()); - metrics.put(TOTAL_LOG_FILES, (double) logFiles.size()); + FileSliceMetricUtils.addFileSliceCommonMetrics(Collections.singletonList(slice), metrics, defaultMaxParquetFileSize); return metrics; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java index c9a811a1c..fe4b47459 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java @@ -20,14 +20,10 @@ package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; /** @@ -40,21 +36,6 @@ import java.util.stream.Collectors; public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrategy implements Comparator { - private static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILE_SIZE"; - - @Override - public Map captureMetrics(HoodieWriteConfig config, Option dataFile, - String partitionPath, List logFiles) { - Map metrics = super.captureMetrics(config, dataFile, partitionPath, logFiles); - - // Total size of all the log files - Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0) - .reduce(Long::sum).orElse(0L); - // save the metrics needed during the order - metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue()); - return metrics; - } - @Override public List orderAndFilter(HoodieWriteConfig writeConfig, List operations, List pendingCompactionPlans) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestConcatenatingIterator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestConcatenatingIterator.java new file mode 100644 index 000000000..af4c4fbfc --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestConcatenatingIterator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utils; + +import org.apache.hudi.client.utils.ConcatenatingIterator; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestConcatenatingIterator { + + // Simple test for iterator concatenation + @Test + public void testConcatBasic() { + Iterator i1 = Arrays.asList(5, 3, 2, 1).iterator(); + Iterator i2 = new ArrayList().iterator(); // empty iterator + Iterator i3 = Arrays.asList(3).iterator(); + + ConcatenatingIterator ci = new ConcatenatingIterator<>(Arrays.asList(i1, i2, i3)); + List allElements = new ArrayList<>(); + while (ci.hasNext()) { + allElements.add(ci.next()); + } + + assertEquals(5, allElements.size()); + assertEquals(Arrays.asList(5, 3, 2, 1, 3), allElements); + } + + @Test + public void testConcatError() { + Iterator i1 = new ArrayList().iterator(); // empty iterator + + ConcatenatingIterator ci = new ConcatenatingIterator<>(Arrays.asList(i1)); + assertFalse(ci.hasNext()); + try { + ci.next(); + fail("expected error for empty iterator"); + } catch (IllegalArgumentException e) { + // + } + } +} \ No newline at end of file diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index f975406e4..3e8952e0c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -188,6 +188,11 @@ public class HoodieFlinkWriteClient extends throw new HoodieNotSupportedException("Compaction is not supported yet"); } + @Override + public HoodieWriteMetadata> cluster(final String clusteringInstant, final boolean shouldComplete) { + throw new HoodieNotSupportedException("Clustering is not supported yet"); + } + @Override protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index b8ae370f1..3c4d7fb95 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -121,6 +122,16 @@ public class HoodieFlinkCopyOnWriteTable extends throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } + @Override + public Option scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option> extraMetadata) { + throw new HoodieNotSupportedException("Clustering is not supported on a Flink CopyOnWrite table"); + } + + @Override + public HoodieWriteMetadata> cluster(final HoodieEngineContext context, final String clusteringInstantTime) { + throw new HoodieNotSupportedException("Clustering is not supported on a Flink CopyOnWrite table"); + } + @Override public HoodieBootstrapWriteMetadata> bootstrap(HoodieEngineContext context, Option> extraMetadata) { throw new HoodieNotSupportedException("Bootstrap is not supported yet"); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 67a607159..71a85deff 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -212,6 +212,11 @@ public class HoodieJavaWriteClient extends throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaClient"); } + @Override + public HoodieWriteMetadata> cluster(final String clusteringInstant, final boolean shouldComplete) { + throw new HoodieNotSupportedException("Cluster is not supported in HoodieJavaClient"); + } + @Override protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 7c45b75fb..7f658898b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -135,6 +136,16 @@ public class HoodieJavaCopyOnWriteTable extends H throw new HoodieNotSupportedException("Compact is not supported yet"); } + @Override + public Option scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option> extraMetadata) { + throw new HoodieNotSupportedException("Clustering is not supported yet"); + } + + @Override + public HoodieWriteMetadata> cluster(final HoodieEngineContext context, final String clusteringInstantTime) { + throw new HoodieNotSupportedException("Clustering is not supported yet"); + } + @Override public HoodieBootstrapWriteMetadata> bootstrap(HoodieEngineContext context, Option> extraMetadata) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 5010a5f76..18f5309ee 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; @@ -25,6 +27,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -34,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndex; @@ -43,15 +47,13 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.compact.SparkCompactHelpers; import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.List; import java.util.Map; @@ -314,6 +316,57 @@ public class SparkRDDWriteClient extends return statuses; } + @Override + public HoodieWriteMetadata> cluster(String clusteringInstant, boolean shouldComplete) { + HoodieSparkTable table = HoodieSparkTable.create(config, context); + HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); + HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); + if (pendingClusteringTimeline.containsInstant(inflightInstant)) { + rollbackInflightClustering(inflightInstant, table); + table.getMetaClient().reloadActiveTimeline(); + } + clusteringTimer = metrics.getClusteringCtx(); + LOG.info("Starting clustering at " + clusteringInstant); + HoodieWriteMetadata> clusteringMetadata = table.cluster(context, clusteringInstant); + JavaRDD statuses = clusteringMetadata.getWriteStatuses(); + if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { + completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), statuses, table, clusteringInstant); + } + return clusteringMetadata; + } + + protected void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD writeStatuses, + HoodieTable>, JavaRDD, JavaRDD> table, + String clusteringCommitTime) { + + List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); + if (!writeStatuses.filter(WriteStatus::hasErrors).isEmpty()) { + throw new HoodieClusteringException("Clustering failed to write to files:" + + writeStatuses.filter(WriteStatus::hasErrors).map(WriteStatus::getFileId).collect()); + } + finalizeWrite(table, clusteringCommitTime, writeStats); + try { + LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); + table.getActiveTimeline().transitionReplaceInflightToComplete( + HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException e) { + throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); + } + + if (clusteringTimer != null) { + long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); + try { + metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(clusteringCommitTime).getTime(), + durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + + config.getBasePath() + " at time " + clusteringCommitTime, e); + } + } + LOG.info("Clustering successfully on commit " + clusteringCommitTime); + } + @Override protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java new file mode 100644 index 000000000..f1382ac3c --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.plan.strategy; + +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkMergeOnReadTable; +import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY; + +/** + * Clustering Strategy based on following. + * 1) Only looks at latest 'daybased.lookback.partitions' partitions. + * 2) Excludes files that are greater than 'small.file.limit' from clustering plan. + */ +public class SparkRecentDaysClusteringPlanStrategy> + extends PartitionAwareClusteringPlanStrategy>, JavaRDD, JavaRDD> { + private static final Logger LOG = LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class); + + public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + protected Stream buildClusteringGroupsForPartition(String partitionPath, List fileSlices) { + List, Integer>> fileSliceGroups = new ArrayList<>(); + List currentGroup = new ArrayList<>(); + int totalSizeSoFar = 0; + for (FileSlice currentSlice : fileSlices) { + // assume each filegroup size is ~= parquet.max.file.size + totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize(); + // check if max size is reached and create new group, if needed. + if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { + fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()))); + currentGroup = new ArrayList<>(); + totalSizeSoFar = 0; + } + currentGroup.add(currentSlice); + } + if (!currentGroup.isEmpty()) { + fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes()))); + } + + return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputFileGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); + } + + @Override + protected Map getStrategyParams() { + Map params = new HashMap<>(); + if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) { + params.put(CLUSTERING_SORT_COLUMNS_PROPERTY, getWriteConfig().getClusteringSortColumns()); + } + return params; + } + + @Override + protected List filterPartitionPaths(List partitionPaths) { + int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering(); + return partitionPaths.stream() + .sorted(Comparator.reverseOrder()) + .limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size()) + .collect(Collectors.toList()); + } + + @Override + protected Stream getFileSlicesEligibleForClustering(final String partition) { + return super.getFileSlicesEligibleForClustering(partition) + // Only files that have basefile size smaller than small file size are eligible. + .filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit()); + } + + private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) { + return (int) Math.ceil(groupSize / (double) targetFileSize); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java new file mode 100644 index 000000000..cfbc2ec22 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.clustering.run.strategy; + +import org.apache.avro.Schema; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; +import org.apache.hudi.table.HoodieSparkMergeOnReadTable; +import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; +import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.Map; +import java.util.Properties; + +import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY; + +/** + * Clustering Strategy based on following. + * 1) Spark execution engine. + * 2) Uses bulk_insert to write data into new files. + */ +public class SparkSortAndSizeExecutionStrategy> + extends ClusteringExecutionStrategy>, JavaRDD, JavaRDD> { + private static final Logger LOG = LogManager.getLogger(SparkSortAndSizeExecutionStrategy.class); + + public SparkSortAndSizeExecutionStrategy(HoodieSparkCopyOnWriteTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + public SparkSortAndSizeExecutionStrategy(HoodieSparkMergeOnReadTable table, + HoodieSparkEngineContext engineContext, + HoodieWriteConfig writeConfig) { + super(table, engineContext, writeConfig); + } + + @Override + public JavaRDD performClustering(final JavaRDD> inputRecords, final int numOutputGroups, + final String instantTime, final Map strategyParams, final Schema schema) { + Properties props = getWriteConfig().getProps(); + props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM, String.valueOf(numOutputGroups)); + // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. + props.put(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, Boolean.FALSE.toString()); + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); + return (JavaRDD) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, + false, getPartitioner(strategyParams, schema), true, numOutputGroups); + } + + /** + * Create BulkInsertPartitioner based on strategy params. + */ + protected Option> getPartitioner(Map strategyParams, Schema schema) { + if (strategyParams.containsKey(CLUSTERING_SORT_COLUMNS_PROPERTY)) { + return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(CLUSTERING_SORT_COLUMNS_PROPERTY).split(","), + HoodieAvroUtils.addMetadataFields(schema))); + } else { + return Option.empty(); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java index ec90ef88e..27b5482b2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java @@ -19,6 +19,7 @@ package org.apache.hudi.execution; import org.apache.avro.Schema; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; @@ -34,14 +35,18 @@ import java.util.List; public class SparkLazyInsertIterable extends HoodieLazyInsertIterable { + private boolean useWriterSchema; + public SparkLazyInsertIterable(Iterator> recordItr, boolean areRecordsSorted, HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, String idPrefix, - TaskContextSupplier taskContextSupplier) { + TaskContextSupplier taskContextSupplier, + boolean useWriterSchema) { super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier); + this.useWriterSchema = useWriterSchema; } public SparkLazyInsertIterable(Iterator> recordItr, @@ -53,6 +58,7 @@ public class SparkLazyInsertIterable extends Hood TaskContextSupplier taskContextSupplier, WriteHandleFactory writeHandleFactory) { super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory); + this.useWriterSchema = false; } @Override @@ -61,7 +67,10 @@ public class SparkLazyInsertIterable extends Hood BoundedInMemoryExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = null; try { - final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); + Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); + if (useWriterSchema) { + schema = HoodieAvroUtils.addMetadataFields(schema); + } bufferedIteratorExecutor = new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema)); final List result = bufferedIteratorExecutor.execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java index db73a9c3e..96da3969e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java @@ -41,20 +41,22 @@ public class BulkInsertMapFunction private HoodieWriteConfig config; private HoodieTable hoodieTable; private List fileIDPrefixes; + private boolean useWriterSchema; public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted, HoodieWriteConfig config, HoodieTable hoodieTable, - List fileIDPrefixes) { + List fileIDPrefixes, boolean useWriterSchema) { this.instantTime = instantTime; this.areRecordsSorted = areRecordsSorted; this.config = config; this.hoodieTable = hoodieTable; this.fileIDPrefixes = fileIDPrefixes; + this.useWriterSchema = useWriterSchema; } @Override public Iterator> call(Integer partition, Iterator> recordItr) { return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, - fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier()); + fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java new file mode 100644 index 000000000..65c20004c --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.SerializableSchema; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.api.java.JavaRDD; + +import java.io.IOException; + +/** + * A partitioner that does sorting based on specified column values for each RDD partition. + * + * @param HoodieRecordPayload type + */ +public class RDDCustomColumnsSortPartitioner + implements BulkInsertPartitioner>> { + + private final String[] sortColumnNames; + private final SerializableSchema serializableSchema; + + public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) { + this.sortColumnNames = columnNames; + this.serializableSchema = new SerializableSchema(schema); + } + + @Override + public JavaRDD> repartitionRecords(JavaRDD> records, + int outputSparkPartitions) { + final String[] sortColumns = this.sortColumnNames; + final SerializableSchema schema = this.serializableSchema; + return records.sortBy(record -> getRecordSortColumnValues(record, sortColumns, schema), + true, outputSparkPartitions); + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } + + private static String getRecordSortColumnValues(HoodieRecord record, + String[] sortColumns, + SerializableSchema schema) { + try { + GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get(); + StringBuilder sb = new StringBuilder(); + for (String col : sortColumns) { + sb.append(genericRecord.get(col)); + } + + return sb.toString(); + } catch (IOException e) { + throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 99a8f1f3c..71085a232 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -19,6 +19,7 @@ package org.apache.hudi.table; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -44,12 +45,14 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor; import org.apache.hudi.table.action.clean.SparkCleanActionExecutor; -import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor; -import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor; +import org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor; +import org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor; +import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor; +import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkMergeHelper; import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor; @@ -57,7 +60,6 @@ import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecuto import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor; import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -145,6 +147,19 @@ public class HoodieSparkCopyOnWriteTable extends throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); } + @Override + public Option scheduleClustering(HoodieEngineContext context, + String instantTime, + Option> extraMetadata) { + return new SparkClusteringPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute(); + } + + @Override + public HoodieWriteMetadata> cluster(HoodieEngineContext context, + String clusteringInstantTime) { + return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute(); + } + @Override public HoodieBootstrapWriteMetadata> bootstrap(HoodieEngineContext context, Option> extraMetadata) { return new SparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java new file mode 100644 index 000000000..78510ba1c --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkClusteringPlanActionExecutor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster; + +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.Map; + +@SuppressWarnings("checkstyle:LineLength") +public class SparkClusteringPlanActionExecutor extends + BaseClusteringPlanActionExecutor>, JavaRDD, JavaRDD> { + + private static final Logger LOG = LogManager.getLogger(SparkClusteringPlanActionExecutor.class); + + public SparkClusteringPlanActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable>, JavaRDD, JavaRDD> table, + String instantTime, + Option> extraMetadata) { + super(context, config, table, instantTime, extraMetadata); + } + + @Override + protected Option createClusteringPlan() { + LOG.info("Checking if clustering needs to be run on " + config.getBasePath()); + Option lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant(); + + int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() + .findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE) + .countInstants(); + if (config.getInlineClusterMaxCommits() > commitsSinceLastClustering) { + LOG.info("Not scheduling clustering as only " + commitsSinceLastClustering + + " commits was found since last clustering " + lastClusteringInstant + ". Waiting for " + + config.getInlineClusterMaxCommits()); + return Option.empty(); + } + + LOG.info("Generating clustering plan for table " + config.getBasePath()); + ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) + ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config); + return strategy.generateClusteringPlan(); + } + +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java new file mode 100644 index 000000000..a8044edcf --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.cluster; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieClusteringGroup; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.utils.ConcatenatingIterator; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.log.HoodieFileSliceReader; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.io.IOUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; +import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class SparkExecuteClusteringCommitActionExecutor> + extends BaseSparkCommitActionExecutor { + + private static final Logger LOG = LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class); + private final HoodieClusteringPlan clusteringPlan; + + public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime) { + super(context, config, table, instantTime, WriteOperationType.CLUSTER); + this.clusteringPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime)) + .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException("Unable to read clustering plan for instant: " + instantTime)); + } + + @Override + public HoodieWriteMetadata> execute() { + HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime); + // Mark instant as clustering inflight + table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty()); + table.getMetaClient().reloadActiveTimeline(); + + JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(context); + // execute clustering for each group async and collect WriteStatus + JavaRDD writeStatusRDD = clusteringPlan.getInputGroups().stream() + .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams())) + .map(CompletableFuture::join) + .reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD()); + if (writeStatusRDD.isEmpty()) { + throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + " #groups: " + clusteringPlan.getInputGroups().size()); + } + + HoodieWriteMetadata> writeMetadata = buildWriteMetadata(writeStatusRDD); + updateIndexAndCommitIfNeeded(writeStatusRDD, writeMetadata); + if (!writeMetadata.getCommitMetadata().isPresent()) { + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(), + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); + writeMetadata.setCommitMetadata(Option.of(commitMetadata)); + } + return writeMetadata; + } + + /** + * Submit job to execute clustering for the group. + */ + private CompletableFuture> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map strategyParams) { + CompletableFuture> writeStatusesFuture = CompletableFuture.supplyAsync(() -> { + JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); + JavaRDD> inputRecords = readRecordsForGroup(jsc, clusteringGroup); + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + return ((ClusteringExecutionStrategy>, JavaRDD, JavaRDD>) + ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), table, context, config)) + .performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema); + }); + + return writeStatusesFuture; + } + + @Override + protected String getCommitActionType() { + return HoodieTimeline.REPLACE_COMMIT_ACTION; + } + + @Override + protected Map> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { + return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).collect( + Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList()))); + } + + /** + * Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any). + */ + private JavaRDD> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup) { + List clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList()); + boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent(); + if (hasLogFiles) { + // if there are log files, we read all records into memory for a file group and apply updates. + return readRecordsForGroupWithLogs(jsc, clusteringOps); + } else { + // We want to optimize reading records for case there are no log files. + return readRecordsForGroupBaseFiles(jsc, clusteringOps); + } + } + + /** + * Read records from baseFiles, apply updates and convert to RDD. + */ + private JavaRDD> readRecordsForGroupWithLogs(JavaSparkContext jsc, + List clusteringOps) { + return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { + List>> recordIterators = new ArrayList<>(); + clusteringOpsPartition.forEachRemaining(clusteringOp -> { + long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config.getProps()); + LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction); + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(table.getMetaClient().getFs(), + table.getMetaClient().getBasePath(), clusteringOp.getDeltaFilePaths(), readerSchema, instantTime, + maxMemoryPerCompaction, config.getCompactionLazyBlockReadEnabled(), + config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(), + config.getSpillableMapBasePath()); + + recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, + table.getMetaClient().getTableConfig().getPayloadClass())); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }); + + return new ConcatenatingIterator<>(recordIterators); + }); + } + + /** + * Read records from baseFiles and convert to RDD. + */ + private JavaRDD> readRecordsForGroupBaseFiles(JavaSparkContext jsc, + List clusteringOps) { + return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> { + List> iteratorsForPartition = new ArrayList<>(); + clusteringOpsPartition.forEachRemaining(clusteringOp -> { + try { + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); + HoodieFileReader baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())); + iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema)); + } catch (IOException e) { + throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath() + + " and " + clusteringOp.getDeltaFilePaths(), e); + } + }); + + return new ConcatenatingIterator<>(iteratorsForPartition); + }).map(this::transform); + } + + /** + * Transform IndexedRecord into HoodieRecord. + */ + private HoodieRecord transform(IndexedRecord indexedRecord) { + GenericRecord record = (GenericRecord) indexedRecord; + String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + HoodieKey hoodieKey = new HoodieKey(key, partition); + + HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(table.getMetaClient().getTableConfig().getPayloadClass(), + new Object[] {Option.of(record)}, Option.class); + HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload); + return hoodieRecord; + } + + private HoodieWriteMetadata> buildWriteMetadata(JavaRDD writeStatusJavaRDD) { + HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); + result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD)); + result.setWriteStatuses(writeStatusJavaRDD); + result.setWriteStats(writeStatusJavaRDD.map(WriteStatus::getStat).collect()); + result.setCommitMetadata(Option.empty()); + result.setCommitted(false); + return result; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java index fb8b5f9cd..2b00d47b0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java @@ -18,8 +18,6 @@ package org.apache.hudi.table.action.commit; -import java.util.Map; - import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.model.HoodieRecord; @@ -28,12 +26,13 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieInsertException; -import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.BulkInsertPartitioner; - +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.spark.api.java.JavaRDD; +import java.util.Map; + public class SparkBulkInsertCommitActionExecutor> extends BaseSparkCommitActionExecutor { private final JavaRDD> inputRecordsRDD; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 9ccd66b2c..66fd68e15 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -31,7 +31,6 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; - import org.apache.spark.api.java.JavaRDD; import java.util.List; @@ -59,25 +58,45 @@ public class SparkBulkInsertHelper extends Abs } @Override - public HoodieWriteMetadata> bulkInsert(JavaRDD> inputRecords, - String instantTime, - HoodieTable>, JavaRDD, JavaRDD> table, - HoodieWriteConfig config, - BaseCommitActionExecutor>, JavaRDD, JavaRDD, R> executor, - boolean performDedupe, - Option> userDefinedBulkInsertPartitioner) { + public HoodieWriteMetadata> bulkInsert(final JavaRDD> inputRecords, + final String instantTime, + final HoodieTable>, JavaRDD, JavaRDD> table, + final HoodieWriteConfig config, + final BaseCommitActionExecutor>, JavaRDD, JavaRDD, R> executor, + final boolean performDedupe, + final Option> userDefinedBulkInsertPartitioner) { HoodieWriteMetadata result = new HoodieWriteMetadata(); + //transition bulk_insert state to inflight + table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, + table.getMetaClient().getCommitActionType(), instantTime), Option.empty(), + config.shouldAllowMultiWriteOnSameInstant()); + // write new files + JavaRDD writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism()); + //update index + ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); + return result; + } + + @Override + public JavaRDD bulkInsert(JavaRDD> inputRecords, + String instantTime, + HoodieTable>, JavaRDD, JavaRDD> table, + HoodieWriteConfig config, + boolean performDedupe, + Option> userDefinedBulkInsertPartitioner, + boolean useWriterSchema, + int parallelism) { + // De-dupe/merge if needed JavaRDD> dedupedRecords = inputRecords; if (performDedupe) { dedupedRecords = (JavaRDD>) SparkWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, - config.getBulkInsertShuffleParallelism(), table); + parallelism, table); } final JavaRDD> repartitionedRecords; - final int parallelism = config.getBulkInsertShuffleParallelism(); BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() ? userDefinedBulkInsertPartitioner.get() : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); @@ -87,16 +106,11 @@ public class SparkBulkInsertHelper extends Abs final List fileIDPrefixes = IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList()); - table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, - table.getMetaClient().getCommitActionType(), instantTime), Option.empty(), - config.shouldAllowMultiWriteOnSameInstant()); - JavaRDD writeStatusRDD = repartitionedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction(instantTime, - partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes), true) + partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema), true) .flatMap(List::iterator); - ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatusRDD, result); - return result; + return writeStatusRDD; } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java index 65cefc9b9..96d52a160 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java @@ -18,6 +18,9 @@ package org.apache.hudi.table.action.compact; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -49,10 +52,6 @@ import org.apache.hudi.io.IOUtils; import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; - -import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -179,7 +178,8 @@ public class HoodieSparkMergeOnReadTableCompactor @Override public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context, HoodieTable>, JavaRDD, JavaRDD> hoodieTable, - HoodieWriteConfig config, String compactionCommitTime, Set fgIdsInPendingCompactions) + HoodieWriteConfig config, String compactionCommitTime, + Set fgIdsInPendingCompactionAndClustering) throws IOException { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); totalLogFiles = new LongAccumulator(); @@ -213,7 +213,7 @@ public class HoodieSparkMergeOnReadTableCompactor List operations = context.flatMap(partitionPaths, partitionPath -> { return fileSystemView .getLatestFileSlices(partitionPath) - .filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId())) + .filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId())) .map(s -> { List logFiles = s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); @@ -224,7 +224,7 @@ public class HoodieSparkMergeOnReadTableCompactor // into meta files. Option dataFile = s.getBaseFile(); return new CompactionOperation(dataFile, partitionPath, logFiles, - config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles)); + config.getCompactionStrategy().captureMetrics(config, s)); }) .filter(c -> !c.getDeltaFileNames().isEmpty()); }, partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList()); @@ -239,9 +239,9 @@ public class HoodieSparkMergeOnReadTableCompactor CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList())); ValidationUtils.checkArgument( compactionPlan.getOperations().stream().noneMatch( - op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), + op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " - + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering + ", Selected workload :" + compactionPlan); if (compactionPlan.getOperations().isEmpty()) { LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java index c5f6c1692..96d76d356 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java @@ -21,22 +21,24 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.table.HoodieTable; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; @SuppressWarnings("checkstyle:LineLength") @@ -75,10 +77,13 @@ public class SparkScheduleCompactionActionExecutor instantTimeOpPair.getValue().getFileGroupId()) - .collect(Collectors.toSet())); + SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView(); + Set fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations() + .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId()) + .collect(Collectors.toSet()); + // exclude files in pending clustering from compaction. + fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet())); + return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering); } catch (IOException e) { throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index d278b08f3..d9a396d0f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; @@ -42,6 +43,7 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; @@ -56,6 +58,7 @@ import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.SparkWriteHelper; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; @@ -82,6 +85,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0; import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths; @@ -938,6 +942,65 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar); } + @Test + public void testSimpleClustering() throws Exception { + // setup clustering config + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + testClustering(clusteringConfig); + } + + @Test + public void testClusteringWithSortColumns() throws Exception { + // setup clustering config + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringSortColumns("_hoodie_record_key") + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + testClustering(clusteringConfig); + } + + private void testClustering(HoodieClusteringConfig clusteringConfig) throws Exception { + // create config to not update small files. + HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10); + SparkRDDWriteClient client = getHoodieWriteClient(config, false); + dataGen = new HoodieTestDataGenerator(); + String commitTime = "100"; + List records1 = dataGen.generateInserts(commitTime, 200); + List statuses1 = writeAndVerifyBatch(client, records1, commitTime); + Set fileIds1 = getFileGroupIdsFromWriteStatus(statuses1); + + commitTime = "200"; + List records2 = dataGen.generateInserts(commitTime, 200); + List statuses2 = writeAndVerifyBatch(client, records2, commitTime); + Set fileIds2 = getFileGroupIdsFromWriteStatus(statuses2); + //verify new files are created for 2nd write + Set fileIdIntersection = new HashSet<>(fileIds1); + fileIdIntersection.retainAll(fileIds2); + assertEquals(0, fileIdIntersection.size()); + + config = getConfigBuilder().withClusteringConfig(clusteringConfig).build(); + + // create client with new config. + client = getHoodieWriteClient(config, false); + String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); + HoodieWriteMetadata> clusterMetadata = client.cluster(clusteringCommitTime, true); + List allRecords = Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()); + verifyRecordsWritten(clusteringCommitTime, allRecords, clusterMetadata.getWriteStatuses().collect()); + Set insertedFileIds = new HashSet<>(); + insertedFileIds.addAll(fileIds1); + insertedFileIds.addAll(fileIds2); + + Set replacedFileIds = new HashSet<>(); + clusterMetadata.getPartitionToReplaceFileIds().entrySet().forEach(partitionFiles -> + partitionFiles.getValue().stream().forEach(file -> + replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file)))); + assertEquals(insertedFileIds, replacedFileIds); + } + + private Set getFileGroupIdsFromWriteStatus(List statuses) { + return statuses.stream().map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet()); + } + /** * Test scenario of writing more file groups than existing number of file groups in partition. */ @@ -975,14 +1038,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); // Do Inserts - String commitTime1 = "001"; - client.startCommitWithTime(commitTime1); - List inserts1 = dataGen.generateInserts(commitTime1, batch1RecordsCount); - JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 2); - List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect(); - assertNoWriteErrors(statuses); - Set batch1Buckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet()); - verifyRecordsWritten(commitTime1, inserts1, statuses); + String commit1 = "001"; + List statuses = writeAndVerifyBatch(client, dataGen.generateInserts(commit1, batch1RecordsCount), commit1); + Set batch1Buckets = getFileIdsFromWriteStatus(statuses); // Do Insert Overwrite String commitTime2 = "002"; @@ -999,6 +1057,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { verifyRecordsWritten(commitTime2, inserts2, statuses); } + private Set getFileIdsFromWriteStatus(List statuses) { + return statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet()); + } + /** * Verify data in parquet files matches expected records and commit time. */ @@ -1019,6 +1081,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } } + private List writeAndVerifyBatch(SparkRDDWriteClient client, List inserts, String commitTime) { + client.startCommitWithTime(commitTime); + JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts, 2); + List statuses = client.upsert(insertRecordsRDD1, commitTime).collect(); + assertNoWriteErrors(statuses); + verifyRecordsWritten(commitTime, inserts, statuses); + return statuses; + } + private Pair, List> testUpdates(String instantTime, SparkRDDWriteClient client, int sizeToInsertAndUpdate, int expectedTotalRecords) throws IOException { @@ -1386,11 +1457,23 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { * Build Hoodie Write Config for small data file sizes. */ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) { - HoodieWriteConfig.Builder builder = getConfigBuilder(useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA); + return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150)); + } + + /** + * Build Hoodie Write Config for specified small file sizes. + */ + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize) { + String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA; + return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize); + } + + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) { + HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr); return builder .withCompactionConfig( HoodieCompactionConfig.newBuilder() - .compactionSmallFileSize(dataGen.getEstimatedFileSizeInBytes(150)) + .compactionSmallFileSize(smallFileSize) .insertSplitSize(insertSplitSize).build()) .withStorageConfig( HoodieStorageConfig.newBuilder() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index 834229b68..81effaa2a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -18,17 +18,23 @@ package org.apache.hudi.execution.bulkinsert; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.testutils.HoodieClientTestBase; - import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -40,6 +46,8 @@ import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { + private static final Comparator> KEY_COMPARATOR = + Comparator.comparing(o -> (o.getPartitionPath() + "+" + o.getRecordKey())); public static JavaRDD generateTestRecordsForBulkInsert(JavaSparkContext jsc) { HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); @@ -69,9 +77,10 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { return Stream.of(data).map(Arguments::of); } - private void verifyRecordAscendingOrder(List records) { - List expectedRecords = new ArrayList<>(records); - Collections.sort(expectedRecords, Comparator.comparing(o -> (o.getPartitionPath() + "+" + o.getRecordKey()))); + private void verifyRecordAscendingOrder(List> records, + Option>> comparator) { + List> expectedRecords = new ArrayList<>(records); + Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR)); assertEquals(expectedRecords, records); } @@ -79,19 +88,28 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { JavaRDD records, boolean isGloballySorted, boolean isLocallySorted, Map expectedPartitionNumRecords) { + testBulkInsertInternalPartitioner(partitioner, records, isGloballySorted, isLocallySorted, expectedPartitionNumRecords, Option.empty()); + } + + private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, + JavaRDD records, + boolean isGloballySorted, boolean isLocallySorted, + Map expectedPartitionNumRecords, + Option>> comparator) { int numPartitions = 2; - JavaRDD actualRecords = (JavaRDD) partitioner.repartitionRecords(records, numPartitions); + JavaRDD> actualRecords = + (JavaRDD>) partitioner.repartitionRecords(records, numPartitions); assertEquals(numPartitions, actualRecords.getNumPartitions()); - List collectedActualRecords = actualRecords.collect(); + List> collectedActualRecords = actualRecords.collect(); if (isGloballySorted) { // Verify global order - verifyRecordAscendingOrder(collectedActualRecords); + verifyRecordAscendingOrder(collectedActualRecords, comparator); } else if (isLocallySorted) { // Verify local order actualRecords.mapPartitions(partition -> { - List partitionRecords = new ArrayList<>(); + List> partitionRecords = new ArrayList<>(); partition.forEachRemaining(partitionRecords::add); - verifyRecordAscendingOrder(partitionRecords); + verifyRecordAscendingOrder(partitionRecords, comparator); return Collections.emptyList().iterator(); }).collect(); } @@ -118,4 +136,35 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase { testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode), records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2)); } + + @Test + public void testCustomColumnSortPartitioner() throws Exception { + String[] sortColumns = new String[] {"rider"}; + Comparator> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns); + + JavaRDD records1 = generateTestRecordsForBulkInsert(jsc); + JavaRDD records2 = generateTripleTestRecordsForBulkInsert(jsc); + testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA), + records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA), + records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + } + + private Comparator> getCustomColumnComparator(Schema schema, String[] sortColumns) { + Comparator> comparator = Comparator.comparing(record -> { + try { + GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema).get(); + StringBuilder sb = new StringBuilder(); + for (String col : sortColumns) { + sb.append(genericRecord.get(col)); + } + + return sb.toString(); + } catch (IOException e) { + throw new HoodieIOException("unable to read value for " + sortColumns); + } + }); + + return comparator; + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 8b47fa3d4..42584b125 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -18,6 +18,11 @@ package org.apache.hudi.table; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -46,6 +51,8 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.Transformations; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; @@ -66,12 +73,6 @@ import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; import org.apache.hudi.testutils.HoodieWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -228,6 +229,81 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } } + @Test + public void testSimpleClusteringNoUpdates() throws Exception { + testClustering(false); + } + + @Test + public void testSimpleClusteringWithUpdates() throws Exception { + testClustering(true); + } + + private void testClustering(boolean doUpdates) throws Exception { + // set low compaction small File Size to generate more file groups. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + HoodieWriteConfig cfg = getConfigBuilder(true, 10L, clusteringConfig).build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { + + /** + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 400); + insertAndGetFilePaths(records.subList(0, 200), client, cfg, newCommitTime); + + /** + * Write 2 (more inserts to create new files) + */ + // we already set small file size to small number to force inserts to go into new file. + newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + insertAndGetFilePaths(records.subList(200, 400), client, cfg, newCommitTime); + + if (doUpdates) { + /** + * Write 3 (updates) + */ + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, 100); + updateAndGetFilePaths(records, client, cfg, newCommitTime); + } + + HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); + FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable); + // expect 2 base files for each partition + assertEquals(dataGen.getPartitionPaths().length * 2, allFiles.length); + + String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(cfg, context, metaClient); + // verify all files are included in clustering plan. + assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count()); + + // Do the clustering and validate + client.cluster(clusteringCommitTime, true); + + metaClient = HoodieTableMetaClient.reload(metaClient); + final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context, metaClient); + Stream dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths()) + .flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p)); + // verify there should be only one base file per partition after clustering. + assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count()); + + HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); + assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(), + "Expecting a single commit."); + assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp()); + assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction()); + assertEquals(400, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"), + "Must contain 200 records"); + } + } + // test incremental read does not go past compaction instant for RO views // For RT views, incremental read can go past compaction @Test @@ -1469,17 +1545,27 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { return getConfigBuilder(autoCommit, false, indexType); } + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { + return getConfigBuilder(autoCommit, false, IndexType.BLOOM, compactionSmallFileSize, clusteringConfig); + } + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) { + return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build()); + } + + protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType, + long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2) .withAutoCommit(autoCommit).withAssumeDatePartitioning(true) - .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize) .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build()) .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() .withEnableBackupForRemoteFileSystemView(false).build()) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) + .withClusteringConfig(clusteringConfig) .withRollbackUsingMarkers(rollbackUsingMarkers); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index d140b1183..faf7e7d43 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -20,14 +20,13 @@ package org.apache.hudi.table.action.compact.strategy; import org.apache.hudi.avro.model.HoodieCompactionOperation; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; - -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.Test; import java.text.SimpleDateFormat; @@ -257,10 +256,13 @@ public class TestHoodieCompactionStrategy { HoodieBaseFile df = TestHoodieBaseFile.newDataFile(k); String partitionPath = keyToPartitionMap.get(k); List logFiles = v.stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList()); + FileSlice slice = new FileSlice(new HoodieFileGroupId(partitionPath, df.getFileId()), df.getCommitTime()); + slice.setBaseFile(df); + logFiles.stream().forEach(f -> slice.addLogFile(f)); operations.add(new HoodieCompactionOperation(df.getCommitTime(), logFiles.stream().map(s -> s.getPath().toString()).collect(Collectors.toList()), df.getPath(), df.getFileId(), partitionPath, - config.getCompactionStrategy().captureMetrics(config, Option.of(df), partitionPath, logFiles), + config.getCompactionStrategy().captureMetrics(config, slice), df.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null)) ); }); @@ -303,10 +305,11 @@ public class TestHoodieCompactionStrategy { public static class TestHoodieLogFile extends HoodieLogFile { + private static int version = 0; private final long size; public TestHoodieLogFile(long size) { - super("/tmp/.ce481ee7-9e53-4a2e-9992-f9e295fa79c0_20180919184844.log.1"); + super("/tmp/.ce481ee7-9e53-4a2e-999-f9e295fa79c0_20180919184844.log." + version++); this.size = size; } @@ -314,11 +317,6 @@ public class TestHoodieCompactionStrategy { return new TestHoodieLogFile(size); } - @Override - public Path getPath() { - return new Path("/tmp/test-log"); - } - @Override public long getFileSize() { return size; diff --git a/hudi-common/src/main/avro/HoodieClusteringGroup.avsc b/hudi-common/src/main/avro/HoodieClusteringGroup.avsc index fb41f6ef5..b2444be84 100644 --- a/hudi-common/src/main/avro/HoodieClusteringGroup.avsc +++ b/hudi-common/src/main/avro/HoodieClusteringGroup.avsc @@ -40,6 +40,11 @@ }], "default": null }, + { + "name":"numOutputFileGroups", + "type":["int", "null"], + "default": 1 + }, { "name":"version", "type":["int", "null"], diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java new file mode 100644 index 000000000..8f6da7084 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/SerializableSchema.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.config; + +import org.apache.avro.Schema; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * A wrapped Schema which can be serialized. + */ +public class SerializableSchema implements Serializable { + + private transient Schema schema; + + public SerializableSchema() { + } + + public SerializableSchema(Schema schema) { + this.schema = newCopy(schema); + } + + public SerializableSchema(SerializableSchema serializableSchema) { + this(serializableSchema.schema); + } + + public static Schema newCopy(Schema schemaObject) { + return new Schema.Parser().parse(schemaObject.toString()); + } + + public Schema get() { + return schema; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + writeObjectTo(out); + } + + private void readObject(ObjectInputStream in) throws IOException { + readObjectFrom(in); + } + + // create a public write method for unit test + public void writeObjectTo(ObjectOutputStream out) throws IOException { + out.writeUTF(schema.toString()); + } + + // create a public read method for unit test + public void readObjectFrom(ObjectInputStream in) throws IOException { + schema = new Schema.Parser().parse(in.readUTF()); + } + + @Override + public String toString() { + return schema.toString(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/ClusteringOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/ClusteringOperation.java new file mode 100644 index 000000000..3d732fc7f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/ClusteringOperation.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.avro.model.HoodieSliceInfo; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Encapsulates all the needed information about a clustering file slice. This is needed because spark serialization + * does not work with avro objects. + */ +public class ClusteringOperation implements Serializable { + + private String dataFilePath; + private List deltaFilePaths; + private String fileId; + private String partitionPath; + private String bootstrapFilePath; + private int version; + + public static ClusteringOperation create(HoodieSliceInfo sliceInfo) { + return new ClusteringOperation(sliceInfo.getDataFilePath(), new ArrayList<>(sliceInfo.getDeltaFilePaths()), sliceInfo.getFileId(), + sliceInfo.getPartitionPath(), sliceInfo.getBootstrapFilePath(), sliceInfo.getVersion()); + } + + // Only for serialization/de-serialization + @Deprecated + public ClusteringOperation() {} + + private ClusteringOperation(final String dataFilePath, final List deltaFilePaths, final String fileId, + final String partitionPath, final String bootstrapFilePath, final int version) { + this.dataFilePath = dataFilePath; + this.deltaFilePaths = deltaFilePaths; + this.fileId = fileId; + this.partitionPath = partitionPath; + this.bootstrapFilePath = bootstrapFilePath; + this.version = version; + } + + public String getDataFilePath() { + return this.dataFilePath; + } + + public void setDataFilePath(final String dataFilePath) { + this.dataFilePath = dataFilePath; + } + + public List getDeltaFilePaths() { + return this.deltaFilePaths; + } + + public void setDeltaFilePaths(final List deltaFilePaths) { + this.deltaFilePaths = deltaFilePaths; + } + + public String getFileId() { + return this.fileId; + } + + public void setFileId(final String fileId) { + this.fileId = fileId; + } + + public String getPartitionPath() { + return this.partitionPath; + } + + public void setPartitionPath(final String partitionPath) { + this.partitionPath = partitionPath; + } + + public String getBootstrapFilePath() { + return this.bootstrapFilePath; + } + + public void setBootstrapFilePath(final String bootstrapFilePath) { + this.bootstrapFilePath = bootstrapFilePath; + } + + public int getVersion() { + return this.version; + } + + public void setVersion(final int version) { + this.version = version; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClusteringOperation that = (ClusteringOperation) o; + return getVersion() == that.getVersion() + && Objects.equals(getDataFilePath(), that.getDataFilePath()) + && Objects.equals(getDeltaFilePaths(), that.getDeltaFilePaths()) + && Objects.equals(getFileId(), that.getFileId()) + && Objects.equals(getPartitionPath(), that.getPartitionPath()) + && Objects.equals(getBootstrapFilePath(), that.getBootstrapFilePath()); + } + + @Override + public int hashCode() { + return Objects.hash(getDataFilePath(), getDeltaFilePaths(), getFileId(), getPartitionPath(), getBootstrapFilePath(), getVersion()); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index 5f328a9bc..39f0f62ca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -76,6 +76,8 @@ public enum WriteOperationType { return INSERT_OVERWRITE; case "insert_overwrite_table": return INSERT_OVERWRITE_TABLE; + case "cluster": + return CLUSTER; default: throw new HoodieException("Invalid value of Type."); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java new file mode 100644 index 000000000..3b73f41cb --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.SpillableMapUtils; +import org.apache.hudi.io.storage.HoodieFileReader; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice. + */ +public class HoodieFileSliceReader implements Iterator> { + private Iterator> recordsIterator; + + public static HoodieFileSliceReader getFileSliceReader( + HoodieFileReader baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass) throws IOException { + Iterator baseIterator = baseFileReader.getRecordIterator(schema); + while (baseIterator.hasNext()) { + GenericRecord record = (GenericRecord) baseIterator.next(); + HoodieRecord hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass); + scanner.processNextRecord(hoodieRecord); + } + return new HoodieFileSliceReader(scanner.iterator()); + } + + private HoodieFileSliceReader(Iterator> recordsItr) { + this.recordsIterator = recordsItr; + } + + @Override + public boolean hasNext() { + return recordsIterator.hasNext(); + } + + @Override + public HoodieRecord next() { + return recordsIterator.next(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 99fd793e3..918d568b9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -306,7 +306,23 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } /** - * Transition Clean State from inflight to Committed. + * Transition replace requested file to replace inflight. + * + * @param requestedInstant Requested instant + * @param data Extra Metadata + * @return inflight instant + */ + public HoodieInstant transitionReplaceRequestedToInflight(HoodieInstant requestedInstant, Option data) { + ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)); + ValidationUtils.checkArgument(requestedInstant.isRequested()); + HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, REPLACE_COMMIT_ACTION, requestedInstant.getTimestamp()); + // Then write to timeline + transitionState(requestedInstant, inflightInstant, data); + return inflightInstant; + } + + /** + * Transition replace inflight to Committed. * * @param inflightInstant Inflight instant * @param data Extra Metadata @@ -321,6 +337,26 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return commitInstant; } + /** + * Revert replace requested State from inflight to requested. + * + * @param inflightInstant Inflight Instant + * @return requested instant + */ + public HoodieInstant revertReplaceCommitInflightToRequested(HoodieInstant inflightInstant) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)); + ValidationUtils.checkArgument(inflightInstant.isInflight()); + HoodieInstant requestedInstant = + new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION, inflightInstant.getTimestamp()); + if (metaClient.getTimelineLayoutVersion().isNullVersion()) { + // Pass empty data since it is read from the corresponding .aux/.compaction instant file + transitionState(inflightInstant, requestedInstant, Option.empty()); + } else { + deleteInflight(inflightInstant); + } + return requestedInstant; + } + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data) { transitionState(fromInstant, toInstant, data, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index ff251e314..06fe9619d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -299,6 +299,14 @@ public interface HoodieTimeline extends Serializable { return new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, timestamp); } + static HoodieInstant getReplaceCommitRequestedInstant(final String timestamp) { + return new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION, timestamp); + } + + static HoodieInstant getReplaceCommitInflightInstant(final String timestamp) { + return new HoodieInstant(State.INFLIGHT, REPLACE_COMMIT_ACTION, timestamp); + } + /** * Returns the inflight instant corresponding to the instant being passed. Takes care of changes in action names * between inflight and completed instants (compaction <=> commit). diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index c0c88c04a..5cdb6fc19 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -114,7 +114,11 @@ public class ClusteringUtils { new AbstractMap.SimpleEntry<>(entry.getLeft(), entry.getRight())); } - private static Stream getFileGroupsFromClusteringGroup(HoodieClusteringGroup group) { + public static Stream getFileGroupsFromClusteringPlan(HoodieClusteringPlan clusteringPlan) { + return clusteringPlan.getInputGroups().stream().flatMap(ClusteringUtils::getFileGroupsFromClusteringGroup); + } + + public static Stream getFileGroupsFromClusteringGroup(HoodieClusteringGroup group) { return group.getSlices().stream().map(slice -> new HoodieFileGroupId(slice.getPartitionPath(), slice.getFileId())); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java index 49f107550..326bf0527 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/StringUtils.java @@ -25,6 +25,8 @@ import javax.annotation.Nullable; */ public class StringUtils { + public static final String EMPTY_STRING = ""; + /** *

* Joins the elements of the provided array into a single String containing the provided list of elements. diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java new file mode 100644 index 000000000..72843a453 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializableSchema.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.avro.Schema; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.SerializableSchema; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests serializable schema. + */ +public class TestSerializableSchema { + + @Test + public void testSerDeser() throws IOException { + verifySchema(HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); + verifySchema(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_TRIP_SCHEMA)); + verifySchema(HoodieTestDataGenerator.AVRO_SHORT_TRIP_SCHEMA); + verifySchema(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SHORT_TRIP_SCHEMA)); + verifySchema(HoodieTestDataGenerator.FLATTENED_AVRO_SCHEMA); + verifySchema(HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.FLATTENED_AVRO_SCHEMA)); + verifySchema(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS); + } + + private void verifySchema(Schema schema) throws IOException { + SerializableSchema serializableSchema = new SerializableSchema(schema); + assertEquals(schema, serializableSchema.get()); + assertTrue(schema != serializableSchema.get()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + serializableSchema.writeObjectTo(oos); + oos.flush(); + oos.close(); + + byte[] bytesWritten = baos.toByteArray(); + SerializableSchema newSchema = new SerializableSchema(); + newSchema.readObjectFrom(new ObjectInputStream(new ByteArrayInputStream(bytesWritten))); + assertEquals(schema, newSchema.get()); + } +}