diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
index 48e8031c1..3c6f1cf8a 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieHBaseIndexConfig.java
@@ -29,6 +29,7 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
public static final String HBASE_ZKPORT_PROP = "hoodie.index.hbase.zkport";
public static final String HBASE_TABLENAME_PROP = "hoodie.index.hbase.table";
public static final String HBASE_GET_BATCH_SIZE_PROP = "hoodie.index.hbase.get.batch.size";
+ public static final String HBASE_ZK_ZNODEPARENT = "hoodie.index.hbase.zknode.path";
/**
* Note that if HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP is set to true, this batch size will not
* be honored for HBase Puts
@@ -133,6 +134,11 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder hbaseZkZnodeParent(String zkZnodeParent) {
+ props.setProperty(HBASE_ZK_ZNODEPARENT, zkZnodeParent);
+ return this;
+ }
+
/**
*
* Method to set maximum QPS allowed per Region Server. This should be same across various
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
index 9848631aa..b4abbca73 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
@@ -185,6 +185,10 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
// filter the partition paths if needed to reduce list status
partitionPaths = config.getCompactionStrategy().filterPartitionPaths(config, partitionPaths);
+ if (partitionPaths.isEmpty()) {
+ // In case no partitions could be picked, return no compaction plan
+ return null;
+ }
TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
List operations =
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedPartitionAwareCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
new file mode 100644
index 000000000..5a87a4e91
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io.compact.strategy;
+
+import com.uber.hoodie.avro.model.HoodieCompactionOperation;
+import com.uber.hoodie.avro.model.HoodieCompactionPlan;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.time.DateUtils;
+
+/**
+ * This strategy ensures that the last N partitions are picked up even if there are later partitions created for the
+ * dataset. lastNPartitions is defined as the N partitions before the currentDate.
+ * currentDay = 2018/01/01
+ * The dataset has partitions for 2018/02/02 and 2018/03/03 beyond the currentDay
+ * This strategy will pick up the following partitions for compaction :
+ * (2018/01/01, allPartitionsInRange[(2018/01/01 - lastNPartitions) to 2018/01/01), 2018/02/02, 2018/03/03)
+ */
+public class BoundedPartitionAwareCompactionStrategy extends DayBasedCompactionStrategy {
+
+ SimpleDateFormat dateFormat = new SimpleDateFormat(datePartitionFormat);
+
+ @Override
+ public List orderAndFilter(HoodieWriteConfig writeConfig,
+ List operations, List pendingCompactionPlans) {
+ // The earliest partition to compact - current day minus the target partitions limit
+ String earliestPartitionPathToCompact = dateFormat.format(DateUtils.addDays(new Date(), -1 * writeConfig
+ .getTargetPartitionsPerDayBasedCompaction()));
+ // Filter out all partitions greater than earliestPartitionPathToCompact
+ List eligibleCompactionOperations = operations.stream()
+ .collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet().stream()
+ .sorted(Map.Entry.comparingByKey(comparator))
+ .filter(e -> comparator.compare(earliestPartitionPathToCompact, e.getKey()) >= 0)
+ .flatMap(e -> e.getValue().stream())
+ .collect(Collectors.toList());
+
+ return eligibleCompactionOperations;
+ }
+
+ @Override
+ public List filterPartitionPaths(HoodieWriteConfig writeConfig, List partitionPaths) {
+ // The earliest partition to compact - current day minus the target partitions limit
+ String earliestPartitionPathToCompact = dateFormat.format(DateUtils.addDays(new Date(), -1 * writeConfig
+ .getTargetPartitionsPerDayBasedCompaction()));
+ // Get all partitions and sort them
+ List filteredPartitionPaths = partitionPaths.stream().map(partition -> partition.replace("/", "-"))
+ .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
+ .filter(e -> comparator.compare(earliestPartitionPathToCompact, e) >= 0)
+ .collect(Collectors.toList());
+ return filteredPartitionPaths;
+ }
+}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java
index ee4d9a5ac..574ad9657 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/DayBasedCompactionStrategy.java
@@ -38,9 +38,9 @@ import java.util.stream.Collectors;
public class DayBasedCompactionStrategy extends CompactionStrategy {
// For now, use SimpleDateFormat as default partition format
- private static String datePartitionFormat = "yyyy/MM/dd";
+ protected static String datePartitionFormat = "yyyy/MM/dd";
// Sorts compaction in LastInFirstCompacted order
- private static Comparator comparator = (String leftPartition,
+ protected static Comparator comparator = (String leftPartition,
String rightPartition) -> {
try {
Date left = new SimpleDateFormat(datePartitionFormat, Locale.ENGLISH)
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
new file mode 100644
index 000000000..c4c81de79
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io.compact.strategy;
+
+import com.uber.hoodie.avro.model.HoodieCompactionOperation;
+import com.uber.hoodie.avro.model.HoodieCompactionPlan;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * UnBoundedPartitionAwareCompactionStrategy is a custom UnBounded Strategy.
+ * This will filter all the partitions that are eligible to be compacted by a
+ * {@link BoundedPartitionAwareCompactionStrategy} and return the result.
+ * This is done so that a long running UnBoundedPartitionAwareCompactionStrategy does not step over partitions
+ * in a shorter running BoundedPartitionAwareCompactionStrategy. Essentially, this is an inverse of the
+ * partitions chosen in BoundedPartitionAwareCompactionStrategy
+ *
+ * @see CompactionStrategy
+ */
+public class UnBoundedPartitionAwareCompactionStrategy extends CompactionStrategy {
+
+ @Override
+ public List orderAndFilter(HoodieWriteConfig config,
+ final List operations, final List pendingCompactionWorkloads) {
+ BoundedPartitionAwareCompactionStrategy boundedPartitionAwareCompactionStrategy
+ = new BoundedPartitionAwareCompactionStrategy();
+ List operationsToExclude = boundedPartitionAwareCompactionStrategy
+ .orderAndFilter(config, operations, pendingCompactionWorkloads);
+ List allOperations = new ArrayList<>(operations);
+ allOperations.removeAll(operationsToExclude);
+ return allOperations;
+ }
+
+ @Override
+ public List filterPartitionPaths(HoodieWriteConfig writeConfig, List partitionPaths) {
+ List allPartitionPaths = partitionPaths.stream().map(partition -> partition.replace("/", "-"))
+ .sorted(Comparator.reverseOrder()).map(partitionPath -> partitionPath.replace("-", "/"))
+ .collect(Collectors.toList());
+ BoundedPartitionAwareCompactionStrategy boundedPartitionAwareCompactionStrategy
+ = new BoundedPartitionAwareCompactionStrategy();
+ List partitionsToExclude = boundedPartitionAwareCompactionStrategy.filterPartitionPaths(writeConfig,
+ partitionPaths);
+ allPartitionPaths.removeAll(partitionsToExclude);
+ return allPartitionPaths;
+ }
+}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java
index e3d96b292..9dfe5ba2e 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/io/strategy/TestHoodieCompactionStrategy.java
@@ -29,15 +29,20 @@ import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.io.compact.strategy.BoundedIOCompactionStrategy;
+import com.uber.hoodie.io.compact.strategy.BoundedPartitionAwareCompactionStrategy;
import com.uber.hoodie.io.compact.strategy.DayBasedCompactionStrategy;
import com.uber.hoodie.io.compact.strategy.LogFileSizeBasedCompactionStrategy;
import com.uber.hoodie.io.compact.strategy.UnBoundedCompactionStrategy;
+import com.uber.hoodie.io.compact.strategy.UnBoundedPartitionAwareCompactionStrategy;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.time.DateUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -109,7 +114,7 @@ public class TestHoodieCompactionStrategy {
}
@Test
- public void testPartitionAwareCompactionSimple() {
+ public void testDayBasedCompactionSimple() {
Map> sizesMap = Maps.newHashMap();
sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
sizesMap.put(110 * MB, Lists.newArrayList());
@@ -141,6 +146,96 @@ public class TestHoodieCompactionStrategy {
assertTrue("DayBasedCompactionStrategy should sort partitions in descending order", comparision >= 0);
}
+ @Test
+ public void testBoundedPartitionAwareCompactionSimple() {
+ Map> sizesMap = Maps.newHashMap();
+ sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
+ sizesMap.put(110 * MB, Lists.newArrayList());
+ sizesMap.put(100 * MB, Lists.newArrayList(MB));
+ sizesMap.put(70 * MB, Lists.newArrayList(MB));
+ sizesMap.put(80 * MB, Lists.newArrayList(MB));
+ sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
+
+ SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd");
+ Date today = new Date();
+ String currentDay = format.format(today);
+ String currentDayMinus1 = format.format(DateUtils.addDays(today, -1));
+ String currentDayMinus2 = format.format(DateUtils.addDays(today, -2));
+ String currentDayMinus3 = format.format(DateUtils.addDays(today, -3));
+ String currentDayPlus1 = format.format(DateUtils.addDays(today, 1));
+ String currentDayPlus5 = format.format(DateUtils.addDays(today, 5));
+
+ Map keyToPartitionMap = new ImmutableMap.Builder()
+ .put(120 * MB, currentDay)
+ .put(110 * MB, currentDayMinus1)
+ .put(100 * MB, currentDayMinus2)
+ .put(80 * MB, currentDayMinus3)
+ .put(90 * MB, currentDayPlus1)
+ .put(70 * MB, currentDayPlus5)
+ .build();
+
+ BoundedPartitionAwareCompactionStrategy strategy = new BoundedPartitionAwareCompactionStrategy();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+ HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy)
+ .withTargetPartitionsPerDayBasedCompaction(2)
+ .build()).build();
+ List operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
+ List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+
+ assertTrue("BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions",
+ returned.size() < operations.size());
+ Assert.assertEquals("BoundedPartitionAwareCompactionStrategy should have resulted in fewer compactions",
+ returned.size(), 5);
+
+ int comparision = strategy.getComparator().compare(returned.get(returned.size() - 1).getPartitionPath(), returned
+ .get(0).getPartitionPath());
+ // Either the partition paths are sorted in descending order or they are equal
+ assertTrue("BoundedPartitionAwareCompactionStrategy should sort partitions in descending order", comparision >= 0);
+ }
+
+ @Test
+ public void testUnboundedPartitionAwareCompactionSimple() {
+ Map> sizesMap = Maps.newHashMap();
+ sizesMap.put(120 * MB, Lists.newArrayList(60 * MB, 10 * MB, 80 * MB));
+ sizesMap.put(110 * MB, Lists.newArrayList());
+ sizesMap.put(100 * MB, Lists.newArrayList(MB));
+ sizesMap.put(80 * MB, Lists.newArrayList(MB));
+ sizesMap.put(70 * MB, Lists.newArrayList(MB));
+ sizesMap.put(90 * MB, Lists.newArrayList(1024 * MB));
+
+ SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd");
+ Date today = new Date();
+ String currentDay = format.format(today);
+ String currentDayMinus1 = format.format(DateUtils.addDays(today, -1));
+ String currentDayMinus2 = format.format(DateUtils.addDays(today, -2));
+ String currentDayMinus3 = format.format(DateUtils.addDays(today, -3));
+ String currentDayPlus1 = format.format(DateUtils.addDays(today, 1));
+ String currentDayPlus5 = format.format(DateUtils.addDays(today, 5));
+
+ Map keyToPartitionMap = new ImmutableMap.Builder()
+ .put(120 * MB, currentDay)
+ .put(110 * MB, currentDayMinus1)
+ .put(100 * MB, currentDayMinus2)
+ .put(80 * MB, currentDayMinus3)
+ .put(90 * MB, currentDayPlus1)
+ .put(70 * MB, currentDayPlus5)
+ .build();
+
+ UnBoundedPartitionAwareCompactionStrategy strategy = new UnBoundedPartitionAwareCompactionStrategy();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+ HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy)
+ .withTargetPartitionsPerDayBasedCompaction(2)
+ .build()).build();
+ List operations = createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
+ List returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+
+ assertTrue("UnBoundedPartitionAwareCompactionStrategy should not include last " + writeConfig
+ .getTargetPartitionsPerDayBasedCompaction() + " partitions or later partitions from today",
+ returned.size() < operations.size());
+ Assert.assertEquals("BoundedPartitionAwareCompactionStrategy should have resulted in 1 compaction",
+ returned.size(), 1);
+ }
+
private List createCompactionOperations(HoodieWriteConfig config,
Map> sizesMap) {
Map keyToPartitionMap = sizesMap.entrySet().stream().map(e ->
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
index 0aee2e86c..969de46c7 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java
@@ -136,7 +136,7 @@ public class HoodieCommitMetadata implements Serializable {
long totalFilesInsert = 0;
for (List stats : partitionToWriteStats.values()) {
for (HoodieWriteStat stat : stats) {
- if (stat.getPrevCommit() != null && stat.getPrevCommit().equals("null")) {
+ if (stat.getPrevCommit() != null && stat.getPrevCommit().equalsIgnoreCase("null")) {
totalFilesInsert++;
}
}
@@ -148,7 +148,7 @@ public class HoodieCommitMetadata implements Serializable {
long totalFilesUpdated = 0;
for (List stats : partitionToWriteStats.values()) {
for (HoodieWriteStat stat : stats) {
- if (stat.getPrevCommit() != null && !stat.getPrevCommit().equals("null")) {
+ if (stat.getPrevCommit() != null && !stat.getPrevCommit().equalsIgnoreCase("null")) {
totalFilesUpdated++;
}
}
@@ -170,8 +170,8 @@ public class HoodieCommitMetadata implements Serializable {
long totalInsertRecordsWritten = 0;
for (List stats : partitionToWriteStats.values()) {
for (HoodieWriteStat stat : stats) {
- if (stat.getPrevCommit() != null && stat.getPrevCommit().equals("null")) {
- totalInsertRecordsWritten += stat.getNumWrites();
+ if (stat.getPrevCommit() != null && stat.getPrevCommit().equalsIgnoreCase("null")) {
+ totalInsertRecordsWritten += stat.getNumInserts();
}
}
}