1
0

[HUDI-1468] Support custom clustering strategies and preserve commit metadata as part of clustering (#3419)

Co-authored-by: Satish Kotha <satishkotha@uber.com>
This commit is contained in:
Sagar Sumit
2021-08-07 08:23:08 +05:30
committed by GitHub
parent 9ce548edb1
commit 70b6bd485f
34 changed files with 1150 additions and 343 deletions

View File

@@ -125,7 +125,13 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Enable running of clustering service, asynchronously as inserts happen on the table.");
private HoodieClusteringConfig() {
public static final ConfigProperty<Boolean> CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA = ConfigProperty
.key("hoodie.clustering.preserve.commit.metadata")
.defaultValue(false)
.sinceVersion("0.9.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
public HoodieClusteringConfig() {
super();
}
@@ -214,6 +220,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
public Builder withPreserveHoodieCommitMetadata(Boolean preserveHoodieCommitMetadata) {
clusteringConfig.setValue(CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA, String.valueOf(preserveHoodieCommitMetadata));
return this;
}
public HoodieClusteringConfig build() {
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
return clusteringConfig;

View File

@@ -700,6 +700,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
}
public boolean isPreserveHoodieCommitMetadata() {
return getBoolean(HoodieClusteringConfig.CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA);
}
public boolean isClusteringEnabled() {
// TODO: future support async clustering
return inlineClusteringEnabled() || isAsyncClusteringEnabled();

View File

@@ -25,12 +25,22 @@ import org.apache.hudi.table.HoodieTable;
public class CreateHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {
private boolean preserveMetadata = false;
public CreateHandleFactory() {
this(false);
}
public CreateHandleFactory(boolean preserveMetadata) {
this.preserveMetadata = preserveMetadata;
}
@Override
public HoodieWriteHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
getNextFileId(fileIdPrefix), taskContextSupplier);
getNextFileId(fileIdPrefix), taskContextSupplier, preserveMetadata);
}
}
}

View File

@@ -59,18 +59,33 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
protected long recordsDeleted = 0;
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;
private boolean preserveHoodieMetadata = false;
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier);
taskContextSupplier, false);
}
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
boolean preserveHoodieMetadata) {
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier, preserveHoodieMetadata);
}
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, overriddenSchema, taskContextSupplier, false);
}
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) {
super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
taskContextSupplier);
this.preserveHoodieMetadata = preserveHoodieMetadata;
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.setStat(new HoodieWriteStat());
@@ -119,7 +134,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
if (preserveHoodieMetadata) {
fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
}
// update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));

View File

@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* A HoodieCreateHandle which writes all data into a single file.
* <p>
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
public class HoodieUnboundedCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieCreateHandle<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);
public HoodieUnboundedCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
boolean preserveHoodieMetadata) {
super(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier, preserveHoodieMetadata);
}
@Override
public boolean canWrite(HoodieRecord record) {
return true;
}
}

View File

@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A SingleFileHandleCreateFactory is used to write all data in the spark partition into a single data file.
* <p>
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
public class SingleFileHandleCreateFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {
private AtomicBoolean isHandleCreated = new AtomicBoolean(false);
private String fileId;
private boolean preserveHoodieMetadata;
public SingleFileHandleCreateFactory(String fileId, boolean preserveHoodieMetadata) {
super();
this.fileId = fileId;
this.preserveHoodieMetadata = preserveHoodieMetadata;
}
@Override
public HoodieWriteHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
if (isHandleCreated.compareAndSet(false, true)) {
return new HoodieUnboundedCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
fileId, // ignore idPfx, always use same fileId
taskContextSupplier, preserveHoodieMetadata);
}
throw new HoodieIOException("Fixed handle create is only expected to be invoked once");
}
}

View File

@@ -18,25 +18,27 @@
package org.apache.hudi.table.action.cluster.strategy;
import org.apache.avro.Schema;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.Serializable;
import java.util.Map;
/**
* Pluggable implementation for writing data into new file groups based on ClusteringPlan.
*/
public abstract class ClusteringExecutionStrategy<T extends HoodieRecordPayload,I,K,O> implements Serializable {
public abstract class ClusteringExecutionStrategy<T extends HoodieRecordPayload, I, K, O> implements Serializable {
private static final Logger LOG = LogManager.getLogger(ClusteringExecutionStrategy.class);
private final HoodieTable<T,I,K,O> hoodieTable;
private final HoodieEngineContext engineContext;
private final HoodieTable<T, I, K, O> hoodieTable;
private final transient HoodieEngineContext engineContext;
private final HoodieWriteConfig writeConfig;
public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
@@ -50,10 +52,9 @@ public abstract class ClusteringExecutionStrategy<T extends HoodieRecordPayload,
* file groups created is bounded by numOutputGroups.
* Note that commit is not done as part of strategy. commit is callers responsibility.
*/
public abstract O performClustering(final I inputRecords, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema);
public abstract HoodieWriteMetadata<O> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime);
protected HoodieTable<T,I,K, O> getHoodieTable() {
protected HoodieTable<T, I, K, O> getHoodieTable() {
return this.hoodieTable;
}

View File

@@ -97,6 +97,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
.setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadata())
.build());
}
}

View File

@@ -43,5 +43,6 @@ public abstract class AbstractBulkInsertHelper<T extends HoodieRecordPayload, I,
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean addMetadataFields,
int parallelism);
int parallelism,
boolean preserveMetadata);
}

View File

@@ -216,7 +216,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
}
}
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
return Collections.emptyMap();
}
@@ -330,7 +330,7 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses));
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
commitOnAutoCommit(result);
}
}

View File

@@ -71,7 +71,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abst
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
// write new files
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism());
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false);
//update index
((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
@@ -85,7 +85,8 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abst
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism) {
int parallelism,
boolean preserveHoodieMetadata) {
// De-dupe/merge if needed
List<HoodieRecord<T>> dedupedRecords = inputRecords;

View File

@@ -64,9 +64,9 @@ public class JavaInsertOverwriteCommitActionExecutor<T extends HoodieRecordPaylo
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeResult) {
return context.mapToPair(
writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()),
writeResult.getWriteStatuses().stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()),
partitionPath ->
Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1
);

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.HashMap;
import java.util.List;
@@ -48,7 +49,7 @@ public class JavaInsertOverwriteTableCommitActionExecutor<T extends HoodieRecord
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeResult) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), config.useFileListingMetadata(),

View File

@@ -70,6 +70,7 @@ import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@@ -357,11 +358,13 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String clusteringCommitTime) {
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
if (!writeStatuses.filter(WriteStatus::hasErrors).isEmpty()) {
if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) {
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStatuses.filter(WriteStatus::hasErrors).map(WriteStatus::getFileId).collect());
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
}
finalizeWrite(table, clusteringCommitTime, writeStats);
try {

View File

@@ -18,41 +18,23 @@
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY;
/**
* Clustering Strategy based on following.
* 1) Only looks at latest 'daybased.lookback.partitions' partitions.
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
* Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions.
*/
public class SparkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends PartitionAwareClusteringPlanStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
extends SparkSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class);
public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> table,
@@ -67,49 +49,6 @@ public class SparkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload
super(table, engineContext, writeConfig);
}
@Override
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
long totalSizeSoFar = 0;
for (FileSlice currentSlice : fileSlices) {
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
currentGroup.add(currentSlice);
}
if (!currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}
return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
@Override
protected Map<String, String> getStrategyParams() {
Map<String, String> params = new HashMap<>();
if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
params.put(CLUSTERING_SORT_COLUMNS_PROPERTY.key(), getWriteConfig().getClusteringSortColumns());
}
return params;
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
return partitionPaths.stream()
@@ -117,15 +56,4 @@ public class SparkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
.collect(Collectors.toList());
}
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
return super.getFileSlicesEligibleForClustering(partition)
// Only files that have basefile size smaller than small file size are eligible.
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
}
private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
return (int) Math.ceil(groupSize / (double) targetFileSize);
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
/**
* Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive.
*/
public class SparkSelectedPartitionsClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends SparkSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(SparkSelectedPartitionsClusteringPlanStrategy.class);
public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
public SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
List<String> filteredPartitions = partitionPaths.stream()
.filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
.collect(Collectors.toList());
LOG.info("Filtered to the following partitions: " + filteredPartitions);
return filteredPartitions;
}
}

View File

@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY;
/**
* Clustering Strategy based on following.
* 1) Creates clustering groups based on max size allowed per group.
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
*/
public class SparkSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends PartitionAwareClusteringPlanStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkSizeBasedClusteringPlanStrategy.class);
public SparkSizeBasedClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public SparkSizeBasedClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
long totalSizeSoFar = 0;
for (FileSlice currentSlice : fileSlices) {
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : getWriteConfig().getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
if (totalSizeSoFar >= getWriteConfig().getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
currentGroup.add(currentSlice);
}
if (!currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, getWriteConfig().getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ getWriteConfig().getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}
return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
@Override
protected Map<String, String> getStrategyParams() {
Map<String, String> params = new HashMap<>();
if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
params.put(CLUSTERING_SORT_COLUMNS_PROPERTY.key(), getWriteConfig().getClusteringSortColumns());
}
return params;
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
return partitionPaths;
}
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
return super.getFileSlicesEligibleForClustering(partition)
// Only files that have basefile size smaller than small file size are eligible.
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
}
private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
return (int) Math.ceil(groupSize / (double) targetFileSize);
}
}

View File

@@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Clustering strategy to submit multiple spark jobs and union the results.
*/
public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(MultipleSparkJobExecutionStrategy.class);
public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
// execute clustering for each group async and collect WriteStatus
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
// execute clustering for each group async and collect WriteStatus
Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = clusteringPlan.getInputGroups().stream()
.map(inputGroup -> runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
instantTime))
.map(CompletableFuture::join);
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusRDDStream);
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(writeStatusRDD);
return writeMetadata;
}
/**
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. The number of new
* file groups created is bounded by numOutputGroups.
* Note that commit is not done as part of strategy. commit is callers responsibility.
*/
public abstract JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata);
/**
* Submit job to execute clustering for the group.
*/
private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
boolean preserveHoodieMetadata, String instantTime) {
CompletableFuture<JavaRDD<WriteStatus>> writeStatusesFuture = CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
JavaRDD<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
.collect(Collectors.toList());
return performClusteringWithRecordsRDD(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata);
});
return writeStatusesFuture;
}
/**
* Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any).
*/
private JavaRDD<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent();
if (hasLogFiles) {
// if there are log files, we read all records into memory for a file group and apply updates.
return readRecordsForGroupWithLogs(jsc, clusteringOps, instantTime);
} else {
// We want to optimize reading records for case there are no log files.
return readRecordsForGroupBaseFiles(jsc, clusteringOps);
}
}
/**
* Read records from baseFiles, apply updates and convert to RDD.
*/
private JavaRDD<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps,
String instantTime) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
List<Iterator<HoodieRecord<T>>> recordIterators = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
tableConfig.getPayloadClass(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp()))));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
return new ConcatenatingIterator<>(recordIterators);
});
}
/**
* Read records from baseFiles and convert to RDD.
*/
private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) {
return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
return new ConcatenatingIterator<>(iteratorsForPartition);
}).map(this::transform);
}
/**
* Stream to array conversion with generic type is not straightforward.
* Implement a utility method to abstract high level logic. This needs to be improved in future
*/
private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>> writeStatusRDDStream) {
Object[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new);
JavaRDD<WriteStatus>[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length];
for (int i = 0; i < writeStatusObjects.length; i++) {
writeStatusRDDArray[i] = (JavaRDD<WriteStatus>) writeStatusObjects[i];
}
return writeStatusRDDArray;
}
/**
* Transform IndexedRecord into HoodieRecord.
*/
private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
GenericRecord record = (GenericRecord) indexedRecord;
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!getWriteConfig().populateMetaFields()) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps())));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
}
}
String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
return hoodieRecord;
}
}

View File

@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.ClusteringGroupInfo;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* Clustering strategy to submit single spark jobs.
* MultipleSparkJobExecution strategy is not ideal for use cases that require large number of clustering groups
*/
public abstract class SingleSparkJobExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SingleSparkJobExecutionStrategy.class);
public SingleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
final TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier();
final SerializableSchema serializableSchema = new SerializableSchema(schema);
final List<ClusteringGroupInfo> clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(clusteringGroup ->
ClusteringGroupInfo.create(clusteringGroup)).collect(Collectors.toList());
String umask = engineContext.hadoopConfiguration().get("fs.permissions.umask-mode");
Broadcast<String> umaskBroadcastValue = engineContext.broadcast(umask);
JavaRDD<ClusteringGroupInfo> groupInfoJavaRDD = engineContext.parallelize(clusteringGroupInfos, clusteringGroupInfos.size());
LOG.info("number of partitions for clustering " + groupInfoJavaRDD.getNumPartitions());
JavaRDD<WriteStatus> writeStatusRDD = groupInfoJavaRDD
.mapPartitions(clusteringOps -> {
Configuration configuration = new Configuration();
configuration.set("fs.permissions.umask-mode", umaskBroadcastValue.getValue());
Iterable<ClusteringGroupInfo> clusteringOpsIterable = () -> clusteringOps;
List<ClusteringGroupInfo> groupsInPartition = StreamSupport.stream(clusteringOpsIterable.spliterator(), false).collect(Collectors.toList());
return groupsInPartition.stream().flatMap(clusteringOp ->
runClusteringForGroup(clusteringOp, clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
serializableSchema, taskContextSupplier, instantTime)
).iterator();
});
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(writeStatusRDD);
return writeMetadata;
}
/**
* Submit job to execute clustering for the group.
*/
private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo clusteringOps, Map<String, String> strategyParams,
boolean preserveHoodieMetadata, SerializableSchema schema,
TaskContextSupplier taskContextSupplier, String instantTime) {
List<HoodieFileGroupId> inputFileIds = clusteringOps.getOperations().stream()
.map(op -> new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))
.collect(Collectors.toList());
Iterator<HoodieRecord<T>> inputRecords = readRecordsForGroupBaseFiles(clusteringOps.getOperations());
Iterator<List<WriteStatus>> writeStatuses = performClusteringWithRecordsIterator(inputRecords, clusteringOps.getNumOutputGroups(), instantTime,
strategyParams, schema.get(), inputFileIds, preserveHoodieMetadata, taskContextSupplier);
Iterable<List<WriteStatus>> writestatusIterable = () -> writeStatuses;
return StreamSupport.stream(writestatusIterable.spliterator(), false)
.flatMap(writeStatusList -> writeStatusList.stream());
}
/**
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters.
* The number of new file groups created is bounded by numOutputGroups.
* Note that commit is not done as part of strategy. commit is callers responsibility.
*/
public abstract Iterator<List<WriteStatus>> performClusteringWithRecordsIterator(final Iterator<HoodieRecord<T>> records, final int numOutputGroups,
final String instantTime,
final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata,
final TaskContextSupplier taskContextSupplier);
/**
* Read records from baseFiles and get iterator.
*/
private Iterator<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<Iterator<HoodieRecord<T>>> iteratorsForPartition = clusteringOps.stream().map(clusteringOp -> {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
Iterable<IndexedRecord> indexedRecords = () -> {
try {
return HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath())).getRecordIterator(readerSchema);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
};
return StreamSupport.stream(indexedRecords.spliterator(), false).map(record -> transform(record)).iterator();
}).collect(Collectors.toList());
return new ConcatenatingIterator<>(iteratorsForPartition);
}
/**
* Transform IndexedRecord into HoodieRecord.
*/
private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
GenericRecord record = (GenericRecord) indexedRecord;
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!getWriteConfig().populateMetaFields()) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps())));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
}
}
String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
return hoodieRecord;
}
}

View File

@@ -21,8 +21,8 @@ package org.apache.hudi.client.clustering.run.strategy;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
@@ -30,14 +30,13 @@ import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -49,24 +48,19 @@ import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_SORT_COLU
* 2) Uses bulk_insert to write data into new files.
*/
public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
extends MultipleSparkJobExecutionStrategy<T> {
private static final Logger LOG = LogManager.getLogger(SparkSortAndSizeExecutionStrategy.class);
public SparkSortAndSizeExecutionStrategy(HoodieSparkCopyOnWriteTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public SparkSortAndSizeExecutionStrategy(HoodieSparkMergeOnReadTable<T> table,
HoodieSparkEngineContext engineContext,
public SparkSortAndSizeExecutionStrategy(HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
public JavaRDD<WriteStatus> performClustering(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema) {
public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM.key(), String.valueOf(numOutputGroups));
@@ -75,7 +69,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
props.put(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups);
false, getPartitioner(strategyParams, schema), true, numOutputGroups, preserveHoodieMetadata);
}
/**

View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.clustering.update.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.spark.api.java.JavaRDD;
import java.util.HashSet;
/**
* Allow ingestion commits during clustering job.
*/
public class SparkAllowUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
public SparkAllowUpdateStrategy(
HoodieSparkEngineContext engineContext, HashSet<HoodieFileGroupId> fileGroupsInPendingClustering) {
super(engineContext, fileGroupsInPendingClustering);
}
@Override
public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
return taggedRecordsRDD;
}
}

View File

@@ -57,8 +57,20 @@ public class SparkLazyInsertIterable<T extends HoodieRecordPayload> extends Hood
String idPrefix,
TaskContextSupplier taskContextSupplier,
WriteHandleFactory writeHandleFactory) {
this(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, false, writeHandleFactory);
}
public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
boolean areRecordsSorted,
HoodieWriteConfig config,
String instantTime,
HoodieTable hoodieTable,
String idPrefix,
TaskContextSupplier taskContextSupplier,
boolean useWriterSchema,
WriteHandleFactory writeHandleFactory) {
super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory);
this.useWriterSchema = false;
this.useWriterSchema = useWriterSchema;
}
@Override

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.function.Function2;
@@ -42,21 +43,25 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
private HoodieTable hoodieTable;
private List<String> fileIDPrefixes;
private boolean useWriterSchema;
private boolean preserveMetadata;
public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
HoodieWriteConfig config, HoodieTable hoodieTable,
List<String> fileIDPrefixes, boolean useWriterSchema) {
List<String> fileIDPrefixes, boolean useWriterSchema,
boolean preserveMetadata) {
this.instantTime = instantTime;
this.areRecordsSorted = areRecordsSorted;
this.config = config;
this.hoodieTable = hoodieTable;
this.fileIDPrefixes = fileIDPrefixes;
this.useWriterSchema = useWriterSchema;
this.preserveMetadata = preserveMetadata;
}
@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> recordItr) {
return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema);
fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema,
new CreateHandleFactory(preserveMetadata));
}
}

View File

@@ -21,20 +21,14 @@ package org.apache.hudi.table.action.cluster;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieFileSliceReader;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
@@ -44,32 +38,21 @@ import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
@@ -92,46 +75,28 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(context);
// execute clustering for each group async and collect WriteStatus
Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = clusteringPlan.getInputGroups().stream()
.map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams()))
.map(CompletableFuture::join);
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusRDDStream);
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = buildWriteMetadata(writeStatusRDD);
final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
JavaRDD<WriteStatus> writeStatusRDD = writeMetadata.getWriteStatuses();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
// validate clustering action before committing result
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatusRDD.map(WriteStatus::getStat).collect(), writeMetadata.getPartitionToReplaceFileIds(),
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}
/**
* Stream to array conversion with generic type is not straightforward.
* Implement a utility method to abstract high level logic. This needs to be improved in future
*/
private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>> writeStatusRDDStream) {
Object[] writeStatusObjects = writeStatusRDDStream.toArray(Object[]::new);
JavaRDD<WriteStatus>[] writeStatusRDDArray = new JavaRDD[writeStatusObjects.length];
for (int i = 0; i < writeStatusObjects.length; i++) {
writeStatusRDDArray[i] = (JavaRDD<WriteStatus>) writeStatusObjects[i];
}
return writeStatusRDDArray;
}
/**
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
*
* We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
*/
private void validateWriteResult(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
@@ -143,134 +108,18 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
}
}
/**
* Submit job to execute clustering for the group.
*/
private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams) {
CompletableFuture<JavaRDD<WriteStatus>> writeStatusesFuture = CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> inputRecords = readRecordsForGroup(jsc, clusteringGroup);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
return ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), table, context, config))
.performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema);
});
return writeStatusesFuture;
}
@Override
protected String getCommitActionType() {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).collect(
Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
}
/**
* Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any).
*/
private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
boolean hasLogFiles = clusteringOps.stream().filter(op -> op.getDeltaFilePaths().size() > 0).findAny().isPresent();
if (hasLogFiles) {
// if there are log files, we read all records into memory for a file group and apply updates.
return readRecordsForGroupWithLogs(jsc, clusteringOps);
} else {
// We want to optimize reading records for case there are no log files.
return readRecordsForGroupBaseFiles(jsc, clusteringOps);
}
}
/**
* Read records from baseFiles, apply updates and convert to RDD.
*/
private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroupWithLogs(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) {
return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
List<Iterator<HoodieRecord<? extends HoodieRecordPayload>>> recordIterators = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,
tableConfig.getPayloadClass(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp()))));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
return new ConcatenatingIterator<>(recordIterators);
});
}
/**
* Read records from baseFiles and convert to RDD.
*/
private JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) {
return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
return new ConcatenatingIterator<>(iteratorsForPartition);
}).map(this::transform);
}
/**
* Transform IndexedRecord into HoodieRecord.
*/
private HoodieRecord<? extends HoodieRecordPayload> transform(IndexedRecord indexedRecord) {
GenericRecord record = (GenericRecord) indexedRecord;
String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(table.getMetaClient().getTableConfig().getPayloadClass(),
new Object[] {Option.of(record)}, Option.class);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
return hoodieRecord;
}
private HoodieWriteMetadata<JavaRDD<WriteStatus>> buildWriteMetadata(JavaRDD<WriteStatus> writeStatusJavaRDD) {
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD));
result.setWriteStatuses(writeStatusJavaRDD);
result.setCommitMetadata(Option.empty());
result.setCommitted(false);
return result;
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
Set<HoodieFileGroupId> newFilesWritten = new HashSet(writeMetadata.getWriteStats().get().stream()
.map(s -> new HoodieFileGroupId(s.getPartitionPath(),s.getFileId()))
.collect(Collectors.toList()));
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.filter(fg -> !newFilesWritten.contains(fg))
.collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
}
}

View File

@@ -237,12 +237,12 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
JavaRDD<WriteStatus> statuses = table.getIndex().updateLocation(writeStatusRDD, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses));
return statuses;
}
protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
updateIndex(writeStatusRDD, result);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
commitOnAutoCommit(result);
}
@@ -281,7 +281,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
}
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeStatuses) {
return Collections.emptyMap();
}

View File

@@ -72,7 +72,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
// write new files
JavaRDD<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism());
JavaRDD<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false);
//update index
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
@@ -86,7 +86,8 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism) {
int parallelism,
boolean preserveMetadata) {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords = inputRecords;
@@ -108,7 +109,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime,
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema), true)
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, preserveMetadata), true)
.flatMap(List::iterator);
return writeStatusRDD;

View File

@@ -72,8 +72,8 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
return writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
return writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
}

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
@@ -45,7 +46,7 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecor
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);

View File

@@ -98,6 +98,7 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -165,6 +166,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
}
private static Stream<Arguments> populateMetaFieldsAndPreserveMetadataParams() {
return Arrays.stream(new Boolean[][] {
{true, true},
{false, true},
{true, false},
{false, false}
}).map(Arguments::of);
}
private static Stream<Arguments> rollbackFailedCommitsParams() {
return Stream.of(
Arguments.of(HoodieFailedWritesCleaningPolicy.LAZY, true),
@@ -1292,21 +1302,23 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testSimpleClustering(boolean populateMetaFields) throws Exception {
@MethodSource("populateMetaFieldsAndPreserveMetadataParams")
public void testSimpleClustering(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception {
@MethodSource("populateMetaFieldsAndPreserveMetadataParams")
public void testClusteringWithSortColumns(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
.withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@@ -1401,37 +1413,37 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields,
boolean completeClustering, String validatorClasses,
String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception {
List<HoodieRecord> allRecords = testInsertTwoBatches(populateMetaFields);
Pair<List<HoodieRecord>, List<String>> allRecords = testInsertTwoBatches(populateMetaFields);
testClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
return allRecords;
return allRecords.getLeft();
}
private List<HoodieRecord> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
private Pair<List<HoodieRecord>, List<String>> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"});
String commitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime, populateMetaFields);
String commitTime1 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime1, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime1, populateMetaFields);
Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1);
commitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime, populateMetaFields);
String commitTime2 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime2, populateMetaFields);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
//verify new files are created for 2nd write
Set<HoodieFileGroupId> fileIdIntersection = new HashSet<>(fileIds1);
fileIdIntersection.retainAll(fileIds2);
assertEquals(0, fileIdIntersection.size());
return Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList());
return Pair.of(Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()), Arrays.asList(commitTime1, commitTime2));
}
private String testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering,
String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
List<HoodieRecord> allRecords) throws IOException {
private void testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering,
String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
Pair<List<HoodieRecord>, List<String>> allRecords) throws IOException {
HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
.withClusteringConfig(clusteringConfig)
@@ -1442,10 +1454,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
if (completeClustering) {
String clusteringCommitTime = metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.getReverseOrderedInstants().findFirst().get().getTimestamp();
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config);
return clusteringCommitTime;
} else {
return "";
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
}
}
@@ -1454,7 +1463,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
boolean completeClustering,
String validatorClasses,
String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
List<HoodieRecord> allRecords) throws IOException {
Pair<List<HoodieRecord>, List<String>> allRecords) throws IOException {
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(StringUtils.nullToEmpty(validatorClasses))
.withPrecommitValidatorEqualitySqlQueries(sqlQueryForEqualityValidation)
@@ -1470,7 +1479,11 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
SparkRDDWriteClient client = getHoodieWriteClient(config);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config);
if (config.isPreserveHoodieCommitMetadata() && config.populateMetaFields()) {
verifyRecordsWrittenWithPreservedMetadata(new HashSet<>(allRecords.getRight()), allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect());
} else {
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
}
Set<HoodieFileGroupId> replacedFileIds = new HashSet<>();
clusterMetadata.getPartitionToReplaceFileIds().entrySet().forEach(partitionFiles ->
@@ -1663,13 +1676,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private void verifyRecordsWritten(String commitTime, boolean populateMetadataField,
List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, HoodieWriteConfig config) throws IOException {
List<GenericRecord> records = new ArrayList<>();
for (WriteStatus status : allStatus) {
Path filePath = new Path(basePath, status.getStat().getPath());
records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath));
}
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
assertEquals(records.size(), expectedKeys.size());
Set<String> expectedKeys = verifyRecordKeys(expectedRecords, allStatus, records);
if (config.populateMetaFields()) {
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -1689,6 +1696,29 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
}
@NotNull
private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, List<GenericRecord> records) {
for (WriteStatus status : allStatus) {
Path filePath = new Path(basePath, status.getStat().getPath());
records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath));
}
Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
assertEquals(records.size(), expectedKeys.size());
return expectedKeys;
}
private void verifyRecordsWrittenWithPreservedMetadata(Set<String> commitTimes, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus) {
List<GenericRecord> records = new ArrayList<>();
Set<String> expectedKeys = verifyRecordKeys(expectedRecords, allStatus, records);
Map<String, List<GenericRecord>> recordsByCommitTime = records.stream()
.collect(Collectors.groupingBy(r -> r.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString()));
assertTrue(commitTimes.containsAll(recordsByCommitTime.keySet()));
for (GenericRecord record : records) {
String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertTrue(expectedKeys.contains(recordKey));
}
}
private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) throws IOException {
client.startCommitWithTime(commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);

View File

@@ -152,6 +152,15 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
}
private static Stream<Arguments> populateMetaFieldsAndPreserveMetadataParams() {
return Arrays.stream(new Boolean[][] {
{true, true},
{false, true},
{true, false},
{false, false}
}).map(Arguments::of);
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testSimpleInsertAndUpdate(boolean populateMetaFields) throws Exception {
@@ -254,25 +263,25 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testSimpleClusteringNoUpdates(boolean populateMetaFields) throws Exception {
@MethodSource("populateMetaFieldsAndPreserveMetadataParams")
public void testSimpleClusteringNoUpdates(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
clean();
init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields);
testClustering(false, populateMetaFields);
testClustering(false, populateMetaFields, preserveCommitMetadata);
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testSimpleClusteringWithUpdates(boolean populateMetaFields) throws Exception {
@MethodSource("populateMetaFieldsAndPreserveMetadataParams")
public void testSimpleClusteringWithUpdates(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
clean();
init(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue(), populateMetaFields);
testClustering(true, populateMetaFields);
testClustering(true, populateMetaFields, preserveCommitMetadata);
}
private void testClustering(boolean doUpdates, boolean populateMetaFields) throws Exception {
private void testClustering(boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, 10L, clusteringConfig);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();

View File

@@ -45,6 +45,11 @@
"name":"version",
"type":["int", "null"],
"default": 1
},
{
"name":"preserveHoodieMetadata",
"type":["null", "boolean"],
"default": null
}
]
}

View File

@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.model;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Encapsulates all the needed information about a clustering group. This is needed because spark serialization
* does not work with avro objects.
*/
public class ClusteringGroupInfo implements Serializable {
private List<ClusteringOperation> operations;
private int numOutputGroups;
public static ClusteringGroupInfo create(HoodieClusteringGroup clusteringGroup) {
List<ClusteringOperation> operations = clusteringGroup.getSlices().stream()
.map(ClusteringOperation::create).collect(Collectors.toList());
return new ClusteringGroupInfo(operations, clusteringGroup.getNumOutputFileGroups());
}
// Only for serialization/de-serialization
@Deprecated
public ClusteringGroupInfo() {}
private ClusteringGroupInfo(final List<ClusteringOperation> operations, final int numOutputGroups) {
this.operations = operations;
this.numOutputGroups = numOutputGroups;
}
public List<ClusteringOperation> getOperations() {
return this.operations;
}
public void setOperations(final List<ClusteringOperation> operations) {
this.operations = operations;
}
public int getNumOutputGroups() {
return this.numOutputGroups;
}
public void setNumOutputGroups(final int numOutputGroups) {
this.numOutputGroups = numOutputGroups;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ClusteringGroupInfo that = (ClusteringGroupInfo) o;
return Objects.equals(getFilePathsInGroup(), that.getFilePathsInGroup());
}
@Override
public int hashCode() {
return Objects.hash(getFilePathsInGroup());
}
private String getFilePathsInGroup() {
return getOperations().stream().map(op -> op.getDataFilePath()).collect(Collectors.joining(","));
}
}

View File

@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.model;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.util.Option;
import java.io.IOException;
/**
* Default payload used for rewrite use cases where we dont change schema. We dont need to serialize/deserialize avro record in payload.
*/
public class RewriteAvroPayload implements HoodieRecordPayload<RewriteAvroPayload> {
private GenericRecord record;
public RewriteAvroPayload(GenericRecord record) {
this.record = record;
}
@Override
public RewriteAvroPayload preCombine(RewriteAvroPayload another) {
throw new UnsupportedOperationException("precombine is not expected for rewrite payload");
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
return getInsertValue(schema);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
return Option.of(record);
}
}

View File

@@ -35,16 +35,16 @@ import java.util.Iterator;
/**
* Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice.
*/
public class HoodieFileSliceReader implements Iterator<HoodieRecord<? extends HoodieRecordPayload>> {
private Iterator<HoodieRecord<? extends HoodieRecordPayload>> recordsIterator;
public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Iterator<HoodieRecord<T>> {
private Iterator<HoodieRecord<T>> recordsIterator;
public static <R extends IndexedRecord, T extends HoodieRecordPayload> HoodieFileSliceReader getFileSliceReader(
public static <R extends IndexedRecord, T> HoodieFileSliceReader getFileSliceReader(
HoodieFileReader<R> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
Option<Pair<String,String>> simpleKeyGenFieldsOpt) throws IOException {
Iterator<R> baseIterator = baseFileReader.getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord<T> hoodieRecord = simpleKeyGenFieldsOpt.isPresent()
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, simpleKeyGenFieldsOpt.get())
: SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass);
scanner.processNextRecord(hoodieRecord);
@@ -52,7 +52,7 @@ public class HoodieFileSliceReader implements Iterator<HoodieRecord<? extends Ho
return new HoodieFileSliceReader(scanner.iterator());
}
private HoodieFileSliceReader(Iterator<HoodieRecord<? extends HoodieRecordPayload>> recordsItr) {
private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {
this.recordsIterator = recordsItr;
}
@@ -62,7 +62,7 @@ public class HoodieFileSliceReader implements Iterator<HoodieRecord<? extends Ho
}
@Override
public HoodieRecord<? extends HoodieRecordPayload> next() {
public HoodieRecord<T> next() {
return recordsIterator.next();
}
}