|
|
|
|
@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.DFSPropertiesConfiguration;
|
|
|
|
|
import org.apache.hudi.common.util.Option;
|
|
|
|
|
import org.apache.hudi.common.util.TypedProperties;
|
|
|
|
|
import org.apache.hudi.config.HoodieCompactionConfig;
|
|
|
|
|
import org.apache.hudi.exception.DatasetNotFoundException;
|
|
|
|
|
import org.apache.hudi.exception.TableNotFoundException;
|
|
|
|
|
import org.apache.hudi.exception.HoodieException;
|
|
|
|
|
import org.apache.hudi.hive.HiveSyncConfig;
|
|
|
|
|
import org.apache.hudi.hive.HoodieHiveClient;
|
|
|
|
|
@@ -219,49 +219,49 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
return cfg;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void assertRecordCount(long expected, String datasetPath, SQLContext sqlContext) {
|
|
|
|
|
long recordCount = sqlContext.read().format("org.apache.hudi").load(datasetPath).count();
|
|
|
|
|
static void assertRecordCount(long expected, String tablePath, SQLContext sqlContext) {
|
|
|
|
|
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).count();
|
|
|
|
|
assertEquals(expected, recordCount);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static List<Row> countsPerCommit(String datasetPath, SQLContext sqlContext) {
|
|
|
|
|
return sqlContext.read().format("org.apache.hudi").load(datasetPath).groupBy("_hoodie_commit_time").count()
|
|
|
|
|
static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
|
|
|
|
|
return sqlContext.read().format("org.apache.hudi").load(tablePath).groupBy("_hoodie_commit_time").count()
|
|
|
|
|
.sort("_hoodie_commit_time").collectAsList();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void assertDistanceCount(long expected, String datasetPath, SQLContext sqlContext) {
|
|
|
|
|
sqlContext.read().format("org.apache.hudi").load(datasetPath).registerTempTable("tmp_trips");
|
|
|
|
|
static void assertDistanceCount(long expected, String tablePath, SQLContext sqlContext) {
|
|
|
|
|
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
|
|
|
|
|
long recordCount =
|
|
|
|
|
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance is not NULL").count();
|
|
|
|
|
assertEquals(expected, recordCount);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void assertDistanceCountWithExactValue(long expected, String datasetPath, SQLContext sqlContext) {
|
|
|
|
|
sqlContext.read().format("org.apache.hudi").load(datasetPath).registerTempTable("tmp_trips");
|
|
|
|
|
static void assertDistanceCountWithExactValue(long expected, String tablePath, SQLContext sqlContext) {
|
|
|
|
|
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
|
|
|
|
|
long recordCount =
|
|
|
|
|
sqlContext.sparkSession().sql("select * from tmp_trips where haversine_distance = 1.0").count();
|
|
|
|
|
assertEquals(expected, recordCount);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void assertAtleastNCompactionCommits(int minExpected, String datasetPath, FileSystem fs) {
|
|
|
|
|
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
|
|
|
|
|
static void assertAtleastNCompactionCommits(int minExpected, String tablePath, FileSystem fs) {
|
|
|
|
|
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath);
|
|
|
|
|
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
|
|
|
|
|
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
|
|
|
|
int numCompactionCommits = (int) timeline.getInstants().count();
|
|
|
|
|
assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void assertAtleastNDeltaCommits(int minExpected, String datasetPath, FileSystem fs) {
|
|
|
|
|
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
|
|
|
|
|
static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) {
|
|
|
|
|
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath);
|
|
|
|
|
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
|
|
|
|
|
LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
|
|
|
|
|
int numDeltaCommits = (int) timeline.getInstants().count();
|
|
|
|
|
assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static String assertCommitMetadata(String expected, String datasetPath, FileSystem fs, int totalCommits)
|
|
|
|
|
static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits)
|
|
|
|
|
throws IOException {
|
|
|
|
|
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
|
|
|
|
|
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), tablePath);
|
|
|
|
|
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
|
|
|
|
HoodieInstant lastInstant = timeline.lastInstant().get();
|
|
|
|
|
HoodieCommitMetadata commitMetadata =
|
|
|
|
|
@@ -302,9 +302,9 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
@Test
|
|
|
|
|
public void testPropsWithInvalidKeyGenerator() throws Exception {
|
|
|
|
|
try {
|
|
|
|
|
String datasetBasePath = dfsBasePath + "/test_dataset";
|
|
|
|
|
String tableBasePath = dfsBasePath + "/test_table";
|
|
|
|
|
HoodieDeltaStreamer deltaStreamer =
|
|
|
|
|
new HoodieDeltaStreamer(TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT,
|
|
|
|
|
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
|
|
|
|
TripsWithDistanceTransformer.class.getName(), PROPS_FILENAME_TEST_INVALID, false), jsc);
|
|
|
|
|
deltaStreamer.sync();
|
|
|
|
|
fail("Should error out when setting the key generator class property to an invalid value");
|
|
|
|
|
@@ -316,45 +316,45 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testDatasetCreation() throws Exception {
|
|
|
|
|
public void testTableCreation() throws Exception {
|
|
|
|
|
try {
|
|
|
|
|
dfs.mkdirs(new Path(dfsBasePath + "/not_a_dataset"));
|
|
|
|
|
dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
|
|
|
|
|
HoodieDeltaStreamer deltaStreamer =
|
|
|
|
|
new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_dataset", Operation.BULK_INSERT), jsc);
|
|
|
|
|
new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + "/not_a_table", Operation.BULK_INSERT), jsc);
|
|
|
|
|
deltaStreamer.sync();
|
|
|
|
|
fail("Should error out when pointed out at a dir thats not a dataset");
|
|
|
|
|
} catch (DatasetNotFoundException e) {
|
|
|
|
|
fail("Should error out when pointed out at a dir thats not a table");
|
|
|
|
|
} catch (TableNotFoundException e) {
|
|
|
|
|
// expected
|
|
|
|
|
LOG.error("Expected error during dataset creation", e);
|
|
|
|
|
LOG.error("Expected error during table creation", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testBulkInsertsAndUpserts() throws Exception {
|
|
|
|
|
String datasetBasePath = dfsBasePath + "/test_dataset";
|
|
|
|
|
String tableBasePath = dfsBasePath + "/test_table";
|
|
|
|
|
|
|
|
|
|
// Initial bulk insert
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT);
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT);
|
|
|
|
|
new HoodieDeltaStreamer(cfg, jsc).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
|
|
|
|
|
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
|
|
|
|
|
|
|
|
|
|
// No new data => no commits.
|
|
|
|
|
cfg.sourceLimit = 0;
|
|
|
|
|
new HoodieDeltaStreamer(cfg, jsc).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
|
|
|
|
|
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
|
|
|
|
|
|
|
|
|
|
// upsert() #1
|
|
|
|
|
cfg.sourceLimit = 2000;
|
|
|
|
|
cfg.operation = Operation.UPSERT;
|
|
|
|
|
new HoodieDeltaStreamer(cfg, jsc).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
|
|
|
|
|
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
|
|
|
|
|
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -369,12 +369,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
|
|
|
|
|
String datasetBasePath = dfsBasePath + "/" + tempDir;
|
|
|
|
|
String tableBasePath = dfsBasePath + "/" + tempDir;
|
|
|
|
|
// Keep it higher than batch-size to test continuous mode
|
|
|
|
|
int totalRecords = 3000;
|
|
|
|
|
|
|
|
|
|
// Initial bulk insert
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.UPSERT);
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT);
|
|
|
|
|
cfg.continuousMode = true;
|
|
|
|
|
cfg.storageType = tableType.name();
|
|
|
|
|
cfg.configs.add(String.format("%s=%d", TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
|
|
|
|
@@ -390,13 +390,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
|
|
|
|
|
TestHelpers.waitTillCondition((r) -> {
|
|
|
|
|
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
|
|
|
|
|
TestHelpers.assertAtleastNDeltaCommits(5, datasetBasePath, dfs);
|
|
|
|
|
TestHelpers.assertAtleastNCompactionCommits(2, datasetBasePath, dfs);
|
|
|
|
|
TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs);
|
|
|
|
|
TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
|
|
|
|
|
} else {
|
|
|
|
|
TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs);
|
|
|
|
|
TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs);
|
|
|
|
|
}
|
|
|
|
|
TestHelpers.assertRecordCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertRecordCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(totalRecords + 200, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
return true;
|
|
|
|
|
}, 180);
|
|
|
|
|
ds.shutdownGracefully();
|
|
|
|
|
@@ -410,71 +410,71 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
*/
|
|
|
|
|
@Test
|
|
|
|
|
public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws Exception {
|
|
|
|
|
String datasetBasePath = dfsBasePath + "/test_dataset2";
|
|
|
|
|
String downstreamDatasetBasePath = dfsBasePath + "/test_downstream_dataset2";
|
|
|
|
|
String tableBasePath = dfsBasePath + "/test_table2";
|
|
|
|
|
String downstreamTableBasePath = dfsBasePath + "/test_downstream_table2";
|
|
|
|
|
|
|
|
|
|
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(datasetBasePath, "hive_trips");
|
|
|
|
|
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
|
|
|
|
|
|
|
|
|
|
// Initial bulk insert to ingest to first hudi table
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT,
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
|
|
|
|
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true);
|
|
|
|
|
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
|
|
|
|
|
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
|
|
|
|
|
|
|
|
|
|
// Now incrementally pull from the above hudi table and ingest to second table
|
|
|
|
|
HoodieDeltaStreamer.Config downstreamCfg =
|
|
|
|
|
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.BULK_INSERT,
|
|
|
|
|
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, Operation.BULK_INSERT,
|
|
|
|
|
true, null);
|
|
|
|
|
new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 1);
|
|
|
|
|
TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, dfs, 1);
|
|
|
|
|
|
|
|
|
|
// No new data => no commits for upstream table
|
|
|
|
|
cfg.sourceLimit = 0;
|
|
|
|
|
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
|
|
|
|
|
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
|
|
|
|
|
|
|
|
|
|
// with no change in upstream table, no change in downstream too when pulled.
|
|
|
|
|
HoodieDeltaStreamer.Config downstreamCfg1 =
|
|
|
|
|
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath,
|
|
|
|
|
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath,
|
|
|
|
|
Operation.BULK_INSERT, true, DummySchemaProvider.class.getName());
|
|
|
|
|
new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 1);
|
|
|
|
|
TestHelpers.assertRecordCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, dfs, 1);
|
|
|
|
|
|
|
|
|
|
// upsert() #1 on upstream hudi table
|
|
|
|
|
cfg.sourceLimit = 2000;
|
|
|
|
|
cfg.operation = Operation.UPSERT;
|
|
|
|
|
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1950, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
|
|
|
|
|
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(1950, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
|
|
|
|
|
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
|
|
|
|
|
|
|
|
|
|
// Incrementally pull changes in upstream hudi table and apply to downstream table
|
|
|
|
|
downstreamCfg =
|
|
|
|
|
TestHelpers.makeConfigForHudiIncrSrc(datasetBasePath, downstreamDatasetBasePath, Operation.UPSERT,
|
|
|
|
|
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, downstreamTableBasePath, Operation.UPSERT,
|
|
|
|
|
false, null);
|
|
|
|
|
downstreamCfg.sourceLimit = 2000;
|
|
|
|
|
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(2000, downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertRecordCount(2000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCount(2000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertDistanceCountWithExactValue(2000, downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
String finalInstant =
|
|
|
|
|
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 2);
|
|
|
|
|
counts = TestHelpers.countsPerCommit(downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, dfs, 2);
|
|
|
|
|
counts = TestHelpers.countsPerCommit(downstreamTableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
assertEquals(2000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
|
|
|
|
|
|
|
|
|
|
// Test Hive integration
|
|
|
|
|
@@ -488,8 +488,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testNullSchemaProvider() throws Exception {
|
|
|
|
|
String dataSetBasePath = dfsBasePath + "/test_dataset";
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, Operation.BULK_INSERT,
|
|
|
|
|
String tableBasePath = dfsBasePath + "/test_table";
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT,
|
|
|
|
|
SqlQueryBasedTransformer.class.getName(), PROPS_FILENAME_TEST_SOURCE, true,
|
|
|
|
|
false);
|
|
|
|
|
try {
|
|
|
|
|
@@ -503,37 +503,37 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testFilterDupes() throws Exception {
|
|
|
|
|
String datasetBasePath = dfsBasePath + "/test_dupes_dataset";
|
|
|
|
|
String tableBasePath = dfsBasePath + "/test_dupes_table";
|
|
|
|
|
|
|
|
|
|
// Initial bulk insert
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, Operation.BULK_INSERT);
|
|
|
|
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.BULK_INSERT);
|
|
|
|
|
new HoodieDeltaStreamer(cfg, jsc).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(1000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00000", datasetBasePath, dfs, 1);
|
|
|
|
|
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
|
|
|
|
|
|
|
|
|
|
// Generate the same 1000 records + 1000 new ones for upsert
|
|
|
|
|
cfg.filterDupes = true;
|
|
|
|
|
cfg.sourceLimit = 2000;
|
|
|
|
|
cfg.operation = Operation.UPSERT;
|
|
|
|
|
new HoodieDeltaStreamer(cfg, jsc).sync();
|
|
|
|
|
TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
|
|
|
|
|
TestHelpers.assertRecordCount(2000, tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
|
|
|
|
|
// 1000 records for commit 00000 & 1000 for commit 00001
|
|
|
|
|
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
|
|
|
|
|
assertEquals(1000, counts.get(0).getLong(1));
|
|
|
|
|
assertEquals(1000, counts.get(1).getLong(1));
|
|
|
|
|
|
|
|
|
|
// Test with empty commits
|
|
|
|
|
HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), datasetBasePath, true);
|
|
|
|
|
HoodieTableMetaClient mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
|
|
|
|
|
HoodieInstant lastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
|
|
|
|
|
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(datasetBasePath, Operation.UPSERT);
|
|
|
|
|
HoodieDeltaStreamer.Config cfg2 = TestHelpers.makeDropAllConfig(tableBasePath, Operation.UPSERT);
|
|
|
|
|
cfg2.filterDupes = true;
|
|
|
|
|
cfg2.sourceLimit = 2000;
|
|
|
|
|
cfg2.operation = Operation.UPSERT;
|
|
|
|
|
cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP));
|
|
|
|
|
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
|
|
|
|
|
ds2.sync();
|
|
|
|
|
mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), datasetBasePath, true);
|
|
|
|
|
mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
|
|
|
|
|
HoodieInstant newLastFinished = mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
|
|
|
|
|
Assert.assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), lastFinished.getTimestamp(),
|
|
|
|
|
HoodieTimeline.GREATER));
|
|
|
|
|
@@ -599,7 +599,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Return empty dataset.
|
|
|
|
|
* Return empty table.
|
|
|
|
|
*/
|
|
|
|
|
public static class DropAllTransformer implements Transformer {
|
|
|
|
|
|
|
|
|
|
|