[HUDI-2189] Adding delete partitions support to DeltaStreamer (#4787)
This commit is contained in:
committed by
GitHub
parent
7e1ea06eb9
commit
14dbbdf4c7
@@ -585,6 +585,10 @@ public class DeltaSync implements Serializable {
|
|||||||
case INSERT_OVERWRITE_TABLE:
|
case INSERT_OVERWRITE_TABLE:
|
||||||
writeStatusRDD = writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses();
|
writeStatusRDD = writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses();
|
||||||
break;
|
break;
|
||||||
|
case DELETE_PARTITION:
|
||||||
|
List<String> partitions = records.map(record -> record.getPartitionPath()).distinct().collect();
|
||||||
|
writeStatusRDD = writeClient.deletePartitions(partitions, instantTime).getWriteStatuses();
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation);
|
throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -173,7 +173,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
|
|||||||
props.setProperty("include", "sql-transformer.properties");
|
props.setProperty("include", "sql-transformer.properties");
|
||||||
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
|
props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
|
||||||
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
|
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
|
||||||
props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
|
props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
|
||||||
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
|
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
|
||||||
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
|
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
|
||||||
|
|
||||||
|
|||||||
@@ -272,6 +272,19 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
assertEquals(expected, recordCount);
|
assertEquals(expected, recordCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Map<String, Long> getPartitionRecordCount(String basePath, SQLContext sqlContext) {
|
||||||
|
sqlContext.clearCache();
|
||||||
|
List<Row> rows = sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count().collectAsList();
|
||||||
|
Map<String, Long> partitionRecordCount = new HashMap<>();
|
||||||
|
rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1)));
|
||||||
|
return partitionRecordCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) {
|
||||||
|
sqlContext.clearCache();
|
||||||
|
assertEquals(0, sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate).count());
|
||||||
|
}
|
||||||
|
|
||||||
static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) {
|
static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) {
|
||||||
sqlContext.clearCache();
|
sqlContext.clearCache();
|
||||||
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
|
long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
|
||||||
@@ -1378,6 +1391,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
|
|
||||||
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
|
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
|
||||||
String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException {
|
String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException {
|
||||||
|
prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps,
|
||||||
|
"not_there");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
|
||||||
|
String propsFileName, String parquetSourceRoot, boolean addCommonProps,
|
||||||
|
String partitionPath) throws IOException {
|
||||||
// Properties used for testing delta-streamer with Parquet source
|
// Properties used for testing delta-streamer with Parquet source
|
||||||
TypedProperties parquetProps = new TypedProperties();
|
TypedProperties parquetProps = new TypedProperties();
|
||||||
|
|
||||||
@@ -1388,7 +1408,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
parquetProps.setProperty("include", "base.properties");
|
parquetProps.setProperty("include", "base.properties");
|
||||||
parquetProps.setProperty("hoodie.embed.timeline.server", "false");
|
parquetProps.setProperty("hoodie.embed.timeline.server", "false");
|
||||||
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
|
parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
|
||||||
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
|
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath);
|
||||||
if (useSchemaProvider) {
|
if (useSchemaProvider) {
|
||||||
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile);
|
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile);
|
||||||
if (hasTransformer) {
|
if (hasTransformer) {
|
||||||
@@ -1855,6 +1875,31 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
|
testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeletePartitions() throws Exception {
|
||||||
|
prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
|
||||||
|
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path");
|
||||||
|
String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
|
||||||
|
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
|
||||||
|
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
|
||||||
|
null, PROPS_FILENAME_TEST_PARQUET, false,
|
||||||
|
false, 100000, false, null, null, "timestamp", null), jsc);
|
||||||
|
deltaStreamer.sync();
|
||||||
|
TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
|
||||||
|
testNum++;
|
||||||
|
|
||||||
|
prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
|
||||||
|
prepareParquetDFSSource(false, false);
|
||||||
|
// set write operation to DELETE_PARTITION and add transformer to filter only for records with partition HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION
|
||||||
|
deltaStreamer = new HoodieDeltaStreamer(
|
||||||
|
TestHelpers.makeConfig(tableBasePath, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(),
|
||||||
|
Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), PROPS_FILENAME_TEST_PARQUET, false,
|
||||||
|
false, 100000, false, null, null, "timestamp", null), jsc);
|
||||||
|
deltaStreamer.sync();
|
||||||
|
// No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
|
||||||
|
TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
|
void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
|
||||||
// Initial insert
|
// Initial insert
|
||||||
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
|
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
|
||||||
@@ -2001,6 +2046,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class TestSpecificPartitionTransformer implements Transformer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
|
||||||
|
TypedProperties properties) {
|
||||||
|
Dataset<Row> toReturn = rowDataset.filter("partition_path == '" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'");
|
||||||
|
return toReturn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add new field evoluted_optional_union_field with value of the field rider.
|
* Add new field evoluted_optional_union_field with value of the field rider.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user