1
0

Add --filter-dupes to DeltaStreamer

- Optionally filter out duplicates before inserting data
 - Unit tests
This commit is contained in:
Vinoth Chandar
2018-10-03 18:02:09 +01:00
committed by vinoth chandar
parent 0a200c32e5
commit 1fca9b21cc
3 changed files with 65 additions and 1 deletions

View File

@@ -26,6 +26,7 @@ import com.uber.hoodie.common.util.TypedProperties;
import com.uber.hoodie.config.HoodieCompactionConfig;
import com.uber.hoodie.config.HoodieIndexConfig;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.index.HoodieIndex;
@@ -142,4 +143,19 @@ public class DataSourceUtils {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
return new HoodieRecord<>(hKey, payload);
}
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig) throws Exception {
try {
HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
} catch (DatasetNotFoundException e) {
// this will be executed when there is no hoodie dataset yet
// so no dups to drop
return incomingHoodieRecords;
}
}
}

View File

@@ -179,8 +179,20 @@ public class HoodieDeltaStreamer implements Serializable {
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});
// Perform the write
// filter dupes if needed
HoodieWriteConfig hoodieCfg = getHoodieClientConfig();
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg);
}
if (records.isEmpty()) {
log.info("No new data, nothing to commit.. ");
return;
}
// Perform the write
HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg);
String commitTime = client.startCommit();
log.info("Starting commit : " + commitTime);
@@ -285,6 +297,10 @@ public class HoodieDeltaStreamer implements Serializable {
converter = OperationConvertor.class)
public Operation operation = Operation.UPSERT;
@Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out"
+ "before insert/bulk-insert")
public Boolean filterDupes = false;
@Parameter(names = {"--spark-master"}, description = "spark master to use.")
public String sparkMaster = "local[2]";

View File

@@ -32,10 +32,12 @@ import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import com.uber.hoodie.utilities.sources.TestDataSource;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.AfterClass;
@@ -103,6 +105,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
assertEquals(expected, recordCount);
}
static List<Row> countsPerCommit(String datasetPath, SQLContext sqlContext) {
return sqlContext.read().format("com.uber.hoodie").load(datasetPath).groupBy("_hoodie_commit_time").count()
.sort("_hoodie_commit_time").collectAsList();
}
static void assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits)
throws IOException {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
@@ -159,5 +166,30 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(2000, counts.get(0).getLong(1));
}
@Test
public void testFilterDupes() throws Exception {
String datasetBasePath = dfsBasePath + "/test_dupes_dataset";
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT);
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
// Generate the same 1000 records + 1000 new ones for upsert
cfg.filterDupes = true;
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
// 1000 records for commit 00000 & 1000 for commit 00001
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
assertEquals(1000, counts.get(0).getLong(1));
assertEquals(1000, counts.get(1).getLong(1));
}
}