[HUDI-4303] Use Hive sentinel value as partition default to avoid type caste issues (#5954)
This commit is contained in:
@@ -45,6 +45,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
|
||||
import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
|
||||
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
|
||||
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION;
|
||||
@@ -81,7 +82,7 @@ public class FlinkOptions extends HoodieConfig {
|
||||
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
|
||||
.key("partition.default_name")
|
||||
.stringType()
|
||||
.defaultValue("default") // keep sync with hoodie style
|
||||
.defaultValue(DEFAULT_PARTITION_PATH) // keep sync with hoodie style
|
||||
.withDescription("The default partition name in case the dynamic partition"
|
||||
+ " column value is null/empty string");
|
||||
|
||||
|
||||
@@ -20,7 +20,6 @@ package org.apache.hudi.sink.bulk;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieKeyException;
|
||||
@@ -39,6 +38,9 @@ import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
|
||||
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName;
|
||||
|
||||
/**
|
||||
* Key generator for {@link RowData}.
|
||||
*/
|
||||
@@ -52,7 +54,6 @@ public class RowDataKeyGen implements Serializable {
|
||||
private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
|
||||
private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
|
||||
|
||||
private static final String DEFAULT_PARTITION_PATH = "default";
|
||||
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
|
||||
|
||||
private final String[] recordKeyFields;
|
||||
@@ -192,7 +193,7 @@ public class RowDataKeyGen implements Serializable {
|
||||
: DEFAULT_PARTITION_PATH);
|
||||
} else {
|
||||
if (encodePartitionPath) {
|
||||
partValue = PartitionPathEncodeUtils.escapePathName(partValue);
|
||||
partValue = escapePathName(partValue);
|
||||
}
|
||||
partitionPath.append(hiveStylePartitioning ? partField + "=" + partValue : partValue);
|
||||
}
|
||||
@@ -227,7 +228,7 @@ public class RowDataKeyGen implements Serializable {
|
||||
partitionPath = DEFAULT_PARTITION_PATH;
|
||||
}
|
||||
if (encodePartitionPath) {
|
||||
partitionPath = PartitionPathEncodeUtils.escapePathName(partitionPath);
|
||||
partitionPath = escapePathName(partitionPath);
|
||||
}
|
||||
if (hiveStylePartitioning) {
|
||||
partitionPath = partField + "=" + partitionPath;
|
||||
|
||||
@@ -39,6 +39,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
|
||||
import static org.apache.hudi.configuration.FlinkOptions.PARTITION_FORMAT_DAY;
|
||||
|
||||
/**
|
||||
@@ -178,7 +179,7 @@ public class FlinkStreamerConfig extends Configuration {
|
||||
|
||||
@Parameter(names = {"--partition-default-name"},
|
||||
description = "The default partition name in case the dynamic partition column value is null/empty string")
|
||||
public String partitionDefaultName = "default";
|
||||
public String partitionDefaultName = DEFAULT_PARTITION_PATH;
|
||||
|
||||
@Parameter(names = {"--index-bootstrap-enabled"},
|
||||
description = "Whether to bootstrap the index state from existing hoodie table, default false")
|
||||
|
||||
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
|
||||
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
|
||||
import static org.apache.hudi.utils.TestData.insertRow;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
@@ -54,19 +55,19 @@ public class TestRowDataKeyGen {
|
||||
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23,
|
||||
TimestampData.fromEpochMillis(1), null);
|
||||
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
|
||||
assertThat(keyGen1.getPartitionPath(rowData2), is("default"));
|
||||
assertThat(keyGen1.getPartitionPath(rowData2), is(DEFAULT_PARTITION_PATH));
|
||||
// empty record key and partition path
|
||||
final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString("Danny"), 23,
|
||||
TimestampData.fromEpochMillis(1), StringData.fromString(""));
|
||||
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3));
|
||||
assertThat(keyGen1.getPartitionPath(rowData3), is("default"));
|
||||
assertThat(keyGen1.getPartitionPath(rowData3), is(DEFAULT_PARTITION_PATH));
|
||||
|
||||
// hive style partitioning
|
||||
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
|
||||
final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE);
|
||||
assertThat(keyGen2.getPartitionPath(rowData1), is("partition=par1"));
|
||||
assertThat(keyGen2.getPartitionPath(rowData2), is("partition=default"));
|
||||
assertThat(keyGen2.getPartitionPath(rowData3), is("partition=default"));
|
||||
assertThat(keyGen2.getPartitionPath(rowData1), is(String.format("partition=%s", "par1")));
|
||||
assertThat(keyGen2.getPartitionPath(rowData2), is(String.format("partition=%s", DEFAULT_PARTITION_PATH)));
|
||||
assertThat(keyGen2.getPartitionPath(rowData3), is(String.format("partition=%s", DEFAULT_PARTITION_PATH)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -83,19 +84,19 @@ public class TestRowDataKeyGen {
|
||||
// null record key and partition path
|
||||
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, null, 23, null, null);
|
||||
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
|
||||
assertThat(keyGen1.getPartitionPath(rowData2), is("default/default"));
|
||||
assertThat(keyGen1.getPartitionPath(rowData2), is(String.format("%s/%s", DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH)));
|
||||
// empty record key and partition path
|
||||
final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString(""), 23,
|
||||
TimestampData.fromEpochMillis(1), StringData.fromString(""));
|
||||
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3));
|
||||
assertThat(keyGen1.getPartitionPath(rowData3), is("default/1970-01-01T00:00:00.001"));
|
||||
assertThat(keyGen1.getPartitionPath(rowData3), is(String.format("%s/1970-01-01T00:00:00.001", DEFAULT_PARTITION_PATH)));
|
||||
|
||||
// hive style partitioning
|
||||
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
|
||||
final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE);
|
||||
assertThat(keyGen2.getPartitionPath(rowData1), is("partition=par1/ts=1970-01-01T00:00:00.001"));
|
||||
assertThat(keyGen2.getPartitionPath(rowData2), is("partition=default/ts=default"));
|
||||
assertThat(keyGen2.getPartitionPath(rowData3), is("partition=default/ts=1970-01-01T00:00:00.001"));
|
||||
assertThat(keyGen2.getPartitionPath(rowData1), is(String.format("partition=%s/ts=%s", "par1", "1970-01-01T00:00:00.001")));
|
||||
assertThat(keyGen2.getPartitionPath(rowData2), is(String.format("partition=%s/ts=%s", DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH)));
|
||||
assertThat(keyGen2.getPartitionPath(rowData3), is(String.format("partition=%s/ts=%s", DEFAULT_PARTITION_PATH, "1970-01-01T00:00:00.001")));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -19,7 +19,6 @@
|
||||
package org.apache.hudi.source;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
|
||||
import org.apache.hudi.utils.TestConfigurations;
|
||||
import org.apache.hudi.utils.TestData;
|
||||
@@ -40,6 +39,11 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.configuration.FlinkOptions.HIVE_STYLE_PARTITIONING;
|
||||
import static org.apache.hudi.configuration.FlinkOptions.KEYGEN_CLASS_NAME;
|
||||
import static org.apache.hudi.configuration.FlinkOptions.METADATA_ENABLED;
|
||||
import static org.apache.hudi.configuration.FlinkOptions.PARTITION_DEFAULT_NAME;
|
||||
import static org.apache.hudi.configuration.FlinkOptions.PARTITION_PATH_FIELD;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
@@ -55,12 +59,12 @@ public class TestFileIndex {
|
||||
@ValueSource(booleans = {true, false})
|
||||
void testFileListingUsingMetadata(boolean hiveStylePartitioning) throws Exception {
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
|
||||
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
|
||||
conf.setBoolean(METADATA_ENABLED, true);
|
||||
conf.setBoolean(HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
|
||||
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||
FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE);
|
||||
List<String> partitionKeys = Collections.singletonList("partition");
|
||||
List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, "default", hiveStylePartitioning);
|
||||
List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, PARTITION_DEFAULT_NAME.defaultValue(), hiveStylePartitioning);
|
||||
assertTrue(partitions.stream().allMatch(m -> m.size() == 1));
|
||||
String partitionPaths = partitions.stream()
|
||||
.map(Map::values).flatMap(Collection::stream).sorted().collect(Collectors.joining(","));
|
||||
@@ -75,13 +79,13 @@ public class TestFileIndex {
|
||||
@Test
|
||||
void testFileListingUsingMetadataNonPartitionedTable() throws Exception {
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "");
|
||||
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName());
|
||||
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
|
||||
conf.setString(PARTITION_PATH_FIELD, "");
|
||||
conf.setString(KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName());
|
||||
conf.setBoolean(METADATA_ENABLED, true);
|
||||
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||
FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE);
|
||||
List<String> partitionKeys = Collections.singletonList("");
|
||||
List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, "default", false);
|
||||
List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, PARTITION_DEFAULT_NAME.defaultValue(), false);
|
||||
assertThat(partitions.size(), is(0));
|
||||
|
||||
FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
|
||||
@@ -93,10 +97,10 @@ public class TestFileIndex {
|
||||
@ValueSource(booleans = {true, false})
|
||||
void testFileListingEmptyTable(boolean enableMetadata) {
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata);
|
||||
conf.setBoolean(METADATA_ENABLED, enableMetadata);
|
||||
FileIndex fileIndex = FileIndex.instance(new Path(tempFile.getAbsolutePath()), conf, TestConfigurations.ROW_TYPE);
|
||||
List<String> partitionKeys = Collections.singletonList("partition");
|
||||
List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, "default", false);
|
||||
List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, PARTITION_DEFAULT_NAME.defaultValue(), false);
|
||||
assertThat(partitions.size(), is(0));
|
||||
|
||||
FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
|
||||
|
||||
@@ -20,7 +20,6 @@ package org.apache.hudi.source;
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.configuration.HadoopConfigurations;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
|
||||
@@ -58,6 +57,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.configuration.FlinkOptions.PARTITION_DEFAULT_NAME;
|
||||
import static org.apache.hudi.configuration.FlinkOptions.TABLE_TYPE;
|
||||
import static org.apache.hudi.configuration.FlinkOptions.TABLE_TYPE_MERGE_ON_READ;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
@@ -84,7 +86,7 @@ public class TestStreamReadOperator {
|
||||
public void before() throws Exception {
|
||||
final String basePath = tempFile.getAbsolutePath();
|
||||
conf = TestConfigurations.getDefaultConf(basePath);
|
||||
conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||
conf.setString(TABLE_TYPE, TABLE_TYPE_MERGE_ON_READ);
|
||||
|
||||
StreamerUtil.initTableIfNotExists(conf);
|
||||
}
|
||||
@@ -266,7 +268,7 @@ public class TestStreamReadOperator {
|
||||
.config(conf)
|
||||
.tableState(hoodieTableState)
|
||||
.fieldTypes(rowDataType.getChildren())
|
||||
.defaultPartName("default").limit(1000L)
|
||||
.defaultPartName(PARTITION_DEFAULT_NAME.defaultValue()).limit(1000L)
|
||||
.emitDelete(true)
|
||||
.build();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user