[HUDI-3838] Implemented drop partition column feature for delta streamer code path (#5294)
* [HUDI-3838] Implemented drop partition column feature for delta streamer code path * Ensure drop partition table config is updated in hoodie.props Co-authored-by: Sagar Sumit <sagarsumit09@gmail.com>
This commit is contained in:
committed by
GitHub
parent
101b82a679
commit
d16740976e
@@ -441,6 +441,17 @@ public class HoodieAvroUtils {
|
|||||||
return records.stream().map(r -> rewriteRecord(r, newSchema)).collect(Collectors.toList());
|
return records.stream().map(r -> rewriteRecord(r, newSchema)).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given an Avro record and list of columns to remove, this method removes the list of columns from
|
||||||
|
* the given avro record using rewriteRecord method.
|
||||||
|
* <p>
|
||||||
|
* To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
|
||||||
|
*/
|
||||||
|
public static GenericRecord removeFields(GenericRecord record, List<String> fieldsToRemove) {
|
||||||
|
Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
|
||||||
|
return rewriteRecord(record, newSchema);
|
||||||
|
}
|
||||||
|
|
||||||
private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) {
|
private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) {
|
||||||
Schema oldSchema = oldRecord.getSchema();
|
Schema oldSchema = oldRecord.getSchema();
|
||||||
Object fieldValue = oldSchema.getField(field.name()) == null ? null : oldRecord.get(field.name());
|
Object fieldValue = oldSchema.getField(field.name()) == null ? null : oldRecord.get(field.name());
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import java.io.IOException;
|
|||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@@ -227,6 +228,35 @@ public class TestHoodieAvroUtils {
|
|||||||
assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA, schemaWithoutMetaCols.getFields().size());
|
assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA, schemaWithoutMetaCols.getFields().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveFields() {
|
||||||
|
// partitioned table test.
|
||||||
|
String schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
|
||||||
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"}]},";
|
||||||
|
Schema expectedSchema = new Schema.Parser().parse(schemaStr);
|
||||||
|
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
|
rec.put("_row_key", "key1");
|
||||||
|
rec.put("non_pii_col", "val1");
|
||||||
|
rec.put("pii_col", "val2");
|
||||||
|
rec.put("timestamp", 3.5);
|
||||||
|
GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col"));
|
||||||
|
assertEquals("key1", rec1.get("_row_key"));
|
||||||
|
assertEquals("val1", rec1.get("non_pii_col"));
|
||||||
|
assertEquals(3.5, rec1.get("timestamp"));
|
||||||
|
assertNull(rec1.get("pii_col"));
|
||||||
|
assertEquals(expectedSchema, rec1.getSchema());
|
||||||
|
|
||||||
|
// non-partitioned table test with empty list of fields.
|
||||||
|
schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
|
||||||
|
+ "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
|
||||||
|
expectedSchema = new Schema.Parser().parse(schemaStr);
|
||||||
|
rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList(""));
|
||||||
|
assertEquals(expectedSchema, rec1.getSchema());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetNestedFieldVal() {
|
public void testGetNestedFieldVal() {
|
||||||
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
|
|||||||
@@ -105,6 +105,7 @@ import java.util.stream.Collectors;
|
|||||||
import scala.collection.JavaConversions;
|
import scala.collection.JavaConversions;
|
||||||
|
|
||||||
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
|
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
|
||||||
|
import static org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
|
||||||
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
|
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
|
||||||
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
|
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
|
||||||
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
|
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
|
||||||
@@ -280,6 +281,7 @@ public class DeltaSync implements Serializable {
|
|||||||
.setPreCombineField(cfg.sourceOrderingField)
|
.setPreCombineField(cfg.sourceOrderingField)
|
||||||
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
|
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
|
||||||
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
|
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
|
||||||
|
.setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
|
||||||
.initTable(new Configuration(jssc.hadoopConfiguration()),
|
.initTable(new Configuration(jssc.hadoopConfiguration()),
|
||||||
cfg.targetBasePath);
|
cfg.targetBasePath);
|
||||||
}
|
}
|
||||||
@@ -375,6 +377,7 @@ public class DeltaSync implements Serializable {
|
|||||||
SimpleKeyGenerator.class.getName()))
|
SimpleKeyGenerator.class.getName()))
|
||||||
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
|
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
|
||||||
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
|
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
|
||||||
|
.setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
|
||||||
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
|
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -478,13 +481,14 @@ public class DeltaSync implements Serializable {
|
|||||||
|
|
||||||
boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
|
boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
|
||||||
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
|
||||||
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
|
JavaRDD<HoodieRecord> records = avroRDD.map(record -> {
|
||||||
|
GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, getPartitionColumns(keyGenerator, props)) : record;
|
||||||
HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
|
||||||
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean(
|
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean(
|
||||||
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
|
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
|
||||||
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
|
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
|
||||||
: DataSourceUtils.createPayload(cfg.payloadClassName, gr);
|
: DataSourceUtils.createPayload(cfg.payloadClassName, gr);
|
||||||
return new HoodieAvroRecord<>(keyGenerator.getKey(gr), payload);
|
return new HoodieAvroRecord<>(keyGenerator.getKey(record), payload);
|
||||||
});
|
});
|
||||||
|
|
||||||
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
|
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
|
||||||
@@ -727,6 +731,9 @@ public class DeltaSync implements Serializable {
|
|||||||
|
|
||||||
private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException {
|
private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException {
|
||||||
LOG.info("Setting up new Hoodie Write Client");
|
LOG.info("Setting up new Hoodie Write Client");
|
||||||
|
if (isDropPartitionColumns()) {
|
||||||
|
targetSchema = HoodieAvroUtils.removeFields(targetSchema, getPartitionColumns(keyGenerator, props));
|
||||||
|
}
|
||||||
registerAvroSchemas(sourceSchema, targetSchema);
|
registerAvroSchemas(sourceSchema, targetSchema);
|
||||||
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema);
|
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema);
|
||||||
if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
|
if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
|
||||||
@@ -898,4 +905,24 @@ public class DeltaSync implements Serializable {
|
|||||||
return Option.empty();
|
return Option.empty();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set based on hoodie.datasource.write.drop.partition.columns config.
|
||||||
|
* When set to true, will not write the partition columns into the table.
|
||||||
|
*/
|
||||||
|
private Boolean isDropPartitionColumns() {
|
||||||
|
return props.getBoolean(DROP_PARTITION_COLUMNS.key(), DROP_PARTITION_COLUMNS.defaultValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of partition columns as a list of strings.
|
||||||
|
*
|
||||||
|
* @param keyGenerator KeyGenerator
|
||||||
|
* @param props TypedProperties
|
||||||
|
* @return List of partition columns.
|
||||||
|
*/
|
||||||
|
private List<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
|
||||||
|
String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
|
||||||
|
return Arrays.asList(partitionColumns.split(","));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user