1. Minor changes to fix compaction 2. Adding 2 compaction policies
This commit is contained in:
committed by
vinoth chandar
parent
d1d33f725e
commit
2577014617
@@ -29,6 +29,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
|
||||
public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
|
||||
public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
|
||||
public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
|
||||
public static final String HBASE_ZK_ZNODEPARENT = "hoodie.index.hbase.zknode.path";
|
||||
/**
|
||||
* Note that if HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP is set to true, this batch size will not
|
||||
* be honored for HBase Puts
|
||||
@@ -133,6 +134,11 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder hbaseZkZnodeParent(String zkZnodeParent) {
|
||||
props.setProperty(HBASE_ZK_ZNODEPARENT, zkZnodeParent);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Method to set maximum QPS allowed per Region Server. This should be same across various
|
||||
|
||||
@@ -185,6 +185,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
||||
// filter the partition paths if needed to reduce list status
|
||||
partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
|
||||
|
||||
if (partitionPaths.isEmpty()) {
|
||||
// In case no partitions could be picked, return no compaction plan
|
||||
return null;
|
||||
}
|
||||
TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
|
||||
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
|
||||
List<HoodieCompactionOperation> operations =
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.io.compact.strategy;
|
||||
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
|
||||
/**
|
||||
* This strategy ensures that the last N partitions are picked up even if there are later partitions created for the
|
||||
* dataset. lastNPartitions is defined as the N partitions before the currentDate.
|
||||
* currentDay = 2018/01/01
|
||||
* The dataset has partitions for 2018/02/02 and 2018/03/03 beyond the currentDay
|
||||
* This strategy will pick up the following partitions for compaction :
|
||||
* (2018/01/01, allPartitionsInRange[(2018/01/01 - lastNPartitions) to 2018/01/01), 2018/02/02, 2018/03/03)
|
||||
*/
|
||||
public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionStrategy {
|
||||
|
||||
SimpleDateFormat dateFormat = new SimpleDateFormat(datePartitionFormat);
|
||||
|
||||
@Override
|
||||
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
|
||||
// The earliest partition to compact - current day minus the target partitions limit
|
||||
String earliestPartitionPathToCompact = dateFormat.format(DateUtils.addDays(new Date(), -1 * writeConfig
|
||||
.getTargetPartitionsPerDayBasedCompaction()));
|
||||
// Filter out all partitions greater than earliestPartitionPathToCompact
|
||||
List<HoodieCompactionOperation> eligibleCompactionOperations = operations.stream()
|
||||
.collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet().stream()
|
||||
.sorted(Map.Entry.comparingByKey(comparator))
|
||||
.filter(e -> comparator.compare(earliestPartitionPathToCompact, e.getKey()) >= 0)
|
||||
.flatMap(e -> e.getValue().stream())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return eligibleCompactionOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
|
||||
// The earliest partition to compact - current day minus the target partitions limit
|
||||
String earliestPartitionPathToCompact = dateFormat.format(DateUtils.addDays(new Date(), -1 * writeConfig
|
||||
.getTargetPartitionsPerDayBasedCompaction()));
|
||||
// Get all partitions and sort them
|
||||
List<String> filteredPartitionPaths = partitionPaths.stream().map(partition -> partition.replace("/", "-"))
|
||||
.sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
|
||||
.filter(e -> comparator.compare(earliestPartitionPathToCompact, e) >= 0)
|
||||
.collect(Collectors.toList());
|
||||
return filteredPartitionPaths;
|
||||
}
|
||||
}
|
||||
@@ -38,9 +38,9 @@ import java.util.stream.Collectors;
|
||||
public class DayBasedCompactionStrategy extends CompactionStrategy {
|
||||
|
||||
// For now, use SimpleDateFormat as default partition format
|
||||
private static String datePartitionFormat = "yyyy/MM/dd";
|
||||
protected static String datePartitionFormat = "yyyy/MM/dd";
|
||||
// Sorts compaction in LastInFirstCompacted order
|
||||
private static Comparator<String> comparator = (String leftPartition,
|
||||
protected static Comparator<String> comparator = (String leftPartition,
|
||||
String rightPartition) -> {
|
||||
try {
|
||||
Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH)
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.uber.hoodie.io.compact.strategy;
|
||||
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionOperation;
|
||||
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* UnBoundedPartitionAwareCompactionStrategy is a custom UnBounded Strategy.
|
||||
* This will filter all the partitions that are eligible to be compacted by a
|
||||
* {@link BoundedPartitionAwareCompactionStrategy} and return the result.
|
||||
* This is done so that a long running UnBoundedPartitionAwareCompactionStrategy does not step over partitions
|
||||
* in a shorter running BoundedPartitionAwareCompactionStrategy. Essentially, this is an inverse of the
|
||||
* partitions chosen in BoundedPartitionAwareCompactionStrategy
|
||||
*
|
||||
* @see CompactionStrategy
|
||||
*/
|
||||
public class UnBoundedPartitionAwareCompactionStrategy extends CompactionStrategy {
|
||||
|
||||
@Override
|
||||
public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig config,
|
||||
final List<HoodieCompactionOperation> operations, final List<HoodieCompactionPlan> pendingCompactionWorkloads) {
|
||||
BoundedPartitionAwareCompactionStrategy boundedPartitionAwareCompactionStrategy
|
||||
= new BoundedPartitionAwareCompactionStrategy();
|
||||
List<HoodieCompactionOperation> operationsToExclude = boundedPartitionAwareCompactionStrategy
|
||||
.orderAndFilter(config, operations, pendingCompactionWorkloads);
|
||||
List<HoodieCompactionOperation> allOperations = new ArrayList<>(operations);
|
||||
allOperations.removeAll(operationsToExclude);
|
||||
return allOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitionPaths) {
|
||||
List<String> allPartitionPaths = partitionPaths.stream().map(partition -> partition.replace("/", "-"))
|
||||
.sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
|
||||
.collect(Collectors.toList());
|
||||
BoundedPartitionAwareCompactionStrategy boundedPartitionAwareCompactionStrategy
|
||||
= new BoundedPartitionAwareCompactionStrategy();
|
||||
List<String> partitionsToExclude = boundedPartitionAwareCompactionStrategy.filterPartitionPaths(writeConfig,
|
||||
partitionPaths);
|
||||
allPartitionPaths.removeAll(partitionsToExclude);
|
||||
return allPartitionPaths;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user