diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java index 896a2aaab..ca7dfa3e7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java @@ -27,8 +27,7 @@ import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sort.SpaceCurveSortingHelper; @@ -62,16 +61,12 @@ public class RDDSpatialCurveOptimizationSortPartitioner> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - String payloadClass = config.getPayloadClass(); - // do sort JavaRDD preparedRecord = prepareGenericRecord(records, outputSparkPartitions, serializableSchema.get()); return preparedRecord.map(record -> { String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); HoodieKey hoodieKey = new HoodieKey(key, partition); - HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(payloadClass, - new Object[] {Option.of(record)}, Option.class); - HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload); + HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new RewriteAvroPayload(record)); return hoodieRecord; }); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java index 8ebc032a1..496168e84 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java @@ -18,19 +18,19 @@ package org.apache.hudi.sort; +import org.apache.hudi.common.util.BinaryUtil; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.optimize.HilbertCurveUtils; -import org.apache.hudi.common.util.BinaryUtil; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.Row$; -import org.apache.spark.sql.hudi.execution.RangeSampleSort$; +import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.hudi.execution.ByteArraySorting; +import org.apache.spark.sql.hudi.execution.RangeSampleSort$; import org.apache.spark.sql.types.BinaryType; import org.apache.spark.sql.types.BinaryType$; import org.apache.spark.sql.types.BooleanType; @@ -51,8 +51,9 @@ import org.apache.spark.sql.types.StructType$; import org.apache.spark.sql.types.TimestampType; import org.davidmoten.hilbert.HilbertCurve; import scala.collection.JavaConversions; +import scala.collection.mutable.WrappedArray; -import java.util.ArrayList; +import javax.annotation.Nonnull; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -126,7 +127,7 @@ public class SpaceCurveSortingHelper { sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount); break; default: - throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", layoutOptStrategy)); + throw new UnsupportedOperationException(String.format("Not supported layout-optimization strategy (%s)", layoutOptStrategy)); } // Compose new {@code StructType} for ordered RDDs @@ -148,51 +149,24 @@ public class SpaceCurveSortingHelper { private static JavaRDD createZCurveSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) { return originRDD.map(row -> { - List zBytesList = fieldMap.entrySet().stream().map(entry -> { - int index = entry.getKey(); - StructField field = entry.getValue(); - DataType dataType = field.dataType(); - if (dataType instanceof LongType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); - } else if (dataType instanceof DoubleType) { - return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); - } else if (dataType instanceof IntegerType) { - return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); - } else if (dataType instanceof FloatType) { - return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); - } else if (dataType instanceof StringType) { - return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); - } else if (dataType instanceof DateType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); - } else if (dataType instanceof TimestampType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); - } else if (dataType instanceof ByteType) { - return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); - } else if (dataType instanceof ShortType) { - return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); - } else if (dataType instanceof DecimalType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); - } else if (dataType instanceof BooleanType) { - boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return BinaryUtil.intTo8Byte(value ? 1 : 0); - } else if (dataType instanceof BinaryType) { - return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); - } - return null; - }).filter(f -> f != null).collect(Collectors.toList()); - byte[][] zBytes = new byte[zBytesList.size()][]; - for (int i = 0; i < zBytesList.size(); i++) { - zBytes[i] = zBytesList.get(i); - } - List zVaules = new ArrayList<>(); - zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); - zVaules.add(BinaryUtil.interleaving(zBytes, 8)); - return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); + byte[][] zBytes = fieldMap.entrySet().stream() + .map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + return mapColumnValueTo8Bytes(row, index, field.dataType()); + }) + .toArray(byte[][]::new); + + // Interleave received bytes to produce Z-curve ordinal + byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8); + return appendToRow(row, zOrdinalBytes); }) - .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); + .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); } private static JavaRDD createHilbertSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) { + // NOTE: Here {@code mapPartitions} is used to make sure Hilbert curve instance is initialized + // only once per partition return originRDD.mapPartitions(rows -> { HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size()); return new Iterator() { @@ -205,48 +179,91 @@ public class SpaceCurveSortingHelper { @Override public Row next() { Row row = rows.next(); - List longList = fieldMap.entrySet().stream().map(entry -> { - int index = entry.getKey(); - StructField field = entry.getValue(); - DataType dataType = field.dataType(); - if (dataType instanceof LongType) { - return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index); - } else if (dataType instanceof DoubleType) { - return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index)); - } else if (dataType instanceof IntegerType) { - return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index); - } else if (dataType instanceof FloatType) { - return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); - } else if (dataType instanceof StringType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index)); - } else if (dataType instanceof DateType) { - return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); - } else if (dataType instanceof TimestampType) { - return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); - } else if (dataType instanceof ByteType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)}); - } else if (dataType instanceof ShortType) { - return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index); - } else if (dataType instanceof DecimalType) { - return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue(); - } else if (dataType instanceof BooleanType) { - boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return value ? Long.MAX_VALUE : 0; - } else if (dataType instanceof BinaryType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index)); - } - return null; - }).filter(f -> f != null).collect(Collectors.toList()); + long[] longs = fieldMap.entrySet().stream() + .mapToLong(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + return mapColumnValueToLong(row, index, field.dataType()); + }) + .toArray(); - byte[] hilbertValue = HilbertCurveUtils.indexBytes( - hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63); - List values = new ArrayList<>(); - values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); - values.add(hilbertValue); - return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values)); + // Map N-dimensional coordinates into position on the Hilbert curve + byte[] hilbertCurvePosBytes = HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63); + return appendToRow(row, hilbertCurvePosBytes); } }; - }).sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); + }) + .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); + } + + private static Row appendToRow(Row row, Object value) { + // NOTE: This is an ugly hack to avoid array re-allocation -- + // Spark's {@code Row#toSeq} returns array of Objects + Object[] currentValues = (Object[]) ((WrappedArray) row.toSeq()).array(); + return RowFactory.create(CollectionUtils.append(currentValues, value)); + } + + @Nonnull + private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType dataType) { + if (dataType instanceof LongType) { + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + } else if (dataType instanceof DoubleType) { + return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + } else if (dataType instanceof FloatType) { + return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + } else if (dataType instanceof StringType) { + return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + } else if (dataType instanceof DateType) { + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + } else if (dataType instanceof TimestampType) { + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + } else if (dataType instanceof ByteType) { + return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + } else if (dataType instanceof ShortType) { + return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + } else if (dataType instanceof DecimalType) { + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return BinaryUtil.intTo8Byte(value ? 1 : 0); + } else if (dataType instanceof BinaryType) { + return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + } + + throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName())); + } + + private static long mapColumnValueToLong(Row row, int index, DataType dataType) { + if (dataType instanceof LongType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index); + } else if (dataType instanceof DoubleType) { + return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getInt(index); + } else if (dataType instanceof FloatType) { + return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); + } else if (dataType instanceof StringType) { + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index)); + } else if (dataType instanceof DateType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); + } else if (dataType instanceof TimestampType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); + } else if (dataType instanceof ByteType) { + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)}); + } else if (dataType instanceof ShortType) { + return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getShort(index); + } else if (dataType instanceof DecimalType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue(); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return value ? Long.MAX_VALUE : 0; + } else if (dataType instanceof BinaryType) { + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index)); + } + + throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName())); } public static Dataset orderDataFrameBySamplingValues( diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 7dfeb582e..9fabc647d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -596,6 +596,7 @@ public class HoodieAvroUtils { if (columns.length == 1) { return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0], true, consistentLogicalTimestampEnabled); } else { + // TODO this is inefficient, instead we can simply return array of Comparable StringBuilder sb = new StringBuilder(); for (String col : columns) { sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true, consistentLogicalTimestampEnabled)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index 1faaad533..8521fd820 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; +import org.apache.hudi.hadoop.FileNameCachingPath; import java.io.IOException; import java.net.URI; @@ -141,7 +142,7 @@ public class HoodieWrapperFileSystem extends FileSystem { try { newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(), oldURI.getQuery(), oldURI.getFragment()); - return new Path(newURI); + return new FileNameCachingPath(newURI); } catch (URISyntaxException e) { // TODO - Better Exception handling throw new RuntimeException(e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index cbcdbc404..6a4efca29 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.util; import org.apache.hudi.common.util.collection.Pair; +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -37,11 +38,35 @@ public class CollectionUtils { public static final Properties EMPTY_PROPERTIES = new Properties(); + /** + * Combines provided arrays into one + */ + @SuppressWarnings("unchecked") + public static T[] combine(T[] one, T[] another) { + T[] combined = (T[]) Array.newInstance(one.getClass().getComponentType(), one.length + another.length); + System.arraycopy(one, 0, combined, 0, one.length); + System.arraycopy(another, 0, combined, one.length, another.length); + return combined; + } + + /** + * Combines provided array and an element into a new array + */ + @SuppressWarnings("unchecked") + public static T[] append(T[] array, T elem) { + T[] combined = (T[]) Array.newInstance(array.getClass().getComponentType(), array.length + 1); + System.arraycopy(array, 0, combined, 0, array.length); + combined[array.length] = elem; + return combined; + } + + /** * Combines provided {@link List}s into one */ public static List combine(List one, List another) { - ArrayList combined = new ArrayList<>(one); + ArrayList combined = new ArrayList<>(one.size() + another.size()); + combined.addAll(one); combined.addAll(another); return combined; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java index f3944152f..7e625e8eb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java @@ -90,7 +90,7 @@ public class ObjectSizeCalculator { private final Map, ClassSizeInfo> classSizeInfos = new IdentityHashMap<>(); private final Set alreadyVisited = Collections.newSetFromMap(new IdentityHashMap<>()); - private final Deque pending = new ArrayDeque<>(16 * 1024); + private final Deque pending = new ArrayDeque<>(64); private long size; /** diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java new file mode 100644 index 000000000..873f7f98f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; + +import java.net.URI; + +/** + * NOTE: This class is thread-safe + */ +public class FileNameCachingPath extends Path { + + // NOTE: volatile keyword is redundant here and put mostly for reader notice, since all + // reads/writes to references are always atomic (including 64-bit JVMs) + // https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7 + private volatile String fileName; + + public FileNameCachingPath(URI aUri) { + super(aUri); + } + + @Override + public String getName() { + // This value could be overwritten concurrently and that's okay, since + // {@code Path} is immutable + if (fileName == null) { + fileName = super.getName(); + } + return fileName; + } +}