1
0

[HUDI-912] Refactor and relocate KeyGenerator to support more engines (#2200)

* [HUDI-912] Refactor and relocate KeyGenerator to support more engines

* Rename KeyGenerators
This commit is contained in:
wangxianghu
2020-11-03 05:12:51 +08:00
committed by GitHub
parent 59f995a3f5
commit d160abb437
38 changed files with 1038 additions and 524 deletions

View File

@@ -50,6 +50,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.exception;
/**
* Exception thrown for any higher level errors when {@link org.apache.hudi.keygen.KeyGeneratorInterface} is generating
* a {@link org.apache.hudi.common.model.HoodieKey}.
*/
public class HoodieKeyGeneratorException extends HoodieException {
public HoodieKeyGeneratorException(String msg, Throwable e) {
super(msg, e);
}
public HoodieKeyGeneratorException(String msg) {
super(msg);
}
}

View File

@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.List;
import java.util.stream.Collectors;
public abstract class BaseKeyGenerator extends KeyGenerator {
protected List<String> recordKeyFields;
protected List<String> partitionPathFields;
protected final boolean encodePartitionPath;
protected final boolean hiveStylePartitioning;
protected BaseKeyGenerator(TypedProperties config) {
super(config);
this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY,
Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL));
this.hiveStylePartitioning = config.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY,
Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL));
}
/**
* Generate a record Key out of provided generic record.
*/
public abstract String getRecordKey(GenericRecord record);
/**
* Generate a partition path out of provided generic record.
*/
public abstract String getPartitionPath(GenericRecord record);
/**
* Generate a Hoodie Key out of provided generic record.
*/
@Override
public final HoodieKey getKey(GenericRecord record) {
if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
}
return new HoodieKey(getRecordKey(record), getPartitionPath(record));
}
@Override
public final List<String> getRecordKeyFieldNames() {
// For nested columns, pick top level column name
return getRecordKeyFields().stream().map(k -> {
int idx = k.indexOf('.');
return idx > 0 ? k.substring(0, idx) : k;
}).collect(Collectors.toList());
}
public List<String> getRecordKeyFields() {
return recordKeyFields;
}
public List<String> getPartitionPathFields() {
return partitionPathFields;
}
}

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
public ComplexAvroKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
}
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
}
@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
}
}

View File

@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;
/**
* This is a generic implementation of KeyGenerator where users can configure record key as a single field or a combination of fields. Similarly partition path can be configured to have multiple
* fields or only one field. This class expects value for prop "hoodie.datasource.write.partitionpath.field" in a specific format. For example:
* <p>
* properties.put("hoodie.datasource.write.partitionpath.field", "field1:PartitionKeyType1,field2:PartitionKeyType2").
* <p>
* The complete partition path is created as <value for field1 basis PartitionKeyType1>/<value for field2 basis PartitionKeyType2> and so on.
* <p>
* Few points to consider: 1. If you want to customize some partition path field on a timestamp basis, you can use field1:timestampBased 2. If you simply want to have the value of your configured
* field in the partition path, use field1:simple 3. If you want your table to be non partitioned, simply leave it as blank.
* <p>
* RecordKey is internally generated using either SimpleKeyGenerator or ComplexKeyGenerator.
*/
public class CustomAvroKeyGenerator extends BaseKeyGenerator {
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
private static final String SPLIT_REGEX = ":";
/**
* Used as a part of config in CustomKeyGenerator.java.
*/
public enum PartitionKeyType {
SIMPLE, TIMESTAMP
}
public CustomAvroKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
}
@Override
public String getPartitionPath(GenericRecord record) {
if (getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for partition path in cfg");
}
String partitionPathField;
StringBuilder partitionPath = new StringBuilder();
//Corresponds to no partition case
if (getPartitionPathFields().size() == 1 && getPartitionPathFields().get(0).isEmpty()) {
return "";
}
for (String field : getPartitionPathFields()) {
String[] fieldWithType = field.split(SPLIT_REGEX);
if (fieldWithType.length != 2) {
throw new HoodieKeyException("Unable to find field names for partition path in proper format");
}
partitionPathField = fieldWithType[0];
PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
switch (keyType) {
case SIMPLE:
partitionPath.append(new SimpleAvroKeyGenerator(config, partitionPathField).getPartitionPath(record));
break;
case TIMESTAMP:
try {
partitionPath.append(new TimestampBasedAvroKeyGenerator(config, partitionPathField).getPartitionPath(record));
} catch (IOException e) {
throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class");
}
break;
default:
throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
}
partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}
@Override
public String getRecordKey(GenericRecord record) {
validateRecordKeyFields();
return getRecordKeyFields().size() == 1
? new SimpleAvroKeyGenerator(config).getRecordKey(record)
: new ComplexAvroKeyGenerator(config).getRecordKey(record);
}
private void validateRecordKeyFields() {
if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
throw new HoodieKeyException("Unable to find field names for record key in cfg");
}
}
public String getDefaultPartitionPathSeparator() {
return DEFAULT_PARTITION_PATH_SEPARATOR;
}
public String getSplitRegex() {
return SPLIT_REGEX;
}
}

View File

@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Avro Key generator for deletes using global indices. Global index deletes do not require partition value so this key generator
* avoids using partition value for generating HoodieKey.
*/
public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
private static final String EMPTY_PARTITION = "";
public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
super(config);
this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(","));
}
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
}
@Override
public String getPartitionPath(GenericRecord record) {
return EMPTY_PARTITION;
}
@Override
public List<String> getPartitionPathFields() {
return new ArrayList<>();
}
public String getEmptyPartition() {
return EMPTY_PARTITION;
}
}

View File

@@ -18,14 +18,20 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
public class KeyGenUtils {
@@ -111,4 +117,23 @@ public class KeyGenUtils {
}
return partitionPath;
}
/**
* Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed.
*/
public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
try {
return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
} catch (Throwable e) {
throw new IOException("Could not load date time parser class " + parserClass, e);
}
}
public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
checkPropNames.forEach(prop -> {
if (!props.containsKey(prop)) {
throw new HoodieNotSupportedException("Required property " + prop + " is missing");
}
});
}
}

View File

@@ -19,30 +19,22 @@
package org.apache.hudi.keygen;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionHelper;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import scala.Function1;
import java.io.Serializable;
import java.util.List;
/**
* Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record.
*/
@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
public abstract class KeyGenerator implements Serializable, SparkKeyGeneratorInterface {
private static final String STRUCT_NAME = "hoodieRowTopLevelField";
private static final String NAMESPACE = "hoodieRow";
public abstract class KeyGenerator implements KeyGeneratorInterface {
protected TypedProperties config;
private transient Function1<Object, Object> converterFn = null;
protected KeyGenerator(TypedProperties config) {
this.config = config;
@@ -64,32 +56,4 @@ public abstract class KeyGenerator implements Serializable, SparkKeyGeneratorInt
throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
+ "Please override this method in your custom key generator.");
}
/**
* Fetch record key from {@link Row}.
* @param row instance of {@link Row} from which record key is requested.
* @return the record key of interest from {@link Row}.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
if (null == converterFn) {
converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
}
GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
return getKey(genericRecord).getRecordKey();
}
/**
* Fetch partition path from {@link Row}.
* @param row instance of {@link Row} from which partition path is requested
* @return the partition path of interest from {@link Row}.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getPartitionPath(Row row) {
if (null == converterFn) {
converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
}
GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
return getKey(genericRecord).getPartitionPath();
}
}

View File

@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import java.util.ArrayList;
import java.util.List;
/**
* Avro simple Key generator for unpartitioned Hive Tables.
*/
public class NonpartitionedAvroKeyGenerator extends SimpleAvroKeyGenerator {
private static final String EMPTY_PARTITION = "";
private static final List<String> EMPTY_PARTITION_FIELD_LIST = new ArrayList<>();
public NonpartitionedAvroKeyGenerator(TypedProperties props) {
super(props);
}
@Override
public String getPartitionPath(GenericRecord record) {
return EMPTY_PARTITION;
}
@Override
public List<String> getPartitionPathFields() {
return EMPTY_PARTITION_FIELD_LIST;
}
public String getEmptyPartition() {
return EMPTY_PARTITION;
}
}

View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.Collections;
/**
* Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
public SimpleAvroKeyGenerator(TypedProperties props) {
this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
}
SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
this(props, null, partitionPathField);
}
SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
super(props);
this.recordKeyFields = recordKeyField == null
? Collections.emptyList()
: Collections.singletonList(recordKeyField);
this.partitionPathFields = Collections.singletonList(partitionPathField);
}
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
}
@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath);
}
}

View File

@@ -1,13 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +17,16 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@@ -46,15 +42,11 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
* Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
* Avro Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
*/
public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
public enum TimestampType implements Serializable {
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
}
@@ -96,19 +88,19 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
static final String DATE_TIME_PARSER_PROP = "hoodie.deltastreamer.keygen.datetime.parser.class";
}
public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
this(config, config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()),
config.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()));
public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException {
this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
}
TimestampBasedKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
TimestampBasedAvroKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
this(config, null, partitionPathField);
}
TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
TimestampBasedAvroKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
super(config, recordKeyField, partitionPathField);
String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName());
this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
this.parser = KeyGenUtils.createDateTimeParser(config, dateTimeParserClass);
this.inputDateTimeZone = parser.getInputDateTimeZone();
this.outputDateTimeZone = parser.getOutputDateTimeZone();
this.outputDateFormat = parser.getOutputDateFormat();
@@ -128,8 +120,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
default:
timeUnit = null;
}
this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY,
Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL));
}
@Override
@@ -141,14 +133,14 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
try {
return getPartitionPath(partitionVal);
} catch (Exception e) {
throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e);
throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + partitionVal, e);
}
}
/**
* Set default value to partitionVal if the input value of partitionPathField is null.
*/
private Object getDefaultPartitionVal() {
public Object getDefaultPartitionVal() {
Object result = 1L;
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
// since partitionVal is null, we can set a default value of any format as TIMESTAMP_INPUT_DATE_FORMAT_PROP
@@ -191,7 +183,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
* @param partitionVal partition path object value fetched from record/row
* @return the parsed partition path based on data type
*/
private String getPartitionPath(Object partitionVal) {
public String getPartitionPath(Object partitionVal) {
initIfNeeded();
long timeMs;
if (partitionVal instanceof Double) {
@@ -202,7 +194,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
timeMs = convertLongTimeToMillis((Long) partitionVal);
} else if (partitionVal instanceof CharSequence) {
if (!inputFormatter.isPresent()) {
throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
}
DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
if (this.outputDateTimeZone == null) {
@@ -235,27 +227,4 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
return MILLISECONDS.convert(partitionVal, timeUnit);
}
@Override
public String getRecordKey(Row row) {
buildFieldPositionMapIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false);
}
@Override
public String getPartitionPath(Row row) {
Object fieldVal = null;
buildFieldPositionMapIfNeeded(row.schema());
Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
try {
if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
|| partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
fieldVal = getDefaultPartitionVal();
} else {
fieldVal = partitionPathFieldVal;
}
return getPartitionPath(fieldVal);
} catch (Exception e) {
throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + fieldVal, e);
}
}
}

View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen.constant;
public class KeyGeneratorOptions {
/**
* Flag to indicate whether to use Hive style partitioning.
* If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.
* By default false (the names of partition folders are only partition values)
*/
public static final String URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode";
public static final String DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false";
public static final String HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning";
public static final String DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false";
/**
* Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
* will be obtained by invoking .toString() on the field value. Nested fields can be specified using
* the dot notation eg: `a.b.c`
*/
public static final String RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field";
public static final String PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field";
}

View File

@@ -19,7 +19,7 @@ package org.apache.hudi.keygen.parser;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
@@ -36,7 +36,7 @@ public abstract class AbstractHoodieDateTimeParser implements Serializable {
}
private String initInputDateFormatDelimiter() {
String inputDateFormatDelimiter = config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
return inputDateFormatDelimiter;
}
@@ -45,7 +45,7 @@ public abstract class AbstractHoodieDateTimeParser implements Serializable {
* Returns the output date format in which the partition paths will be created for the hudi dataset.
*/
public String getOutputDateFormat() {
return config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
return config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
}
/**

View File

@@ -17,11 +17,11 @@
package org.apache.hudi.keygen.parser;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config;
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config;
import org.apache.hudi.keygen.KeyGenUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -42,7 +42,7 @@ public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
public HoodieDateTimeParserImpl(TypedProperties config) {
super(config);
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
KeyGenUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
this.inputDateTimeZone = getInputDateTimeZone();
}
@@ -79,7 +79,7 @@ public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
public Option<DateTimeFormatter> getInputFormatter() {
TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
DataSourceUtils.checkRequiredProperties(config,
KeyGenUtils.checkRequiredProperties(config,
Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
return Option.of(getInputDateFormatter());

View File

@@ -31,6 +31,13 @@
<packaging>jar</packaging>
<dependencies>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>

View File

@@ -18,70 +18,67 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionHelper;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import scala.Function1;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Base class for all the built-in key generators. Contains methods structured for
* Base class for the built-in key generators. Contains methods structured for
* code reuse amongst them.
*/
public abstract class BuiltinKeyGenerator extends KeyGenerator {
public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
protected List<String> recordKeyFields;
protected List<String> partitionPathFields;
protected final boolean encodePartitionPath;
protected final boolean hiveStylePartitioning;
private static final String STRUCT_NAME = "hoodieRowTopLevelField";
private static final String NAMESPACE = "hoodieRow";
private transient Function1<Object, Object> converterFn = null;
protected StructType structType;
protected Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
protected Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
protected StructType structType;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
this.hiveStylePartitioning = config.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
}
/**
* Generate a record Key out of provided generic record.
* Fetch record key from {@link Row}.
* @param row instance of {@link Row} from which record key is requested.
* @return the record key of interest from {@link Row}.
*/
public abstract String getRecordKey(GenericRecord record);
/**
* Generate a partition path out of provided generic record.
*/
public abstract String getPartitionPath(GenericRecord record);
/**
* Generate a Hoodie Key out of provided generic record.
*/
public final HoodieKey getKey(GenericRecord record) {
if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
}
return new HoodieKey(getRecordKey(record), getPartitionPath(record));
}
@Override
public final List<String> getRecordKeyFieldNames() {
// For nested columns, pick top level column name
return getRecordKeyFields().stream().map(k -> {
int idx = k.indexOf('.');
return idx > 0 ? k.substring(0, idx) : k;
}).collect(Collectors.toList());
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getRecordKey(Row row) {
if (null == converterFn) {
converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
}
GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
return getKey(genericRecord).getRecordKey();
}
/**
* Fetch partition path from {@link Row}.
* @param row instance of {@link Row} from which partition path is requested
* @return the partition path of interest from {@link Row}.
*/
@Override
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
public String getPartitionPath(Row row) {
if (null == converterFn) {
converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
}
GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
return getKey(genericRecord).getPartitionPath();
}
void buildFieldPositionMapIfNeeded(StructType structType) {
@@ -119,12 +116,5 @@ public abstract class BuiltinKeyGenerator extends KeyGenerator {
this.structType = structType;
}
}
public List<String> getRecordKeyFields() {
return recordKeyFields;
}
public List<String> getPartitionPathFields() {
return partitionPathFields;
}
}

View File

@@ -1,13 +1,12 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,38 +17,38 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.spark.sql.Row;
/**
* Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
public class ComplexKeyGenerator extends BuiltinKeyGenerator {
public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
private final ComplexAvroKeyGenerator complexAvroKeyGenerator;
public ComplexKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY())
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY())
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
}
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
return complexAvroKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
return complexAvroKeyGenerator.getPartitionPath(record);
}
@Override
@@ -64,4 +63,5 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathPositions);
}
}
}

View File

@@ -18,13 +18,12 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import java.io.IOException;
@@ -46,20 +45,31 @@ import java.util.stream.Collectors;
*/
public class CustomKeyGenerator extends BuiltinKeyGenerator {
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
private static final String SPLIT_REGEX = ":";
/**
* Used as a part of config in CustomKeyGenerator.java.
*/
public enum PartitionKeyType {
SIMPLE, TIMESTAMP
}
private final CustomAvroKeyGenerator customAvroKeyGenerator;
public CustomKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
}
@Override
public String getRecordKey(GenericRecord record) {
return customAvroKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
return customAvroKeyGenerator.getPartitionPath(record);
}
@Override
public String getRecordKey(Row row) {
validateRecordKeyFields();
return getRecordKeyFields().size() == 1
? new SimpleKeyGenerator(config).getRecordKey(row)
: new ComplexKeyGenerator(config).getRecordKey(row);
}
@Override
@@ -67,11 +77,6 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
return getPartitionPath(Option.empty(), Option.of(row));
}
@Override
public String getPartitionPath(GenericRecord record) {
return getPartitionPath(Option.of(record), Option.empty());
}
private String getPartitionPath(Option<GenericRecord> record, Option<Row> row) {
if (getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for partition path in cfg");
@@ -85,13 +90,13 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
return "";
}
for (String field : getPartitionPathFields()) {
String[] fieldWithType = field.split(SPLIT_REGEX);
String[] fieldWithType = field.split(customAvroKeyGenerator.getSplitRegex());
if (fieldWithType.length != 2) {
throw new HoodieKeyException("Unable to find field names for partition path in proper format");
throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format");
}
partitionPathField = fieldWithType[0];
PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
CustomAvroKeyGenerator.PartitionKeyType keyType = CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
switch (keyType) {
case SIMPLE:
if (record.isPresent()) {
@@ -108,38 +113,23 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
}
} catch (IOException ioe) {
throw new HoodieDeltaStreamerException("Unable to initialise TimestampBasedKeyGenerator class");
throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class");
}
break;
default:
throw new HoodieDeltaStreamerException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
}
partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
partitionPath.append(customAvroKeyGenerator.getDefaultPartitionPathSeparator());
}
partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}
@Override
public String getRecordKey(GenericRecord record) {
validateRecordKeyFields();
return getRecordKeyFields().size() == 1
? new SimpleKeyGenerator(config).getRecordKey(record)
: new ComplexKeyGenerator(config).getRecordKey(record);
}
@Override
public String getRecordKey(Row row) {
validateRecordKeyFields();
return getRecordKeyFields().size() == 1
? new SimpleKeyGenerator(config).getRecordKey(row)
: new ComplexKeyGenerator(config).getRecordKey(row);
}
private void validateRecordKeyFields() {
if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
throw new HoodieKeyException("Unable to find field names for record key in cfg");
}
}
}

View File

@@ -18,10 +18,9 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import java.util.ArrayList;
@@ -34,21 +33,21 @@ import java.util.List;
*/
public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
private static final String EMPTY_PARTITION = "";
private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator;
public GlobalDeleteKeyGenerator(TypedProperties config) {
super(config);
this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(","));
globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
}
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
return globalAvroDeleteKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
return EMPTY_PARTITION;
return globalAvroDeleteKeyGenerator.getPartitionPath(record);
}
@Override
@@ -64,6 +63,7 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getPartitionPath(Row row) {
return EMPTY_PARTITION;
return globalAvroDeleteKeyGenerator.getEmptyPartition();
}
}

View File

@@ -18,12 +18,10 @@
package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.spark.sql.Row;
import java.util.ArrayList;
import java.util.List;
/**
@@ -31,25 +29,27 @@ import java.util.List;
*/
public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
private static final String EMPTY_PARTITION = "";
private static final List<String> EMPTY_PARTITION_FIELD_LIST = new ArrayList<>();
private final NonpartitionedAvroKeyGenerator nonpartitionedAvroKeyGenerator;
public NonpartitionedKeyGenerator(TypedProperties props) {
super(props);
public NonpartitionedKeyGenerator(TypedProperties config) {
super(config);
nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(config);
}
@Override
public String getPartitionPath(GenericRecord record) {
return EMPTY_PARTITION;
return nonpartitionedAvroKeyGenerator.getPartitionPath(record);
}
@Override
public List<String> getPartitionPathFields() {
return EMPTY_PARTITION_FIELD_LIST;
return nonpartitionedAvroKeyGenerator.getPartitionPathFields();
}
@Override
public String getPartitionPath(Row row) {
return EMPTY_PARTITION;
return nonpartitionedAvroKeyGenerator.getEmptyPartition();
}
}

View File

@@ -19,10 +19,10 @@
package org.apache.hudi.keygen;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,8 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import scala.Option;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;

View File

@@ -18,10 +18,9 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import java.util.Collections;
@@ -31,9 +30,11 @@ import java.util.Collections;
*/
public class SimpleKeyGenerator extends BuiltinKeyGenerator {
private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
public SimpleKeyGenerator(TypedProperties props) {
this(props, props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()),
props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()));
this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
}
SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
@@ -46,16 +47,17 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
? Collections.emptyList()
: Collections.singletonList(recordKeyField);
this.partitionPathFields = Collections.singletonList(partitionPathField);
simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
}
@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
return simpleAvroKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath);
return simpleAvroKeyGenerator.getPartitionPath(record);
}
@Override

View File

@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import java.io.IOException;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
* Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
*/
public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
}
TimestampBasedKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
this(config, null, partitionPathField);
}
TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
super(config, recordKeyField, partitionPathField);
timestampBasedAvroKeyGenerator = new TimestampBasedAvroKeyGenerator(config, recordKeyField, partitionPathField);
}
@Override
public String getPartitionPath(GenericRecord record) {
return timestampBasedAvroKeyGenerator.getPartitionPath(record);
}
@Override
public String getRecordKey(Row row) {
buildFieldPositionMapIfNeeded(row.schema());
return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false);
}
@Override
public String getPartitionPath(Row row) {
Object fieldVal = null;
buildFieldPositionMapIfNeeded(row.schema());
Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
try {
if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
|| partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal();
} else {
fieldVal = partitionPathFieldVal;
}
return timestampBasedAvroKeyGenerator.getPartitionPath(fieldVal);
} catch (Exception e) {
throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + fieldVal, e);
}
}
}

View File

@@ -1,12 +1,13 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,10 +24,10 @@ import java.util
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{Fixed, Record}
import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.spark.sql.Row
import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow

View File

@@ -1,12 +1,13 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +18,14 @@
package org.apache.hudi
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.common.model.HoodieKey
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieKey
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import scala.collection.JavaConverters._

View File

@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static junit.framework.TestCase.assertEquals;
public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps(boolean getComplexRecordKey) {
TypedProperties properties = new TypedProperties();
if (getComplexRecordKey) {
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col");
} else {
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
}
properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
return properties;
}
private TypedProperties getPropertiesWithoutPartitionPathProp() {
return getCommonProps(false);
}
private TypedProperties getPropertiesWithoutRecordKeyProp() {
TypedProperties properties = new TypedProperties();
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}
private TypedProperties getWrongRecordKeyFieldProps() {
TypedProperties properties = new TypedProperties();
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
return properties;
}
private TypedProperties getProps() {
TypedProperties properties = getCommonProps(true);
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms");
return properties;
}
@Test
public void testNullPartitionPathFields() {
Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
}
@Test
public void testNullRecordKeyFields() {
Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
}
@Test
public void testWrongRecordKeyField() {
ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps());
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
}
@Test
public void testHappyFlow() {
ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21");
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21");
}
@Test
public void testSingleValueKeyGenerator() {
TypedProperties properties = new TypedProperties();
properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey = record.get("_row_key").toString();
String partitionPath = record.get("timestamp").toString();
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}
@Test
public void testMultipleValueKeyGenerator() {
TypedProperties properties = new TypedProperties();
properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,timestamp");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "rider,driver");
ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey =
"_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+ "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString();
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}
@Test
public void testMultipleValueKeyGeneratorNonPartitioned() {
TypedProperties properties = new TypedProperties();
properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,timestamp");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "");
ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey =
"_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+ "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
String partitionPath = "";
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}
}

View File

@@ -18,11 +18,11 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
@@ -33,48 +33,48 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps(boolean getComplexRecordKey) {
TypedProperties properties = new TypedProperties();
if (getComplexRecordKey) {
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col");
} else {
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
}
properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
return properties;
}
private TypedProperties getPropertiesForSimpleKeyGen() {
TypedProperties properties = getCommonProps(false);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
return properties;
}
private TypedProperties getImproperPartitionFieldFormatProp() {
TypedProperties properties = getCommonProps(false);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}
private TypedProperties getInvalidPartitionKeyTypeProps() {
TypedProperties properties = getCommonProps(false);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:dummy");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:dummy");
return properties;
}
private TypedProperties getComplexRecordKeyWithSimplePartitionProps() {
TypedProperties properties = getCommonProps(true);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
return properties;
}
private TypedProperties getComplexRecordKeyAndPartitionPathProps() {
TypedProperties properties = getCommonProps(true);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple,ts_ms:timestamp");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple,ts_ms:timestamp");
populateNecessaryPropsForTimestampBasedKeyGen(properties);
return properties;
}
private TypedProperties getPropsWithoutRecordKeyFieldProps() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
return properties;
}
@@ -86,20 +86,20 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getPropertiesForTimestampBasedKeyGen() {
TypedProperties properties = getCommonProps(false);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "ts_ms:timestamp");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "ts_ms:timestamp");
populateNecessaryPropsForTimestampBasedKeyGen(properties);
return properties;
}
private TypedProperties getPropertiesForNonPartitionedKeyGen() {
TypedProperties properties = getCommonProps(false);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "");
return properties;
}
@Test
public void testSimpleKeyGenerator() {
KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -111,7 +111,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testTimestampBasedKeyGenerator() {
KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -123,7 +123,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testNonPartitionedKeyGenerator() {
KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -136,28 +136,28 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testInvalidPartitionKeyType() {
try {
KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when invalid PartitionKeyType is provided!");
} catch (Exception e) {
Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY"));
Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
}
try {
KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
GenericRecord record = getRecord();
Row row = KeyGeneratorTestUtilities.getRow(record);
keyGenerator.getPartitionPath(row);
Assertions.fail("should fail when invalid PartitionKeyType is provided!");
} catch (Exception e) {
Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY"));
Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
}
}
@Test
public void testNoRecordKeyFieldProp() {
try {
KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when record key field is not provided!");
} catch (Exception e) {
@@ -165,7 +165,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
}
try {
KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
GenericRecord record = getRecord();
Row row = KeyGeneratorTestUtilities.getRow(record);
keyGenerator.getRecordKey(row);
@@ -178,7 +178,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testPartitionFieldsInImproperFormat() {
try {
KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when partition key field is provided in improper format!");
} catch (Exception e) {
@@ -186,7 +186,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
}
try {
KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
GenericRecord record = getRecord();
Row row = KeyGeneratorTestUtilities.getRow(record);
keyGenerator.getPartitionPath(row);
@@ -198,7 +198,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testComplexRecordKeyWithSimplePartitionPath() {
KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
@@ -211,7 +211,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testComplexRecordKeysWithComplexPartitionPath() {
KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps());
BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");

View File

@@ -18,12 +18,12 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
@@ -34,29 +34,29 @@ public class TestGlobalDeleteKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps(boolean getComplexRecordKey) {
TypedProperties properties = new TypedProperties();
if (getComplexRecordKey) {
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,pii_col");
} else {
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
}
properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
return properties;
}
private TypedProperties getPropertiesWithoutRecordKeyProp() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}
private TypedProperties getWrongRecordKeyFieldProps() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
return properties;
}
private TypedProperties getProps() {
TypedProperties properties = getCommonProps(true);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms");
return properties;
}

View File

@@ -18,12 +18,12 @@
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
@@ -33,8 +33,8 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
return properties;
}
@@ -44,34 +44,34 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getPropertiesWithoutRecordKeyProp() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}
private TypedProperties getWrongRecordKeyFieldProps() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
return properties;
}
private TypedProperties getWrongPartitionPathFieldProps() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "_wrong_partition_path");
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "_wrong_partition_path");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
return properties;
}
private TypedProperties getComplexRecordKeyProp() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,pii_col");
return properties;
}
private TypedProperties getProps() {
TypedProperties properties = getCommonProps();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}

View File

@@ -20,14 +20,14 @@ package org.apache.hudi.keygen;
import org.apache.hudi.AvroConversionHelper;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
@@ -58,15 +58,15 @@ public class TestTimestampBasedKeyGenerator {
.generateAvroRecordFromJson(schema, 1, "001", "f1");
baseRow = genericRecordToRow(baseRecord);
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "field1");
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "createTime");
properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "false");
properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "createTime");
properties.setProperty(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "false");
}
private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) {
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat);
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone);
if (scalarType != null) {
properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType);
@@ -88,22 +88,22 @@ public class TestTimestampBasedKeyGenerator {
private TypedProperties getBaseKeyConfig(String timestampType, String inputFormatList, String inputFormatDelimiterRegex, String inputTimezone, String outputFormat, String outputTimezone) {
if (timestampType != null) {
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
}
if (inputFormatList != null) {
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList);
}
if (inputFormatDelimiterRegex != null) {
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex);
}
if (inputTimezone != null) {
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone);
}
if (outputFormat != null) {
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat);
}
if (outputTimezone != null) {
properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone);
properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone);
}
return properties;
}
@@ -213,7 +213,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"GMT");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040113", hk1.getPartitionPath());
@@ -231,7 +231,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040113", hk1.getPartitionPath());
@@ -249,7 +249,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040113", hk1.getPartitionPath());
@@ -267,7 +267,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040113", hk1.getPartitionPath());
@@ -285,7 +285,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040118", hk1.getPartitionPath());
@@ -303,7 +303,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040118", hk1.getPartitionPath());
@@ -321,7 +321,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"EST");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040109", hk1.getPartitionPath());
@@ -339,11 +339,11 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> keyGen.getKey(baseRecord));
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
Assertions.assertThrows(HoodieKeyGeneratorException.class, () -> keyGen.getKey(baseRecord));
baseRow = genericRecordToRow(baseRecord);
Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> keyGen.getPartitionPath(baseRow));
Assertions.assertThrows(HoodieKeyGeneratorException.class, () -> keyGen.getPartitionPath(baseRow));
}
@Test
@@ -356,7 +356,7 @@ public class TestTimestampBasedKeyGenerator {
"UTC",
"MM/dd/yyyy",
"UTC");
KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("04/01/2020", hk1.getPartitionPath());

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -72,7 +72,7 @@ public class HoodieDatasetBulkInsertHelper {
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
StructType structTypeForUDF = rows.schema();
sqlContext.udf().register(RECORD_KEY_UDF_FN, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.hive.HiveSyncTool
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.log4j.LogManager
/**
@@ -213,14 +214,14 @@ object DataSourceWriteOptions {
* the dot notation eg: `a.b.c`
*
*/
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY
val DEFAULT_RECORDKEY_FIELD_OPT_VAL = "uuid"
/**
* Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual
* value ontained by invoking .toString()
*/
val PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field"
val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY
val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = "partitionpath"
/**
@@ -228,10 +229,10 @@ object DataSourceWriteOptions {
* If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.
* By default false (the names of partition folders are only partition values)
*/
val HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning"
val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false"
val URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode"
val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false"
val HIVE_STYLE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY
val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL
val URL_ENCODE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY
val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL
/**
* Key generator class, that implements will extract the key out of incoming record
*

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration

View File

@@ -1,86 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static junit.framework.TestCase.assertEquals;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.junit.jupiter.api.Test;
public class TestComplexKeyGenerator {
@Test
public void testSingleValueKeyGenerator() {
TypedProperties properties = new TypedProperties();
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey = record.get("_row_key").toString();
String partitionPath = record.get("timestamp").toString();
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}
@Test
public void testMultipleValueKeyGenerator() {
TypedProperties properties = new TypedProperties();
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,timestamp");
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "rider,driver");
ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey =
"_row_key" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+ "timestamp" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString();
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}
@Test
public void testMultipleValueKeyGeneratorNonPartitioned() {
TypedProperties properties = new TypedProperties();
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,timestamp");
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
String rowKey =
"_row_key" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+ "timestamp" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
String partitionPath = "";
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
}
}

View File

@@ -1,96 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.keygen;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps(boolean getComplexRecordKey) {
TypedProperties properties = new TypedProperties();
if (getComplexRecordKey) {
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col");
} else {
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
}
properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
return properties;
}
private TypedProperties getPropertiesWithoutPartitionPathProp() {
return getCommonProps(false);
}
private TypedProperties getPropertiesWithoutRecordKeyProp() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
return properties;
}
private TypedProperties getWrongRecordKeyFieldProps() {
TypedProperties properties = new TypedProperties();
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
return properties;
}
private TypedProperties getProps() {
TypedProperties properties = getCommonProps(true);
properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms");
return properties;
}
@Test
public void testNullPartitionPathFields() {
Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
}
@Test
public void testNullRecordKeyFields() {
Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
}
@Test
public void testWrongRecordKeyField() {
ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps());
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
}
@Test
public void testHappyFlow() {
ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21");
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21");
}
}

View File

@@ -22,7 +22,6 @@ import java.time.LocalDate
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{FunSuite, Matchers}
@@ -43,7 +42,7 @@ class TestAvroConversionHelper extends FunSuite with Matchers {
test("Logical type: date") {
val schema = new Schema.Parser().parse(dateSchema)
val convertor = AvroConversionHelper.createConverterToRow(schema, convertAvroSchemaToStructType(schema))
val convertor = AvroConversionHelper.createConverterToRow(schema, AvroConversionUtils.convertAvroSchemaToStructType(schema))
val dateOutputData = dateInputData.map(x => {
val record = new GenericData.Record(schema) {{ put("date", x) }}

View File

@@ -17,8 +17,6 @@
package org.apache.hudi
import java.util
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.TypedProperties
@@ -236,14 +234,29 @@ class TestDataSourceDefaults {
assertEquals("name1", keyGen.getPartitionPath(baseRow))
}
class UserDefinedKeyGenerator(props: TypedProperties) extends KeyGenerator(props) {
class UserDefinedKeyGenerator(props: TypedProperties) extends KeyGenerator(props) with SparkKeyGeneratorInterface {
val recordKeyProp: String = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
val partitionPathProp: String = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY)
val STRUCT_NAME: String = "hoodieRowTopLevelField"
val NAMESPACE: String = "hoodieRow"
var converterFn: Function1[Any, Any] = _
override def getKey(record: GenericRecord): HoodieKey = {
new HoodieKey(HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyProp, true),
HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathProp, true))
}
override def getRecordKey(row: Row): String = {
if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
getKey(genericRecord).getRecordKey
}
override def getPartitionPath(row: Row): String = {
if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
getKey(genericRecord).getPartitionPath
}
}
@Test def testComplexKeyGenerator() = {