1
0

[HUDI-1075] Implement simple clustering strategies to create ClusteringPlan and to run the plan

This commit is contained in:
Satish Kotha
2020-11-07 17:08:55 -08:00
parent e4e2fbc3bb
commit 6dc03b65bf
47 changed files with 2383 additions and 128 deletions

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
@@ -25,6 +27,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -34,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndex;
@@ -43,15 +47,13 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.SparkCompactHelpers;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
@@ -314,6 +316,57 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
return statuses;
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
rollbackInflightClustering(inflightInstant, table);
table.getMetaClient().reloadActiveTimeline();
}
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at " + clusteringInstant);
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = table.cluster(context, clusteringInstant);
JavaRDD<WriteStatus> statuses = clusteringMetadata.getWriteStatuses();
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), statuses, table, clusteringInstant);
}
return clusteringMetadata;
}
protected void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String clusteringCommitTime) {
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
if (!writeStatuses.filter(WriteStatus::hasErrors).isEmpty()) {
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStatuses.filter(WriteStatus::hasErrors).map(WriteStatus::getFileId).collect());
}
finalizeWrite(table, clusteringCommitTime, writeStats);
try {
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e);
}
if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + clusteringCommitTime, e);
}
}
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}
@Override
protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);

View File

@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY;
/**
* Clustering Strategy based on following.
* 1) Only looks at latest 'daybased.lookback.partitions' partitions.
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
*/
public class SparkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends PartitionAwareClusteringPlanStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class);
public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
int totalSizeSoFar = 0;
for (FileSlice currentSlice : fileSlices) {
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes())));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
currentGroup.add(currentSlice);
}
if (!currentGroup.isEmpty()) {
fileSliceGroups.add(Pair.of(currentGroup, getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes())));
}
return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
@Override
protected Map<String, String> getStrategyParams() {
Map<String, String> params = new HashMap<>();
if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
params.put(CLUSTERING_SORT_COLUMNS_PROPERTY, getWriteConfig().getClusteringSortColumns());
}
return params;
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
return partitionPaths.stream()
.sorted(Comparator.reverseOrder())
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
.collect(Collectors.toList());
}
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
return super.getFileSlicesEligibleForClustering(partition)
// Only files that have basefile size smaller than small file size are eligible.
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
}
private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
return (int) Math.ceil(groupSize / (double) targetFileSize);
}
}

View File

@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.Map;
import java.util.Properties;
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY;
/**
* Clustering Strategy based on following.
* 1) Spark execution engine.
* 2) Uses bulk_insert to write data into new files.
*/
public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkSortAndSizeExecutionStrategy.class);
public SparkSortAndSizeExecutionStrategy(HoodieSparkCopyOnWriteTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public SparkSortAndSizeExecutionStrategy(HoodieSparkMergeOnReadTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
public JavaRDD<WriteStatus> performClustering(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema) {
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM, String.valueOf(numOutputGroups));
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, Boolean.FALSE.toString());
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups);
}
/**
* Create BulkInsertPartitioner based on strategy params.
*/
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (strategyParams.containsKey(CLUSTERING_SORT_COLUMNS_PROPERTY)) {
return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(CLUSTERING_SORT_COLUMNS_PROPERTY).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
} else {
return Option.empty();
}
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.execution;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
@@ -34,14 +35,18 @@ import java.util.List;
public class SparkLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {
private boolean useWriterSchema;
public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
boolean areRecordsSorted,
HoodieWriteConfig config,
String instantTime,
HoodieTable hoodieTable,
String idPrefix,
TaskContextSupplier taskContextSupplier) {
TaskContextSupplier taskContextSupplier,
boolean useWriterSchema) {
super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier);
this.useWriterSchema = useWriterSchema;
}
public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
@@ -53,6 +58,7 @@ public class SparkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
TaskContextSupplier taskContextSupplier,
WriteHandleFactory writeHandleFactory) {
super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory);
this.useWriterSchema = false;
}
@Override
@@ -61,7 +67,10 @@ public class SparkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
null;
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
if (useWriterSchema) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
bufferedIteratorExecutor =
new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(), getTransformFunction(schema));
final List<WriteStatus> result = bufferedIteratorExecutor.execute();

View File

@@ -41,20 +41,22 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
private HoodieWriteConfig config;
private HoodieTable hoodieTable;
private List<String> fileIDPrefixes;
private boolean useWriterSchema;
public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
HoodieWriteConfig config, HoodieTable hoodieTable,
List<String> fileIDPrefixes) {
List<String> fileIDPrefixes, boolean useWriterSchema) {
this.instantTime = instantTime;
this.areRecordsSorted = areRecordsSorted;
this.config = config;
this.hoodieTable = hoodieTable;
this.fileIDPrefixes = fileIDPrefixes;
this.useWriterSchema = useWriterSchema;
}
@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> recordItr) {
return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier());
fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema);
}
}

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.execution.bulkinsert;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
/**
* A partitioner that does sorting based on specified column values for each RDD partition.
*
* @param <T> HoodieRecordPayload type
*/
public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
private final String[] sortColumnNames;
private final SerializableSchema serializableSchema;
public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
this.sortColumnNames = columnNames;
this.serializableSchema = new SerializableSchema(schema);
}
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
int outputSparkPartitions) {
final String[] sortColumns = this.sortColumnNames;
final SerializableSchema schema = this.serializableSchema;
return records.sortBy(record -> getRecordSortColumnValues(record, sortColumns, schema),
true, outputSparkPartitions);
}
@Override
public boolean arePartitionRecordsSorted() {
return true;
}
private static String getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
String[] sortColumns,
SerializableSchema schema) {
try {
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get();
StringBuilder sb = new StringBuilder();
for (String col : sortColumns) {
sb.append(genericRecord.get(col));
}
return sb.toString();
} catch (IOException e) {
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
}
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -44,12 +45,14 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
import org.apache.hudi.table.action.clean.SparkCleanActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
import org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor;
import org.apache.hudi.table.action.cluster.SparkClusteringPlanActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkMergeHelper;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
@@ -57,7 +60,6 @@ import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecuto
import org.apache.hudi.table.action.restore.SparkCopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.SparkCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -145,6 +147,19 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload> extends
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
@Override
public Option<HoodieClusteringPlan> scheduleClustering(HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata) {
return new SparkClusteringPlanActionExecutor<>(context, config,this, instantTime, extraMetadata).execute();
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(HoodieEngineContext context,
String clusteringInstantTime) {
return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();
}
@Override
public HoodieBootstrapWriteMetadata<JavaRDD<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
return new SparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();

View File

@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.cluster;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.Map;
@SuppressWarnings("checkstyle:LineLength")
public class SparkClusteringPlanActionExecutor<T extends HoodieRecordPayload> extends
BaseClusteringPlanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkClusteringPlanActionExecutor.class);
public SparkClusteringPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
@Override
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant();
int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
.countInstants();
if (config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
}
LOG.info("Generating clustering plan for table " + config.getBasePath());
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config);
return strategy.generateClusteringPlan();
}
}

View File

@@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.table.action.cluster;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
private static final Logger LOG = LogManager.getLogger(SparkExecuteClusteringCommitActionExecutor.class);
private final HoodieClusteringPlan clusteringPlan;
public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime) {
super(context, config, table, instantTime, WriteOperationType.CLUSTER);
this.clusteringPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
.map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException("Unable to read clustering plan for instant: " + instantTime));
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(context);
// execute clustering for each group async and collect WriteStatus
JavaRDD<WriteStatus> writeStatusRDD = clusteringPlan.getInputGroups().stream()
.map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams()))
.map(CompletableFuture::join)
.reduce((rdd1, rdd2) -> rdd1.union(rdd2)).orElse(engineContext.emptyRDD());
if (writeStatusRDD.isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime + " #groups: " + clusteringPlan.getInputGroups().size());
}
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = buildWriteMetadata(writeStatusRDD);
updateIndexAndCommitIfNeeded(writeStatusRDD, writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}
/**
* Submit job to execute clustering for the group.
*/
private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams) {
CompletableFuture<JavaRDD<WriteStatus>> writeStatusesFuture = CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> inputRecords = readRecordsForGroup(jsc, clusteringGroup);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
return ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), table, context, config))
.performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema);
});
return writeStatusesFuture;
}
@Override
protected String getCommitActionType() {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).collect(
Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
}
/**
* Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any).
*/
private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent();
if (hasLogFiles) {
// if there are log files, we read all records into memory for a file group and apply updates.
return readRecordsForGroupWithLogs(jsc, clusteringOps);
} else {
// We want to optimize reading records for case there are no log files.
return readRecordsForGroupBaseFiles(jsc, clusteringOps);
}
}
/**
* Read records from baseFiles, apply updates and convert to RDD.
*/
private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroupWithLogs(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) {
return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
List<Iterator<HoodieRecord<? extends HoodieRecordPayload>>> recordIterators = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config.getProps());
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), clusteringOp.getDeltaFilePaths(), readerSchema, instantTime,
maxMemoryPerCompaction, config.getCompactionLazyBlockReadEnabled(),
config.getCompactionReverseLogReadEnabled(), config.getMaxDFSStreamBufferSize(),
config.getSpillableMapBasePath());
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
table.getMetaClient().getTableConfig().getPayloadClass()));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
return new ConcatenatingIterator<>(recordIterators);
});
}
/**
* Read records from baseFiles and convert to RDD.
*/
private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) {
return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
return new ConcatenatingIterator<>(iteratorsForPartition);
}).map(this::transform);
}
/**
* Transform IndexedRecord into HoodieRecord.
*/
private HoodieRecord<? extends HoodieRecordPayload> transform(IndexedRecord indexedRecord) {
GenericRecord record = (GenericRecord) indexedRecord;
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(table.getMetaClient().getTableConfig().getPayloadClass(),
new Object[] {Option.of(record)}, Option.class);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
return hoodieRecord;
}
private HoodieWriteMetadata<JavaRDD<WriteStatus>> buildWriteMetadata(JavaRDD<WriteStatus> writeStatusJavaRDD) {
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD));
result.setWriteStatuses(writeStatusJavaRDD);
result.setWriteStats(writeStatusJavaRDD.map(WriteStatus::getStat).collect());
result.setCommitMetadata(Option.empty());
result.setCommitted(false);
return result;
}
}

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.table.action.commit;
import java.util.Map;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
@@ -28,12 +26,13 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import java.util.Map;
public class SparkBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;

View File

@@ -31,7 +31,6 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
@@ -59,25 +58,45 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(JavaRDD<HoodieRecord<T>> inputRecords,
String instantTime,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
HoodieWriteConfig config,
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor,
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(final JavaRDD<HoodieRecord<T>> inputRecords,
final String instantTime,
final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
final HoodieWriteConfig config,
final BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor,
final boolean performDedupe,
final Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
//transition bulk_insert state to inflight
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
// write new files
JavaRDD<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism());
//update index
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
@Override
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> inputRecords,
String instantTime,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism) {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords = inputRecords;
if (performDedupe) {
dedupedRecords = (JavaRDD<HoodieRecord<T>>) SparkWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
config.getBulkInsertShuffleParallelism(), table);
parallelism, table);
}
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
final int parallelism = config.getBulkInsertShuffleParallelism();
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
? userDefinedBulkInsertPartitioner.get()
: BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
@@ -87,16 +106,11 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
final List<String> fileIDPrefixes =
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime,
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes), true)
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema), true)
.flatMap(List::iterator);
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatusRDD, result);
return result;
return writeStatusRDD;
}
}

View File

@@ -18,6 +18,9 @@
package org.apache.hudi.table.action.compact;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -49,10 +52,6 @@ import org.apache.hudi.io.IOUtils;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -179,7 +178,8 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
@Override
public HoodieCompactionPlan generateCompactionPlan(HoodieEngineContext context,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable,
HoodieWriteConfig config, String compactionCommitTime, Set<HoodieFileGroupId> fgIdsInPendingCompactions)
HoodieWriteConfig config, String compactionCommitTime,
Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering)
throws IOException {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
totalLogFiles = new LongAccumulator();
@@ -213,7 +213,7 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> {
return fileSystemView
.getLatestFileSlices(partitionPath)
.filter(slice -> !fgIdsInPendingCompactions.contains(slice.getFileGroupId()))
.filter(slice -> !fgIdsInPendingCompactionAndClustering.contains(slice.getFileGroupId()))
.map(s -> {
List<HoodieLogFile> logFiles =
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
@@ -224,7 +224,7 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
// into meta files.
Option<HoodieBaseFile> dataFile = s.getBaseFile();
return new CompactionOperation(dataFile, partitionPath, logFiles,
config.getCompactionStrategy().captureMetrics(config, dataFile, partitionPath, logFiles));
config.getCompactionStrategy().captureMetrics(config, s));
})
.filter(c -> !c.getDeltaFileNames().isEmpty());
}, partitionPaths.size()).stream().map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
@@ -239,9 +239,9 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
ValidationUtils.checkArgument(
compactionPlan.getOperations().stream().noneMatch(
op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
"Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
+ "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions
+ "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());

View File

@@ -21,22 +21,24 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
@@ -75,10 +77,13 @@ public class SparkScheduleCompactionActionExecutor<T extends HoodieRecordPayload
LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
HoodieSparkMergeOnReadTableCompactor compactor = new HoodieSparkMergeOnReadTableCompactor();
try {
return compactor.generateCompactionPlan(context, table, config, instantTime,
((SyncableFileSystemView) table.getSliceView()).getPendingCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
.collect(Collectors.toSet()));
SyncableFileSystemView fileSystemView = (SyncableFileSystemView) table.getSliceView();
Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
.map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
.collect(Collectors.toSet());
// exclude files in pending clustering from compaction.
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -42,6 +43,7 @@ import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
@@ -56,6 +58,7 @@ import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkWriteHelper;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -82,6 +85,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
import static org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
@@ -938,6 +942,65 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar);
}
@Test
public void testSimpleClustering() throws Exception {
// setup clustering config
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testClustering(clusteringConfig);
}
@Test
public void testClusteringWithSortColumns() throws Exception {
// setup clustering config
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("_hoodie_record_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testClustering(clusteringConfig);
}
private void testClustering(HoodieClusteringConfig clusteringConfig) throws Exception {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, false, 10);
SparkRDDWriteClient client = getHoodieWriteClient(config, false);
dataGen = new HoodieTestDataGenerator();
String commitTime = "100";
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime);
Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1);
commitTime = "200";
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
//verify new files are created for 2nd write
Set<HoodieFileGroupId> fileIdIntersection = new HashSet<>(fileIds1);
fileIdIntersection.retainAll(fileIds2);
assertEquals(0, fileIdIntersection.size());
config = getConfigBuilder().withClusteringConfig(clusteringConfig).build();
// create client with new config.
client = getHoodieWriteClient(config, false);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, true);
List<HoodieRecord> allRecords = Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList());
verifyRecordsWritten(clusteringCommitTime, allRecords, clusterMetadata.getWriteStatuses().collect());
Set<HoodieFileGroupId> insertedFileIds = new HashSet<>();
insertedFileIds.addAll(fileIds1);
insertedFileIds.addAll(fileIds2);
Set<HoodieFileGroupId> replacedFileIds = new HashSet<>();
clusterMetadata.getPartitionToReplaceFileIds().entrySet().forEach(partitionFiles ->
partitionFiles.getValue().stream().forEach(file ->
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file))));
assertEquals(insertedFileIds, replacedFileIds);
}
private Set<HoodieFileGroupId> getFileGroupIdsFromWriteStatus(List<WriteStatus> statuses) {
return statuses.stream().map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
}
/**
* Test scenario of writing more file groups than existing number of file groups in partition.
*/
@@ -975,14 +1038,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
// Do Inserts
String commitTime1 = "001";
client.startCommitWithTime(commitTime1);
List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, batch1RecordsCount);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
Set<String> batch1Buckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
verifyRecordsWritten(commitTime1, inserts1, statuses);
String commit1 = "001";
List<WriteStatus> statuses = writeAndVerifyBatch(client, dataGen.generateInserts(commit1, batch1RecordsCount), commit1);
Set<String> batch1Buckets = getFileIdsFromWriteStatus(statuses);
// Do Insert Overwrite
String commitTime2 = "002";
@@ -999,6 +1057,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
verifyRecordsWritten(commitTime2, inserts2, statuses);
}
private Set<String> getFileIdsFromWriteStatus(List<WriteStatus> statuses) {
return statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
}
/**
* Verify data in parquet files matches expected records and commit time.
*/
@@ -1019,6 +1081,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
}
private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime) {
client.startCommitWithTime(commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect();
assertNoWriteErrors(statuses);
verifyRecordsWritten(commitTime, inserts, statuses);
return statuses;
}
private Pair<Set<String>, List<HoodieRecord>> testUpdates(String instantTime, SparkRDDWriteClient client,
int sizeToInsertAndUpdate, int expectedTotalRecords)
throws IOException {
@@ -1386,11 +1457,23 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
* Build Hoodie Write Config for small data file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) {
HoodieWriteConfig.Builder builder = getConfigBuilder(useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA);
return getSmallInsertWriteConfig(insertSplitSize, useNullSchema, dataGen.getEstimatedFileSizeInBytes(150));
}
/**
* Build Hoodie Write Config for specified small file sizes.
*/
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema, long smallFileSize) {
String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA;
return getSmallInsertWriteConfig(insertSplitSize, schemaStr, smallFileSize);
}
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize) {
HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr);
return builder
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(dataGen.getEstimatedFileSizeInBytes(150))
.compactionSmallFileSize(smallFileSize)
.insertSplitSize(insertSplitSize).build())
.withStorageConfig(
HoodieStorageConfig.newBuilder()

View File

@@ -18,17 +18,23 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -40,6 +46,8 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
private static final Comparator<HoodieRecord<? extends HoodieRecordPayload>> KEY_COMPARATOR =
Comparator.comparing(o -> (o.getPartitionPath() + "+" + o.getRecordKey()));
public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc) {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
@@ -69,9 +77,10 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
return Stream.of(data).map(Arguments::of);
}
private void verifyRecordAscendingOrder(List<HoodieRecord> records) {
List<HoodieRecord> expectedRecords = new ArrayList<>(records);
Collections.sort(expectedRecords, Comparator.comparing(o -> (o.getPartitionPath() + "+" + o.getRecordKey())));
private void verifyRecordAscendingOrder(List<HoodieRecord<? extends HoodieRecordPayload>> records,
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
List<HoodieRecord<? extends HoodieRecordPayload>> expectedRecords = new ArrayList<>(records);
Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR));
assertEquals(expectedRecords, records);
}
@@ -79,19 +88,28 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
JavaRDD<HoodieRecord> records,
boolean isGloballySorted, boolean isLocallySorted,
Map<String, Long> expectedPartitionNumRecords) {
testBulkInsertInternalPartitioner(partitioner, records, isGloballySorted, isLocallySorted, expectedPartitionNumRecords, Option.empty());
}
private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
JavaRDD<HoodieRecord> records,
boolean isGloballySorted, boolean isLocallySorted,
Map<String, Long> expectedPartitionNumRecords,
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
int numPartitions = 2;
JavaRDD<HoodieRecord> actualRecords = (JavaRDD<HoodieRecord>) partitioner.repartitionRecords(records, numPartitions);
JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords =
(JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) partitioner.repartitionRecords(records, numPartitions);
assertEquals(numPartitions, actualRecords.getNumPartitions());
List<HoodieRecord> collectedActualRecords = actualRecords.collect();
List<HoodieRecord<? extends HoodieRecordPayload>> collectedActualRecords = actualRecords.collect();
if (isGloballySorted) {
// Verify global order
verifyRecordAscendingOrder(collectedActualRecords);
verifyRecordAscendingOrder(collectedActualRecords, comparator);
} else if (isLocallySorted) {
// Verify local order
actualRecords.mapPartitions(partition -> {
List<HoodieRecord> partitionRecords = new ArrayList<>();
List<HoodieRecord<? extends HoodieRecordPayload>> partitionRecords = new ArrayList<>();
partition.forEachRemaining(partitionRecords::add);
verifyRecordAscendingOrder(partitionRecords);
verifyRecordAscendingOrder(partitionRecords, comparator);
return Collections.emptyList().iterator();
}).collect();
}
@@ -118,4 +136,35 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode),
records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2));
}
@Test
public void testCustomColumnSortPartitioner() throws Exception {
String[] sortColumns = new String[] {"rider"};
Comparator<HoodieRecord<? extends HoodieRecordPayload>> columnComparator = getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc);
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
}
private Comparator<HoodieRecord<? extends HoodieRecordPayload>> getCustomColumnComparator(Schema schema, String[] sortColumns) {
Comparator<HoodieRecord<? extends HoodieRecordPayload>> comparator = Comparator.comparing(record -> {
try {
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
StringBuilder sb = new StringBuilder();
for (String col : sortColumns) {
sb.append(genericRecord.get(col));
}
return sb.toString();
} catch (IOException e) {
throw new HoodieIOException("unable to read value for " + sortColumns);
}
});
return comparator;
}
}

View File

@@ -18,6 +18,11 @@
package org.apache.hudi.table;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -46,6 +51,8 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
@@ -66,12 +73,6 @@ import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -228,6 +229,81 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
}
@Test
public void testSimpleClusteringNoUpdates() throws Exception {
testClustering(false);
}
@Test
public void testSimpleClusteringWithUpdates() throws Exception {
testClustering(true);
}
private void testClustering(boolean doUpdates) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
HoodieWriteConfig cfg = getConfigBuilder(true, 10L, clusteringConfig).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
/**
* Write 1 (only inserts)
*/
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
insertAndGetFilePaths(records.subList(0, 200), client, cfg, newCommitTime);
/**
* Write 2 (more inserts to create new files)
*/
// we already set small file size to small number to force inserts to go into new file.
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
insertAndGetFilePaths(records.subList(200, 400), client, cfg, newCommitTime);
if (doUpdates) {
/**
* Write 3 (updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateAndGetFilePaths(records, client, cfg, newCommitTime);
}
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
// expect 2 base files for each partition
assertEquals(dataGen.getPartitionPaths().length * 2, allFiles.length);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
// verify all files are included in clustering plan.
assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());
// Do the clustering and validate
client.cluster(clusteringCommitTime, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context, metaClient);
Stream<HoodieBaseFile> dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths())
.flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p));
// verify there should be only one base file per partition after clustering.
assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count());
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction());
assertEquals(400, HoodieClientTestUtils.countRecordsSince(jsc, basePath, sqlContext, timeline, "000"),
"Must contain 200 records");
}
}
// test incremental read does not go past compaction instant for RO views
// For RT views, incremental read can go past compaction
@Test
@@ -1469,17 +1545,27 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
return getConfigBuilder(autoCommit, false, indexType);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return getConfigBuilder(autoCommit, false, IndexType.BLOOM, compactionSmallFileSize, clusteringConfig);
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
.withClusteringConfig(clusteringConfig)
.withRollbackUsingMarkers(rollbackUsingMarkers);
}

View File

@@ -20,14 +20,13 @@ package org.apache.hudi.table.action.compact.strategy;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import java.text.SimpleDateFormat;
@@ -257,10 +256,13 @@ public class TestHoodieCompactionStrategy {
HoodieBaseFile df = TestHoodieBaseFile.newDataFile(k);
String partitionPath = keyToPartitionMap.get(k);
List<HoodieLogFile> logFiles = v.stream().map(TestHoodieLogFile::newLogFile).collect(Collectors.toList());
FileSlice slice = new FileSlice(new HoodieFileGroupId(partitionPath, df.getFileId()), df.getCommitTime());
slice.setBaseFile(df);
logFiles.stream().forEach(f -> slice.addLogFile(f));
operations.add(new HoodieCompactionOperation(df.getCommitTime(),
logFiles.stream().map(s -> s.getPath().toString()).collect(Collectors.toList()), df.getPath(), df.getFileId(),
partitionPath,
config.getCompactionStrategy().captureMetrics(config, Option.of(df), partitionPath, logFiles),
config.getCompactionStrategy().captureMetrics(config, slice),
df.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null))
);
});
@@ -303,10 +305,11 @@ public class TestHoodieCompactionStrategy {
public static class TestHoodieLogFile extends HoodieLogFile {
private static int version = 0;
private final long size;
public TestHoodieLogFile(long size) {
super("/tmp/.ce481ee7-9e53-4a2e-9992-f9e295fa79c0_20180919184844.log.1");
super("/tmp/.ce481ee7-9e53-4a2e-999-f9e295fa79c0_20180919184844.log." + version++);
this.size = size;
}
@@ -314,11 +317,6 @@ public class TestHoodieCompactionStrategy {
return new TestHoodieLogFile(size);
}
@Override
public Path getPath() {
return new Path("/tmp/test-log");
}
@Override
public long getFileSize() {
return size;