1
0

[HUDI-2101][RFC-28] support z-order for hudi (#3330)

* [HUDI-2101]support z-order for hudi

* Renaming some configs for consistency/simplicity.

* Minor code cleanups

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
xiarixiaoyao
2021-11-03 00:31:57 +08:00
committed by GitHub
parent f9bc3e03e5
commit d194643b49
22 changed files with 2140 additions and 10 deletions

View File

@@ -22,10 +22,12 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.exception.HoodieException;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;
/**
@@ -40,6 +42,9 @@ public class HoodieClusteringConfig extends HoodieConfig {
// Any strategy specific params can be saved with this prefix
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
// Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix
public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize.";
public static final ConfigProperty<String> DAYBASED_LOOKBACK_PARTITIONS = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.lookback.partitions")
.defaultValue("2")
@@ -137,6 +142,55 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
public static final ConfigProperty LAYOUT_OPTIMIZE_ENABLE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "enable")
.defaultValue(false)
.sinceVersion("0.10.0")
.withDocumentation("Enable use z-ordering/space-filling curves to optimize the layout of table to boost query performance. "
+ "This parameter takes precedence over clustering strategy set using " + EXECUTION_STRATEGY_CLASS_NAME.key());
public static final ConfigProperty LAYOUT_OPTIMIZE_STRATEGY = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "strategy")
.defaultValue("z-order")
.sinceVersion("0.10.0")
.withDocumentation("Type of layout optimization to be applied, current only supports `z-order` and `hilbert` curves.");
/**
* There exists two method to build z-curve.
* one is directly mapping sort cols to z-value to build z-curve;
* we can find this method in Amazon DynamoDB https://aws.amazon.com/cn/blogs/database/tag/z-order/
* the other one is Boundary-based Interleaved Index method which we proposed. simply call it sample method.
* Refer to rfc-28 for specific algorithm flow.
* Boundary-based Interleaved Index method has better generalization, but the build speed is slower than direct method.
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "curve.build.method")
.defaultValue("direct")
.sinceVersion("0.10.0")
.withDocumentation("Controls how data is sampled to build the space filling curves. two methods: `direct`,`sample`."
+ "The direct method is faster than the sampling, however sample method would produce a better data layout.");
/**
* Doing sample for table data is the first step in Boundary-based Interleaved Index method.
* larger sample number means better optimize result, but more memory consumption
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "build.curve.sample.size")
.defaultValue("200000")
.sinceVersion("0.10.0")
.withDocumentation("when setting" + LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD.key() + " to `sample`, the amount of sampling to be done."
+ "Large sample size leads to better results, at the expense of more memory usage.");
/**
* The best way to use Z-order/Space-filling curves is to cooperate with Data-Skipping
* with data-skipping query engine can greatly reduce the number of table files to be read.
* otherwise query engine can only do row-group skipping for files (parquet/orc)
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "data.skipping.enable")
.defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("Enable data skipping by collecting statistics once layout optimization is complete.");
/**
* @deprecated Use {@link #PLAN_STRATEGY_CLASS_NAME} and its methods instead
*/
@@ -350,9 +404,58 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
public Builder withSpaceFillingCurveDataOptimizeEnable(Boolean enable) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_ENABLE, String.valueOf(enable));
return this;
}
public Builder withDataOptimizeStrategy(String strategy) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_STRATEGY, strategy);
return this;
}
public Builder withDataOptimizeBuildCurveStrategy(String method) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD, method);
return this;
}
public Builder withDataOptimizeBuildCurveSampleNumber(int sampleNumber) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE, String.valueOf(sampleNumber));
return this;
}
public Builder withDataOptimizeDataSkippingEnable(boolean dataSkipping) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE, String.valueOf(dataSkipping));
return this;
}
public HoodieClusteringConfig build() {
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
return clusteringConfig;
}
}
/**
* strategy types for build z-ordering/space-filling curves.
*/
public enum BuildCurveStrategyType {
DIRECT("direct"),
SAMPLE("sample");
private final String value;
BuildCurveStrategyType(String value) {
this.value = value;
}
public static BuildCurveStrategyType fromValue(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "direct":
return DIRECT;
case "sample":
return SAMPLE;
default:
throw new HoodieException("Invalid value of Type.");
}
}
}
}

View File

@@ -1228,6 +1228,30 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS);
}
/**
* Data layout optimize properties.
*/
public boolean isLayoutOptimizationEnabled() {
return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE);
}
public String getLayoutOptimizationStrategy() {
return getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY);
}
public HoodieClusteringConfig.BuildCurveStrategyType getLayoutOptimizationCurveBuildMethod() {
return HoodieClusteringConfig.BuildCurveStrategyType.fromValue(
getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD));
}
public int getLayoutOptimizationSampleSize() {
return getInt(HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE);
}
public boolean isDataSkippingEnabled() {
return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE);
}
/**
* index properties.
*/
@@ -1776,6 +1800,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
private boolean isClusteringConfigSet = false;
private boolean isOptimizeConfigSet = false;
private boolean isMetricsConfigSet = false;
private boolean isBootstrapConfigSet = false;
private boolean isMemoryConfigSet = false;

View File

@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.optimize;
import java.nio.charset.Charset;
public class ZOrderingUtil {
/**
* Lexicographically compare two arrays.
* copy from hbase
* @param buffer1 left operand
* @param buffer2 right operand
* @param offset1 Where to start comparing in the left buffer
* @param offset2 Where to start comparing in the right buffer
* @param length1 How much to compare from the left buffer
* @param length2 How much to compare from the right buffer
* @return 0 if equal, < 0 if left is less than right, etc.
*/
public static int compareTo(byte[] buffer1, int offset1, int length1,
byte[] buffer2, int offset2, int length2) {
// Short circuit equal case
if (buffer1 == buffer2
&& offset1 == offset2
&& length1 == length2) {
return 0;
}
// Bring WritableComparator code local
int end1 = offset1 + length1;
int end2 = offset2 + length2;
for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
int a = (buffer1[i] & 0xff);
int b = (buffer2[j] & 0xff);
if (a != b) {
return a - b;
}
}
return length1 - length2;
}
public static byte[] paddingTo8Byte(byte[] a) {
if (a.length == 8) {
return a;
}
if (a.length > 8) {
byte[] result = new byte[8];
System.arraycopy(a, 0, result, 0, 8);
return result;
}
int paddingSize = 8 - a.length;
byte[] result = new byte[8];
for (int i = 0; i < paddingSize; i++) {
result[i] = 0;
}
System.arraycopy(a, 0, result, paddingSize, a.length);
return result;
}
/**
* Interleaving array bytes.
* Interleaving means take one bit from the first matrix element, one bit
* from the next, etc, then take the second bit from the first matrix
* element, second bit from the second, all the way to the last bit of the
* last element. Combine those bits in that order into a single BigInteger,
* @param buffer candidate element to do interleaving
* @return byte size of candidate element
*/
public static byte[] interleaving(byte[][] buffer, int size) {
int candidateSize = buffer.length;
byte[] result = new byte[size * candidateSize];
int resBitPos = 0;
int totalBits = size * 8;
for (int bitStep = 0; bitStep < totalBits; bitStep++) {
int currentBytePos = (int) Math.floor(bitStep / 8);
int currentBitPos = bitStep % 8;
for (int i = 0; i < candidateSize; i++) {
int tempResBytePos = (int) Math.floor(resBitPos / 8);
int tempResBitPos = resBitPos % 8;
result[tempResBytePos] = updatePos(result[tempResBytePos], tempResBitPos, buffer[i][currentBytePos], currentBitPos);
resBitPos++;
}
}
return result;
}
public static byte updatePos(byte a, int apos, byte b, int bpos) {
byte temp = (byte) (b & (1 << (7 - bpos)));
if (apos < bpos) {
temp = (byte) (temp << (bpos - apos));
}
if (apos > bpos) {
temp = (byte) (temp >> (apos - bpos));
}
byte atemp = (byte) (a & (1 << (7 - apos)));
if ((byte) (atemp ^ temp) == 0) {
return a;
}
return (byte) (a ^ (1 << (7 - apos)));
}
public static byte[] toBytes(int val) {
byte[] b = new byte[4];
for (int i = 3; i > 0; i--) {
b[i] = (byte) val;
val >>>= 8;
}
b[0] = (byte) val;
return b;
}
public static byte[] toBytes(long val) {
long temp = val;
byte[] b = new byte[8];
for (int i = 7; i > 0; i--) {
b[i] = (byte) temp;
temp >>>= 8;
}
b[0] = (byte) temp;
return b;
}
public static byte[] toBytes(final double d) {
return toBytes(Double.doubleToRawLongBits(d));
}
public static byte[] intTo8Byte(int a) {
int temp = a;
temp = temp ^ (1 << 31);
return paddingTo8Byte(toBytes(temp));
}
public static byte[] byteTo8Byte(byte a) {
return paddingTo8Byte(new byte[] { a });
}
public static byte[] longTo8Byte(long a) {
long temp = a;
temp = temp ^ (1L << 63);
return toBytes(temp);
}
public static byte[] doubleTo8Byte(double a) {
byte[] temp = toBytes(a);
if (a > 0) {
temp[0] = (byte) (temp[0] ^ (1 << 7));
}
if (a < 0) {
for (int i = 0; i < temp.length; i++) {
temp[i] = (byte) ~temp[i];
}
}
return temp;
}
public static byte[] utf8To8Byte(String a) {
return paddingTo8Byte(a.getBytes(Charset.forName("utf-8")));
}
public static Long convertStringToLong(String a) {
byte[] bytes = utf8To8Byte(a);
long temp = 0L;
for (int i = 7; i >= 0; i--) {
temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8);
}
return temp;
}
}

View File

@@ -244,6 +244,16 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
*/
public abstract HoodieWriteMetadata<O> insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);
/**
* update statistics info for current table.
* to do adaptation, once RFC-27 is finished.
*
* @param context HoodieEngineContext
* @param instantTime Instant time for the replace action
* @param isOptimizeOperation whether current operation is OPTIMIZE type
*/
public abstract void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation);
public HoodieWriteConfig getConfig() {
return config;
}

View File

@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.optimize;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestZOrderingUtil {
@Test
public void testIntConvert() {
// test Int
int[] testInt = new int[] {-1, 1, -2, 10000, -100000, 2, Integer.MAX_VALUE, Integer.MIN_VALUE};
List<OrginValueWrapper<Integer>> valueWrappers = new ArrayList<>();
List<ConvertResultWrapper<Integer>> convertResultWrappers = new ArrayList<>();
for (int i = 0; i < testInt.length; i++) {
valueWrappers.add(new OrginValueWrapper<>(i, testInt[i]));
convertResultWrappers.add(new ConvertResultWrapper<>(i, ZOrderingUtil.intTo8Byte(testInt[i])));
}
Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue)));
Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length)));
for (int i = 0; i < testInt.length; i++) {
assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index);
}
}
@Test
public void testLongConvert() {
// test Long
long[] testLong = new long[] {-1L, 1L, -2L, 10000L, -100000L, 2L, Long.MAX_VALUE, Long.MIN_VALUE};
List<OrginValueWrapper<Long>> valueWrappers = new ArrayList<>();
List<ConvertResultWrapper<Long>> convertResultWrappers = new ArrayList<>();
for (int i = 0; i < testLong.length; i++) {
valueWrappers.add(new OrginValueWrapper<>((long)i, testLong[i]));
convertResultWrappers.add(new ConvertResultWrapper<>((long)i, ZOrderingUtil.longTo8Byte(testLong[i])));
}
Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue)));
Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length)));
for (int i = 0; i < testLong.length; i++) {
assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index);
}
}
@Test
public void testDoubleConvert() {
// test Long
double[] testDouble = new double[] {-1.00d, 1.05d, -2.3d, 10000.002d, -100000.7d, 2.9d, Double.MAX_VALUE};
List<OrginValueWrapper<Double>> valueWrappers = new ArrayList<>();
List<ConvertResultWrapper<Double>> convertResultWrappers = new ArrayList<>();
for (int i = 0; i < testDouble.length; i++) {
valueWrappers.add(new OrginValueWrapper<>((Double)(i * 1.0), testDouble[i]));
convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), ZOrderingUtil.doubleTo8Byte(testDouble[i])));
}
Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue)));
Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length)));
for (int i = 0; i < testDouble.length; i++) {
assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index);
}
}
@Test
public void testFloatConvert() {
// test Long
float[] testDouble = new float[] {-1.00f, 1.05f, -2.3f, 10000.002f, -100000.7f, 2.9f, Float.MAX_VALUE, Float.MIN_VALUE};
List<OrginValueWrapper<Float>> valueWrappers = new ArrayList<>();
List<ConvertResultWrapper<Float>> convertResultWrappers = new ArrayList<>();
for (int i = 0; i < testDouble.length; i++) {
valueWrappers.add(new OrginValueWrapper<>((float)(i * 1.0), testDouble[i]));
convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), ZOrderingUtil.doubleTo8Byte((double) testDouble[i])));
}
Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue)));
Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length)));
for (int i = 0; i < testDouble.length; i++) {
assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index);
}
}
private class ConvertResultWrapper<T> {
T index;
byte[] result;
public ConvertResultWrapper(T index, byte[] result) {
this.index = index;
this.result = result;
}
}
private class OrginValueWrapper<T> {
T index;
T originValue;
public OrginValueWrapper(T index, T originValue) {
this.index = index;
this.originValue = originValue;
}
}
}

View File

@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
@@ -232,6 +233,11 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
throw new HoodieNotSupportedException("DeletePartitions is not supported yet");
}
@Override
public void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
throw new HoodieNotSupportedException("update statistics is not supported yet");
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> preppedRecords) {
throw new HoodieNotSupportedException("This method should not be invoked");

View File

@@ -32,6 +32,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
@@ -144,6 +145,11 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
context, config, this, instantTime, records).execute();
}
@Override
public void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
throw new HoodieNotSupportedException("update statistics is not supported yet");
}
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context,
String instantTime,

View File

@@ -380,6 +380,10 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime));
finalizeWrite(table, clusteringCommitTime, writeStats);
try {
// try to save statistics info to hudi
if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) {
table.updateStatistics(context, writeStats, clusteringCommitTime, true);
}
LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
@@ -28,6 +29,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
@@ -77,7 +79,11 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
* Create BulkInsertPartitioner based on strategy params.
*/
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
if (getWriteConfig().isLayoutOptimizationEnabled()) {
// sort input records by z-order/hilbert
return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(),
getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema)));
} else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema)));
} else {

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.ZCurveOptimizeHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
/**
* A partitioner that does spartial curve optimization sorting based on specified column values for each RDD partition.
* support z-curve optimization, hilbert will come soon.
* @param <T> HoodieRecordPayload type
*/
public class RDDSpatialCurveOptimizationSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
private final HoodieSparkEngineContext sparkEngineContext;
private final SerializableSchema serializableSchema;
private final HoodieWriteConfig config;
public RDDSpatialCurveOptimizationSortPartitioner(HoodieSparkEngineContext sparkEngineContext, HoodieWriteConfig config, Schema schema) {
this.sparkEngineContext = sparkEngineContext;
this.config = config;
this.serializableSchema = new SerializableSchema(schema);
}
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
String payloadClass = config.getPayloadClass();
// do sort
JavaRDD<GenericRecord> preparedRecord = prepareGenericRecord(records, outputSparkPartitions, serializableSchema.get());
return preparedRecord.map(record -> {
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(payloadClass,
new Object[] {Option.of(record)}, Option.class);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
return hoodieRecord;
});
}
private JavaRDD<GenericRecord> prepareGenericRecord(JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final Schema schema) {
SerializableSchema serializableSchema = new SerializableSchema(schema);
JavaRDD<GenericRecord> genericRecordJavaRDD = inputRecords.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get());
Dataset<Row> originDF = AvroConversionUtils.createDataFrame(genericRecordJavaRDD.rdd(), schema.toString(), sparkEngineContext.getSqlContext().sparkSession());
Dataset<Row> zDataFrame;
switch (config.getLayoutOptimizationCurveBuildMethod()) {
case DIRECT:
zDataFrame = ZCurveOptimizeHelper.createZIndexedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups);
break;
case SAMPLE:
zDataFrame = ZCurveOptimizeHelper.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups);
break;
default:
throw new HoodieException("Not a valid build curve method for doWriteOperation: ");
}
return HoodieSparkUtils.createRdd(zDataFrame, schema.getName(),
schema.getNamespace(), false, org.apache.hudi.common.util.Option.empty()).toJavaRDD();
}
@Override
public boolean arePartitionRecordsSorted() {
return true;
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.table;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -34,6 +35,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -71,13 +73,16 @@ import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.ZCurveOptimizeHelper;
import org.apache.spark.api.java.JavaRDD;
import scala.collection.JavaConversions;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
@@ -152,6 +157,32 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
return new SparkInsertOverwriteTableCommitActionExecutor(context, config, this, instantTime, records).execute();
}
@Override
public void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation) {
// deal with z-order/hilbert statistic info
if (isOptimizeOperation) {
updateOptimizeOperationStatistics(context, stats, instantTime);
}
}
private void updateOptimizeOperationStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime) {
String cols = config.getClusteringSortColumns();
String basePath = metaClient.getBasePath();
String indexPath = metaClient.getZindexPath();
List<String> validateCommits = metaClient.getCommitsTimeline()
.filterCompletedInstants().getInstants().map(f -> f.getTimestamp()).collect(Collectors.toList());
List<String> touchFiles = stats.stream().map(s -> new Path(basePath, s.getPath()).toString()).collect(Collectors.toList());
if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) {
LOG.warn("save nothing to index table");
return;
}
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context;
ZCurveOptimizeHelper.saveStatisticsInfo(sparkEngineContext
.getSqlContext().sparkSession().read().load(JavaConversions.asScalaBuffer(touchFiles)),
cols, indexPath, instantTime, validateCommits);
LOG.info(String.format("save statistic info sucessfully at commitTime: %s", instantTime));
}
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");

View File

@@ -69,7 +69,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Abs
//transition bulk_insert state to inflight
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
// write new files
JavaRDD<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false);

View File

@@ -0,0 +1,355 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
import scala.collection.JavaConversions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieSparkUtils$;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.optimize.ZOrderingUtil;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ZCurveOptimizeHelper {
private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
/**
* Create z-order DataFrame directly
* first, map all base type data to byte[8], then create z-order DataFrame
* only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte
* this method is more effective than createZIndexDataFrameBySample
*
* @param df a spark DataFrame holds parquet files to be read.
* @param zCols z-sort cols
* @param fileNum spark partition num
* @return a dataFrame sorted by z-order.
*/
public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, List<String> zCols, int fileNum) {
Map<String, StructField> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
int fieldNum = df.schema().fields().length;
List<String> checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
if (zCols.size() != checkCols.size()) {
return df;
}
// only one col to sort, no need to use z-order
if (zCols.size() == 1) {
return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(zCols.get(0)));
}
Map<Integer, StructField> fieldMap = zCols
.stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
// z-sort
JavaRDD<Row> sortedRdd = df.toJavaRDD().map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(ZOrderingUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
}).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum);
// create new StructType
List<StructField> newFields = new ArrayList<>();
newFields.addAll(Arrays.asList(df.schema().fields()));
newFields.add(new StructField("zIndex", BinaryType$.MODULE$, true, Metadata.empty()));
// create new DataFrame
return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex");
}
public static Dataset<Row> createZIndexedDataFrameByMapValue(Dataset<Row> df, String zCols, int fileNum) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createZIndexedDataFrameByMapValue(df,
Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, List<String> zCols, int fileNum) {
return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum);
}
public static Dataset<Row> createZIndexedDataFrameBySample(Dataset<Row> df, String zCols, int fileNum) {
if (zCols == null || zCols.isEmpty() || fileNum <= 0) {
return df;
}
return createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum);
}
/**
* Parse min/max statistics stored in parquet footers for z-sort cols.
* no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType.
* to do adapt for rfc-27
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols
* @return a dataFrame holds all statistics info.
*/
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {
Map<String, DataType> columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType()));
List<String> scanFiles = Arrays.asList(df.inputFiles());
SparkContext sc = df.sparkSession().sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (scanFiles.size() / 3 + 1);
List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos = new ArrayList<>();
String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION);
try {
String description = "Listing parquet column statistics";
jsc.setJobDescription(description);
colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> {
Configuration conf = serializableConfiguration.value();
ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET);
List<Collection<HoodieColumnRangeMetadata<Comparable>>> results = new ArrayList<>();
while (paths.hasNext()) {
String path = paths.next();
results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols));
}
return results.stream().flatMap(f -> f.stream()).iterator();
}).collect();
} finally {
jsc.setJobDescription(previousJobDescription);
}
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath()));
JavaRDD<Row> allMetaDataRDD = jsc.parallelize(fileToStatsListMap.values().stream().collect(Collectors.toList()), 1).map(f -> {
int colSize = f.size();
if (colSize == 0) {
return null;
} else {
List<Object> rows = new ArrayList<>();
rows.add(f.get(0).getFilePath());
cols.stream().forEach(col -> {
HoodieColumnRangeMetadata<Comparable> currentColRangeMetaData =
f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null);
DataType colType = columnsMap.get(col);
if (currentColRangeMetaData == null || colType == null) {
throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col));
}
if (colType instanceof IntegerType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof DoubleType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof StringType) {
String minString = new String(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
String maxString = new String(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
rows.add(minString);
rows.add(maxString);
} else if (colType instanceof DecimalType) {
Double minDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMinValue().toString())));
Double maxDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMaxValue().toString())));
rows.add(BigDecimal.valueOf(minDecimal));
rows.add(BigDecimal.valueOf(maxDecimal));
} else if (colType instanceof DateType) {
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMinValue())));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMaxValue())));
} else if (colType instanceof LongType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ShortType) {
rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString()));
rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString()));
} else if (colType instanceof FloatType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof BinaryType) {
rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
} else if (colType instanceof BooleanType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof ByteType) {
rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString()));
rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString()));
} else {
throw new HoodieException(String.format("Not support type: %s", colType));
}
rows.add(currentColRangeMetaData.getNumNulls());
});
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows));
}
}).filter(f -> f != null);
List<StructField> allMetaDataSchema = new ArrayList<>();
allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty()));
cols.forEach(col -> {
allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty()));
allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty()));
});
return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema));
}
public static Dataset<Row> getMinMaxValue(Dataset<Row> df, String cols) {
List<String> rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList());
return getMinMaxValue(df, rawCols);
}
/**
* Update statistics info.
* this method will update old index table by full out join,
* and save the updated table into a new index table based on commitTime.
* old index table will be cleaned also.
*
* @param df a spark DataFrame holds parquet files to be read.
* @param cols z-sort cols.
* @param indexPath index store path.
* @param commitTime current operation commitTime.
* @param validateCommits all validate commits for current table.
* @return
*/
public static void saveStatisticsInfo(Dataset<Row> df, String cols, String indexPath, String commitTime, List<String> validateCommits) {
Path savePath = new Path(indexPath, commitTime);
SparkSession spark = df.sparkSession();
FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols);
// try to find last validate index table from index path
try {
if (fs.exists(new Path(indexPath))) {
List<String> allIndexTables = Arrays
.stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList());
List<String> candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList());
List<String> residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList());
Option<Dataset> latestIndexData = Option.empty();
if (!candidateIndexTables.isEmpty()) {
latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString()));
// clean old index table, keep at most 1 index table.
candidateIndexTables.remove(candidateIndexTables.size() - 1);
candidateIndexTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
}
// clean residualTables
// retried cluster operations at the same instant time is also considered,
// the residual files produced by retried are cleaned up before save statistics
// save statistics info to index table which named commitTime
residualTables.forEach(f -> {
try {
fs.delete(new Path(indexPath, f));
} catch (IOException ie) {
throw new HoodieException(ie);
}
});
if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) {
// update the statistics info
String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", "");
latestIndexData.get().registerTempTable(originalTable);
statisticsDF.registerTempTable(updateTable);
// update table by full out join
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(HoodieSparkUtils$
.MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString());
}
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
}
} catch (IOException e) {
throw new HoodieException(e);
}
}
}

View File

@@ -19,6 +19,7 @@
package org.apache.hudi
import java.util.Properties
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -35,6 +36,7 @@ import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -283,4 +285,43 @@ object HoodieSparkUtils extends SparkAdapterSupport {
s"${tableSchema.fieldNames.mkString(",")}")
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
}
/**
* Create merge sql to merge leftTable and right table.
*
* @param leftTable table name.
* @param rightTable table name.
* @param cols merged cols.
* @return merge sql.
*/
def createMergeSql(leftTable: String, rightTable: String, cols: Seq[String]): String = {
var selectsql = ""
for (i <- (0 to cols.size-1)) {
selectsql = selectsql + s" if (${leftTable}.${cols(0)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ,"
}
"select " + selectsql.dropRight(1) + s" from ${leftTable} full join ${rightTable} on ${leftTable}.${cols(0)} = ${rightTable}.${cols(0)}"
}
/**
* Collect min/max statistics for candidate cols.
* support all col types.
*
* @param df dataFrame holds read files.
* @param cols candidate cols to collect statistics.
* @return
*/
def getMinMaxValueSpark(df: DataFrame, cols: Seq[String]): DataFrame = {
val sqlContext = df.sparkSession.sqlContext
import sqlContext.implicits._
val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount")))
val valueCounts = count("*").as("totalNum")
val projectValues = Seq(col("file")) ++ cols.flatMap(c =>
Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls")))
val result = df.select(input_file_name() as "file", col("*"))
.groupBy($"file")
.agg(valueCounts, values: _*).select(projectValues:_*)
result
}
}

View File

@@ -0,0 +1,526 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi.execution
import java.util
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow}
import org.apache.hudi.optimize.ZOrderingUtil
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.types._
import org.apache.spark.util.MutablePair
import org.apache.spark.util.random.SamplingUtils
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{ClassTag, classTag}
import scala.util.hashing.byteswap32
class RangeSample[K: ClassTag, V](
zEncodeNum: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascend: Boolean = true,
val samplePointsPerPartitionHint: Int = 20) extends Serializable {
// We allow zEncodeNum = 0, which happens when sorting an empty RDD under the default settings.
require(zEncodeNum >= 0, s"Number of zEncodeNum cannot be negative but found $zEncodeNum.")
require(samplePointsPerPartitionHint > 0,
s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")
def getRangeBounds(): ArrayBuffer[(K, Float)] = {
if (zEncodeNum <= 1) {
ArrayBuffer.empty[(K, Float)]
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * zEncodeNum, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
ArrayBuffer.empty[(K, Float)]
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
candidates
}
}
}
/**
* Determines the bounds for range partitioning from candidates with weights indicating how many
* items each represents. Usually this is 1 over the probability used to sample this candidate.
*
* @param candidates unordered candidates with weights
* @param partitions number of partitions
* @return selected bounds
*/
def determineBound[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int, ordering: Ordering[K]): Array[K] = {
val ordered = candidates.sortBy(_._1)(ordering)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
def determineRowBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int, orderings: Seq[Ordering[K]],
attributes: Seq[Attribute]): Array[Array[UnsafeRow]] = {
orderings.zipWithIndex.map { case (ordering, index) =>
val ordered = candidates.sortBy(_._1)(ordering)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
// build project
val project = UnsafeProjection.create(Seq(attributes(index)), attributes)
bounds.map { bound =>
val row = bound.asInstanceOf[UnsafeRow]
project(row).copy()
}.toArray
}.toArray
}
/**
* Sketches the input RDD via reservoir sampling on each partition.
*
* @param rdd the input RDD to sketch
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K: ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
}
class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends Serializable {
private var binarySearch: ((Array[K], K) => Int) = {
// For primitive keys, we can use the natural ordering. Otherwise, use the Ordering comparator.
classTag[K] match {
case ClassTag.Float =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
case ClassTag.Double =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double])
case ClassTag.Byte =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte])
case ClassTag.Char =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char])
case ClassTag.Short =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short])
case ClassTag.Int =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int])
case ClassTag.Long =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long])
case _ =>
val comparator = ordering.asInstanceOf[java.util.Comparator[Any]]
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator)
}
}
def getBound(key: Any, candidateBounds: Array[K]): Int = {
val k = key.asInstanceOf[K]
var bound = 0
if (candidateBounds.length <= 128) {
while(bound < candidateBounds.length && ordering.gt(k, candidateBounds(bound))) {
bound += 1
}
} else {
bound = binarySearch(candidateBounds, k)
if (bound < 0 ) {
bound = -bound - 1
}
if (bound > candidateBounds.length) {
bound = candidateBounds.length
}
}
bound
}
}
case class ZorderingBinarySort(b: Array[Byte]) extends Ordered[ZorderingBinarySort] with Serializable {
override def compare(that: ZorderingBinarySort): Int = {
val len = this.b.length
ZOrderingUtil.compareTo(this.b, 0, len, that.b, 0, len)
}
}
object RangeSampleSort {
/**
* create z-order DataFrame by sample
* support all col types
*/
def sortDataFrameBySampleSupportAllTypes(df: DataFrame, zCols: Seq[String], fileNum: Int): DataFrame = {
val spark = df.sparkSession
val internalRdd = df.queryExecution.toRdd
val schema = df.schema
val outputAttributes = df.queryExecution.analyzed.output
val sortingExpressions = outputAttributes.filter(p => zCols.contains(p.name))
if (sortingExpressions.length == 0 || sortingExpressions.length != zCols.size) {
df
} else {
val zOrderBounds = df.sparkSession.sessionState.conf.getConfString(
HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.key,
HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.defaultValue.toString).toInt
val sampleRdd = internalRdd.mapPartitionsInternal { iter =>
val projection = UnsafeProjection.create(sortingExpressions, outputAttributes)
val mutablePair = new MutablePair[InternalRow, Null]()
// Internally, RangePartitioner runs a job on the RDD that samples keys to compute
// partition bounds. To get accurate samples, we need to copy the mutable keys.
iter.map(row => mutablePair.update(projection(row).copy(), null))
}
val orderings = sortingExpressions.map(SortOrder(_, Ascending)).zipWithIndex.map { case (ord, i) =>
ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
}
val lazyGeneratedOrderings = orderings.map(ord => new LazilyGeneratedOrdering(Seq(ord)))
val sample = new RangeSample(zOrderBounds, sampleRdd)
val rangeBounds = sample.getRangeBounds()
implicit val ordering1 = lazyGeneratedOrderings(0)
val sampleBounds = sample.determineRowBounds(rangeBounds, math.min(zOrderBounds, rangeBounds.length), lazyGeneratedOrderings, sortingExpressions)
val origin_orderings = sortingExpressions.map(SortOrder(_, Ascending)).map { ord =>
ord.copy(child = BoundReference(0, ord.dataType, ord.nullable))
}
val origin_lazyGeneratedOrderings = origin_orderings.map(ord => new LazilyGeneratedOrdering(Seq(ord)))
// expand bounds.
// maybe it's better to use the value of "spark.zorder.bounds.number" as maxLength,
// however this will lead to extra time costs when all zorder cols distinct count values are less then "spark.zorder.bounds.number"
val maxLength = sampleBounds.map(_.length).max
val expandSampleBoundsWithFactor = sampleBounds.map { bound =>
val fillFactor = maxLength / bound.size.toDouble
(bound, fillFactor)
}
val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor)
val indexRdd = internalRdd.mapPartitionsInternal { iter =>
val boundsWithFactor = boundBroadCast.value
import java.util.concurrent.ThreadLocalRandom
val threadLocalRandom = ThreadLocalRandom.current
val maxBoundNum = boundsWithFactor.map(_._1.length).max
val origin_Projections = sortingExpressions.map { se =>
UnsafeProjection.create(Seq(se), outputAttributes)
}
iter.map { unsafeRow =>
val interleaveValues = origin_Projections.zip(origin_lazyGeneratedOrderings).zipWithIndex.map { case ((rowProject, lazyOrdering), index) =>
val row = rowProject(unsafeRow)
val decisionBound = new RawDecisionBound(lazyOrdering)
if (row.isNullAt(0)) {
maxBoundNum + 1
} else {
val (bound, factor) = boundsWithFactor(index)
if (factor > 1) {
val currentRank = decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]])
currentRank*factor.toInt + threadLocalRandom.nextInt(factor.toInt)
} else {
decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]])
}
}
}.toArray.map(ZOrderingUtil.intTo8Byte(_))
val zValues = ZOrderingUtil.interleaving(interleaveValues, 8)
val mutablePair = new MutablePair[InternalRow, Array[Byte]]()
mutablePair.update(unsafeRow, zValues)
}
}.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1)
spark.internalCreateDataFrame(indexRdd, schema)
}
}
/**
* create z-order DataFrame by sample
* first, sample origin data to get z-cols bounds, then create z-order DataFrame
* support all type data.
* this method need more resource and cost more time than createZIndexedDataFrameByMapValue
*/
def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int): DataFrame = {
val spark = df.sparkSession
val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap
val fieldNum = df.schema.fields.length
val checkCols = zCols.filter(col => columnsMap(col) != null)
if (zCols.isEmpty || checkCols.isEmpty) {
df
} else {
val zFields = zCols.map { col =>
val newCol = columnsMap(col)
if (newCol == null) {
(-1, null)
} else {
newCol.dataType match {
case LongType | DoubleType | FloatType | StringType | IntegerType | DateType | TimestampType | ShortType | ByteType =>
(df.schema.fields.indexOf(newCol), newCol)
case d: DecimalType =>
(df.schema.fields.indexOf(newCol), newCol)
case _ =>
(-1, null)
}
}
}.filter(_._1 != -1)
// Complex type found, use createZIndexedDataFrameByRange
if (zFields.length != zCols.length) {
return sortDataFrameBySampleSupportAllTypes(df, zCols, fieldNum)
}
val rawRdd = df.rdd
val sampleRdd = rawRdd.map { row =>
val values = zFields.map { case (index, field) =>
field.dataType match {
case LongType =>
if (row.isNullAt(index)) Long.MaxValue else row.getLong(index)
case DoubleType =>
if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getDouble(index))
case IntegerType =>
if (row.isNullAt(index)) Long.MaxValue else row.getInt(index).toLong
case FloatType =>
if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble)
case StringType =>
if (row.isNullAt(index)) "" else row.getString(index)
case DateType =>
if (row.isNullAt(index)) Long.MaxValue else row.getDate(index).getTime
case TimestampType =>
if (row.isNullAt(index)) Long.MaxValue else row.getTimestamp(index).getTime
case ByteType =>
if (row.isNullAt(index)) Long.MaxValue else row.getByte(index).toLong
case ShortType =>
if (row.isNullAt(index)) Long.MaxValue else row.getShort(index).toLong
case d: DecimalType =>
if (row.isNullAt(index)) Long.MaxValue else row.getDecimal(index).longValue()
case _ =>
null
}
}.filter(v => v != null).toArray
(values, null)
}
val zOrderBounds = df.sparkSession.sessionState.conf.getConfString(
HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.key,
HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.defaultValue.toString).toInt
val sample = new RangeSample(zOrderBounds, sampleRdd)
val rangeBounds = sample.getRangeBounds()
val sampleBounds = {
val candidateColNumber = rangeBounds.head._1.length
(0 to candidateColNumber - 1).map { i =>
val colRangeBound = rangeBounds.map(x => (x._1(i), x._2))
if (colRangeBound.head._1.isInstanceOf[String]) {
sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(String, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[String])
} else {
sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(Long, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[Long])
}
}
}
// expand bounds.
// maybe it's better to use the value of "spark.zorder.bounds.number" as maxLength,
// however this will lead to extra time costs when all zorder cols distinct count values are less then "spark.zorder.bounds.number"
val maxLength = sampleBounds.map(_.length).max
val expandSampleBoundsWithFactor = sampleBounds.map { bound =>
val fillFactor = maxLength / bound.size
val newBound = new Array[Double](bound.length * fillFactor)
if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) {
val longBound = bound.asInstanceOf[Array[Long]]
for (i <- 0 to bound.length - 1) {
for (j <- 0 to fillFactor - 1) {
// sample factor shoud not be too large, so it's ok to use 1 / fillfactor as slice
newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble)
}
}
(newBound, fillFactor)
} else {
(bound, 0)
}
}
val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor)
val indexRdd = rawRdd.mapPartitions { iter =>
val expandBoundsWithFactor = boundBroadCast.value
val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max
val longDecisionBound = new RawDecisionBound(Ordering[Long])
val doubleDecisionBound = new RawDecisionBound(Ordering[Double])
val stringDecisionBound = new RawDecisionBound(Ordering[String])
import java.util.concurrent.ThreadLocalRandom
val threadLocalRandom = ThreadLocalRandom.current
def getRank(rawIndex: Int, value: Long, isNull: Boolean): Int = {
val (expandBound, factor) = expandBoundsWithFactor(rawIndex)
if (isNull) {
expandBound.length + 1
} else {
if (factor > 1) {
doubleDecisionBound.getBound(value + (threadLocalRandom.nextInt(factor) + 1)*(1 / factor.toDouble), expandBound.asInstanceOf[Array[Double]])
} else {
longDecisionBound.getBound(value, expandBound.asInstanceOf[Array[Long]])
}
}
}
iter.map { row =>
val values = zFields.zipWithIndex.map { case ((index, field), rawIndex) =>
field.dataType match {
case LongType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getLong(index), isNull)
case DoubleType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getDouble(index)), isNull)
case IntegerType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getInt(index).toLong, isNull)
case FloatType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble), isNull)
case StringType =>
val factor = maxBoundNum.toDouble / expandBoundsWithFactor(rawIndex)._1.length
if (row.isNullAt(index)) {
maxBoundNum + 1
} else {
val currentRank = stringDecisionBound.getBound(row.getString(index), expandBoundsWithFactor(rawIndex)._1.asInstanceOf[Array[String]])
if (factor > 1) {
(currentRank*factor).toInt + threadLocalRandom.nextInt(factor.toInt)
} else {
currentRank
}
}
case DateType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getDate(index).getTime, isNull)
case TimestampType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getTimestamp(index).getTime, isNull)
case ByteType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getByte(index).toLong, isNull)
case ShortType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getShort(index).toLong, isNull)
case d: DecimalType =>
val isNull = row.isNullAt(index)
getRank(rawIndex, if (isNull) 0 else row.getDecimal(index).longValue(), isNull)
case _ =>
-1
}
}.filter(v => v != -1).map(ZOrderingUtil.intTo8Byte(_)).toArray
val zValues = ZOrderingUtil.interleaving(values, 8)
Row.fromSeq(row.toSeq ++ Seq(zValues))
}
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), numPartitions = fileNum)
val newDF = df.sparkSession.createDataFrame(indexRdd, StructType(
df.schema.fields ++ Seq(
StructField(s"zindex",
BinaryType, false))
))
newDF.drop("zindex")
}
}
}

View File

@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.model;
import org.apache.parquet.schema.PrimitiveStringifier;
import java.util.Objects;
/**
* Hoodie Range metadata.
*/
public class HoodieColumnRangeMetadata<T> {
private final String filePath;
private final String columnName;
private final T minValue;
private final T maxValue;
private final long numNulls;
private final PrimitiveStringifier stringifier;
public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) {
this.filePath = filePath;
this.columnName = columnName;
this.minValue = minValue;
this.maxValue = maxValue;
this.numNulls = numNulls;
this.stringifier = stringifier;
}
public String getFilePath() {
return this.filePath;
}
public String getColumnName() {
return this.columnName;
}
public T getMinValue() {
return this.minValue;
}
public T getMaxValue() {
return this.maxValue;
}
public PrimitiveStringifier getStringifier() {
return stringifier;
}
public long getNumNulls() {
return numNulls;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final HoodieColumnRangeMetadata<?> that = (HoodieColumnRangeMetadata<?>) o;
return Objects.equals(getFilePath(), that.getFilePath())
&& Objects.equals(getColumnName(), that.getColumnName())
&& Objects.equals(getMinValue(), that.getMinValue())
&& Objects.equals(getMaxValue(), that.getMaxValue())
&& Objects.equals(getNumNulls(), that.getNumNulls());
}
@Override
public int hashCode() {
return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls());
}
@Override
public String toString() {
return "HoodieColumnRangeMetadata{"
+ "filePath ='" + filePath + '\''
+ "columnName='" + columnName + '\''
+ ", minValue=" + minValue
+ ", maxValue=" + maxValue
+ ", numNulls=" + numNulls + '}';
}
}

View File

@@ -79,6 +79,7 @@ public class HoodieTableMetaClient implements Serializable {
public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".aux";
public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat";
public static final String ZINDEX_NAME = ".zindex";
public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH
+ Path.SEPARATOR + ".partitions";
public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR
@@ -176,6 +177,13 @@ public class HoodieTableMetaClient implements Serializable {
return metaPath;
}
/**
* @return z-index path
*/
public String getZindexPath() {
return new Path(metaPath, ZINDEX_NAME).toString();
}
/**
* @return Temp Folder path
*/

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.exception.HoodieIOException;
@@ -41,12 +42,14 @@ import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Utility functions involving with parquet.
@@ -277,4 +280,59 @@ public class ParquetUtils extends BaseFileUtils {
return candidateKeys.contains(recordKey);
}
}
/**
* Parse min/max statistics stored in parquet footers for all columns.
*/
public Collection<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(Configuration conf, Path parquetFilePath, List<String> cols) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
// collect stats from all parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData ->
new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(),
columnChunkMetaData.getStatistics().genericGetMin(),
columnChunkMetaData.getStatistics().genericGetMax(),
columnChunkMetaData.getStatistics().getNumNulls(),
columnChunkMetaData.getPrimitiveType().stringifier()));
}).collect(Collectors.groupingBy(e -> e.getColumnName()));
// we only intend to keep file level statistics.
return new ArrayList<>(columnToStatsListMap.values().stream()
.map(blocks -> getColumnRangeInFile(blocks))
.collect(Collectors.toList()));
}
private HoodieColumnRangeMetadata<Comparable> getColumnRangeInFile(final List<HoodieColumnRangeMetadata<Comparable>> blockRanges) {
if (blockRanges.size() == 1) {
// only one block in parquet file. we can just return that range.
return blockRanges.get(0);
} else {
// there are multiple blocks. Compute min(block_mins) and max(block_maxs)
return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, b2)).get();
}
}
private HoodieColumnRangeMetadata<Comparable> combineRanges(HoodieColumnRangeMetadata<Comparable> range1,
HoodieColumnRangeMetadata<Comparable> range2) {
final Comparable minValue;
final Comparable maxValue;
if (range1.getMinValue() != null && range2.getMinValue() != null) {
minValue = range1.getMinValue().compareTo(range2.getMinValue()) < 0 ? range1.getMinValue() : range2.getMinValue();
} else if (range1.getMinValue() == null) {
minValue = range2.getMinValue();
} else {
minValue = range1.getMinValue();
}
if (range1.getMaxValue() != null && range2.getMaxValue() != null) {
maxValue = range1.getMaxValue().compareTo(range2.getMaxValue()) < 0 ? range2.getMaxValue() : range1.getMaxValue();
} else if (range1.getMaxValue() == null) {
maxValue = range2.getMaxValue();
} else {
maxValue = range1.getMaxValue();
}
return new HoodieColumnRangeMetadata<>(range1.getFilePath(),
range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), range1.getStringifier());
}
}

View File

@@ -110,6 +110,12 @@ object DataSourceReadOptions {
.withDocumentation("The query instant for time travel. Without specified this option," +
" we query the latest snapshot.")
val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.enable.data.skipping")
.defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("enable data skipping to boost query after doing z-order optimize for current table")
/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()

View File

@@ -28,19 +28,20 @@ import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTab
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
@@ -84,6 +85,12 @@ case class HoodieFileIndex(
private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
.map(HoodieSqlUtils.formatQueryInstant)
/**
* Get all completeCommits.
*/
lazy val completedCommits = metaClient.getCommitsTimeline
.filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp)
/**
* Get the schema of the table.
*/
@@ -147,6 +154,48 @@ case class HoodieFileIndex(
override def rootPaths: Seq[Path] = queryPath :: Nil
def enableDataSkipping(): Boolean = {
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
}
private def filterFilesByDataSkippingIndex(dataFilters: Seq[Expression]): Set[String] = {
var allFiles: Set[String] = Set.empty
var candidateFiles: Set[String] = Set.empty
val indexPath = metaClient.getZindexPath
val fs = metaClient.getFs
if (fs.exists(new Path(indexPath)) && dataFilters.nonEmpty) {
// try to load latest index table from index path
val candidateIndexTables = fs.listStatus(new Path(indexPath)).filter(_.isDirectory)
.map(_.getPath.getName).filter(f => completedCommits.contains(f)).sortBy(x => x)
if (candidateIndexTables.nonEmpty) {
val dataFrameOpt = try {
Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString))
} catch {
case _: Throwable =>
logError("missing index skip data-skipping")
None
}
if (dataFrameOpt.isDefined) {
val indexSchema = dataFrameOpt.get.schema
val indexFiles = DataSkippingUtils.getIndexFiles(spark.sparkContext.hadoopConfiguration, new Path(indexPath, candidateIndexTables.last).toString)
val indexFilter = dataFilters.map(DataSkippingUtils.createZindexFilter(_, indexSchema)).reduce(And)
logInfo(s"index filter condition: $indexFilter")
dataFrameOpt.get.persist()
if (indexFiles.size <= 4) {
allFiles = DataSkippingUtils.readParquetFile(spark, indexFiles)
} else {
allFiles = dataFrameOpt.get.select("file").collect().map(_.getString(0)).toSet
}
candidateFiles = dataFrameOpt.get.filter(new Column(indexFilter)).select("file").collect().map(_.getString(0)).toSet
dataFrameOpt.get.unpersist()
}
}
}
allFiles -- candidateFiles
}
/**
* Invoked by Spark to fetch list of latest base files per partition.
*
@@ -156,12 +205,29 @@ case class HoodieFileIndex(
*/
override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
// try to load filterFiles from index
val filterFiles: Set[String] = if (enableDataSkipping()) {
filterFilesByDataSkippingIndex(dataFilters)
} else {
Set.empty
}
if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
Seq(PartitionDirectory(InternalRow.empty, allFiles))
val candidateFiles = if (!filterFiles.isEmpty) {
allFiles.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName))
} else {
allFiles
}
logInfo(s"Total files : ${allFiles.size}," +
s" candidate files after data skipping: ${candidateFiles.size} " +
s" skipping percent ${if (allFiles.length != 0) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}")
Seq(PartitionDirectory(InternalRow.empty, candidateFiles))
} else {
// Prune the partition path by the partition filters
val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters)
prunedPartitions.map { partition =>
var totalFileSize = 0
var candidateFileSize = 0
val result = prunedPartitions.map { partition =>
val baseFileStatuses = cachedAllInputFileSlices(partition).map(fileSlice => {
if (fileSlice.getBaseFile.isPresent) {
fileSlice.getBaseFile.get().getFileStatus
@@ -169,9 +235,19 @@ case class HoodieFileIndex(
null
}
}).filterNot(_ == null)
PartitionDirectory(partition.values, baseFileStatuses)
val candidateFiles = if (!filterFiles.isEmpty) {
baseFileStatuses.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName))
} else {
baseFileStatuses
}
totalFileSize += baseFileStatuses.size
candidateFileSize += candidateFiles.size
PartitionDirectory(partition.values, candidateFiles)
}
logInfo(s"Total files: ${totalFileSize}," +
s" Candidate files after data skipping : ${candidateFileSize} " +
s"skipping percent ${if (allFiles.length != 0) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}")
result
}
}

View File

@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
object DataSkippingUtils {
/**
* create z_index filter and push those filters to index table to filter all candidate scan files.
* @param condition origin filter from query.
* @param indexSchema schema from index table.
* @return filters for index table.
*/
def createZindexFilter(condition: Expression, indexSchema: StructType): Expression = {
def buildExpressionInternal(colName: Seq[String], statisticValue: String): Expression = {
val appendColName = UnresolvedAttribute(colName).name + statisticValue
col(appendColName).expr
}
def reWriteCondition(colName: Seq[String], conditionExpress: Expression): Expression = {
val appendColName = UnresolvedAttribute(colName).name + "_minValue"
if (indexSchema.exists(p => p.name == appendColName)) {
conditionExpress
} else {
Literal.TrueLiteral
}
}
val minValue = (colName: Seq[String]) => buildExpressionInternal(colName, "_minValue")
val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName, "_maxValue")
val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName, "_num_nulls")
condition match {
// query filter "colA = b" convert it to "colA_minValue <= b and colA_maxValue >= b" for index table
case EqualTo(attribute: AttributeReference, value: Literal) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value)))
// query filter "b = colA" convert it to "colA_minValue <= b and colA_maxValue >= b" for index table
case EqualTo(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value)))
// query filter "colA = null" convert it to "colA_num_nulls = null" for index table
case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
val colName = getTargetColNameParts(equalNullSafe.left)
reWriteCondition(colName, EqualTo(num_nulls(colName), equalNullSafe.right))
// query filter "colA < b" convert it to "colA_minValue < b" for index table
case LessThan(attribute: AttributeReference, value: Literal) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName,LessThan(minValue(colName), value))
// query filter "b < colA" convert it to "colA_maxValue > b" for index table
case LessThan(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, GreaterThan(maxValue(colName), value))
// query filter "colA > b" convert it to "colA_maxValue > b" for index table
case GreaterThan(attribute: AttributeReference, value: Literal) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, GreaterThan(maxValue(colName), value))
// query filter "b > colA" convert it to "colA_minValue < b" for index table
case GreaterThan(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, LessThan(minValue(colName), value))
// query filter "colA <= b" convert it to "colA_minValue <= b" for index table
case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
// query filter "b <= colA" convert it to "colA_maxValue >= b" for index table
case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
// query filter "colA >= b" convert it to "colA_maxValue >= b" for index table
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
val colName = getTargetColNameParts(attribute)
GreaterThanOrEqual(maxValue(colName), right)
// query filter "b >= colA" convert it to "colA_minValue <= b" for index table
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
// query filter "colA is null" convert it to "colA_num_nulls > 0" for index table
case IsNull(attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0)))
// query filter "colA is not null" convert it to "colA_num_nulls = 0" for index table
case IsNotNull(attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0)))
// query filter "colA in (a,b)" convert it to " (colA_minValue <= a and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for index table
case In(attribute: AttributeReference, list: Seq[Literal]) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, list.map { lit =>
And(LessThanOrEqual(minValue(colName), lit), GreaterThanOrEqual(maxValue(colName), lit))
}.reduce(Or))
// query filter "colA like xxx" convert it to " (colA_minValue <= xxx and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with xxx) " for index table
case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName), v), GreaterThanOrEqual(maxValue(colName), v)) ,
Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName), v))))
// query filter "colA not in (a, b)" convert it to " (not( colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and colA_maxValue = b)) " for index table
case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, list.map { lit =>
Not(And(EqualTo(minValue(colName), lit), EqualTo(maxValue(colName), lit)))
}.reduce(And))
// query filter "colA != b" convert it to "not ( colA_minValue = b and colA_maxValue = b )" for index table
case Not(EqualTo(attribute: AttributeReference, value: Literal)) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value))))
// query filter "b != colA" convert it to "not ( colA_minValue = b and colA_maxValue = b )" for index table
case Not(EqualTo(value: Literal, attribute: AttributeReference)) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value))))
// query filter "colA not like xxxx" convert it to "not ( colA_minValue startWith xxx and colA_maxValue startWith xxx)" for index table
case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) =>
val colName = getTargetColNameParts(attribute)
reWriteCondition(colName, Not(And(StartsWith(minValue(colName), value), StartsWith(maxValue(colName), value))))
case or: Or =>
val resLeft = createZindexFilter(or.left, indexSchema)
val resRight = createZindexFilter(or.right, indexSchema)
Or(resLeft, resRight)
case and: And =>
val resLeft = createZindexFilter(and.left, indexSchema)
val resRight = createZindexFilter(and.right, indexSchema)
And(resLeft, resRight)
case expr: Expression =>
Literal.TrueLiteral
}
}
/**
* Extracts name from a resolved expression referring to a nested or non-nested column.
*/
def getTargetColNameParts(resolvedTargetCol: Expression): Seq[String] = {
resolvedTargetCol match {
case attr: Attribute => Seq(attr.name)
case Alias(c, _) => getTargetColNameParts(c)
case GetStructField(c, _, Some(name)) => getTargetColNameParts(c) :+ name
case ex: ExtractValue =>
throw new AnalysisException(s"convert reference to name failed, Updating nested fields is only supported for StructType: ${ex}.")
case other =>
throw new AnalysisException(s"convert reference to name failed, Found unsupported expression ${other}")
}
}
def getIndexFiles(conf: Configuration, indexPath: String): Seq[FileStatus] = {
val basePath = new Path(indexPath)
basePath.getFileSystem(conf)
.listStatus(basePath).filterNot(f => f.getPath.getName.endsWith(".parquet"))
}
/**
* read parquet files concurrently by local.
* this method is mush faster than spark
*/
def readParquetFile(spark: SparkSession, indexFiles: Seq[FileStatus], filters: Seq[Filter] = Nil, schemaOpts: Option[StructType] = None): Set[String] = {
val hadoopConf = spark.sparkContext.hadoopConfiguration
val partitionedFiles = indexFiles.map(f => PartitionedFile(InternalRow.empty, f.getPath.toString, 0, f.getLen))
val requiredSchema = new StructType().add("file", StringType, true)
val schema = schemaOpts.getOrElse(requiredSchema)
val parquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(spark
, schema , StructType(Nil), requiredSchema, filters, Map.empty, hadoopConf)
val results = new Array[Iterator[String]](partitionedFiles.size)
partitionedFiles.zipWithIndex.par.foreach { case (pf, index) =>
val fileIterator = parquetReader(pf).asInstanceOf[Iterator[Any]]
val rows = fileIterator.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
}).map(r => r.getString(0))
results(index) = rows
}
results.flatMap(f => f).toSet
}
}

View File

@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.functional
import java.sql.{Date, Timestamp}
import org.apache.hadoop.fs.Path
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.ZCurveOptimizeHelper
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.collection.JavaConversions._
import scala.util.Random
class TestOptimizeTable extends HoodieClientTestBase {
var spark: SparkSession = null
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
@BeforeEach override def setUp() {
initPath()
initSparkContexts()
spark = sqlContext.sparkSession
initTestDataGenerator()
initFileSystem()
}
@AfterEach override def tearDown() = {
cleanupSparkContexts()
cleanupTestDataGenerator()
cleanupFileSystem()
}
@ParameterizedTest
@ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ"))
def testOptimizewithClustering(tableType: String): Unit = {
// Bulk Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "false")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
// option for clustering
.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.clustering.inline", "true")
.option("hoodie.clustering.inline.max.commits", "1")
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824")
.option("hoodie.clustering.plan.strategy.small.file.limit", "629145600")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString)
.option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 *1024 * 1024L))
.option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
.option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon")
.mode(SaveMode.Overwrite)
.save(basePath)
assertEquals(1000, spark.read.format("hudi").load(basePath).count())
assertEquals(1000,
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true").format("hudi").load(basePath).count())
}
@Test
def testCollectMinMaxStatistics(): Unit = {
val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax")
val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat")
val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
try {
val complexDataFrame = createComplexDataFrame(spark)
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
val df = spark.read.load(testPath.toString)
// do not support TimeStampType, so if we collect statistics for c4, should throw exception
val colDf = ZCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8")
colDf.cache()
assertEquals(colDf.count(), 3)
assertEquals(colDf.take(1)(0).length, 22)
colDf.unpersist()
// try to save statistics
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1"))
// save again
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2"))
// test old index table clean
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3"))
assertEquals(!fs.exists(new Path(statisticPath, "2")), true)
assertEquals(fs.exists(new Path(statisticPath, "3")), true)
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
if (fs.exists(statisticPath)) fs.delete(statisticPath)
}
}
def createComplexDataFrame(spark: SparkSession): DataFrame = {
val schema = new StructType()
.add("c1", IntegerType)
.add("c2", StringType)
.add("c3", DecimalType(9,3))
.add("c4", TimestampType)
.add("c5", ShortType)
.add("c6", DateType)
.add("c7", BinaryType)
.add("c8", ByteType)
val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item =>
val c1 = Integer.valueOf(item)
val c2 = s" ${item}sdc"
val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}")
val c4 = new Timestamp(System.currentTimeMillis())
val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}")
val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")
val c7 = Array(item).map(_.toByte)
val c8 = java.lang.Byte.valueOf("9")
RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8)
}
spark.createDataFrame(rdd, schema)
}
}