1
0

[HUDI-2102] Support hilbert curve for hudi (#3952)

Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
This commit is contained in:
xiarixiaoyao
2021-11-27 15:20:19 +08:00
committed by GitHub
parent 2c7656c35f
commit 780a2ac5b2
18 changed files with 1015 additions and 30 deletions

6
NOTICE
View File

@@ -159,3 +159,9 @@ its NOTICE file:
This product includes software developed at This product includes software developed at
StreamSets (http://www.streamsets.com/). StreamSets (http://www.streamsets.com/).
--------------------------------------------------------------------------------
This product includes code from hilbert-curve project
* Copyright https://github.com/davidmoten/hilbert-curve
* Licensed under the Apache-2.0 License

View File

@@ -64,6 +64,13 @@
<artifactId>parquet-avro</artifactId> <artifactId>parquet-avro</artifactId>
</dependency> </dependency>
<!-- Hilbert Curve -->
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>hilbert-curve</artifactId>
<version>0.2.2</version>
</dependency>
<!-- Dropwizard Metrics --> <!-- Dropwizard Metrics -->
<dependency> <dependency>
<groupId>io.dropwizard.metrics</groupId> <groupId>io.dropwizard.metrics</groupId>

View File

@@ -542,4 +542,32 @@ public class HoodieClusteringConfig extends HoodieConfig {
} }
} }
} }
/**
* strategy types for optimize layout for hudi data.
*/
public enum BuildLayoutOptimizationStrategy {
ZORDER("z-order"),
HILBERT("hilbert");
private final String value;
BuildLayoutOptimizationStrategy(String value) {
this.value = value;
}
public String toCustomString() {
return value;
}
public static BuildLayoutOptimizationStrategy fromValue(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "z-order":
return ZORDER;
case "hilbert":
return HILBERT;
default:
throw new HoodieException("Invalid value of Type.");
}
}
}
} }

View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.optimize;
import org.davidmoten.hilbert.HilbertCurve;
import java.math.BigInteger;
/**
* Utils for Hilbert Curve.
*/
public class HilbertCurveUtils {
public static byte[] indexBytes(HilbertCurve hilbertCurve, long[] points, int paddingNum) {
BigInteger index = hilbertCurve.index(points);
return paddingToNByte(index.toByteArray(), paddingNum);
}
public static byte[] paddingToNByte(byte[] a, int paddingNum) {
if (a.length == paddingNum) {
return a;
}
if (a.length > paddingNum) {
byte[] result = new byte[paddingNum];
System.arraycopy(a, 0, result, 0, paddingNum);
return result;
}
int paddingSize = paddingNum - a.length;
byte[] result = new byte[paddingNum];
for (int i = 0; i < paddingSize; i++) {
result[i] = 0;
}
System.arraycopy(a, 0, result, paddingSize, a.length);
return result;
}
}

View File

@@ -176,9 +176,14 @@ public class ZOrderingUtil {
public static Long convertStringToLong(String a) { public static Long convertStringToLong(String a) {
byte[] bytes = utf8To8Byte(a); byte[] bytes = utf8To8Byte(a);
return convertBytesToLong(bytes);
}
public static long convertBytesToLong(byte[] bytes) {
byte[] paddedBytes = paddingTo8Byte(bytes);
long temp = 0L; long temp = 0L;
for (int i = 7; i >= 0; i--) { for (int i = 7; i >= 0; i--) {
temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8); temp = temp | (((long) paddedBytes[i] & 0xff) << (7 - i) * 8);
} }
return temp; return temp;
} }

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.optimize;
import org.davidmoten.hilbert.HilbertCurve;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestHilbertCurveUtils {
private static final HilbertCurve INSTANCE = HilbertCurve.bits(5).dimensions(2);
@Test
public void testIndex() {
long[] t = {1, 2};
assertEquals(13, INSTANCE.index(t).intValue());
long[] t1 = {0, 16};
assertEquals(256, INSTANCE.index(t1).intValue());
}
}

View File

@@ -126,4 +126,29 @@ public class TestZOrderingUtil {
this.originValue = originValue; this.originValue = originValue;
} }
} }
@Test
public void testConvertBytesToLong() {
long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE};
for (int i = 0; i < tests.length; i++) {
assertEquals(ZOrderingUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]);
}
}
@Test
public void testConvertBytesToLongWithPadding() {
byte[] bytes = new byte[2];
bytes[0] = 2;
bytes[1] = 127;
assertEquals(ZOrderingUtil.convertBytesToLong(bytes), 2 * 256 + 127);
}
private byte[] convertLongToBytes(long num) {
byte[] byteNum = new byte[8];
for (int i = 0; i < 8; i++) {
int offset = 64 - (i + 1) * 8;
byteNum[i] = (byte) ((num >> offset) & 0xff);
}
return byteNum;
}
} }

View File

@@ -33,7 +33,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper; import org.apache.spark.OrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
@@ -79,10 +79,12 @@ public class RDDSpatialCurveOptimizationSortPartitioner<T extends HoodieRecordPa
switch (config.getLayoutOptimizationCurveBuildMethod()) { switch (config.getLayoutOptimizationCurveBuildMethod()) {
case DIRECT: case DIRECT:
zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups); zDataFrame = OrderingIndexHelper
.createOptimizedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy());
break; break;
case SAMPLE: case SAMPLE:
zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups); zDataFrame = OrderingIndexHelper
.createOptimizeDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy());
break; break;
default: default:
throw new HoodieException("Not a valid build curve method for doWriteOperation: "); throw new HoodieException("Not a valid build curve method for doWriteOperation: ");

View File

@@ -18,17 +18,19 @@
package org.apache.hudi.index.zorder; package org.apache.hudi.index.zorder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.optimize.ZOrderingUtil; import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Binary;
@@ -62,10 +64,10 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$; import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType; import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration; import org.apache.spark.util.SerializableConfiguration;
import scala.collection.JavaConversions;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList; import java.util.ArrayList;
@@ -77,6 +79,8 @@ import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import scala.collection.JavaConversions;
import static org.apache.hudi.util.DataTypeUtils.areCompatible; import static org.apache.hudi.util.DataTypeUtils.areCompatible;
public class ZOrderingIndexHelper { public class ZOrderingIndexHelper {
@@ -189,7 +193,8 @@ public class ZOrderingIndexHelper {
} }
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum) { public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum); return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum,
HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString());
} }
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, String zCols, int fileNum) { public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, String zCols, int fileNum) {
@@ -584,7 +589,7 @@ public class ZOrderingIndexHelper {
* @VisibleForTesting * @VisibleForTesting
*/ */
@Nonnull @Nonnull
static String createIndexMergeSql( public static String createIndexMergeSql(
@Nonnull String originalIndexTable, @Nonnull String originalIndexTable,
@Nonnull String newIndexTable, @Nonnull String newIndexTable,
@Nonnull List<String> columns @Nonnull List<String> columns

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -49,6 +47,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle;
@@ -76,12 +75,15 @@ import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;

View File

@@ -0,0 +1,430 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.hudi.optimize.HilbertCurveUtils;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import org.davidmoten.hilbert.HilbertCurve;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.collection.JavaConversions;
public class OrderingIndexHelper {
private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
/**
* Create optimized DataFrame directly
* only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte
* this method is more effective than createOptimizeDataFrameBySample
*
* @param df a spark DataFrame holds parquet files to be read.
* @param sortCols ordering columns for the curve
* @param fileNum spark partition num
* @param sortMode layout optimization strategy
* @return a dataFrame ordered by the curve.
*/
public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, List<String> sortCols, int fileNum, String sortMode) {
Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
int fieldNum = df.schema().fields().length;
List<String> checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
if (sortCols.size() != checkCols.size()) {
return df;
}
// only one col to sort, no need to use z-order
if (sortCols.size() == 1) {
return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(sortCols.get(0)));
}
Map<Integer, StructField> fieldMap = sortCols
.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
// do optimize
JavaRDD<Row> sortedRDD = null;
switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) {
case ZORDER:
sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
break;
case HILBERT:
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
break;
default:
throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode));
}
// create new StructType
List<StructField> newFields = new ArrayList<>();
newFields.addAll(Arrays.asList(df.schema().fields()));
newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty()));
// create new DataFrame
return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index");
}
private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
}
private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.mapPartitions(rows -> {
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return rows.hasNext();
}
@Override
public Row next() {
Row row = rows.next();
List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
} else if (dataType instanceof DoubleType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index);
} else if (dataType instanceof FloatType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index));
} else if (dataType instanceof StringType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertStringToLong(row.getString(index));
} else if (dataType instanceof DateType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
} else if (dataType instanceof TimestampType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
} else if (dataType instanceof ByteType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong(new byte[] {row.getByte(index)});
} else if (dataType instanceof ShortType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index);
} else if (dataType instanceof DecimalType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return value ? Long.MAX_VALUE : 0;
} else if (dataType instanceof BinaryType) {
return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong((byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[] hilbertValue = HilbertCurveUtils.indexBytes(
hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
List<Object> values = new ArrayList<>();
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
values.add(hilbertValue);
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
}
};
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
}
public static Dataset<Row> createOptimizedDataFrameByMapValue(Dataset<Row> df, String sortCols, int fileNum, String sortMode) {
if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) {
return df;
}
return createOptimizedDataFrameByMapValue(df,
Arrays.stream(sortCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode);
}
public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum, String sortMode) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, sortMode);
}
public static Dataset<Row> createOptimizeDataFrameBySample(Dataset<Row> df, String zCols, int fileNum, String sortMode) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createOptimizeDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode);
}
/**
* Parse min/max statistics stored in parquet footers for z-sort cols.
* no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType.
* to do adapt for rfc-27
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols
* @return a dataFrame holds all statistics info.
*/
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {
Map<String, DataType> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType()));
List<String> scanFiles = Arrays.asList(df.inputFiles());
SparkContext sc = df.sparkSession().sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (scanFiles.size() / 3 + 1);
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
try {
jsc.setJobDescription("Listing parquet column statistics");
colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> {
Configuration conf = serializableConfiguration.value();
ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
List<Collection<HoodieColumnRangeMetadata<Comparable>>> results = new ArrayList<>();
while (paths.hasNext()) {
String path = paths.next();
results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols));
}
return results.stream().flatMap(f -> f.stream()).iterator();
}).collect();
} finally {
jsc.setJobDescription(previousJobDescription);
}
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> {
int colSize = f.size();
if (colSize == 0) {
return null;
} else {
List<Object> rows = new ArrayList<>();
rows.add(f.get(0).getFilePath());
cols.stream().forEach(col -> {
HoodieColumnRangeMetadata<Comparable> currentColRangeMetaData =
f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null);
DataType colType = columnsMap.get(col);
if (currentColRangeMetaData == null || colType == null) {
throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col));
}
if (colType instanceof IntegerType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof DoubleType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof StringType) {
rows.add(currentColRangeMetaData.getMinValue().toString());
rows.add(currentColRangeMetaData.getMaxValue().toString());
} else if (colType instanceof DecimalType) {
rows.add(new BigDecimal(currentColRangeMetaData.getMinValue().toString()));
rows.add(new BigDecimal(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof DateType) {
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValue().toString()));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof LongType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ShortType) {
rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString()));
rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof FloatType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof BinaryType) {
rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
} else if (colType instanceof BooleanType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ByteType) {
rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString()));
rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString()));
} else {
throw new HoodieException(String.format("Not support type: %s", colType));
}
rows.add(currentColRangeMetaData.getNumNulls());
});
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows));
}
}).filter(f -> f != null);
List<StructField> allMetaDataSchema = new ArrayList<>();
allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty()));
cols.forEach(col -> {
allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty()));
});
return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema));
}
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) {
List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList());
return getMinMaxValue(df, rawCols);
}
/**
* Update statistics info.
* this method will update old index table by full out join,
* and save the updated table into a new index table based on commitTime.
* old index table will be cleaned also.
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols.
* @param indexPath index store path.
* @param commitTime current operation commitTime.
* @param validateCommits all validate commits for current table.
* @return
*/
public static void saveStatisticsInfo(Dataset<Row> df, String cols, String indexPath, String commitTime, List<String> validateCommits) {
Path savePath = new Path(indexPath, commitTime);
SparkSession spark = df.sparkSession();
FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols);
// try to find last validate index table from index path
try {
// If there's currently no index, create one
if (!fs.exists(new Path(indexPath))) {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
return;
}
// Otherwise, clean up all indexes but the most recent one
List<String> allIndexTables = Arrays
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList());
Option<Dataset> latestIndexData = Option.empty();
if (!candidateIndexTables.isEmpty()) {
latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
// clean old index table, keep at most 1 index table.
candidateIndexTables.remove(candidateIndexTables.size() - 1);
candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
}
// clean residualTables
// retried cluster operations at the same instant time is also considered,
// the residual files produced by retried are cleaned up before save statistics
// save statistics info to index table which named commitTime
residualTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) {
// update the statistics info
String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
latestIndexData.get().registerTempTable(originalTable);
statisticsDF.registerTempTable(updateTable);
// update table by full out join
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable, updateTable, columns)).repartition(1).write().save(savePath.toString());
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
}
} catch (IOException e) {
throw new HoodieException(e);
}
}
}

View File

@@ -19,15 +19,16 @@
package org.apache.spark.sql.hudi.execution package org.apache.spark.sql.hudi.execution
import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.optimize.{HilbertCurveUtils, ZOrderingUtil}
import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.hudi.optimize.ZOrderingUtil
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.util.MutablePair import org.apache.spark.util.MutablePair
import org.apache.spark.util.random.SamplingUtils import org.apache.spark.util.random.SamplingUtils
import org.davidmoten.hilbert.HilbertCurve
import scala.collection.mutable import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
@@ -35,10 +36,10 @@ import scala.reflect.{ClassTag, classTag}
import scala.util.hashing.byteswap32 import scala.util.hashing.byteswap32
class RangeSample[K: ClassTag, V]( class RangeSample[K: ClassTag, V](
zEncodeNum: Int, zEncodeNum: Int,
rdd: RDD[_ <: Product2[K, V]], rdd: RDD[_ <: Product2[K, V]],
private var ascend: Boolean = true, private var ascend: Boolean = true,
val samplePointsPerPartitionHint: Int = 20) extends Serializable { val samplePointsPerPartitionHint: Int = 20) extends Serializable {
// We allow zEncodeNum = 0, which happens when sorting an empty RDD under the default settings. // We allow zEncodeNum = 0, which happens when sorting an empty RDD under the default settings.
require(zEncodeNum >= 0, s"Number of zEncodeNum cannot be negative but found $zEncodeNum.") require(zEncodeNum >= 0, s"Number of zEncodeNum cannot be negative but found $zEncodeNum.")
@@ -335,16 +336,21 @@ object RangeSampleSort {
} }
/** /**
* create z-order DataFrame by sample * create optimize DataFrame by sample
* first, sample origin data to get z-cols bounds, then create z-order DataFrame * first, sample origin data to get order-cols bounds, then apply sort to produce DataFrame
* support all type data. * support all type data.
* this method need more resource and cost more time than createZIndexedDataFrameByMapValue * this method need more resource and cost more time than createOptimizedDataFrameByMapValue
*/ */
def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int): DataFrame = { def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int, sortMode: String): DataFrame = {
val spark = df.sparkSession val spark = df.sparkSession
val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap
val fieldNum = df.schema.fields.length val fieldNum = df.schema.fields.length
val checkCols = zCols.filter(col => columnsMap(col) != null) val checkCols = zCols.filter(col => columnsMap(col) != null)
val useHilbert = sortMode match {
case "hilbert" => true
case "z-order" => false
case other => throw new IllegalArgumentException(s"new only support z-order/hilbert optimize but find: ${other}")
}
if (zCols.isEmpty || checkCols.isEmpty) { if (zCols.isEmpty || checkCols.isEmpty) {
df df
@@ -366,7 +372,7 @@ object RangeSampleSort {
}.filter(_._1 != -1) }.filter(_._1 != -1)
// Complex type found, use createZIndexedDataFrameByRange // Complex type found, use createZIndexedDataFrameByRange
if (zFields.length != zCols.length) { if (zFields.length != zCols.length) {
return sortDataFrameBySampleSupportAllTypes(df, zCols, fieldNum) return sortDataFrameBySampleSupportAllTypes(df, zCols, fileNum)
} }
val rawRdd = df.rdd val rawRdd = df.rdd
@@ -441,6 +447,7 @@ object RangeSampleSort {
val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor) val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor)
val indexRdd = rawRdd.mapPartitions { iter => val indexRdd = rawRdd.mapPartitions { iter =>
val hilbertCurve = if (useHilbert) Some(HilbertCurve.bits(32).dimensions(zFields.length)) else None
val expandBoundsWithFactor = boundBroadCast.value val expandBoundsWithFactor = boundBroadCast.value
val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max
val longDecisionBound = new RawDecisionBound(Ordering[Long]) val longDecisionBound = new RawDecisionBound(Ordering[Long])
@@ -507,17 +514,21 @@ object RangeSampleSort {
case _ => case _ =>
-1 -1
} }
}.filter(v => v != -1).map(ZOrderingUtil.intTo8Byte(_)).toArray }.filter(v => v != -1)
val zValues = ZOrderingUtil.interleaving(values, 8) val mapValues = if (hilbertCurve.isDefined) {
Row.fromSeq(row.toSeq ++ Seq(zValues)) HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32)
} else {
ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8)
}
Row.fromSeq(row.toSeq ++ Seq(mapValues))
} }
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), numPartitions = fileNum) }.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), numPartitions = fileNum)
val newDF = df.sparkSession.createDataFrame(indexRdd, StructType( val newDF = df.sparkSession.createDataFrame(indexRdd, StructType(
df.schema.fields ++ Seq( df.schema.fields ++ Seq(
StructField(s"zindex", StructField(s"index",
BinaryType, false)) BinaryType, false))
)) ))
newDF.drop("zindex") newDF.drop("index")
} }
} }
} }

View File

@@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.functional
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils}
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.OrderingIndexHelper
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.Arguments.arguments
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import java.sql.{Date, Timestamp}
import scala.collection.JavaConversions._
import scala.util.Random
@Tag("functional")
class TestTableLayoutOptimization extends HoodieClientTestBase {
var spark: SparkSession = _
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
@BeforeEach override def setUp() {
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
initTestDataGenerator()
initFileSystem()
}
@AfterEach override def tearDown() = {
cleanupSparkContexts()
cleanupTestDataGenerator()
cleanupFileSystem()
}
@ParameterizedTest
@MethodSource(Array("testLayOutParameter"))
def testOptimizewithClustering(tableType: String, optimizeMode: String): Unit = {
val targetRecordsCount = 10000
// Bulk Insert Operation
val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList
val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
writeDf.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
// option for clustering
.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "1")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L))
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), optimizeMode)
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon")
.mode(SaveMode.Overwrite)
.save(basePath)
val readDf =
spark.read
.format("hudi")
.load(basePath)
val readDfSkip =
spark.read
.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi")
.load(basePath)
assertEquals(targetRecordsCount, readDf.count())
assertEquals(targetRecordsCount, readDfSkip.count())
readDf.createOrReplaceTempView("hudi_snapshot_raw")
readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping")
def select(tableName: String) =
spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51")
assertRowsMatch(
select("hudi_snapshot_raw"),
select("hudi_snapshot_skipping")
)
}
def assertRowsMatch(one: DataFrame, other: DataFrame) = {
val rows = one.count()
assert(rows == other.count() && one.intersect(other).count() == rows)
}
@Test
def testCollectMinMaxStatistics(): Unit = {
val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax")
val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat")
val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
val complexDataFrame = createComplexDataFrame(spark)
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
val df = spark.read.load(testPath.toString)
try {
// test z-order/hilbert sort for all primitive type
// shoud not throw exception.
OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1)
OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1)
OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1)
OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1)
try {
// do not support TimeStampType, so if we collect statistics for c4, should throw exception
val colDf = OrderingIndexHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8")
colDf.cache()
assertEquals(colDf.count(), 3)
assertEquals(colDf.take(1)(0).length, 22)
colDf.unpersist()
// try to save statistics
OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1"))
// save again
OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2"))
// test old index table clean
OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3"))
assertEquals(!fs.exists(new Path(statisticPath, "2")), true)
assertEquals(fs.exists(new Path(statisticPath, "3")), true)
// test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved.
OrderingIndexHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4"))
assertEquals(fs.exists(new Path(statisticPath, "5")), true)
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
if (fs.exists(statisticPath)) fs.delete(statisticPath)
}
}
}
// test collect min-max statistic info for DateType in the case of multithreading.
// parquet will give a wrong statistic result for DateType in the case of multithreading.
@Test
def testMultiThreadParquetFooterReadForDateType(): Unit = {
// create parquet file with DateType
val rdd = spark.sparkContext.parallelize(0 to 100, 1)
.map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")))
val df = spark.createDataFrame(rdd, new StructType().add("id", DateType))
val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType")
val conf = spark.sparkContext.hadoopConfiguration
val cols = new java.util.ArrayList[String]
cols.add("id")
try {
df.repartition(3).write.mode("overwrite").save(testPath.toString)
val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x)
val realResult = new Array[(String, String)](3)
inputFiles.zipWithIndex.foreach { case (f, index) =>
val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next()
realResult(index) = (res.getMinValue.toString, res.getMaxValue.toString)
}
// multi thread read with no lock
val resUseLock = new Array[(String, String)](3)
inputFiles.zipWithIndex.par.foreach { case (f, index) =>
val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next()
resUseLock(index) = (res.getMinValue.toString, res.getMaxValue.toString)
}
// check resUseNoLock,
// We can't guarantee that there must be problems in the case of multithreading.
// In order to make ut pass smoothly, we will not check resUseNoLock.
// check resUseLock
// should pass assert
realResult.zip(resUseLock).foreach { case (realValue, testValue) =>
assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}")
}
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
}
}
def createComplexDataFrame(spark: SparkSession): DataFrame = {
val schema = new StructType()
.add("c1", IntegerType)
.add("c2", StringType)
.add("c3", DecimalType(9,3))
.add("c4", TimestampType)
.add("c5", ShortType)
.add("c6", DateType)
.add("c7", BinaryType)
.add("c8", ByteType)
val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
val c1 = Integer.valueOf(item)
val c2 = s" ${item}sdc"
val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}")
val c4 = new Timestamp(System.currentTimeMillis())
val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}")
val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")
val c7 = Array(item).map(_.toByte)
val c8 = java.lang.Byte.valueOf("9")
RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8)
}
spark.createDataFrame(rdd, schema)
}
}
object TestTableLayoutOptimization {
def testLayOutParameter(): java.util.stream.Stream[Arguments] = {
java.util.stream.Stream.of(
arguments("COPY_ON_WRITE", "hilbert"),
arguments("COPY_ON_WRITE", "z-order"),
arguments("MERGE_ON_READ", "hilbert"),
arguments("MERGE_ON_READ", "z-order")
)
}
}

View File

@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.benchmark
import org.apache.hadoop.fs.Path
import org.apache.spark.OrderingIndexHelper
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hudi.TestHoodieSqlBase
import scala.util.Random
object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase {
def getSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= {
val minMax = OrderingIndexHelper
.getMinMaxValue(spark.sql(s"select * from ${tableName}"), s"${co1}, ${co2}")
.collect().map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5)))
var c = 0
for (elem <- minMax) {
if ((elem._1 <= value1 && elem._2 >= value1) || (elem._3 <= value2 && elem._4 >= value2)) {
c = c + 1
}
}
val p = c / minMax.size.toDouble
println(s"for table ${tableName} with query filter: ${co1} = ${value1} or ${co2} = ${value2} we can achieve skipping percent ${1.0 - p}")
}
/*
for table table_z_sort_byMap with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.8
for table table_z_sort_bySample with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.77
for table table_hilbert_sort_byMap with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.855
for table table_hilbert_sort_bySample with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.83
*/
def runNormalTableSkippingBenchMark(): Unit = {
withTempDir { f =>
withTempTable("table_z_sort_byMap", "table_z_sort_bySample", "table_hilbert_sort_byMap", "table_hilbert_sort_bySample") {
prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000)
// choose median value as filter condition.
// the median value of c1_int is 500000
// the median value of c2_int is 500000
getSkippingPercent("table_z_sort_byMap", "c1_int", "c2_int", 500000, 500000)
getSkippingPercent("table_z_sort_bySample", "c1_int", "c2_int", 500000, 500000)
getSkippingPercent("table_hilbert_sort_byMap", "c1_int", "c2_int", 500000, 500000)
getSkippingPercent("table_hilbert_sort_bySample", "c1_int", "c2_int", 500000, 500000)
}
}
}
/*
for table table_z_sort_byMap_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.0
for table table_z_sort_bySample_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.78
for table table_hilbert_sort_byMap_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.05500000000000005
for table table_hilbert_sort_bySample_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.84
*/
def runSkewTableSkippingBenchMark(): Unit = {
withTempDir { f =>
withTempTable("table_z_sort_byMap_skew", "table_z_sort_bySample_skew", "table_hilbert_sort_byMap_skew", "table_hilbert_sort_bySample_skew") {
// prepare skewed table.
prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000, 10000, 1000000, true)
// choose median value as filter condition.
// the median value of c1_int is 5000
// the median value of c2_int is 500000
getSkippingPercent("table_z_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000)
getSkippingPercent("table_z_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000)
getSkippingPercent("table_hilbert_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000)
getSkippingPercent("table_hilbert_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000)
}
}
}
def main(args: Array[String]): Unit = {
runNormalTableSkippingBenchMark()
runSkewTableSkippingBenchMark()
}
def withTempTable(tableNames: String*)(f: => Unit): Unit = {
try f finally tableNames.foreach(spark.catalog.dropTempView)
}
def prepareInterTypeTable(tablePath: Path, numRows: Int, col1Range: Int = 1000000, col2Range: Int = 1000000, skewed: Boolean = false): Unit = {
import spark.implicits._
val df = spark.range(numRows).map(_ => (Random.nextInt(col1Range), Random.nextInt(col2Range))).toDF("c1_int", "c2_int")
val dfOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order")
val dfOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order")
val dfHilbertOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert")
val dfHilbertOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert")
saveAsTable(dfOptimizeByMap, tablePath, if (skewed) "z_sort_byMap_skew" else "z_sort_byMap")
saveAsTable(dfOptimizeBySample, tablePath, if (skewed) "z_sort_bySample_skew" else "z_sort_bySample")
saveAsTable(dfHilbertOptimizeByMap, tablePath, if (skewed) "hilbert_sort_byMap_skew" else "hilbert_sort_byMap")
saveAsTable(dfHilbertOptimizeBySample, tablePath, if (skewed) "hilbert_sort_bySample_skew" else "hilbert_sort_bySample")
}
def saveAsTable(df: DataFrame, savePath: Path, suffix: String): Unit = {
df.write.mode("overwrite").save(new Path(savePath, suffix).toString)
spark.read.parquet(new Path(savePath, suffix).toString).createOrReplaceTempView("table_" + suffix)
}
}

View File

@@ -107,6 +107,8 @@
<include>com.fasterxml.jackson.core:jackson-databind</include> <include>com.fasterxml.jackson.core:jackson-databind</include>
<include>com.fasterxml.jackson.core:jackson-core</include> <include>com.fasterxml.jackson.core:jackson-core</include>
<include>com.github.davidmoten:guava-mini</include>
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include> <include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include> <include>com.twitter:bijection-core_${scala.binary.version}</include>
<include>io.dropwizard.metrics:metrics-core</include> <include>io.dropwizard.metrics:metrics-core</include>

View File

@@ -89,6 +89,8 @@
<include>org.apache.flink:flink-core</include> <include>org.apache.flink:flink-core</include>
<include>org.apache.flink:flink-hadoop-compatibility_${scala.binary.version}</include> <include>org.apache.flink:flink-hadoop-compatibility_${scala.binary.version}</include>
<include>com.github.davidmoten:guava-mini</include>
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.yammer.metrics:metrics-core</include> <include>com.yammer.metrics:metrics-core</include>
<include>com.beust:jcommander</include> <include>com.beust:jcommander</include>
<include>io.javalin:javalin</include> <include>io.javalin:javalin</include>

View File

@@ -88,6 +88,8 @@
<include>org.antlr:stringtemplate</include> <include>org.antlr:stringtemplate</include>
<include>org.apache.parquet:parquet-avro</include> <include>org.apache.parquet:parquet-avro</include>
<include>com.github.davidmoten:guava-mini</include>
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include> <include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include> <include>com.twitter:bijection-core_${scala.binary.version}</include>
<include>io.dropwizard.metrics:metrics-core</include> <include>io.dropwizard.metrics:metrics-core</include>

View File

@@ -117,6 +117,8 @@
<include>com.amazonaws:aws-java-sdk-dynamodb</include> <include>com.amazonaws:aws-java-sdk-dynamodb</include>
<include>com.amazonaws:aws-java-sdk-core</include> <include>com.amazonaws:aws-java-sdk-core</include>
<include>com.github.davidmoten:guava-mini</include>
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.twitter:bijection-avro_${scala.binary.version}</include> <include>com.twitter:bijection-avro_${scala.binary.version}</include>
<include>com.twitter:bijection-core_${scala.binary.version}</include> <include>com.twitter:bijection-core_${scala.binary.version}</include>
<include>io.confluent:kafka-avro-serializer</include> <include>io.confluent:kafka-avro-serializer</include>