1
0

[HUDI-2950] Addressing performance traps in Bulk Insert/Layout Optimization (#4234)

* Cleaned up Z-curve/Hilbert ordering seqs:
  - Streamlined flow
  - Removed unnecessary operations (double-mapping, boxing, etc)
Updated `CollectionUtils::combine` to avoid AL resizing

* Tidying up

* Reducing small objects churn due to Scala/Java conversions by re-using `RowFactory`, passing `Object[]`

* Fixing name resolution (disambiguation overloads)

* `lint`

* Replaced `OverwriteAvroPayloadRecord` w/ `RewriteRecordPayload` to avoid unnecessary Avro ser/de loop

* Added `PathCachingFileName` to avoid fetching substrings every time file-name is fetched;
Inject `PathCachingFileName` into `HoodieWrapperFileSystem.convertPathWithScheme`

* Drastically reducing size of the `ArrayDeque` allocated by `ObjectSizeCalculator`

* XXX

* Missing license

* Fixed refs (after rebase)

* Fixing compilation failure in Scala 2.11

* `PathCachingFileName` > `FileNameCachingPath`

* Tidying up
This commit is contained in:
Alexey Kudinkin
2022-01-10 18:23:22 -08:00
committed by GitHub
parent c8df9b09d7
commit f1e3762a94
7 changed files with 182 additions and 95 deletions

View File

@@ -27,8 +27,7 @@ import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sort.SpaceCurveSortingHelper;
@@ -62,16 +61,12 @@ public class RDDSpatialCurveOptimizationSortPartitioner<T extends HoodieRecordPa
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
String payloadClass = config.getPayloadClass();
// do sort
JavaRDD<GenericRecord> preparedRecord = prepareGenericRecord(records, outputSparkPartitions, serializableSchema.get());
return preparedRecord.map(record -> {
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(payloadClass,
new Object[] {Option.of(record)}, Option.class);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new RewriteAvroPayload(record));
return hoodieRecord;
});
}

View File

@@ -18,19 +18,19 @@
package org.apache.hudi.sort;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.optimize.HilbertCurveUtils;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hudi.execution.ByteArraySorting;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
@@ -51,8 +51,9 @@ import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.davidmoten.hilbert.HilbertCurve;
import scala.collection.JavaConversions;
import scala.collection.mutable.WrappedArray;
import java.util.ArrayList;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -126,7 +127,7 @@ public class SpaceCurveSortingHelper {
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
default:
throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", layoutOptStrategy));
throw new UnsupportedOperationException(String.format("Not supported layout-optimization strategy (%s)", layoutOptStrategy));
}
// Compose new {@code StructType} for ordered RDDs
@@ -148,51 +149,24 @@ public class SpaceCurveSortingHelper {
private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return BinaryUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(BinaryUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
byte[][] zBytes = fieldMap.entrySet().stream()
.map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueTo8Bytes(row, index, field.dataType());
})
.toArray(byte[][]::new);
// Interleave received bytes to produce Z-curve ordinal
byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8);
return appendToRow(row, zOrdinalBytes);
})
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
// NOTE: Here {@code mapPartitions} is used to make sure Hilbert curve instance is initialized
// only once per partition
return originRDD.mapPartitions(rows -> {
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
@@ -205,48 +179,91 @@ public class SpaceCurveSortingHelper {
@Override
public Row next() {
Row row = rows.next();
List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
} else if (dataType instanceof DoubleType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index);
} else if (dataType instanceof FloatType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index));
} else if (dataType instanceof StringType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index));
} else if (dataType instanceof DateType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
} else if (dataType instanceof TimestampType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
} else if (dataType instanceof ByteType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)});
} else if (dataType instanceof ShortType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index);
} else if (dataType instanceof DecimalType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return value ? Long.MAX_VALUE : 0;
} else if (dataType instanceof BinaryType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
long[] longs = fieldMap.entrySet().stream()
.mapToLong(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueToLong(row, index, field.dataType());
})
.toArray();
byte[] hilbertValue = HilbertCurveUtils.indexBytes(
hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
List<Object> values = new ArrayList<>();
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
values.add(hilbertValue);
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
// Map N-dimensional coordinates into position on the Hilbert curve
byte[] hilbertCurvePosBytes = HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);
return appendToRow(row, hilbertCurvePosBytes);
}
};
}).sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
})
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
private static Row appendToRow(Row row, Object value) {
// NOTE: This is an ugly hack to avoid array re-allocation --
// Spark's {@code Row#toSeq} returns array of Objects
Object[] currentValues = (Object[]) ((WrappedArray<Object>) row.toSeq()).array();
return RowFactory.create(CollectionUtils.append(currentValues, value));
}
@Nonnull
private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType dataType) {
if (dataType instanceof LongType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return BinaryUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName()));
}
private static long mapColumnValueToLong(Row row, int index, DataType dataType) {
if (dataType instanceof LongType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
} else if (dataType instanceof DoubleType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getInt(index);
} else if (dataType instanceof FloatType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index));
} else if (dataType instanceof StringType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index));
} else if (dataType instanceof DateType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
} else if (dataType instanceof TimestampType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
} else if (dataType instanceof ByteType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)});
} else if (dataType instanceof ShortType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getShort(index);
} else if (dataType instanceof DecimalType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return value ? Long.MAX_VALUE : 0;
} else if (dataType instanceof BinaryType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index));
}
throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName()));
}
public static Dataset<Row> orderDataFrameBySamplingValues(

View File

@@ -596,6 +596,7 @@ public class HoodieAvroUtils {
if (columns.length == 1) {
return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0], true, consistentLogicalTimestampEnabled);
} else {
// TODO this is inefficient, instead we can simply return array of Comparable
StringBuilder sb = new StringBuilder();
for (String col : columns) {
sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true, consistentLogicalTimestampEnabled));

View File

@@ -48,6 +48,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hudi.hadoop.FileNameCachingPath;
import java.io.IOException;
import java.net.URI;
@@ -141,7 +142,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
try {
newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(),
oldURI.getQuery(), oldURI.getFragment());
return new Path(newURI);
return new FileNameCachingPath(newURI);
} catch (URISyntaxException e) {
// TODO - Better Exception handling
throw new RuntimeException(e);

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.common.util;
import org.apache.hudi.common.util.collection.Pair;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,11 +38,35 @@ public class CollectionUtils {
public static final Properties EMPTY_PROPERTIES = new Properties();
/**
* Combines provided arrays into one
*/
@SuppressWarnings("unchecked")
public static <T> T[] combine(T[] one, T[] another) {
T[] combined = (T[]) Array.newInstance(one.getClass().getComponentType(), one.length + another.length);
System.arraycopy(one, 0, combined, 0, one.length);
System.arraycopy(another, 0, combined, one.length, another.length);
return combined;
}
/**
* Combines provided array and an element into a new array
*/
@SuppressWarnings("unchecked")
public static <T> T[] append(T[] array, T elem) {
T[] combined = (T[]) Array.newInstance(array.getClass().getComponentType(), array.length + 1);
System.arraycopy(array, 0, combined, 0, array.length);
combined[array.length] = elem;
return combined;
}
/**
* Combines provided {@link List}s into one
*/
public static <E> List<E> combine(List<E> one, List<E> another) {
ArrayList<E> combined = new ArrayList<>(one);
ArrayList<E> combined = new ArrayList<>(one.size() + another.size());
combined.addAll(one);
combined.addAll(another);
return combined;
}

View File

@@ -90,7 +90,7 @@ public class ObjectSizeCalculator {
private final Map<Class<?>, ClassSizeInfo> classSizeInfos = new IdentityHashMap<>();
private final Set<Object> alreadyVisited = Collections.newSetFromMap(new IdentityHashMap<>());
private final Deque<Object> pending = new ArrayDeque<>(16 * 1024);
private final Deque<Object> pending = new ArrayDeque<>(64);
private long size;
/**

View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hadoop;
import org.apache.hadoop.fs.Path;
import java.net.URI;
/**
* NOTE: This class is thread-safe
*/
public class FileNameCachingPath extends Path {
// NOTE: volatile keyword is redundant here and put mostly for reader notice, since all
// reads/writes to references are always atomic (including 64-bit JVMs)
// https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7
private volatile String fileName;
public FileNameCachingPath(URI aUri) {
super(aUri);
}
@Override
public String getName() {
// This value could be overwritten concurrently and that's okay, since
// {@code Path} is immutable
if (fileName == null) {
fileName = super.getName();
}
return fileName;
}
}