1
0

[HUDI-2161] Adding support to disable meta columns with bulk insert operation (#3247)

This commit is contained in:
Sivabalan Narayanan
2021-07-19 20:43:48 -04:00
committed by GitHub
parent 2099bf41db
commit d5026e9a24
53 changed files with 1063 additions and 269 deletions

View File

@@ -62,6 +62,10 @@ public class HoodieInternalWriteStatus implements Serializable {
totalRecords++;
}
public void markSuccess() {
totalRecords++;
}
public void markFailure(String recordKey, Throwable t) {
if (failedRecordKeys.isEmpty() || (random.nextDouble() <= failureFraction)) {
failedRecordKeys.add(Pair.of(recordKey, t));

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -602,6 +603,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return engineType;
}
public boolean populateMetaFields() {
return Boolean.parseBoolean(getStringOrDefault(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS,
HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()));
}
/**
* compaction properties.
*/

View File

@@ -403,6 +403,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
upgradeDowngrade.run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
}
}
metaClient.validateTableProperties(config.getProps(), operationType);
return getTableAndInitCtx(metaClient, operationType, instantTime);
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.io.storage;
package org.apache.hudi.io.storage.row;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -33,12 +33,19 @@ public interface HoodieInternalRowFileWriter {
boolean canWrite();
/**
* Writes an {@link InternalRow} to the HoodieInternalRowFileWriter.
* Writes an {@link InternalRow} to the HoodieInternalRowFileWriter. Also takes in associated record key to be added to bloom filter if required.
*
* @throws IOException on any exception while writing.
*/
void writeRow(String key, InternalRow row) throws IOException;
/**
* Writes an {@link InternalRow} to the HoodieInternalRowFileWriter.
*
* @throws IOException on any exception while writing.
*/
void writeRow(InternalRow row) throws IOException;
/**
* Closes the {@link HoodieInternalRowFileWriter} and may not take in any more writes.
*/

View File

@@ -16,21 +16,22 @@
* limitations under the License.
*/
package org.apache.hudi.io.storage;
package org.apache.hudi.io.storage.row;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.types.StructType;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
import java.io.IOException;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
/**
* Factory to assist in instantiating a new {@link HoodieInternalRowFileWriter}.
*/
@@ -76,4 +77,29 @@ public class HoodieInternalRowFileWriterFactory {
writeSupport.getHadoopConf(),
writeConfig.getParquetCompressionRatio()));
}
public static HoodieInternalRowFileWriter getInternalRowFileWriterWithoutMetaFields(
Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
throws IOException {
if (PARQUET.getFileExtension().equals(hoodieTable.getBaseFileExtension())) {
return newParquetInternalRowFileWriterWithoutMetaFields(path, config, schema, hoodieTable);
}
throw new HoodieIOException(hoodieTable.getBaseFileExtension() + " format not supported yet in row writer path");
}
private static HoodieInternalRowFileWriter newParquetInternalRowFileWriterWithoutMetaFields(
Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table)
throws IOException {
HoodieRowParquetWriteSupport writeSupport =
new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, null);
return new HoodieInternalRowParquetWriter(
path, new HoodieRowParquetConfig(
writeSupport,
writeConfig.getParquetCompressionCodec(),
writeConfig.getParquetBlockSize(),
writeConfig.getParquetPageSize(),
writeConfig.getParquetMaxFileSize(),
writeSupport.getHadoopConf(),
writeConfig.getParquetCompressionRatio()));
}
}

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.io.storage;
package org.apache.hudi.io.storage.row;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
@@ -65,6 +65,11 @@ public class HoodieInternalRowParquetWriter extends ParquetWriter<InternalRow>
writeSupport.add(key);
}
@Override
public void writeRow(InternalRow row) throws IOException {
super.write(row);
}
@Override
public void close() throws IOException {
super.close();

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.io;
package org.apache.hudi.io.storage.row;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.client.model.HoodieInternalRow;
@@ -30,8 +30,6 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
@@ -61,12 +59,12 @@ public class HoodieRowCreateHandle implements Serializable {
private final long taskEpochId;
private final HoodieTable table;
private final HoodieWriteConfig writeConfig;
private final HoodieInternalRowFileWriter fileWriter;
protected final HoodieInternalRowFileWriter fileWriter;
private final String partitionPath;
private final Path path;
private final String fileId;
private final FileSystem fs;
private final HoodieInternalWriteStatus writeStatus;
protected final HoodieInternalWriteStatus writeStatus;
private final HoodieTimer currTimer;
public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId,
@@ -197,7 +195,7 @@ public class HoodieRowCreateHandle implements Serializable {
return taskPartitionId + "-" + taskId + "-" + taskEpochId;
}
private HoodieInternalRowFileWriter createNewFileWriter(
protected HoodieInternalRowFileWriter createNewFileWriter(
Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
throws IOException {
return HoodieInternalRowFileWriterFactory.getInternalRowFileWriter(

View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage.row;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
/**
* RowCreateHandle to be used when meta fields are disabled.
*/
public class HoodieRowCreateHandleWithoutMetaFields extends HoodieRowCreateHandle {
public HoodieRowCreateHandleWithoutMetaFields(HoodieTable table, HoodieWriteConfig writeConfig, String partitionPath, String fileId, String instantTime,
int taskPartitionId, long taskId, long taskEpochId, StructType structType) {
super(table, writeConfig, partitionPath, fileId, instantTime, taskPartitionId, taskId, taskEpochId, structType);
}
/**
* Write the incoming InternalRow as is.
*
* @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
* @throws IOException
*/
@Override
public void write(InternalRow record) throws IOException {
try {
fileWriter.writeRow(record);
writeStatus.markSuccess();
} catch (Throwable ge) {
writeStatus.setGlobalError(ge);
throw new HoodieException("Exception thrown while writing spark InternalRows to file ", ge);
}
}
protected HoodieInternalRowFileWriter createNewFileWriter(
Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
throws IOException {
return HoodieInternalRowFileWriterFactory.getInternalRowFileWriterWithoutMetaFields(
path, hoodieTable, config, schema);
}
}

View File

@@ -16,7 +16,9 @@
* limitations under the License.
*/
package org.apache.hudi.io.storage;
package org.apache.hudi.io.storage.row;
import org.apache.hudi.io.storage.HoodieBaseParquetConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.io.storage;
package org.apache.hudi.io.storage.row;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.bloom.BloomFilter;

View File

@@ -18,21 +18,28 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.AvroConversionHelper;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import scala.Function1;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import scala.Function1;
/**
* Base class for the built-in key generators. Contains methods structured for
* code reuse amongst them.
@@ -42,10 +49,12 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
private static final String STRUCT_NAME = "hoodieRowTopLevelField";
private static final String NAMESPACE = "hoodieRow";
private transient Function1<Object, Object> converterFn = null;
private SparkRowSerDe sparkRowSerDe;
protected StructType structType;
protected Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
protected Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
protected Map<String, List<DataType>> partitionPathDataTypes = null;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
@@ -81,6 +90,29 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
return getKey(genericRecord).getPartitionPath();
}
/**
* Fetch partition path from {@link InternalRow}.
*
* @param internalRow {@link InternalRow} instance from which partition path needs to be fetched from.
* @param structType schema of the internalRow.
* @return the partition path.
*/
public String getPartitionPath(InternalRow internalRow, StructType structType) {
try {
initDeserializer(structType);
Row row = sparkRowSerDe.deserializeRow(internalRow);
return getPartitionPath(row);
} catch (Exception e) {
throw new HoodieIOException("Conversion of InternalRow to Row failed with exception " + e);
}
}
private void initDeserializer(StructType structType) {
if (sparkRowSerDe == null) {
sparkRowSerDe = HoodieSparkUtils.getDeserializer(structType);
}
}
void buildFieldPositionMapIfNeeded(StructType structType) {
if (this.structType == null) {
// parse simple fields
@@ -116,5 +148,39 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
this.structType = structType;
}
}
protected String getPartitionPathInternal(InternalRow row, StructType structType) {
buildFieldDataTypesMapIfNeeded(structType);
validatePartitionFieldsForInternalRow();
return RowKeyGeneratorHelper.getPartitionPathFromInternalRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathPositions, partitionPathDataTypes);
}
protected void validatePartitionFieldsForInternalRow() {
partitionPathPositions.entrySet().forEach(entry -> {
if (entry.getValue().size() > 1) {
throw new IllegalArgumentException("Nested column for partitioning is not supported with disabling meta columns");
}
});
}
void buildFieldDataTypesMapIfNeeded(StructType structType) {
buildFieldPositionMapIfNeeded(structType);
if (this.partitionPathDataTypes == null) {
this.partitionPathDataTypes = new HashMap<>();
if (getPartitionPathFields() != null) {
// populating simple fields are good enough
getPartitionPathFields().stream().filter(f -> !f.isEmpty()).filter(f -> !(f.contains(".")))
.forEach(f -> {
if (structType.getFieldIndex(f).isDefined()) {
partitionPathDataTypes.put(f,
Collections.singletonList((structType.fields()[structType.fieldIndex(f)].dataType())));
} else {
partitionPathDataTypes.put(f, Collections.singletonList(null));
}
});
}
}
}
}

View File

@@ -17,10 +17,13 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.stream.Collectors;
@@ -64,4 +67,9 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
hiveStylePartitioning, partitionPathPositions);
}
@Override
public String getPartitionPath(InternalRow row, StructType structType) {
return getPartitionPathInternal(row, structType);
}
}

View File

@@ -18,13 +18,17 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.Arrays;
@@ -74,10 +78,15 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getPartitionPath(Row row) {
return getPartitionPath(Option.empty(), Option.of(row));
return getPartitionPath(Option.empty(), Option.of(row), Option.empty());
}
private String getPartitionPath(Option<GenericRecord> record, Option<Row> row) {
@Override
public String getPartitionPath(InternalRow row, StructType structType) {
return getPartitionPath(Option.empty(), Option.empty(), Option.of(Pair.of(row, structType)));
}
private String getPartitionPath(Option<GenericRecord> record, Option<Row> row, Option<Pair<InternalRow, StructType>> internalRowStructTypePair) {
if (getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for partition path in cfg");
}
@@ -101,16 +110,22 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
case SIMPLE:
if (record.isPresent()) {
partitionPath.append(new SimpleKeyGenerator(config, partitionPathField).getPartitionPath(record.get()));
} else {
} else if (row.isPresent()) {
partitionPath.append(new SimpleKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
} else {
partitionPath.append(new SimpleKeyGenerator(config, partitionPathField).getPartitionPath(internalRowStructTypePair.get().getKey(),
internalRowStructTypePair.get().getValue()));
}
break;
case TIMESTAMP:
try {
if (record.isPresent()) {
partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(record.get()));
} else {
} else if (row.isPresent()) {
partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
} else {
partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(internalRowStructTypePair.get().getKey(),
internalRowStructTypePair.get().getValue()));
}
} catch (IOException ioe) {
throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class", ioe);

View File

@@ -18,10 +18,13 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.Arrays;
@@ -65,5 +68,10 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
public String getPartitionPath(Row row) {
return globalAvroDeleteKeyGenerator.getEmptyPartition();
}
@Override
public String getPartitionPath(InternalRow row, StructType structType) {
return globalAvroDeleteKeyGenerator.getEmptyPartition();
}
}

View File

@@ -22,6 +22,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.Collections;
@@ -63,5 +65,9 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
return nonpartitionedAvroKeyGenerator.getEmptyPartition();
}
@Override
public String getPartitionPath(InternalRow internalRow, StructType structType) {
return nonpartitionedAvroKeyGenerator.getEmptyPartition();
}
}

View File

@@ -19,10 +19,13 @@
package org.apache.hudi.keygen;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,6 +36,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import scala.Option;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
@@ -121,6 +126,54 @@ public class RowKeyGeneratorHelper {
}).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
}
public static String getPartitionPathFromInternalRow(InternalRow row, List<String> partitionPathFields, boolean hiveStylePartitioning,
Map<String, List<Integer>> partitionPathPositions,
Map<String, List<DataType>> partitionPathDataTypes) {
return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
String field = partitionPathFields.get(idx);
String val = null;
List<Integer> fieldPositions = partitionPathPositions.get(field);
if (fieldPositions.size() == 1) { // simple
Integer fieldPos = fieldPositions.get(0);
// for partition path, if field is not found, index will be set to -1
if (fieldPos == -1 || row.isNullAt(fieldPos)) {
val = DEFAULT_PARTITION_PATH;
} else {
Object value = row.get(fieldPos, partitionPathDataTypes.get(field).get(0));
if (value == null || value.toString().isEmpty()) {
val = DEFAULT_PARTITION_PATH;
} else {
val = value.toString();
}
}
if (hiveStylePartitioning) {
val = field + "=" + val;
}
} else { // nested
throw new IllegalArgumentException("Nested partitioning is not supported with disabling meta columns.");
}
return val;
}).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
}
public static Object getFieldValFromInternalRow(InternalRow internalRow,
Integer partitionPathPosition,
DataType partitionPathDataType) {
Object val = null;
if (internalRow.isNullAt(partitionPathPosition)) {
return DEFAULT_PARTITION_PATH;
} else {
Object value = partitionPathDataType == DataTypes.StringType ? internalRow.getString(partitionPathPosition) : internalRow.get(partitionPathPosition, partitionPathDataType);
if (value == null || value.toString().isEmpty()) {
val = DEFAULT_PARTITION_PATH;
} else {
val = value;
}
}
return val;
}
/**
* Fetch the field value located at the positions requested for.
*

View File

@@ -18,10 +18,13 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.util.Collections;
@@ -72,4 +75,9 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathPositions);
}
@Override
public String getPartitionPath(InternalRow row, StructType structType) {
return getPartitionPathInternal(row, structType);
}
}

View File

@@ -18,11 +18,14 @@
package org.apache.hudi.keygen;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
@@ -64,9 +67,23 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
@Override
public String getPartitionPath(Row row) {
Object fieldVal = null;
buildFieldPositionMapIfNeeded(row.schema());
Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
return getTimestampBasedPartitionPath(partitionPathFieldVal);
}
@Override
public String getPartitionPath(InternalRow internalRow, StructType structType) {
buildFieldDataTypesMapIfNeeded(structType);
validatePartitionFieldsForInternalRow();
Object partitionPathFieldVal = RowKeyGeneratorHelper.getFieldValFromInternalRow(internalRow,
partitionPathPositions.get(getPartitionPathFields().get(0)).get(0),
partitionPathDataTypes.get(getPartitionPathFields().get(0)).get(0));
return getTimestampBasedPartitionPath(partitionPathFieldVal);
}
private String getTimestampBasedPartitionPath(Object partitionPathFieldVal) {
Object fieldVal = null;
try {
if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
|| partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {

View File

@@ -21,21 +21,24 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.JavaConverters._
import scala.collection.JavaConverters.asScalaBufferConverter
object HoodieSparkUtils extends SparkAdapterSupport {
def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")
def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
@@ -110,6 +113,11 @@ object HoodieSparkUtils extends SparkAdapterSupport {
}
}
def getDeserializer(structType: StructType) : SparkRowSerDe = {
val encoder = RowEncoder.apply(structType).resolveAndBind()
sparkAdapter.createSparkRowSerDe(encoder)
}
/**
* Convert Filters to Catalyst Expressions and joined by And. If convert success return an
* Non-Empty Option[Expression],or else return None.

View File

@@ -1,12 +1,13 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +18,7 @@
package org.apache.hudi
import org.apache.spark.sql.hudi.{HoodieSqlUtils, SparkAdapter}
import org.apache.spark.sql.hudi.SparkAdapter
/**
* Use the SparkAdapterSupport trait to get the SparkAdapter when we
@@ -26,7 +27,7 @@ import org.apache.spark.sql.hudi.{HoodieSqlUtils, SparkAdapter}
trait SparkAdapterSupport {
lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSqlUtils.isSpark3) {
val adapterClass = if (HoodieSparkUtils.isSpark3) {
"org.apache.spark.sql.adapter.Spark3Adapter"
} else {
"org.apache.spark.sql.adapter.Spark2Adapter"

View File

@@ -1,12 +1,13 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,21 +19,21 @@
package org.apache.spark.sql.hudi
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Row, SparkSession}
/**
* An interface to adapter the difference between spark2 and spark3
* in some spark related class.
*/
* An interface to adapter the difference between spark2 and spark3
* in some spark related class.
*/
trait SparkAdapter extends Serializable {
/**

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.io.storage;
package org.apache.hudi.io.storage.row;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;

View File

@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hudi.io;
package org.apache.hudi.io.storage.row;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieRecord;

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.keygen;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -27,6 +28,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -95,6 +97,9 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21");
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686/ts_ms=2020-03-21");
}
@Test
@@ -112,6 +117,12 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
Row row = KeyGeneratorTestUtilities.getRow(record, HoodieTestDataGenerator.AVRO_SCHEMA,
AvroConversionUtils.convertAvroSchemaToStructType(HoodieTestDataGenerator.AVRO_SCHEMA));
Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(row), partitionPath);
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(internalRow, row.schema()), partitionPath);
}
@Test
@@ -131,6 +142,13 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
Row row = KeyGeneratorTestUtilities.getRow(record, HoodieTestDataGenerator.AVRO_SCHEMA,
AvroConversionUtils.convertAvroSchemaToStructType(HoodieTestDataGenerator.AVRO_SCHEMA));
Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(row), partitionPath);
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(internalRow, row.schema()), partitionPath);
}
@Test
@@ -150,5 +168,12 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
assertEquals(rowKey, hoodieKey.getRecordKey());
assertEquals(partitionPath, hoodieKey.getPartitionPath());
Row row = KeyGeneratorTestUtilities.getRow(record, HoodieTestDataGenerator.AVRO_SCHEMA,
AvroConversionUtils.convertAvroSchemaToStructType(HoodieTestDataGenerator.AVRO_SCHEMA));
Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(row), partitionPath);
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(compositeKeyGenerator.getPartitionPath(internalRow, row.schema()), partitionPath);
}
}

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -141,6 +142,8 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "key1");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686");
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686");
}
@Test
@@ -164,6 +167,8 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "key1");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "ts_ms=20200321");
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "ts_ms=20200321");
}
@Test
@@ -187,6 +192,9 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "key1");
Assertions.assertTrue(keyGenerator.getPartitionPath(row).isEmpty());
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertTrue(keyGenerator.getPartitionPath(internalRow, row.schema()).isEmpty());
}
@Test
@@ -335,6 +343,9 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686");
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686");
}
@Test
@@ -359,5 +370,8 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=20200321");
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686/ts_ms=20200321");
}
}

View File

@@ -21,11 +21,12 @@ package org.apache.hudi.keygen;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -130,6 +131,9 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
Row row = KeyGeneratorTestUtilities.getRow(record);
Assertions.assertEquals(keyGenerator.getRecordKey(row), "key1");
Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686");
InternalRow internalRow = KeyGeneratorTestUtilities.getInternalRow(row);
Assertions.assertEquals(keyGenerator.getPartitionPath(internalRow, row.schema()), "timestamp=4357686");
}
private static Stream<GenericRecord> nestedColTestRecords() {

View File

@@ -28,7 +28,10 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
@@ -50,6 +53,7 @@ public class TestTimestampBasedKeyGenerator {
private Schema schema;
private StructType structType;
private Row baseRow;
private InternalRow internalRow;
@BeforeEach
public void initialize() throws IOException {
@@ -58,6 +62,7 @@ public class TestTimestampBasedKeyGenerator {
baseRecord = SchemaTestUtil
.generateAvroRecordFromJson(schema, 1, "001", "f1");
baseRow = genericRecordToRow(baseRecord);
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY.key(), "field1");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY.key(), "createTime");
@@ -77,7 +82,7 @@ public class TestTimestampBasedKeyGenerator {
}
private Row genericRecordToRow(GenericRecord baseRecord) {
Function1<Object, Object> convertor = AvroConversionHelper.createConverterToRow(schema, structType);
Function1<Object, Object> convertor = AvroConversionHelper.createConverterToRow(baseRecord.getSchema(), structType);
Row row = (Row) convertor.apply(baseRecord);
int fieldCount = structType.fieldNames().length;
Object[] values = new Object[fieldCount];
@@ -117,6 +122,10 @@ public class TestTimestampBasedKeyGenerator {
TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
assertEquals("2020-01-06 12", hk1.getPartitionPath());
baseRow = genericRecordToRow(baseRecord);
assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("2020-01-06 12", keyGen.getPartitionPath(internalRow, baseRow.schema()));
// timezone is GMT+8:00, createTime is BigDecimal
baseRecord.put("createTime", new BigDecimal(1578283932000.00001));
@@ -169,6 +178,8 @@ public class TestTimestampBasedKeyGenerator {
// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("1970-01-01 08", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("1970-01-01 08", keyGen.getPartitionPath(internalRow, baseRow.schema()));
// timestamp is DATE_STRING, timezone is GMT, createTime is null
baseRecord.put("createTime", null);
@@ -181,6 +192,8 @@ public class TestTimestampBasedKeyGenerator {
// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(internalRow, baseRow.schema()));
}
@Test
@@ -197,6 +210,8 @@ public class TestTimestampBasedKeyGenerator {
// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("2024-10-04 12", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("2024-10-04 12", keyGen.getPartitionPath(internalRow, baseRow.schema()));
// timezone is GMT, createTime is null
baseRecord.put("createTime", null);
@@ -208,6 +223,8 @@ public class TestTimestampBasedKeyGenerator {
// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("1970-01-02 12", keyGen.getPartitionPath(baseRow));
internalRow = KeyGeneratorTestUtilities.getInternalRow(baseRow);
assertEquals("1970-01-02 12", keyGen.getPartitionPath(internalRow, baseRow.schema()));
// timezone is GMT. number of days store integer in mysql
baseRecord.put("createTime", 18736);
@@ -216,6 +233,9 @@ public class TestTimestampBasedKeyGenerator {
HoodieKey scalarSecondsKey = keyGen.getKey(baseRecord);
assertEquals("2021-04-19", scalarSecondsKey.getPartitionPath());
// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("2021-04-19", keyGen.getPartitionPath(baseRow));
}
@Test

View File

@@ -107,6 +107,11 @@ public class HoodieClientTestUtils {
public static Dataset<Row> readCommit(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline,
String instantTime) {
return readCommit(basePath, sqlContext, commitTimeline, instantTime, true);
}
public static Dataset<Row> readCommit(String basePath, SQLContext sqlContext, HoodieTimeline commitTimeline,
String instantTime, boolean filterByCommitTime) {
HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime);
if (!commitTimeline.containsInstant(commitInstant)) {
throw new HoodieException("No commit exists at " + instantTime);
@@ -115,14 +120,21 @@ public class HoodieClientTestUtils {
HashMap<String, String> paths =
getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant));
LOG.info("Path :" + paths.values());
Dataset<Row> unFilteredRows = null;
if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.PARQUET)) {
return sqlContext.read().parquet(paths.values().toArray(new String[paths.size()]))
.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime));
unFilteredRows = sqlContext.read().parquet(paths.values().toArray(new String[paths.size()]));
} else if (HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP.defaultValue().equals(HoodieFileFormat.ORC)) {
return sqlContext.read().orc(paths.values().toArray(new String[paths.size()]))
.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime));
unFilteredRows = sqlContext.read().orc(paths.values().toArray(new String[paths.size()]));
}
if (unFilteredRows != null) {
if (filterByCommitTime) {
return unFilteredRows.filter(String.format("%s ='%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime));
} else {
return unFilteredRows;
}
} else {
return sqlContext.emptyDataFrame();
}
return sqlContext.emptyDataFrame();
} catch (Exception e) {
throw new HoodieException("Error reading commit " + instantTime, e);
}

View File

@@ -24,11 +24,24 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.package$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.stream.Collectors;
import scala.Function1;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
public class KeyGeneratorTestUtilities {
@@ -60,7 +73,7 @@ public class KeyGeneratorTestUtilities {
public static GenericRecord getRecord(GenericRecord nestedColRecord) {
GenericRecord record = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
record.put("timestamp", 4357686);
record.put("timestamp", 4357686L);
record.put("_row_key", "key1");
record.put("ts_ms", "2020-03-21");
record.put("pii_col", "pi");
@@ -82,4 +95,39 @@ public class KeyGeneratorTestUtilities {
}
return new GenericRowWithSchema(values, structType);
}
public static InternalRow getInternalRow(Row row) {
try {
return getInternalRow(row, getEncoder(row.schema()));
} catch (Exception e) {
throw new IllegalStateException("Exception thrown while converting Row to InternalRow", e);
}
}
private static ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
}
public static InternalRow getInternalRow(Row row, ExpressionEncoder<Row> encoder) throws ClassNotFoundException, InvocationTargetException, IllegalAccessException, NoSuchMethodException {
return serializeRow(encoder, row);
}
private static InternalRow serializeRow(ExpressionEncoder encoder, Row row)
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, ClassNotFoundException {
// TODO remove reflection if Spark 2.x support is dropped
if (package$.MODULE$.SPARK_VERSION().startsWith("2.")) {
Method spark2method = encoder.getClass().getMethod("toRow", Object.class);
return (InternalRow) spark2method.invoke(encoder, row);
} else {
Class<?> serializerClass = Class.forName("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer");
Object serializer = encoder.getClass().getMethod("createSerializer").invoke(encoder);
Method aboveSpark2method = serializerClass.getMethod("apply", Object.class);
return (InternalRow) aboveSpark2method.invoke(serializer, row);
}
}
}

View File

@@ -59,12 +59,17 @@ import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
*/
public class SparkDatasetTestUtils {
public static final String RECORD_KEY_FIELD_NAME = "record_key";
public static final String PARTITION_PATH_FIELD_NAME = "partition_path";
public static final StructType STRUCT_TYPE = new StructType(new StructField[] {
new StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(RECORD_KEY_FIELD_NAME, DataTypes.StringType, false, Metadata.empty()),
new StructField(PARTITION_PATH_FIELD_NAME, DataTypes.StringType, false, Metadata.empty()),
new StructField("randomInt", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("randomLong", DataTypes.LongType, false, Metadata.empty())});
@@ -74,6 +79,8 @@ public class SparkDatasetTestUtils {
new StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(HoodieRecord.FILENAME_METADATA_FIELD, DataTypes.StringType, false, Metadata.empty()),
new StructField(RECORD_KEY_FIELD_NAME, DataTypes.StringType, false, Metadata.empty()),
new StructField(PARTITION_PATH_FIELD_NAME, DataTypes.StringType, false, Metadata.empty()),
new StructField("randomInt", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("randomStr", DataTypes.StringType, false, Metadata.empty())});
@@ -117,7 +124,7 @@ public class SparkDatasetTestUtils {
*/
public static Row getRandomValue(String partitionPath, boolean isError) {
// order commit time, seq no, record key, partition path, file name
Object[] values = new Object[7];
Object[] values = new Object[9];
values[0] = ""; //commit time
if (!isError) {
values[1] = ""; // commit seq no
@@ -127,11 +134,13 @@ public class SparkDatasetTestUtils {
values[2] = UUID.randomUUID().toString();
values[3] = partitionPath;
values[4] = ""; // filename
values[5] = RANDOM.nextInt();
values[5] = UUID.randomUUID().toString();
values[6] = partitionPath;
values[7] = RANDOM.nextInt();
if (!isError) {
values[6] = RANDOM.nextLong();
values[8] = RANDOM.nextLong();
} else {
values[6] = UUID.randomUUID().toString();
values[8] = UUID.randomUUID().toString();
}
return new GenericRow(values);
}
@@ -154,14 +163,16 @@ public class SparkDatasetTestUtils {
public static InternalRow getInternalRowWithError(String partitionPath) {
// order commit time, seq no, record key, partition path, file name
String recordKey = UUID.randomUUID().toString();
Object[] values = new Object[7];
Object[] values = new Object[9];
values[0] = "";
values[1] = "";
values[2] = recordKey;
values[3] = partitionPath;
values[4] = "";
values[5] = RANDOM.nextInt();
values[6] = RANDOM.nextBoolean();
values[5] = recordKey;
values[6] = partitionPath;
values[7] = RANDOM.nextInt();
values[8] = RANDOM.nextBoolean();
return new GenericInternalRow(values);
}

View File

@@ -141,6 +141,12 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable {
.noDefaultValue()
.withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table");
public static final ConfigProperty<String> HOODIE_POPULATE_META_FIELDS = ConfigProperty
.key("hoodie.populate.meta.fields")
.defaultValue("true")
.withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated "
+ "and incremental queries will not be functional. This is only meant to be used for append only/immutable data for batch processing");
public static final String NO_OP_BOOTSTRAP_INDEX_CLASS = NoOpBootstrapIndex.class.getName();
public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
@@ -313,6 +319,13 @@ public class HoodieTableConfig extends HoodieConfig implements Serializable {
return getStringOrDefault(HOODIE_ARCHIVELOG_FOLDER_PROP);
}
/**
* @returns true is meta fields need to be populated. else returns false.
*/
public boolean populateMetaFields() {
return Boolean.parseBoolean(getStringOrDefault(HOODIE_POPULATE_META_FIELDS));
}
public Map<String, String> propsMap() {
return props.entrySet().stream()
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -312,6 +313,25 @@ public class HoodieTableMetaClient implements Serializable {
return archivedTimeline;
}
/**
* Validate table properties.
* @param properties Properties from writeConfig.
* @param operationType operation type to be executed.
*/
public void validateTableProperties(Properties properties, WriteOperationType operationType) {
// disabling meta fields are allowed only for bulk_insert operation
if (!Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()))
&& operationType != WriteOperationType.BULK_INSERT) {
throw new HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key() + " can only be disabled for " + WriteOperationType.BULK_INSERT
+ " operation");
}
// once meta fields are disabled, it cant be re-enabled for a given table.
if (!getTableConfig().populateMetaFields()
&& Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()))) {
throw new HoodieException(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back");
}
}
/**
* Helper method to initialize a given path as a hoodie table with configs passed in as Properties.
*
@@ -602,6 +622,7 @@ public class HoodieTableMetaClient implements Serializable {
private String partitionColumns;
private String bootstrapIndexClass;
private String bootstrapBasePath;
private Boolean populateMetaFields;
private PropertyBuilder() {
@@ -675,6 +696,11 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
public PropertyBuilder setPopulateMetaFields(boolean populateMetaFields) {
this.populateMetaFields = populateMetaFields;
return this;
}
public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) {
return setTableType(metaClient.getTableType())
.setTableName(metaClient.getTableConfig().getTableName())
@@ -725,6 +751,9 @@ public class HoodieTableMetaClient implements Serializable {
if (hoodieConfig.contains(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA)) {
setTableCreateSchema(hoodieConfig.getString(HoodieTableConfig.HOODIE_TABLE_CREATE_SCHEMA));
}
if (hoodieConfig.contains(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS)) {
setPopulateMetaFields(hoodieConfig.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS));
}
return this;
}
@@ -778,6 +807,9 @@ public class HoodieTableMetaClient implements Serializable {
if (null != recordKeyFields) {
tableConfig.setValue(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS, recordKeyFields);
}
if (null != populateMetaFields) {
tableConfig.setValue(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS, Boolean.toString(populateMetaFields));
}
return tableConfig.getProps();
}

View File

@@ -18,15 +18,25 @@
package org.apache.hudi.internal;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.HoodieRowCreateHandle;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields;
import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import java.io.IOException;
@@ -34,6 +44,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
/**
@@ -52,15 +63,20 @@ public class BulkInsertDataInternalWriterHelper {
private final StructType structType;
private final Boolean arePartitionRecordsSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowCreateHandle handle;
private String lastKnownPartitionPath = null;
private String fileIdPrefix;
private int numFilesWritten = 0;
private Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
private final boolean populateMetaFields;
private Option<BuiltinKeyGenerator> keyGeneratorOpt = null;
private boolean simpleKeyGen = false;
private int simplePartitionFieldIndex = -1;
private DataType simplePartitionFieldDataType;
public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean arePartitionRecordsSorted) {
String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType,
boolean populateMetaFields, boolean arePartitionRecordsSorted) {
this.hoodieTable = hoodieTable;
this.writeConfig = writeConfig;
this.instantTime = instantTime;
@@ -68,14 +84,57 @@ public class BulkInsertDataInternalWriterHelper {
this.taskId = taskId;
this.taskEpochId = taskEpochId;
this.structType = structType;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.fileIdPrefix = UUID.randomUUID().toString();
if (!populateMetaFields) {
this.keyGeneratorOpt = getKeyGenerator(writeConfig.getProps());
if (keyGeneratorOpt.isPresent() && keyGeneratorOpt.get() instanceof SimpleKeyGenerator) {
simpleKeyGen = true;
simplePartitionFieldIndex = (Integer) structType.getFieldIndex((keyGeneratorOpt.get()).getPartitionPathFields().get(0)).get();
simplePartitionFieldDataType = structType.fields()[simplePartitionFieldIndex].dataType();
}
}
}
/**
* Instantiate {@link BuiltinKeyGenerator}.
*
* @param properties properties map.
* @return the key generator thus instantiated.
*/
private Option<BuiltinKeyGenerator> getKeyGenerator(Properties properties) {
TypedProperties typedProperties = new TypedProperties();
typedProperties.putAll(properties);
if (properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key()).equals(NonpartitionedKeyGenerator.class.getName())) {
return Option.empty(); // Do not instantiate NonPartitionKeyGen
} else {
try {
return Option.of((BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(typedProperties));
} catch (ClassCastException cce) {
throw new HoodieIOException("Only those key generators implementing BuiltInKeyGenerator interface is supported with virtual keys");
} catch (IOException e) {
throw new HoodieIOException("Key generator instantiation failed ", e);
}
}
}
public void write(InternalRow record) throws IOException {
try {
String partitionPath = record.getUTF8String(
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
String partitionPath = null;
if (populateMetaFields) { // usual path where meta fields are pre populated in prep step.
partitionPath = record.getUTF8String(
HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
} else { // if meta columns are disabled.
if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
partitionPath = "";
} else if (simpleKeyGen) { // SimpleKeyGen
partitionPath = (record.get(simplePartitionFieldIndex, simplePartitionFieldDataType)).toString();
} else {
// only BuiltIn key generators are supported if meta fields are disabled.
partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType);
}
}
if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) {
LOG.info("Creating new file for partition path " + partitionPath);
@@ -103,20 +162,24 @@ public class BulkInsertDataInternalWriterHelper {
if (arePartitionRecordsSorted) {
close();
}
handles.put(partitionPath, new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType));
HoodieRowCreateHandle rowCreateHandle = populateMetaFields ? new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType) : new HoodieRowCreateHandleWithoutMetaFields(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType);
handles.put(partitionPath, rowCreateHandle);
} else if (!handles.get(partitionPath).canWrite()) {
// even if there is a handle to the partition path, it could have reached its max size threshold. So, we close the handle here and
// create a new one.
writeStatusList.add(handles.remove(partitionPath).close());
handles.put(partitionPath, new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType));
HoodieRowCreateHandle rowCreateHandle = populateMetaFields ? new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType) : new HoodieRowCreateHandleWithoutMetaFields(hoodieTable, writeConfig, partitionPath, getNextFileId(),
instantTime, taskPartitionId, taskId, taskEpochId, structType);
handles.put(partitionPath, rowCreateHandle);
}
return handles.get(partitionPath);
}
public void close() throws IOException {
for (HoodieRowCreateHandle rowCreateHandle: handles.values()) {
for (HoodieRowCreateHandle rowCreateHandle : handles.values()) {
writeStatusList.add(rowCreateHandle.close());
}
handles.clear();

View File

@@ -66,6 +66,7 @@ public class DataSourceInternalWriterHelper {
writeClient.startCommitWithTime(instantTime);
this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
this.metaClient.validateTableProperties(writeConfig.getProps(), WriteOperationType.BULK_INSERT);
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
}

View File

@@ -18,12 +18,17 @@
package org.apache.hudi.internal;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.testutils.SparkDatasetTestUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -33,8 +38,10 @@ import org.junit.jupiter.api.BeforeEach;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -62,6 +69,17 @@ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarn
cleanupResources();
}
protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) {
Properties properties = new Properties();
if (!populateMetaFields) {
properties.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY().key(), SimpleKeyGenerator.class.getName());
properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY().key(), SparkDatasetTestUtils.RECORD_KEY_FIELD_NAME);
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY().key(), SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME);
properties.setProperty(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), "false");
}
return getConfigBuilder(basePath).withProperties(properties).build();
}
protected void assertWriteStatuses(List<HoodieInternalWriteStatus> writeStatuses, int batches, int size,
Option<List<String>> fileAbsPaths, Option<List<String>> fileNames) {
assertWriteStatuses(writeStatuses, batches, size, false, fileAbsPaths, fileNames);
@@ -126,21 +144,27 @@ public class HoodieBulkInsertInternalWriterTestBase extends HoodieClientTestHarn
}
}
protected void assertOutput(Dataset<Row> expectedRows, Dataset<Row> actualRows, String instantTime, Option<List<String>> fileNames) {
// verify 3 meta fields that are filled in within create handle
actualRows.collectAsList().forEach(entry -> {
assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)));
if (fileNames.isPresent()) {
assertTrue(fileNames.get().contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
.get(HoodieRecord.FILENAME_METADATA_FIELD))));
}
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
});
protected void assertOutput(Dataset<Row> expectedRows, Dataset<Row> actualRows, String instantTime, Option<List<String>> fileNames,
boolean populateMetaColumns) {
if (populateMetaColumns) {
// verify 3 meta fields that are filled in within create handle
actualRows.collectAsList().forEach(entry -> {
assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime);
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)));
if (fileNames.isPresent()) {
assertTrue(fileNames.get().contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
.get(HoodieRecord.FILENAME_METADATA_FIELD))));
}
assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
});
// after trimming 2 of the meta fields, rest of the fields should match
Dataset<Row> trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
Dataset<Row> trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
assertEquals(0, trimmedActual.except(trimmedExpected).count());
// after trimming 2 of the meta fields, rest of the fields should match
Dataset<Row> trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
Dataset<Row> trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD);
assertEquals(0, trimmedActual.except(trimmedExpected).count());
} else { // operation = BULK_INSERT_APPEND_ONLY
// all meta columns are untouched
assertEquals(0, expectedRows.except(actualRows).count());
}
}
}

View File

@@ -18,12 +18,6 @@
package org.apache.hudi;
import static org.apache.spark.sql.functions.callUDF;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ReflectionUtils;
@@ -41,8 +35,17 @@ import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.collection.JavaConverters;
import static org.apache.spark.sql.functions.callUDF;
/**
* Helper class to assist in preparing {@link Dataset<Row>}s for bulk insert with datasource implementation.
*/
@@ -112,4 +115,40 @@ public class HoodieDatasetBulkInsertHelper {
return bulkInsertPartitionerRows.repartitionRecords(colOrderedDataset, config.getBulkInsertShuffleParallelism());
}
/**
* Add empty meta fields and reorder such that meta fields are at the beginning.
*
* @param rows
* @return
*/
public static Dataset<Row> prepareHoodieDatasetForBulkInsertWithoutMetaFields(Dataset<Row> rows) {
// add empty meta cols.
Dataset<Row> rowsWithMetaCols = rows
.withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType));
List<Column> originalFields =
Arrays.stream(rowsWithMetaCols.schema().fields()).filter(field -> !field.name().contains("_hoodie_")).map(f -> new Column(f.name())).collect(Collectors.toList());
List<Column> metaFields =
Arrays.stream(rowsWithMetaCols.schema().fields()).filter(field -> field.name().contains("_hoodie_")).map(f -> new Column(f.name())).collect(Collectors.toList());
// reorder such that all meta columns are at the beginning followed by original columns
List<Column> allCols = new ArrayList<>();
allCols.addAll(metaFields);
allCols.addAll(originalFields);
return rowsWithMetaCols.select(
JavaConverters.collectionAsScalaIterableConverter(allCols).asScala().toSeq());
}
}

View File

@@ -32,7 +32,7 @@ import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory
import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.index.SparkHoodieIndex
import org.apache.hudi.internal.DataSourceInternalWriterHelper
@@ -128,6 +128,7 @@ object HoodieSparkSqlWriter {
.setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_OPT_KEY))
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
.setPartitionColumns(partitionColumns)
.setPopulateMetaFields(parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(), HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean)
.initTable(sparkContext.hadoopConfiguration, path.get)
tableConfig = tableMetaClient.getTableConfig
}
@@ -139,7 +140,8 @@ object HoodieSparkSqlWriter {
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER_OPT_KEY) &&
operation == WriteOperationType.BULK_INSERT) {
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime)
basePath, path, instantTime, parameters.getOrElse(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()).toBoolean)
return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
}
// scalastyle:on
@@ -330,7 +332,8 @@ object HoodieSparkSqlWriter {
tblName: String,
basePath: Path,
path: Option[String],
instantTime: String): (Boolean, common.util.Option[String]) = {
instantTime: String,
populateMetaFields: Boolean): (Boolean, common.util.Option[String]) = {
val sparkContext = sqlContext.sparkContext
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
@@ -345,22 +348,36 @@ object HoodieSparkSqlWriter {
}
val params = parameters.updated(HoodieWriteConfig.AVRO_SCHEMA.key, schema.toString)
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path.get, tblName, mapAsJavaMap(params))
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (userDefinedBulkInsertPartitionerOpt.isPresent) {
userDefinedBulkInsertPartitionerOpt.get
}
else {
BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode)
val bulkInsertPartitionerRows : BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
userDefinedBulkInsertPartitionerOpt.get
}
else {
BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode)
}
} else {
// Sort modes are not yet supported when meta fields are disabled
new NonSortPartitionerWithRows()
}
val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted();
parameters.updated(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, arePartitionRecordsSorted.toString)
val isGlobalIndex = SparkHoodieIndex.isGlobalIndex(writeConfig)
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace,
bulkInsertPartitionerRows, isGlobalIndex)
val isGlobalIndex = if (populateMetaFields) {
SparkHoodieIndex.isGlobalIndex(writeConfig)
} else {
false
}
val hoodieDF = if (populateMetaFields) {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace,
bulkInsertPartitionerRows, isGlobalIndex)
} else {
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}
if (SPARK_VERSION.startsWith("2.")) {
hoodieDF.write.format("org.apache.hudi.internal")
.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(params)
.mode(SaveMode.Append)
.save()
} else if (SPARK_VERSION.startsWith("3.")) {
hoodieDF.write.format("org.apache.hudi.spark3.internal")

View File

@@ -113,6 +113,33 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
assertTrue(dataset.except(trimmedOutput).count() == 0);
}
@Test
public void testBulkInsertHelperNoMetaFields() {
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(dataset);
StructType resultSchema = result.schema();
assertEquals(result.count(), 10);
assertEquals(resultSchema.fieldNames().length, structType.fieldNames().length + HoodieRecord.HOODIE_META_COLUMNS.size());
for (Map.Entry<String, Integer> entry : HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.entrySet()) {
assertTrue(resultSchema.fieldIndex(entry.getKey()) == entry.getValue());
}
result.toJavaRDD().foreach(entry -> {
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals(""));
});
Dataset<Row> trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
assertTrue(dataset.except(trimmedOutput).count() == 0);
}
@ParameterizedTest
@MethodSource("providePreCombineArgs")
public void testBulkInsertPreCombine(boolean enablePreCombine) {

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap}
import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord, HoodieRecordPayload}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieException
@@ -119,61 +120,13 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
}
List(BulkInsertSortMode.GLOBAL_SORT.name(), BulkInsertSortMode.NONE.name(), BulkInsertSortMode.PARTITION_SORT.name())
List(BulkInsertSortMode.GLOBAL_SORT, BulkInsertSortMode.NONE, BulkInsertSortMode.PARTITION_SORT)
.foreach(sortMode => {
test("test_bulk_insert_for_" + sortMode) {
initSparkContext("test_bulk_insert_datasource")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.OPERATION_OPT_KEY.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY.key -> "true",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key -> "partition",
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key -> "org.apache.hudi.keygen.SimpleKeyGenerator")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val inserts = DataSourceTestUtils.generateRandomRows(1000)
// add some updates so that preCombine kicks in
val toUpdateDataset = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 40), structType)
val updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset)
val records = inserts.union(updates)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
// collect all parition paths to issue read of parquet files
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
// Check the entire dataset has all records still
val fullPartitionPaths = new Array[String](3)
for (i <- 0 until fullPartitionPaths.length) {
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i))
}
// fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
val resultRows = actualDf.collectAsList()
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
assert(df.except(trimmedDf).count() == 0)
testBulkInsertWithSortMode(sortMode, path)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
@@ -181,6 +134,118 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
})
List(true, false)
.foreach(populateMetaFields => {
test("test_bulk_insert_for_populate_meta_fields_" + populateMetaFields) {
initSparkContext("test_bulk_insert_datasource_populate_meta_fields")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path_populate_meta_fields")
try {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, path, populateMetaFields)
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
})
def testBulkInsertWithSortMode(sortMode: BulkInsertSortMode, path: java.nio.file.Path, populateMetaFields : Boolean = true) : Unit = {
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.OPERATION_OPT_KEY.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY.key -> "true",
HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key() -> String.valueOf(populateMetaFields),
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key -> "partition",
HoodieWriteConfig.BULKINSERT_SORT_MODE.key() -> sortMode.name(),
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key -> "org.apache.hudi.keygen.SimpleKeyGenerator")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val inserts = DataSourceTestUtils.generateRandomRows(1000)
// add some updates so that preCombine kicks in
val toUpdateDataset = sqlContext.createDataFrame(DataSourceTestUtils.getUniqueRows(inserts, 40), structType)
val updates = DataSourceTestUtils.updateRowsWithHigherTs(toUpdateDataset)
val records = inserts.union(updates)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
// collect all parition paths to issue read of parquet files
val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
// Check the entire dataset has all records still
val fullPartitionPaths = new Array[String](3)
for (i <- 0 until fullPartitionPaths.length) {
fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i))
}
// fetch all records from parquet files generated from write to hudi
val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2))
if (!populateMetaFields) {
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(0)).filter(entry => !(entry.mkString(",").equals(""))).count())
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(1)).filter(entry => !(entry.mkString(",").equals(""))).count())
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(2)).filter(entry => !(entry.mkString(",").equals(""))).count())
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(3)).filter(entry => !(entry.mkString(",").equals(""))).count())
assertEquals(0, actualDf.select(HoodieRecord.HOODIE_META_COLUMNS.get(4)).filter(entry => !(entry.mkString(",").equals(""))).count())
}
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
assert(df.except(trimmedDf).count() == 0)
}
test("test disable and enable meta fields") {
initSparkContext("test_disable_enable_meta_fields")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
try {
testBulkInsertWithSortMode(BulkInsertSortMode.NONE, path, false)
// enabling meta fields back should throw exception
val hoodieFooTableName = "hoodie_foo_tbl"
//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.OPERATION_OPT_KEY.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY.key -> "true",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key -> "partition",
HoodieWriteConfig.BULKINSERT_SORT_MODE.key() -> BulkInsertSortMode.NONE.name(),
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key -> "org.apache.hudi.keygen.SimpleKeyGenerator")
val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
val inserts = DataSourceTestUtils.generateRandomRows(1000)
val df = spark.createDataFrame(sc.parallelize(inserts), structType)
try {
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df)
fail("Should have thrown exception")
} catch {
case e: HoodieException => assertTrue(e.getMessage.contains("hoodie.populate.meta.fields already disabled for the table. Can't be re-enabled back"))
}
} finally {
spark.stop()
FileUtils.deleteDirectory(path.toFile)
}
}
test("test drop duplicates row writing for bulk_insert") {
initSparkContext("test_append_mode")
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.internal;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -61,12 +62,14 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get();
String path = options.get("path").get();
String tblName = options.get(HoodieWriteConfig.TABLE_NAME.key()).get();
boolean populateMetaFields = options.getBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
Boolean.parseBoolean(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()));
// 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways.
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA.key()).get(), path, tblName, options.asMap());
boolean arePartitionRecordsSorted = HoodieInternalConfig.getBulkInsertIsPartitionRecordsSorted(
options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).isPresent()
? options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).get() : null);
return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(),
getConfiguration(), options, arePartitionRecordsSorted));
getConfiguration(), options, populateMetaFields, arePartitionRecordsSorted));
}
}

View File

@@ -36,10 +36,10 @@ public class HoodieBulkInsertDataInternalWriter implements DataWriter<InternalRo
private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper;
public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId,
StructType structType, boolean arePartitionRecordsSorted) {
String instantTime, int taskPartitionId, long taskId, long taskEpochId,
StructType structType, boolean populateMetaFields, boolean arePartitionRecordsSorted) {
this.bulkInsertWriterHelper = new BulkInsertDataInternalWriterHelper(hoodieTable,
writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, structType, arePartitionRecordsSorted);
writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, structType, populateMetaFields, arePartitionRecordsSorted);
}
@Override

View File

@@ -36,19 +36,22 @@ public class HoodieBulkInsertDataInternalWriterFactory implements DataWriterFact
private final HoodieWriteConfig writeConfig;
private final StructType structType;
private final boolean arePartitionRecordsSorted;
private final boolean populateMetaFields;
public HoodieBulkInsertDataInternalWriterFactory(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, StructType structType, boolean arePartitionRecordsSorted) {
String instantTime, StructType structType, boolean populateMetaFields,
boolean arePartitionRecordsSorted) {
this.hoodieTable = hoodieTable;
this.writeConfig = writeConfig;
this.instantTime = instantTime;
this.structType = structType;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
}
@Override
public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig, instantTime, partitionId, taskId, epochId,
structType, arePartitionRecordsSorted);
structType, populateMetaFields, arePartitionRecordsSorted);
}
}

View File

@@ -49,15 +49,17 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
private final HoodieWriteConfig writeConfig;
private final StructType structType;
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
private final boolean populateMetaFields;
private final Boolean arePartitionRecordsSorted;
private Map<String, String> extraMetadataMap = new HashMap<>();
public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession sparkSession, Configuration configuration, DataSourceOptions dataSourceOptions,
boolean arePartitionRecordsSorted) {
boolean populateMetaFields, boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
@@ -69,7 +71,7 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
dataSourceInternalWriterHelper.createInflightCommit();
if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) {
return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
writeConfig, instantTime, structType, arePartitionRecordsSorted);
writeConfig, instantTime, structType, populateMetaFields, arePartitionRecordsSorted);
} else {
throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
}

View File

@@ -38,7 +38,6 @@ import java.util.stream.Stream;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
@@ -51,6 +50,16 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieBulkInsertInternalWriterTestBase {
private static Stream<Arguments> configParams() {
Object[][] data = new Object[][] {
{true, true},
{true, false},
{false, true},
{false, false}
};
return Stream.of(data).map(Arguments::of);
}
private static Stream<Arguments> bulkInsertTypeParams() {
Object[][] data = new Object[][] {
{true},
{false}
@@ -60,16 +69,16 @@ public class TestHoodieBulkInsertDataInternalWriter extends
@ParameterizedTest
@MethodSource("configParams")
public void testDataInternalWriter(boolean sorted) throws Exception {
public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
// execute N rounds
for (int i = 0; i < 3; i++) {
String instantTime = "00" + i;
// init writer
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE,
sorted);
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
STRUCT_TYPE, populateMetaFields, sorted);
int size = 10 + RANDOM.nextInt(1000);
// write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file
@@ -96,11 +105,10 @@ public class TestHoodieBulkInsertDataInternalWriter extends
// verify rows
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(totalInputRows, result, instantTime, fileNames);
assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields);
}
}
/**
* Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected
* to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk.
@@ -108,13 +116,13 @@ public class TestHoodieBulkInsertDataInternalWriter extends
@Test
public void testGlobalFailure() throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(true);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
String instantTime = "001";
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE,
false);
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
STRUCT_TYPE, true, false);
int size = 10 + RANDOM.nextInt(100);
int totalFailures = 5;
@@ -150,7 +158,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
// verify rows
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(inputRows, result, instantTime, fileNames);
assertOutput(inputRows, result, instantTime, fileNames, true);
}
private void writeRows(Dataset<Row> inputRows, HoodieBulkInsertDataInternalWriter writer)

View File

@@ -31,6 +31,9 @@ import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,10 +41,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,18 +56,28 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieDataSourceInternalWriter extends
HoodieBulkInsertInternalWriterTestBase {
@Test
public void testDataSourceWriter() throws Exception {
testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP);
private static Stream<Arguments> bulkInsertTypeParams() {
Object[][] data = new Object[][] {
{true},
{false}
};
return Stream.of(data).map(Arguments::of);
}
private void testDataSourceWriterInternal(Map<String, String> extraMetadata, Map<String, String> expectedExtraMetadata) throws Exception {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testDataSourceWriter(boolean populateMetaFields) throws Exception {
testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, populateMetaFields);
}
private void testDataSourceWriterInternal(Map<String, String> extraMetadata, Map<String, String> expectedExtraMetadata, boolean populateMetaFields)
throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
String instantTime = "001";
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(extraMetadata), false);
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(extraMetadata), populateMetaFields, false);
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());
String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
@@ -95,7 +108,7 @@ public class TestHoodieDataSourceInternalWriter extends
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify output
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
// verify extra metadata
@@ -122,7 +135,7 @@ public class TestHoodieDataSourceInternalWriter extends
expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key());
expectedMetadata.remove("commit_extra_c");
testDataSourceWriterInternal(extraMeta, expectedMetadata);
testDataSourceWriterInternal(extraMeta, expectedMetadata, true);
}
@Test
@@ -134,13 +147,14 @@ public class TestHoodieDataSourceInternalWriter extends
extraMeta.put("keyB", "valB");
extraMeta.put("commit_extra_c", "valC");
// none of the keys has commit metadata key prefix.
testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP);
testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP, true);
}
@Test
public void testMultipleDataSourceWrites() throws Exception {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testMultipleDataSourceWrites(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
int partitionCounter = 0;
// execute N rounds
@@ -148,7 +162,7 @@ public class TestHoodieDataSourceInternalWriter extends
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false);
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong());
@@ -172,18 +186,20 @@ public class TestHoodieDataSourceInternalWriter extends
dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime,
populateMetaFields);
// verify output
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
}
@Test
public void testLargeWrites() throws Exception {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testLargeWrites(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
int partitionCounter = 0;
// execute N rounds
@@ -191,7 +207,7 @@ public class TestHoodieDataSourceInternalWriter extends
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false);
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong());
@@ -215,10 +231,11 @@ public class TestHoodieDataSourceInternalWriter extends
dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime,
populateMetaFields);
// verify output
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
}
@@ -229,15 +246,16 @@ public class TestHoodieDataSourceInternalWriter extends
* abort batch2
* verify only records from batch1 is available to read
*/
@Test
public void testAbort() throws Exception {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testAbort(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
String instantTime0 = "00" + 0;
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false);
new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());
List<String> partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
@@ -269,13 +287,13 @@ public class TestHoodieDataSourceInternalWriter extends
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify rows
assertOutput(totalInputRows, result, instantTime0, Option.empty());
assertOutput(totalInputRows, result, instantTime0, Option.empty(), populateMetaFields);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
// 2nd batch. abort in the end
String instantTime1 = "00" + 1;
dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false);
new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), populateMetaFields, false);
writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(1, RANDOM.nextLong(), RANDOM.nextLong());
for (int j = 0; j < batches; j++) {
@@ -293,7 +311,7 @@ public class TestHoodieDataSourceInternalWriter extends
result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify rows
// only rows from first batch should be present
assertOutput(totalInputRows, result, instantTime0, Option.empty());
assertOutput(totalInputRows, result, instantTime0, Option.empty(), populateMetaFields);
}
private void writeRows(Dataset<Row> inputRows, DataWriter<InternalRow> writer) throws Exception {

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.spark3.internal;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.BaseDefaultSource;
@@ -48,11 +49,13 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
String path = properties.get("path");
String tblName = properties.get(HoodieWriteConfig.TABLE_NAME.key());
boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.key(),
HoodieTableConfig.HOODIE_POPULATE_META_FIELDS.defaultValue()));
boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));
// 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways.
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(properties.get(HoodieWriteConfig.AVRO_SCHEMA.key()), path, tblName, properties);
return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
getConfiguration(), properties, arePartitionRecordsSorted);
getConfiguration(), properties, populateMetaFields, arePartitionRecordsSorted);
}
}

View File

@@ -37,9 +37,10 @@ public class HoodieBulkInsertDataInternalWriter implements DataWriter<InternalRo
private final BulkInsertDataInternalWriterHelper bulkInsertWriterHelper;
public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, StructType structType, boolean arePartitionRecordsSorted) {
String instantTime, int taskPartitionId, long taskId, StructType structType, boolean populateMetaFields,
boolean arePartitionRecordsSorted) {
this.bulkInsertWriterHelper = new BulkInsertDataInternalWriterHelper(hoodieTable,
writeConfig, instantTime, taskPartitionId, taskId, 0, structType, arePartitionRecordsSorted);
writeConfig, instantTime, taskPartitionId, taskId, 0, structType, populateMetaFields, arePartitionRecordsSorted);
}
@Override
@@ -61,4 +62,4 @@ public class HoodieBulkInsertDataInternalWriter implements DataWriter<InternalRo
public void close() throws IOException {
bulkInsertWriterHelper.close();
}
}
}

View File

@@ -35,20 +35,23 @@ public class HoodieBulkInsertDataInternalWriterFactory implements DataWriterFact
private final HoodieTable hoodieTable;
private final HoodieWriteConfig writeConfig;
private final StructType structType;
private final boolean populateMetaFields;
private final boolean arePartitionRecordsSorted;
public HoodieBulkInsertDataInternalWriterFactory(HoodieTable hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, StructType structType, boolean arePartitionRecordsSorted) {
String instantTime, StructType structType, boolean populateMetaFields,
boolean arePartitionRecordsSorted) {
this.hoodieTable = hoodieTable;
this.writeConfig = writeConfig;
this.instantTime = instantTime;
this.structType = structType;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
}
@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig, instantTime, partitionId, taskId,
structType, arePartitionRecordsSorted);
structType, populateMetaFields, arePartitionRecordsSorted);
}
}

View File

@@ -27,10 +27,10 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
@@ -49,14 +49,16 @@ public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
private final HoodieWriteConfig writeConfig;
private final StructType structType;
private final boolean arePartitionRecordsSorted;
private final boolean populateMetaFields;
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
private Map<String, String> extraMetadata = new HashMap<>();
public HoodieDataSourceInternalBatchWrite(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties, boolean arePartitionRecordsSorted) {
SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties, boolean populateMetaFields, boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.extraMetadata = DataSourceUtils.getExtraMetadata(properties);
this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
@@ -68,7 +70,7 @@ public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
dataSourceInternalWriterHelper.createInflightCommit();
if (WriteOperationType.BULK_INSERT == dataSourceInternalWriterHelper.getWriteOperationType()) {
return new HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
writeConfig, instantTime, structType, arePartitionRecordsSorted);
writeConfig, instantTime, structType, populateMetaFields, arePartitionRecordsSorted);
} else {
throw new IllegalArgumentException("Write Operation Type + " + dataSourceInternalWriterHelper.getWriteOperationType() + " not supported ");
}

View File

@@ -39,23 +39,26 @@ public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder {
private final StructType structType;
private final SparkSession jss;
private final Configuration hadoopConfiguration;
private final boolean arePartitionRecordsSorted;
private final Map<String, String> properties;
private final boolean populateMetaFields;
private final boolean arePartitionRecordsSorted;
public HoodieDataSourceInternalBatchWriteBuilder(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties, boolean arePartitionRecordsSorted) {
SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties, boolean populateMetaFields,
boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.jss = jss;
this.hadoopConfiguration = hadoopConfiguration;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.properties = properties;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
}
@Override
public BatchWrite buildForBatch() {
return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig, structType, jss,
hadoopConfiguration, properties, arePartitionRecordsSorted);
hadoopConfiguration, properties, populateMetaFields, arePartitionRecordsSorted);
}
}

View File

@@ -18,9 +18,9 @@
package org.apache.hudi.spark3.internal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -44,17 +44,19 @@ class HoodieDataSourceInternalTable implements SupportsWrite {
private final Configuration hadoopConfiguration;
private final boolean arePartitionRecordsSorted;
private final Map<String, String> properties;
private final boolean populateMetaFields;
public HoodieDataSourceInternalTable(String instantTime, HoodieWriteConfig config,
StructType schema, SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties,
boolean arePartitionRecordsSorted) {
boolean populateMetaFields, boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = config;
this.structType = schema;
this.jss = jss;
this.hadoopConfiguration = hadoopConfiguration;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.properties = properties;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
}
@Override
@@ -69,7 +71,8 @@ class HoodieDataSourceInternalTable implements SupportsWrite {
@Override
public Set<TableCapability> capabilities() {
return new HashSet<TableCapability>() {{
return new HashSet<TableCapability>() {
{
add(TableCapability.BATCH_WRITE);
add(TableCapability.TRUNCATE);
}
@@ -79,6 +82,6 @@ class HoodieDataSourceInternalTable implements SupportsWrite {
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo logicalWriteInfo) {
return new HoodieDataSourceInternalBatchWriteBuilder(instantTime, writeConfig, structType, jss,
hadoopConfiguration, properties, arePartitionRecordsSorted);
hadoopConfiguration, properties, populateMetaFields, arePartitionRecordsSorted);
}
}

View File

@@ -39,7 +39,6 @@ import java.util.stream.Stream;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
@@ -52,6 +51,16 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieBulkInsertInternalWriterTestBase {
private static Stream<Arguments> configParams() {
Object[][] data = new Object[][] {
{true, true},
{true, false},
{false, true},
{false, false}
};
return Stream.of(data).map(Arguments::of);
}
private static Stream<Arguments> bulkInsertTypeParams() {
Object[][] data = new Object[][] {
{true},
{false}
@@ -61,16 +70,16 @@ public class TestHoodieBulkInsertDataInternalWriter extends
@ParameterizedTest
@MethodSource("configParams")
public void testDataInternalWriter(boolean sorted) throws Exception {
public void testDataInternalWriter(boolean sorted, boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
// execute N rounds
for (int i = 0; i < 5; i++) {
String instantTime = "00" + i;
// init writer
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE,
sorted);
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000),
RANDOM.nextLong(), STRUCT_TYPE, populateMetaFields, sorted);
int size = 10 + RANDOM.nextInt(1000);
// write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file
@@ -97,7 +106,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
// verify rows
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(totalInputRows, result, instantTime, fileNames);
assertOutput(totalInputRows, result, instantTime, fileNames, populateMetaFields);
}
}
@@ -109,12 +118,13 @@ public class TestHoodieBulkInsertDataInternalWriter extends
@Test
public void testGlobalFailure() throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(true);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
String instantTime = "001";
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), STRUCT_TYPE, false);
HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000),
RANDOM.nextLong(), STRUCT_TYPE, true, false);
int size = 10 + RANDOM.nextInt(100);
int totalFailures = 5;
@@ -150,7 +160,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
// verify rows
Dataset<Row> result = sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
assertOutput(inputRows, result, instantTime, fileNames);
assertOutput(inputRows, result, instantTime, fileNames, true);
}
private void writeRows(Dataset<Row> inputRows, HoodieBulkInsertDataInternalWriter writer)

View File

@@ -33,6 +33,9 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -40,10 +43,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -55,19 +58,28 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieDataSourceInternalBatchWrite extends
HoodieBulkInsertInternalWriterTestBase {
@Test
public void testDataSourceWriter() throws Exception {
testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP);
private static Stream<Arguments> bulkInsertTypeParams() {
Object[][] data = new Object[][] {
{true},
{false}
};
return Stream.of(data).map(Arguments::of);
}
private void testDataSourceWriterInternal(Map<String, String> extraMetadata, Map<String, String> expectedExtraMetadata) throws Exception {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testDataSourceWriter(boolean populateMetaFields) throws Exception {
testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, populateMetaFields);
}
private void testDataSourceWriterInternal(Map<String, String> extraMetadata, Map<String, String> expectedExtraMetadata, boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
String instantTime = "001";
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, extraMetadata, false);
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, extraMetadata, populateMetaFields, false);
DataWriter<InternalRow> writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong());
String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
@@ -99,7 +111,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify output
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
// verify extra metadata
@@ -125,7 +137,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key());
expectedMetadata.remove("commit_extra_c");
testDataSourceWriterInternal(extraMeta, expectedMetadata);
testDataSourceWriterInternal(extraMeta, expectedMetadata, true);
}
@Test
@@ -137,13 +149,14 @@ public class TestHoodieDataSourceInternalBatchWrite extends
extraMeta.put("keyB", "valB");
extraMeta.put("commit_extra_c", "valC");
// none of the keys has commit metadata key prefix.
testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP);
testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP, true);
}
@Test
public void testMultipleDataSourceWrites() throws Exception {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testMultipleDataSourceWrites(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
int partitionCounter = 0;
@@ -152,7 +165,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false);
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, populateMetaFields, false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong());
@@ -176,18 +189,19 @@ public class TestHoodieDataSourceInternalBatchWrite extends
dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime, populateMetaFields);
// verify output
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
}
@Test
public void testLargeWrites() throws Exception {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testLargeWrites(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
int partitionCounter = 0;
@@ -196,7 +210,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false);
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, populateMetaFields, false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++, RANDOM.nextLong());
@@ -220,10 +234,11 @@ public class TestHoodieDataSourceInternalBatchWrite extends
dataSourceInternalBatchWrite.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime);
Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime,
populateMetaFields);
// verify output
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertOutput(totalInputRows, result, instantTime, Option.empty(), populateMetaFields);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
}
}
@@ -234,15 +249,16 @@ public class TestHoodieDataSourceInternalBatchWrite extends
* abort batch2
* verify only records from batch1 is available to read
*/
@Test
public void testAbort() throws Exception {
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testAbort(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
String instantTime0 = "00" + 0;
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false);
new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, populateMetaFields, false);
DataWriter<InternalRow> writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, RANDOM.nextLong());
List<String> partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
@@ -274,13 +290,13 @@ public class TestHoodieDataSourceInternalBatchWrite extends
metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify rows
assertOutput(totalInputRows, result, instantTime0, Option.empty());
assertOutput(totalInputRows, result, instantTime0, Option.empty(), populateMetaFields);
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());
// 2nd batch. abort in the end
String instantTime1 = "00" + 1;
dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, false);
new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, Collections.EMPTY_MAP, populateMetaFields, false);
writer = dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, RANDOM.nextLong());
for (int j = 0; j < batches; j++) {
@@ -298,7 +314,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify rows
// only rows from first batch should be present
assertOutput(totalInputRows, result, instantTime0, Option.empty());
assertOutput(totalInputRows, result, instantTime0, Option.empty(), populateMetaFields);
}
private void writeRows(Dataset<Row> inputRows, DataWriter<InternalRow> writer) throws Exception {