diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 1055bd522..9367e23dc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -441,6 +441,17 @@ public class HoodieAvroUtils {
return records.stream().map(r -> rewriteRecord(r, newSchema)).collect(Collectors.toList());
}
+ /**
+ * Given an Avro record and list of columns to remove, this method removes the list of columns from
+ * the given avro record using rewriteRecord method.
+ *
+ * To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
+ */
+ public static GenericRecord removeFields(GenericRecord record, List fieldsToRemove) {
+ Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
+ return rewriteRecord(record, newSchema);
+ }
+
private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) {
Schema oldSchema = oldRecord.getSchema();
Object fieldValue = oldSchema.getField(field.name()) == null ? null : oldRecord.get(field.name());
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 8c57dc84d..246d74411 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -227,6 +228,35 @@ public class TestHoodieAvroUtils {
assertEquals(NUM_FIELDS_IN_EXAMPLE_SCHEMA, schemaWithoutMetaCols.getFields().size());
}
+ @Test
+ public void testRemoveFields() {
+ // partitioned table test.
+ String schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ + "{\"name\": \"non_pii_col\", \"type\": \"string\"}]},";
+ Schema expectedSchema = new Schema.Parser().parse(schemaStr);
+ GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
+ rec.put("_row_key", "key1");
+ rec.put("non_pii_col", "val1");
+ rec.put("pii_col", "val2");
+ rec.put("timestamp", 3.5);
+ GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col"));
+ assertEquals("key1", rec1.get("_row_key"));
+ assertEquals("val1", rec1.get("non_pii_col"));
+ assertEquals(3.5, rec1.get("timestamp"));
+ assertNull(rec1.get("pii_col"));
+ assertEquals(expectedSchema, rec1.getSchema());
+
+ // non-partitioned table test with empty list of fields.
+ schemaStr = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ + "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
+ expectedSchema = new Schema.Parser().parse(schemaStr);
+ rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList(""));
+ assertEquals(expectedSchema, rec1.getSchema());
+ }
+
@Test
public void testGetNestedFieldVal() {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 0e57bd379..7a08d1542 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -105,6 +105,7 @@ import java.util.stream.Collectors;
import scala.collection.JavaConversions;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS;
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
@@ -280,6 +281,7 @@ public class DeltaSync implements Serializable {
.setPreCombineField(cfg.sourceOrderingField)
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
+ .setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
.initTable(new Configuration(jssc.hadoopConfiguration()),
cfg.targetBasePath);
}
@@ -375,6 +377,7 @@ public class DeltaSync implements Serializable {
SimpleKeyGenerator.class.getName()))
.setPartitionMetafileUseBaseFormat(props.getBoolean(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue()))
+ .setDropPartitionColumnsWhenWrite(isDropPartitionColumns())
.initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath);
}
@@ -478,13 +481,14 @@ public class DeltaSync implements Serializable {
boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
JavaRDD avroRDD = avroRDDOptional.get();
- JavaRDD records = avroRDD.map(gr -> {
+ JavaRDD records = avroRDD.map(record -> {
+ GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, getPartitionColumns(keyGenerator, props)) : record;
HoodieRecordPayload payload = shouldCombine ? DataSourceUtils.createPayload(cfg.payloadClassName, gr,
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, props.getBoolean(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
: DataSourceUtils.createPayload(cfg.payloadClassName, gr);
- return new HoodieAvroRecord<>(keyGenerator.getKey(gr), payload);
+ return new HoodieAvroRecord<>(keyGenerator.getKey(record), payload);
});
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
@@ -727,6 +731,9 @@ public class DeltaSync implements Serializable {
private void reInitWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
+ if (isDropPartitionColumns()) {
+ targetSchema = HoodieAvroUtils.removeFields(targetSchema, getPartitionColumns(keyGenerator, props));
+ }
registerAvroSchemas(sourceSchema, targetSchema);
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema);
if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
@@ -898,4 +905,24 @@ public class DeltaSync implements Serializable {
return Option.empty();
}
}
+
+ /**
+ * Set based on hoodie.datasource.write.drop.partition.columns config.
+ * When set to true, will not write the partition columns into the table.
+ */
+ private Boolean isDropPartitionColumns() {
+ return props.getBoolean(DROP_PARTITION_COLUMNS.key(), DROP_PARTITION_COLUMNS.defaultValue());
+ }
+
+ /**
+ * Get the list of partition columns as a list of strings.
+ *
+ * @param keyGenerator KeyGenerator
+ * @param props TypedProperties
+ * @return List of partition columns.
+ */
+ private List getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
+ String partitionColumns = HoodieSparkUtils.getPartitionColumns(keyGenerator, props);
+ return Arrays.asList(partitionColumns.split(","));
+ }
}