[HUDI-1860] Add INSERT_OVERWRITE and INSERT_OVERWRITE_TABLE support to DeltaStreamer (#3184)
This commit is contained in:
@@ -1723,6 +1723,40 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertOverwrite() throws Exception {
|
||||
testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite", WriteOperationType.INSERT_OVERWRITE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInsertOverwriteTable() throws Exception {
|
||||
testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
|
||||
}
|
||||
|
||||
void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
|
||||
// Initial insert
|
||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
|
||||
new HoodieDeltaStreamer(cfg, jsc).sync();
|
||||
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
|
||||
|
||||
// setting the operationType
|
||||
cfg.operation = operationType;
|
||||
// No new data => no commits.
|
||||
cfg.sourceLimit = 0;
|
||||
new HoodieDeltaStreamer(cfg, jsc).sync();
|
||||
TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
TestHelpers.assertDistanceCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
|
||||
|
||||
cfg.sourceLimit = 1000;
|
||||
new HoodieDeltaStreamer(cfg, jsc).sync();
|
||||
TestHelpers.assertRecordCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
TestHelpers.assertDistanceCount(1950, tableBasePath + "/*/*.parquet", sqlContext);
|
||||
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* UDF to calculate Haversine distance.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user