1
0

[HUDI-2332] Add clustering and compaction in Kafka Connect Sink (#3857)

* [HUDI-2332] Add clustering and compaction in Kafka Connect Sink

* Disable validation check on instant time for compaction and adjust configs

* Add javadocs

* Add clustering and compaction config

* Fix transaction causing missing records in the target table

* Add debugging logs

* Fix kafka offset sync in participant

* Adjust how clustering and compaction are configured in kafka-connect

* Fix clustering strategy

* Remove irrelevant changes from other published PRs

* Update clustering logic and others

* Update README

* Fix test failures

* Fix indentation

* Fix clustering config

* Add JavaCustomColumnsSortPartitioner and make async compaction enabled by default

* Add test for JavaCustomColumnsSortPartitioner

* Add more changes after IDE sync

* Update README with clarification

* Fix clustering logic after rebasing

* Remove unrelated changes
This commit is contained in:
Y Ethan Guo
2021-11-23 00:53:28 -08:00
committed by GitHub
parent 9ed28b1570
commit ca9bfa2a40
27 changed files with 1358 additions and 93 deletions

View File

@@ -22,7 +22,9 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import java.io.File;
import java.io.FileReader;
@@ -41,6 +43,14 @@ public class HoodieClusteringConfig extends HoodieConfig {
// Any strategy specific params can be saved with this prefix
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
// Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix
public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize.";
@@ -59,7 +69,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final ConfigProperty<String> PLAN_STRATEGY_CLASS_NAME = ConfigProperty
.key("hoodie.clustering.plan.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy")
.defaultValue(SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY)
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of ClusteringPlanStrategy) to create clustering plan "
+ "i.e select what file groups are being clustered. Default strategy, looks at the clustering small file size limit (determined by "
@@ -67,7 +77,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final ConfigProperty<String> EXECUTION_STRATEGY_CLASS_NAME = ConfigProperty
.key("hoodie.clustering.execution.strategy.class")
.defaultValue("org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
.defaultValue(SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY)
.sinceVersion("0.7.0")
.withDocumentation("Config to provide a strategy class (subclass of RunClusteringStrategy) to define how the "
+ " clustering plan is executed. By default, we sort the file groups in th plan by the specified columns, while "
@@ -336,6 +346,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static class Builder {
private final HoodieClusteringConfig clusteringConfig = new HoodieClusteringConfig();
private EngineType engineType = EngineType.SPARK;
public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
return this;
}
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
@@ -455,9 +471,37 @@ public class HoodieClusteringConfig extends HoodieConfig {
}
public HoodieClusteringConfig build() {
clusteringConfig.setDefaultValue(
PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType));
clusteringConfig.setDefaultValue(
EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType));
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
return clusteringConfig;
}
private String getDefaultPlanStrategyClassName(EngineType engineType) {
switch (engineType) {
case SPARK:
return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
case FLINK:
case JAVA:
return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
private String getDefaultExecutionStrategyClassName(EngineType engineType) {
switch (engineType) {
case SPARK:
return SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
case FLINK:
case JAVA:
return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY;
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
}
/**

View File

@@ -2182,7 +2182,8 @@ public class HoodieWriteConfig extends HoodieConfig {
writeConfig.setDefaultOnCondition(!isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isClusteringConfigSet,
HoodieClusteringConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
HoodieClusteringConfig.newBuilder().withEngineType(engineType)
.fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties(
writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isBootstrapConfigSet,

View File

@@ -27,10 +27,15 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
@@ -38,6 +43,8 @@ import java.util.Map;
public abstract class BaseClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {
private static final Logger LOG = LogManager.getLogger(BaseClusteringPlanActionExecutor.class);
private final Option<Map<String, String>> extraMetadata;
public BaseClusteringPlanActionExecutor(HoodieEngineContext context,
@@ -49,7 +56,32 @@ public abstract class BaseClusteringPlanActionExecutor<T extends HoodieRecordPay
this.extraMetadata = extraMetadata;
}
protected abstract Option<HoodieClusteringPlan> createClusteringPlan();
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant();
int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
.countInstants();
if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
}
if (config.isAsyncClusteringEnabled() && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling async clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getAsyncClusterMaxCommits());
return Option.empty();
}
LOG.info("Generating clustering plan for table " + config.getBasePath());
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config);
return strategy.generateClusteringPlan();
}
@Override
public Option<HoodieClusteringPlan> execute() {

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -68,12 +69,15 @@ public class ScheduleCompactionActionExecutor<T extends HoodieRecordPayload, I,
public Option<HoodieCompactionPlan> execute() {
if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& !config.getFailedWritesCleanPolicy().isLazy()) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
// TODO(yihua): this validation is removed for Java client used by kafka-connect. Need to revisit this.
if (config.getEngineType() != EngineType.JAVA) {
// if there are inflight writes, their instantTime must not be less than that of compaction instant time
table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction().firstInstant()
.ifPresent(earliestInflight -> ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
"Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime));
}
// Committed and pending compaction instants should have strictly lower timestamps
List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
.getWriteTimeline().filterCompletedAndCompactionInstants().getInstants()

View File

@@ -20,7 +20,6 @@ package org.apache.hudi.config;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.config.HoodieWriteConfig.Builder;
import org.apache.hudi.index.HoodieIndex;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -81,6 +80,52 @@ public class TestHoodieWriteConfig {
assertEquals(HoodieIndex.IndexType.INMEMORY, writeConfig.getIndexType());
}
@Test
public void testDefaultClusteringPlanStrategyClassAccordingToEngineType() {
// Default (as Spark)
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
writeConfig.getClusteringPlanStrategyClass());
// Spark
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
writeConfig.getClusteringPlanStrategyClass());
// Flink and Java
for (EngineType engineType : new EngineType[] {EngineType.FLINK, EngineType.JAVA}) {
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
writeConfig.getClusteringPlanStrategyClass());
}
}
@Test
public void testDefaultClusteringExecutionStrategyClassAccordingToEngineType() {
// Default (as Spark)
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY,
writeConfig.getClusteringExecutionStrategyClass());
// Spark
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(EngineType.SPARK).withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY,
writeConfig.getClusteringExecutionStrategyClass());
// Flink and Java
for (EngineType engineType : new EngineType[] {EngineType.FLINK, EngineType.JAVA}) {
writeConfig = HoodieWriteConfig.newBuilder().withEngineType(engineType).withPath("/tmp").build();
assertEquals(
HoodieClusteringConfig.JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY,
writeConfig.getClusteringExecutionStrategyClass());
}
}
private ByteArrayOutputStream saveParamsIntoOutputStream(Map<String, String> params) throws IOException {
Properties properties = new Properties();
properties.putAll(params);

View File

@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* Clustering Strategy that only looks at latest 'daybased.lookback.partitions' partitions
* for Java engine.
*/
public class JavaRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends JavaSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class);
public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T> table,
HoodieJavaEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T> table,
HoodieJavaEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
return partitionPaths.stream()
.sorted(Comparator.reverseOrder())
.skip(Math.max(skipPartitionsFromLatestForClustering, 0))
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
.collect(Collectors.toList());
}
}

View File

@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
import org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
/**
* Clustering Strategy for Java engine based on following.
* 1) Creates clustering groups based on max size allowed per group.
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
*/
public class JavaSizeBasedClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends PartitionAwareClusteringPlanStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(JavaSizeBasedClusteringPlanStrategy.class);
public JavaSizeBasedClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T> table,
HoodieJavaEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
public JavaSizeBasedClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T> table,
HoodieJavaEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
long totalSizeSoFar = 0;
HoodieWriteConfig writeConfig = getWriteConfig();
for (FileSlice currentSlice : fileSlices) {
// assume each filegroup size is ~= parquet.max.file.size
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
}
currentGroup.add(currentSlice);
// totalSizeSoFar could be 0 when new group was created in the previous conditional block.
// reset to the size of current slice, otherwise the number of output file group will become 0 even though current slice is present.
if (totalSizeSoFar == 0) {
totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
}
}
if (!currentGroup.isEmpty()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}
return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
@Override
protected Map<String, String> getStrategyParams() {
Map<String, String> params = new HashMap<>();
if (!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
params.put(PLAN_STRATEGY_SORT_COLUMNS.key(), getWriteConfig().getClusteringSortColumns());
}
return params;
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
return partitionPaths;
}
@Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String partition) {
return super.getFileSlicesEligibleForClustering(partition)
// Only files that have basefile size smaller than small file size are eligible.
.filter(slice -> slice.getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) < getWriteConfig().getClusteringSmallFileLimit());
}
private int getNumberOfOutputFileGroups(long groupSize, long targetFileSize) {
return (int) Math.ceil(groupSize / (double) targetFileSize);
}
}

View File

@@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.common.table.log.HoodieFileSliceReader.getFileSliceReader;
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
/**
* Clustering strategy for Java engine.
*/
public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(JavaExecutionStrategy.class);
public JavaExecutionStrategy(
HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> performClustering(
HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
// execute clustering for each group and collect WriteStatus
List<WriteStatus> writeStatusList = new ArrayList<>();
clusteringPlan.getInputGroups().forEach(
inputGroup -> writeStatusList.addAll(runClusteringForGroup(
inputGroup, clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
instantTime)));
HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(writeStatusList);
return writeMetadata;
}
/**
* Execute clustering to write inputRecords into new files as defined by rules in strategy parameters.
* The number of new file groups created is bounded by numOutputGroups.
* Note that commit is not done as part of strategy. commit is callers responsibility.
*
* @param inputRecords List of {@link HoodieRecord}.
* @param numOutputGroups Number of output file groups.
* @param instantTime Clustering (replace commit) instant time.
* @param strategyParams Strategy parameters containing columns to sort the data by when clustering.
* @param schema Schema of the data including metadata fields.
* @param fileGroupIdList File group id corresponding to each out group.
* @param preserveHoodieMetadata Whether to preserve commit metadata while clustering.
* @return List of {@link WriteStatus}.
*/
public abstract List<WriteStatus> performClusteringWithRecordList(
final List<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata);
/**
* Create {@link BulkInsertPartitioner} based on strategy params.
*
* @param strategyParams Strategy parameters containing columns to sort the data by when clustering.
* @param schema Schema of the data including metadata fields.
* @return empty for now.
*/
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
} else {
return Option.empty();
}
}
/**
* Executes clustering for the group.
*/
private List<WriteStatus> runClusteringForGroup(
HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
boolean preserveHoodieMetadata, String instantTime) {
List<HoodieRecord<T>> inputRecords = readRecordsForGroup(clusteringGroup, instantTime);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
.collect(Collectors.toList());
return performClusteringWithRecordList(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, preserveHoodieMetadata);
}
/**
* Get a list of all records for the group. This includes all records from file slice
* (Apply updates from log files, if any).
*/
private List<HoodieRecord<T>> readRecordsForGroup(HoodieClusteringGroup clusteringGroup, String instantTime) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
if (hasLogFiles) {
// if there are log files, we read all records into memory for a file group and apply updates.
return readRecordsForGroupWithLogs(clusteringOps, instantTime);
} else {
// We want to optimize reading records for case there are no log files.
return readRecordsForGroupBaseFiles(clusteringOps);
}
}
/**
* Read records from baseFiles and apply updates.
*/
private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperation> clusteringOps,
String instantTime) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new JavaTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
.withReaderSchema(readerSchema)
.withLatestInstantTime(instantTime)
.withMaxMemorySizeInBytes(maxMemoryPerCompaction)
.withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
.withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.build();
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
Iterator<HoodieRecord<T>> fileSliceReader = getFileSliceReader(baseFileReader, scanner, readerSchema,
tableConfig.getPayloadClass(),
tableConfig.getPreCombineField(),
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
tableConfig.getPartitionFieldProp())));
fileSliceReader.forEachRemaining(records::add);
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
return records;
}
/**
* Read records from baseFiles.
*/
private List<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
Iterator<IndexedRecord> recordIterator = baseFileReader.getRecordIterator(readerSchema);
recordIterator.forEachRemaining(record -> records.add(transform(record)));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
return records;
}
/**
* Transform IndexedRecord into HoodieRecord.
*/
private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
GenericRecord record = (GenericRecord) indexedRecord;
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
String key = KeyGenUtils.getRecordKeyFromGenericRecord(record, keyGeneratorOpt);
String partition = KeyGenUtils.getPartitionPathFromGenericRecord(record, keyGeneratorOpt);
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
return hoodieRecord;
}
}

View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Clustering Strategy based on following.
* 1) Java execution engine.
* 2) Uses bulk_insert to write data into new files.
*/
public class JavaSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
extends JavaExecutionStrategy<T> {
private static final Logger LOG = LogManager.getLogger(JavaSortAndSizeExecutionStrategy.class);
public JavaSortAndSizeExecutionStrategy(HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
@Override
public List<WriteStatus> performClusteringWithRecordList(
final List<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
return (List<WriteStatus>) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
}
}

View File

@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.Schema;
import java.util.List;
import java.util.stream.Collectors;
/**
* A partitioner that does sorting based on specified column values for Java client.
*
* @param <T> HoodieRecordPayload type
*/
public class JavaCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
private final String[] sortColumnNames;
private final Schema schema;
public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema) {
this.sortColumnNames = columnNames;
this.schema = schema;
}
@Override
public List<HoodieRecord<T>> repartitionRecords(
List<HoodieRecord<T>> records, int outputSparkPartitions) {
return records.stream().sorted((o1, o2) -> {
Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema);
Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema);
return values1.toString().compareTo(values2.toString());
}).collect(Collectors.toList());
}
@Override
public boolean arePartitionRecordsSorted() {
return true;
}
}

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,17 +40,24 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
import org.apache.hudi.table.action.cluster.JavaClusteringPlanActionExecutor;
import org.apache.hudi.table.action.cluster.JavaExecuteClusteringCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaMergeHelper;
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
@@ -57,10 +65,20 @@ import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieJavaTable<T> {
public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
extends HoodieJavaTable<T> implements HoodieCompactionHandler<T> {
private static final Logger LOG = LoggerFactory.getLogger(HoodieJavaCopyOnWriteTable.class);
protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
@@ -160,23 +178,23 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("ScheduleCompaction is not supported yet");
throw new HoodieNotSupportedException("ScheduleCompaction is not supported on a CopyOnWrite table");
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext context,
String compactionInstantTime) {
throw new HoodieNotSupportedException("Compact is not supported yet");
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
@Override
public Option<HoodieClusteringPlan> scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Clustering is not supported yet");
return new JavaClusteringPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> cluster(final HoodieEngineContext context, final String clusteringInstantTime) {
throw new HoodieNotSupportedException("Clustering is not supported yet");
return new JavaExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();
}
@Override
@@ -235,4 +253,53 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
return new CopyOnWriteRestoreActionExecutor(
context, config, this, restoreInstantTime, instantToRestore).execute();
}
@Override
public Iterator<List<WriteStatus>> handleUpdate(
String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile)
throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, instantTime, fileId);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String instantTime,
String fileId) throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
JavaMergeHelper.newInstance().runMerge(this, upsertHandle);
}
// TODO(yihua): This needs to be revisited
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
if (requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, Option.empty());
} else {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, Option.empty());
}
}
@Override
public Iterator<List<WriteStatus>> handleInsert(
String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle<?, ?, ?, ?> createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier);
createHandle.write();
return Collections.singletonList(createHandle.close()).iterator();
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -29,9 +30,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.compact.HoodieJavaMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.deltacommit.JavaUpsertPreppedDeltaCommitActionExecutor;
import java.util.List;
import java.util.Map;
public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends HoodieJavaCopyOnWriteTable<T> {
protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
@@ -60,4 +65,21 @@ public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends H
return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config,
this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
context, config, this, instantTime, extraMetadata,
new HoodieJavaMergeOnReadTableCompactor());
return scheduleCompactionExecutor.execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
context, config, this, compactionInstantTime, new HoodieJavaMergeOnReadTableCompactor(),
new HoodieJavaCopyOnWriteTable(config, context, getMetaClient()));
return convertMetadata(compactionExecutor.execute());
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -31,9 +32,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.JavaHoodieIndexFactory;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
import static org.apache.hudi.common.data.HoodieList.getList;
public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
@@ -61,6 +65,11 @@ public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
}
}
public static HoodieWriteMetadata<List<WriteStatus>> convertMetadata(
HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
return metadata.clone(getList(metadata.getWriteStatuses()));
}
@Override
protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return JavaHoodieIndexFactory.createIndex(config);

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.cluster;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
import java.util.Map;
public class JavaClusteringPlanActionExecutor<T extends HoodieRecordPayload> extends
BaseClusteringPlanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public JavaClusteringPlanActionExecutor(
HoodieEngineContext context, HoodieWriteConfig config,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String instantTime, Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
}

View File

@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.cluster;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;
import org.apache.avro.Schema;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class JavaExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseJavaCommitActionExecutor<T> {
private final HoodieClusteringPlan clusteringPlan;
public JavaExecuteClusteringCommitActionExecutor(
HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime) {
super(context, config, table, instantTime, WriteOperationType.CLUSTER);
this.clusteringPlan = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
.map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
"Unable to read clustering plan for instant: " + instantTime));
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieWriteMetadata<List<WriteStatus>> writeMetadata = (
(ClusteringExecutionStrategy<T, List<HoodieRecord<? extends HoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
List<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
List<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}
/**
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
* We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
*/
private void validateWriteResult(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
}
@Override
protected String getCommitActionType() {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
}
@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
.map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.filter(fg -> !newFilesWritten.contains(fg))
.collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
}
}

View File

@@ -126,13 +126,14 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
return result;
}
protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
protected List<WriteStatus> updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
Instant indexStartTime = Instant.now();
// Update the index back
List<WriteStatus> statuses = HoodieList.getList(
table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, table));
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
return statuses;
}
@Override

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.table.action.compact;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
/**
* Compacts a hoodie table with merge on read storage in Java engine. Computes all possible
* compactions, passes it through a CompactionFilter and executes all the compactions and
* writes a new version of base files and make a normal commit.
*/
public class HoodieJavaMergeOnReadTableCompactor<T extends HoodieRecordPayload>
extends HoodieCompactor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
@Override
public void preCompact(
HoodieTable table, HoodieTimeline pendingCompactionTimeline, String compactionInstantTime) {
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
table.rollbackInflightCompaction(inflightInstant);
table.getMetaClient().reloadActiveTimeline();
}
}
@Override
public void maybePersist(HoodieData<WriteStatus> writeStatus, HoodieWriteConfig config) {
// No OP
}
}

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.testutils.HoodieJavaClientTestBase;
import org.apache.avro.Schema;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestJavaBulkInsertInternalPartitioner extends HoodieJavaClientTestBase {
private static final Comparator<HoodieRecord> KEY_COMPARATOR =
Comparator.comparing(o -> (o.getPartitionPath() + "+" + o.getRecordKey()));
public static List<HoodieRecord> generateTestRecordsForBulkInsert(int numRecords) {
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGenerator.generateInserts("0", numRecords);
return records;
}
public static Map<String, Long> generatePartitionNumRecords(List<HoodieRecord> records) {
return records.stream().map(record -> record.getPartitionPath())
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
}
@ParameterizedTest
@ValueSource(strings = {"rider", "rider,driver"})
public void testCustomColumnSortPartitioner(String sortColumnString) throws Exception {
String[] sortColumns = sortColumnString.split(",");
Comparator<HoodieRecord> columnComparator =
getCustomColumnComparator(HoodieTestDataGenerator.AVRO_SCHEMA, sortColumns);
List<HoodieRecord> records = generateTestRecordsForBulkInsert(1000);
testBulkInsertInternalPartitioner(
new JavaCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA),
records, true, generatePartitionNumRecords(records), Option.of(columnComparator));
}
private Comparator<HoodieRecord> getCustomColumnComparator(Schema schema, String[] sortColumns) {
return Comparator.comparing(
record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema).toString());
}
private void verifyRecordAscendingOrder(List<HoodieRecord> records,
Option<Comparator<HoodieRecord>> comparator) {
List<HoodieRecord> expectedRecords = new ArrayList<>(records);
Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR));
assertEquals(expectedRecords, records);
}
private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner,
List<HoodieRecord> records,
boolean isSorted,
Map<String, Long> expectedPartitionNumRecords,
Option<Comparator<HoodieRecord>> comparator) {
List<HoodieRecord> actualRecords =
(List<HoodieRecord>) partitioner.repartitionRecords(records, 1);
if (isSorted) {
// Verify global order
verifyRecordAscendingOrder(actualRecords, comparator);
}
// Verify number of records per partition path
assertEquals(expectedPartitionNumRecords, generatePartitionNumRecords(actualRecords));
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -49,6 +50,7 @@ public class SparkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload
super(table, engineContext, writeConfig);
}
@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();

View File

@@ -18,18 +18,15 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
/**
* A partitioner that does sorting based on specified column values for each RDD partition.
@@ -57,7 +54,8 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
int outputSparkPartitions) {
final String[] sortColumns = this.sortColumnNames;
final SerializableSchema schema = this.serializableSchema;
return records.sortBy(record -> getRecordSortColumnValues(record, sortColumns, schema),
return records.sortBy(
record -> HoodieAvroUtils.getRecordColumnValues(record, sortColumns, schema),
true, outputSparkPartitions);
}
@@ -66,26 +64,6 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload>
return true;
}
private static Object getRecordSortColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
String[] sortColumns,
SerializableSchema schema) {
try {
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema.get()).get();
if (sortColumns.length == 1) {
return HoodieAvroUtils.getNestedFieldVal(genericRecord, sortColumns[0], true);
} else {
StringBuilder sb = new StringBuilder();
for (String col : sortColumns) {
sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true));
}
return sb.toString();
}
} catch (IOException e) {
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
}
}
private String[] getSortColumnName(HoodieWriteConfig config) {
return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
}

View File

@@ -18,20 +18,14 @@
package org.apache.hudi.table.action.cluster;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.util.Map;
@@ -40,8 +34,6 @@ import java.util.Map;
public class SparkClusteringPlanActionExecutor<T extends HoodieRecordPayload> extends
BaseClusteringPlanActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkClusteringPlanActionExecutor.class);
public SparkClusteringPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
@@ -49,33 +41,4 @@ public class SparkClusteringPlanActionExecutor<T extends HoodieRecordPayload> ex
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, extraMetadata);
}
@Override
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getCompletedReplaceTimeline().lastInstant();
int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
.countInstants();
if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling inline clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
}
if (config.isAsyncClusteringEnabled() && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
LOG.info("Not scheduling async clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getAsyncClusterMaxCommits());
return Option.empty();
}
LOG.info("Generating clustering plan for table " + config.getBasePath());
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), table, context, config);
return strategy.generateClusteringPlan();
}
}

View File

@@ -18,8 +18,10 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -229,10 +231,10 @@ public class HoodieAvroUtils {
public static Schema removeFields(Schema schema, List<String> fieldsToRemove) {
List<Schema.Field> filteredFields = schema.getFields()
.stream()
.filter(field -> !fieldsToRemove.contains(field.name()))
.map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
.collect(Collectors.toList());
.stream()
.filter(field -> !fieldsToRemove.contains(field.name()))
.map(field -> new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()))
.collect(Collectors.toList());
Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
filteredSchema.setFields(filteredFields);
return filteredSchema;
@@ -289,7 +291,7 @@ public class HoodieAvroUtils {
}
public static GenericRecord addHoodieKeyToRecord(GenericRecord record, String recordKey, String partitionPath,
String fileName) {
String fileName) {
record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileName);
record.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
@@ -551,7 +553,7 @@ public class HoodieAvroUtils {
} else if (fieldSchema.getType() == Schema.Type.BYTES) {
ByteBuffer byteBuffer = (ByteBuffer) fieldValue;
BigDecimal convertedValue = decimalConversion.fromBytes(byteBuffer, fieldSchema,
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
LogicalTypes.decimal(dc.getPrecision(), dc.getScale()));
byteBuffer.rewind();
return convertedValue;
}
@@ -570,9 +572,51 @@ public class HoodieAvroUtils {
* @return sanitized name
*/
public static String sanitizeName(String name) {
if (name.substring(0,1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
}
return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
}
/**
* Gets record column values into one object.
*
* @param record Hoodie record.
* @param columns Names of the columns to get values.
* @param schema {@link Schema} instance.
* @return Column value if a single column, or concatenated String values by comma.
*/
public static Object getRecordColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
String[] columns,
Schema schema) {
try {
GenericRecord genericRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
if (columns.length == 1) {
return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0], true);
} else {
StringBuilder sb = new StringBuilder();
for (String col : columns) {
sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true));
}
return sb.toString();
}
} catch (IOException e) {
throw new HoodieIOException("Unable to read record with key:" + record.getKey(), e);
}
}
/**
* Gets record column values into one object.
*
* @param record Hoodie record.
* @param columns Names of the columns to get values.
* @param schema {@link SerializableSchema} instance.
* @return Column value if a single column, or concatenated String values by comma.
*/
public static Object getRecordColumnValues(HoodieRecord<? extends HoodieRecordPayload> record,
String[] columns,
SerializableSchema schema) {
return getRecordColumnValues(record, columns, schema.get());
}
}

View File

@@ -106,7 +106,7 @@ to generate, with each batch containing a number of messages and idle time betwe
bash setupKafka.sh -n <num_kafka_messages_per_batch> -b <num_batches>
```
### 4 - Run the Sink connector worker (multiple workers can be run)
### 5 - Run the Sink connector worker (multiple workers can be run)
The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks)
that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with
@@ -120,7 +120,7 @@ cd $KAFKA_HOME
./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
```
### 5- To add the Hudi Sink to the Connector (delete it if you want to re-configure)
### 6 - To add the Hudi Sink to the Connector (delete it if you want to re-configure)
Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following
curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink,
@@ -170,4 +170,132 @@ total 5168
-rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
```
### 7 - Run async compaction and clustering if scheduled
When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is
running. Inline compaction and clustering are disabled by default due to performance reason. By default, async
compaction scheduling is enabled, and you can disable it by setting `hoodie.kafka.compaction.async.enable` to `false`.
Async clustering scheduling is disabled by default, and you can enable it by setting `hoodie.clustering.async.enabled`
to `true`.
The Sink only schedules the compaction and clustering if necessary and does not execute them for performance. You need
to execute the scheduled compaction and clustering using separate Spark jobs or Hudi CLI.
After the compaction is scheduled, you can see the requested compaction instant (`20211111111410.compaction.requested`)
below:
```
ls -l /tmp/hoodie/hudi-test-topic/.hoodie
total 280
-rw-r--r-- 1 user wheel 21172 Nov 11 11:09 20211111110807.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.requested
-rw-r--r-- 1 user wheel 22458 Nov 11 11:11 20211111110940.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:09 20211111110940.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:09 20211111110940.deltacommit.requested
-rw-r--r-- 1 user wheel 21445 Nov 11 11:13 20211111111110.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:11 20211111111110.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:11 20211111111110.deltacommit.requested
-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.requested
-rw-r--r-- 1 user wheel 9885 Nov 11 11:14 20211111111410.compaction.requested
-rw-r--r-- 1 user wheel 21192 Nov 11 11:15 20211111111411.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.requested
-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.requested
drwxr-xr-x 2 user wheel 64 Nov 11 11:08 archived
-rw-r--r-- 1 user wheel 387 Nov 11 11:08 hoodie.properties
```
Then you can run async compaction job with `HoodieCompactor` and `spark-submit` by:
```
spark-submit \
--class org.apache.hudi.utilities.HoodieCompactor \
hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar \
--base-path /tmp/hoodie/hudi-test-topic \
--table-name hudi-test-topic \
--schema-file /Users/user/repo/hudi/docker/demo/config/schema.avsc \
--instant-time 20211111111410 \
--parallelism 2 \
--spark-memory 1g
```
Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled
compaction is going to be executed.
Alternatively, you can use Hudi CLI to execute compaction:
```
hudi-> connect --path /tmp/hoodie/hudi-test-topic
hudi:hudi-test-topic-> compactions show all
╔═════════════════════════╤═══════════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╪═══════════╪═══════════════════════════════╣
║ 20211111111410 │ REQUESTED │ 12 ║
╚═════════════════════════╧═══════════╧═══════════════════════════════╝
compaction validate --instant 20211111111410
compaction run --compactionInstant 20211111111410 --parallelism 2 --schemaFilePath /Users/user/repo/hudi/docker/demo/config/schema.avsc
```
Similarly, you can see the requested clustering instant (`20211111111813.replacecommit.requested`) after it is scheduled
by the Sink:
```
ls -l /tmp/hoodie/hudi-test-topic/.hoodie
total 736
-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.requested
-rw-r--r-- 1 user wheel 18681 Nov 11 11:17 20211111111410.commit
-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111410.compaction.inflight
-rw-r--r-- 1 user wheel 9885 Nov 11 11:14 20211111111410.compaction.requested
-rw-r--r-- 1 user wheel 21192 Nov 11 11:15 20211111111411.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.requested
-rw-r--r-- 1 user wheel 22460 Nov 11 11:17 20211111111530.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.requested
-rw-r--r-- 1 user wheel 21357 Nov 11 11:18 20211111111711.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111711.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111711.deltacommit.requested
-rw-r--r-- 1 user wheel 6516 Nov 11 11:18 20211111111813.replacecommit.requested
-rw-r--r-- 1 user wheel 26070 Nov 11 11:20 20211111111815.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:18 20211111111815.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:18 20211111111815.deltacommit.requested
```
Then you can run async clustering job with `HoodieClusteringJob` and `spark-submit` by:
```
spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.10.0-SNAPSHOT.jar \
--props clusteringjob.properties \
--mode execute \
--base-path /tmp/hoodie/hudi-test-topic \
--table-name sample_table \
--instant-time 20211111111813 \
--spark-memory 1g
```
Sample `clusteringjob.properties`:
```
hoodie.datasource.write.recordkey.field=volume
hoodie.datasource.write.partitionpath.field=date
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/hudi-test-topic/versions/latest
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=volume
hoodie.write.concurrency.mode=single_writer
```
Note that you don't have to provide the instant time through `--instant-time`. In that case, the earliest scheduled
clustering is going to be executed.

View File

@@ -10,6 +10,7 @@
"topics": "hudi-test-topic",
"hoodie.table.name": "hudi-test-topic",
"hoodie.table.type": "MERGE_ON_READ",
"hoodie.metadata.enable": "false",
"hoodie.base.path": "file:///tmp/hoodie/hudi-test-topic",
"hoodie.datasource.write.recordkey.field": "volume",
"hoodie.datasource.write.partitionpath.field": "date",

View File

@@ -73,6 +73,11 @@ public class KafkaConnectConfigs extends HoodieConfig {
+ "the coordinator will wait for the write statuses from all the partitions"
+ "to ignore the current commit and start a new commit.");
public static final ConfigProperty<String> ASYNC_COMPACT_ENABLE = ConfigProperty
.key("hoodie.kafka.compaction.async.enable")
.defaultValue("true")
.withDocumentation("Controls whether async compaction should be turned on for MOR table writing.");
public static final ConfigProperty<String> META_SYNC_ENABLE = ConfigProperty
.key("hoodie.meta.sync.enable")
.defaultValue("false")
@@ -121,6 +126,10 @@ public class KafkaConnectConfigs extends HoodieConfig {
return getString(KAFKA_VALUE_CONVERTER);
}
public Boolean isAsyncCompactEnabled() {
return getBoolean(ASYNC_COMPACT_ENABLE);
}
public Boolean isMetaSyncEnabled() {
return getBoolean(META_SYNC_ENABLE);
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -54,6 +55,8 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
private final Option<HoodieTableMetaClient> tableMetaClient;
private final Configuration hadoopConf;
private final HoodieWriteConfig writeConfig;
private final KafkaConnectConfigs connectConfigs;
private final String tableBasePath;
private final String tableName;
private final HoodieEngineContext context;
@@ -61,8 +64,11 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
private final HoodieJavaWriteClient<HoodieAvroPayload> javaClient;
public KafkaConnectTransactionServices(KafkaConnectConfigs connectConfigs) throws HoodieException {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withProperties(connectConfigs.getProps()).build();
this.connectConfigs = connectConfigs;
this.writeConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA)
.withProperties(connectConfigs.getProps())
.build();
tableBasePath = writeConfig.getBasePath();
tableName = writeConfig.getTableName();
@@ -95,6 +101,7 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
}
}
@Override
public String startCommit() {
String newCommitTime = javaClient.startCommit();
javaClient.transitionInflight(newCommitTime);
@@ -102,11 +109,23 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
return newCommitTime;
}
@Override
public void endCommit(String commitTime, List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
javaClient.commit(commitTime, writeStatuses, Option.of(extraMetadata));
LOG.info("Ending Hudi commit " + commitTime);
// Schedule clustering and compaction as needed.
if (writeConfig.isAsyncClusteringEnabled()) {
javaClient.scheduleClustering(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled clustering at instant time:" + instantTs));
}
if (isAsyncCompactionEnabled()) {
javaClient.scheduleCompaction(Option.empty()).ifPresent(
instantTs -> LOG.info("Scheduled compaction at instant time:" + instantTs));
}
}
@Override
public Map<String, String> fetchLatestExtraCommitMetadata() {
if (tableMetaClient.isPresent()) {
Option<HoodieCommitMetadata> metadata = KafkaConnectUtils.getCommitMetadataForLatestInstant(tableMetaClient.get());
@@ -119,4 +138,10 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
}
throw new HoodieException("Fatal error retrieving Hoodie Extra Metadata since Table Meta Client is absent");
}
private boolean isAsyncCompactionEnabled() {
return tableMetaClient.isPresent()
&& HoodieTableType.MERGE_ON_READ.equals(tableMetaClient.get().getTableType())
&& connectConfigs.isAsyncCompactEnabled();
}
}

View File

@@ -164,7 +164,7 @@ public class HoodieCompactor {
// Get schema.
SparkRDDWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.of(cfg.strategyClassName), props);
if (cfg.compactionInstantTime == null) {
if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
throw new IllegalArgumentException("No instant time is provided for scheduling compaction. "
+ "Please specify the compaction instant time by using --instant-time.");
}