diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java index 5d7ae1e46..7b2f93ec8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.parquet.io.api.Binary; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -62,6 +61,7 @@ import scala.collection.JavaConversions; import javax.annotation.Nonnull; import java.io.IOException; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -422,9 +422,8 @@ public class ColumnStatsIndexHelper { ); } else if (colType instanceof StringType) { return Pair.of( - new String(((Binary) colMetadata.getMinValue()).getBytes()), - new String(((Binary) colMetadata.getMaxValue()).getBytes()) - ); + colMetadata.getMinValue().toString(), + colMetadata.getMaxValue().toString()); } else if (colType instanceof DecimalType) { return Pair.of( new BigDecimal(colMetadata.getMinValue().toString()), @@ -447,8 +446,8 @@ public class ColumnStatsIndexHelper { new Float(colMetadata.getMaxValue().toString())); } else if (colType instanceof BinaryType) { return Pair.of( - ((Binary) colMetadata.getMinValue()).getBytes(), - ((Binary) colMetadata.getMaxValue()).getBytes()); + ((ByteBuffer) colMetadata.getMinValue()).array(), + ((ByteBuffer) colMetadata.getMaxValue()).array()); } else if (colType instanceof BooleanType) { return Pair.of( Boolean.valueOf(colMetadata.getMinValue().toString()), diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index ca977ae53..31dcd8765 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -18,8 +18,6 @@ package org.apache.hudi.common.model; -import org.apache.parquet.schema.PrimitiveStringifier; - import java.util.Objects; /** @@ -31,15 +29,13 @@ public class HoodieColumnRangeMetadata { private final T minValue; private final T maxValue; private final long numNulls; - private final PrimitiveStringifier stringifier; - public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) { + public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls) { this.filePath = filePath; this.columnName = columnName; this.minValue = minValue; this.maxValue = maxValue; this.numNulls = numNulls; - this.stringifier = stringifier; } public String getFilePath() { @@ -58,10 +54,6 @@ public class HoodieColumnRangeMetadata { return this.maxValue; } - public PrimitiveStringifier getStringifier() { - return stringifier; - } - public long getNumNulls() { return numNulls; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index 136206150..66884fe80 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -37,6 +37,7 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; @@ -45,6 +46,7 @@ import org.apache.parquet.schema.PrimitiveType; import javax.annotation.Nonnull; import java.io.IOException; import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -308,9 +310,8 @@ public class ParquetUtils extends BaseFileUtils { convertToNativeJavaType( columnChunkMetaData.getPrimitiveType(), columnChunkMetaData.getStatistics().genericGetMax()), - columnChunkMetaData.getStatistics().getNumNulls(), - columnChunkMetaData.getPrimitiveType().stringifier())) - ).collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName)); + columnChunkMetaData.getStatistics().getNumNulls()))) + .collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName)); // Combine those into file-level statistics // NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer @@ -360,24 +361,56 @@ public class ParquetUtils extends BaseFileUtils { return new HoodieColumnRangeMetadata( one.getFilePath(), - one.getColumnName(), minValue, maxValue, one.getNumNulls() + another.getNumNulls(), one.getStringifier()); + one.getColumnName(), minValue, maxValue, one.getNumNulls() + another.getNumNulls()); } private static Comparable convertToNativeJavaType(PrimitiveType primitiveType, Comparable val) { if (primitiveType.getOriginalType() == OriginalType.DECIMAL) { - DecimalMetadata decimalMetadata = primitiveType.getDecimalMetadata(); - return BigDecimal.valueOf((Integer) val, decimalMetadata.getScale()); + return extractDecimal(val, primitiveType.getDecimalMetadata()); } else if (primitiveType.getOriginalType() == OriginalType.DATE) { // NOTE: This is a workaround to address race-condition in using // {@code SimpleDataFormat} concurrently (w/in {@code DateStringifier}) // TODO cleanup after Parquet upgrade to 1.12 synchronized (primitiveType.stringifier()) { + // Date logical type is implemented as a signed INT32 + // REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md return java.sql.Date.valueOf( primitiveType.stringifier().stringify((Integer) val) ); } + } else if (primitiveType.getOriginalType() == OriginalType.UTF8) { + // NOTE: UTF8 type designates a byte array that should be interpreted as a + // UTF-8 encoded character string + // REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md + return ((Binary) val).toStringUsingUTF8(); + } else if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) { + // NOTE: `getBytes` access makes a copy of the underlying byte buffer + return ((Binary) val).toByteBuffer(); } return val; } + + @Nonnull + private static BigDecimal extractDecimal(Object val, DecimalMetadata decimalMetadata) { + // In Parquet, Decimal could be represented as either of + // 1. INT32 (for 1 <= precision <= 9) + // 2. INT64 (for 1 <= precision <= 18) + // 3. FIXED_LEN_BYTE_ARRAY (precision is limited by the array size. Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits) + // 4. BINARY (precision is not limited) + // REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#DECIMAL + int scale = decimalMetadata.getScale(); + if (val == null) { + return null; + } else if (val instanceof Integer) { + return BigDecimal.valueOf((Integer) val, scale); + } else if (val instanceof Long) { + return BigDecimal.valueOf((Long) val, scale); + } else if (val instanceof Binary) { + // NOTE: Unscaled number is stored in BE format (most significant byte is 0th) + return new BigDecimal(new BigInteger(((Binary)val).getBytesUnsafe()), scale); + } else { + throw new UnsupportedOperationException(String.format("Unsupported value type (%s)", val.getClass().getName())); + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 000000000..59b3ff043 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":770,"c2":" 770sdc","c3":335.770,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-01-15","c7":"Ag==","c8":9} +{"c1":768,"c2":" 768sdc","c3":64.768,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-10-13","c7":"AA==","c8":9} +{"c1":431,"c2":" 431sdc","c3":153.431,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-03-12","c7":"rw==","c8":9} +{"c1":427,"c2":" 427sdc","c3":246.427,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-10-08","c7":"qw==","c8":9} +{"c1":328,"c2":" 328sdc","c3":977.328,"c4":"2021-11-18T23:34:44.181-08:00","c5":34,"c6":"2020-10-21","c7":"SA==","c8":9} +{"c1":320,"c2":" 320sdc","c3":230.320,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-02-13","c7":"QA==","c8":9} +{"c1":317,"c2":" 317sdc","c3":580.317,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-10-10","c7":"PQ==","c8":9} +{"c1":308,"c2":" 308sdc","c3":375.308,"c4":"2021-11-18T23:34:44.180-08:00","c5":32,"c6":"2020-01-01","c7":"NA==","c8":9} +{"c1":304,"c2":" 304sdc","c3":904.304,"c4":"2021-11-18T23:34:44.179-08:00","c5":32,"c6":"2020-08-25","c7":"MA==","c8":9} +{"c1":300,"c2":" 300sdc","c3":398.300,"c4":"2021-11-18T23:34:44.179-08:00","c5":31,"c6":"2020-04-21","c7":"LA==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 000000000..c5a11067c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":719,"c2":" 719sdc","c3":707.719,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-05-20","c7":"zw==","c8":9} +{"c1":715,"c2":" 715sdc","c3":777.715,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-01-16","c7":"yw==","c8":9} +{"c1":579,"c2":" 579sdc","c3":958.579,"c4":"2021-11-18T23:34:44.193-08:00","c5":59,"c6":"2020-08-20","c7":"Qw==","c8":9} +{"c1":568,"c2":" 568sdc","c3":667.568,"c4":"2021-11-18T23:34:44.193-08:00","c5":58,"c6":"2020-08-09","c7":"OA==","c8":9} +{"c1":367,"c2":" 367sdc","c3":791.367,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-05-04","c7":"bw==","c8":9} +{"c1":364,"c2":" 364sdc","c3":264.364,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-02-01","c7":"bA==","c8":9} +{"c1":250,"c2":" 250sdc","c3":624.250,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-09-27","c7":"+g==","c8":9} +{"c1":249,"c2":" 249sdc","c3":579.249,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-08-26","c7":"+Q==","c8":9} +{"c1":246,"c2":" 246sdc","c3":413.246,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-05-23","c7":"9g==","c8":9} +{"c1":125,"c2":" 125sdc","c3":153.125,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-05-14","c7":"fQ==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 000000000..585eb3132 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":486,"c2":" 486sdc","c3":278.486,"c4":"2021-11-18T23:34:44.189-08:00","c5":50,"c6":"2020-03-11","c7":"5g==","c8":9} +{"c1":483,"c2":" 483sdc","c3":162.483,"c4":"2021-11-18T23:34:44.189-08:00","c5":49,"c6":"2020-11-08","c7":"4w==","c8":9} +{"c1":224,"c2":" 224sdc","c3":294.224,"c4":"2021-11-18T23:34:44.175-08:00","c5":24,"c6":"2020-05-01","c7":"4A==","c8":9} +{"c1":118,"c2":" 118sdc","c3":204.118,"c4":"2021-11-18T23:34:44.168-08:00","c5":13,"c6":"2020-09-07","c7":"dg==","c8":9} +{"c1":111,"c2":" 111sdc","c3":82.111,"c4":"2021-11-18T23:34:44.168-08:00","c5":12,"c6":"2020-02-28","c7":"bw==","c8":9} +{"c1":79,"c2":" 79sdc","c3":198.790,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-03-24","c7":"Tw==","c8":9} +{"c1":77,"c2":" 77sdc","c3":619.770,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-01-22","c7":"TQ==","c8":9} +{"c1":76,"c2":" 76sdc","c3":315.760,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-11-21","c7":"TA==","c8":9} +{"c1":60,"c2":" 60sdc","c3":326.600,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-06-05","c7":"PA==","c8":9} +{"c1":59,"c2":" 59sdc","c3":771.590,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-05-04","c7":"Ow==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 000000000..2e37e6a18 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":272,"c2":" 272sdc","c3":979.272,"c4":"2021-11-18T23:34:44.178-08:00","c5":28,"c6":"2020-09-21","c7":"EA==","c8":9} +{"c1":258,"c2":" 258sdc","c3":627.258,"c4":"2021-11-18T23:34:44.177-08:00","c5":27,"c6":"2020-06-07","c7":"Ag==","c8":9} +{"c1":240,"c2":" 240sdc","c3":880.240,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-10-17","c7":"8A==","c8":9} +{"c1":236,"c2":" 236sdc","c3":576.236,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-06-13","c7":"7A==","c8":9} +{"c1":137,"c2":" 137sdc","c3":597.137,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-06-26","c7":"iQ==","c8":9} +{"c1":134,"c2":" 134sdc","c3":802.134,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-03-23","c7":"hg==","c8":9} +{"c1":131,"c2":" 131sdc","c3":959.131,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-11-20","c7":"gw==","c8":9} +{"c1":129,"c2":" 129sdc","c3":430.129,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-09-18","c7":"gQ==","c8":9} +{"c1":24,"c2":" 24sdc","c3":867.240,"c4":"2021-11-18T23:34:44.161-08:00","c5":4,"c6":"2020-03-25","c7":"GA==","c8":9} +{"c1":8,"c2":" 8sdc","c3":977.800,"c4":"2021-11-18T23:34:44.159-08:00","c5":2,"c6":"2020-09-09","c7":"CA==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json new file mode 100644 index 000000000..43d89698c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json @@ -0,0 +1,10 @@ +{"c1":323,"c2":" 323sdc","c3":738.323,"c4":"2021-11-19T20:40:55.522-08:00","c5":33,"c6":"2020-05-16","c7":"Qw==","c8":9} +{"c1":326,"c2":" 326sdc","c3":481.326,"c4":"2021-11-19T20:40:55.522-08:00","c5":34,"c6":"2020-08-19","c7":"Rg==","c8":9} +{"c1":555,"c2":" 555sdc","c3":791.555,"c4":"2021-11-19T20:40:55.535-08:00","c5":57,"c6":"2020-06-24","c7":"Kw==","c8":9} +{"c1":556,"c2":" 556sdc","c3":100.556,"c4":"2021-11-19T20:40:55.535-08:00","c5":57,"c6":"2020-07-25","c7":"LA==","c8":9} +{"c1":562,"c2":" 562sdc","c3":100.562,"c4":"2021-11-19T20:40:55.535-08:00","c5":57,"c6":"2020-02-03","c7":"Mg==","c8":9} +{"c1":619,"c2":" 619sdc","c3":284.619,"c4":"2021-11-19T20:40:55.537-08:00","c5":63,"c6":"2020-04-04","c7":"aw==","c8":9} +{"c1":624,"c2":" 624sdc","c3":783.624,"c4":"2021-11-19T20:40:55.537-08:00","c5":64,"c6":"2020-09-09","c7":"cA==","c8":9} +{"c1":633,"c2":" 633sdc","c3":706.633,"c4":"2021-11-19T20:40:55.538-08:00","c5":64,"c6":"2020-07-18","c7":"eQ==","c8":9} +{"c1":638,"c2":" 638sdc","c3":811.638,"c4":"2021-11-19T20:40:55.538-08:00","c5":65,"c6":"2020-01-23","c7":"fg==","c8":9} +{"c1":639,"c2":" 639sdc","c3":299.639,"c4":"2021-11-19T20:40:55.538-08:00","c5":65,"c6":"2020-02-24","c7":"fw==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json new file mode 100644 index 000000000..7537986a1 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json @@ -0,0 +1,10 @@ +{"c1":74,"c2":" 74sdc","c3":38.740,"c4":"2021-11-19T20:40:55.507-08:00","c5":9,"c6":"2020-09-19","c7":"Sg==","c8":9} +{"c1":181,"c2":" 181sdc","c3":754.181,"c4":"2021-11-19T20:40:55.514-08:00","c5":19,"c6":"2020-06-14","c7":"tQ==","c8":9} +{"c1":212,"c2":" 212sdc","c3":633.212,"c4":"2021-11-19T20:40:55.516-08:00","c5":22,"c6":"2020-04-17","c7":"1A==","c8":9} +{"c1":213,"c2":" 213sdc","c3":980.213,"c4":"2021-11-19T20:40:55.516-08:00","c5":22,"c6":"2020-05-18","c7":"1Q==","c8":9} +{"c1":428,"c2":" 428sdc","c3":550.428,"c4":"2021-11-19T20:40:55.528-08:00","c5":44,"c6":"2020-11-09","c7":"rA==","c8":9} +{"c1":429,"c2":" 429sdc","c3":799.429,"c4":"2021-11-19T20:40:55.528-08:00","c5":44,"c6":"2020-01-10","c7":"rQ==","c8":9} +{"c1":430,"c2":" 430sdc","c3":76.430,"c4":"2021-11-19T20:40:55.528-08:00","c5":44,"c6":"2020-02-11","c7":"rg==","c8":9} +{"c1":539,"c2":" 539sdc","c3":866.539,"c4":"2021-11-19T20:40:55.534-08:00","c5":55,"c6":"2020-01-08","c7":"Gw==","c8":9} +{"c1":552,"c2":" 552sdc","c3":382.552,"c4":"2021-11-19T20:40:55.535-08:00","c5":56,"c6":"2020-03-21","c7":"KA==","c8":9} +{"c1":559,"c2":" 559sdc","c3":699.559,"c4":"2021-11-19T20:40:55.535-08:00","c5":57,"c6":"2020-10-28","c7":"Lw==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json new file mode 100644 index 000000000..7f171d3b7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json @@ -0,0 +1,10 @@ +{"c1":355,"c2":" 355sdc","c3":994.355,"c4":"2021-11-19T20:40:55.524-08:00","c5":37,"c6":"2020-04-20","c7":"Yw==","c8":9} +{"c1":358,"c2":" 358sdc","c3":975.358,"c4":"2021-11-19T20:40:55.524-08:00","c5":37,"c6":"2020-07-23","c7":"Zg==","c8":9} +{"c1":769,"c2":" 769sdc","c3":919.769,"c4":"2021-11-19T20:40:55.543-08:00","c5":78,"c6":"2020-11-14","c7":"AQ==","c8":9} +{"c1":882,"c2":" 882sdc","c3":374.882,"c4":"2021-11-19T20:40:55.547-08:00","c5":89,"c6":"2020-03-15","c7":"cg==","c8":9} +{"c1":892,"c2":" 892sdc","c3":787.892,"c4":"2021-11-19T20:40:55.547-08:00","c5":90,"c6":"2020-02-25","c7":"fA==","c8":9} +{"c1":917,"c2":" 917sdc","c3":912.917,"c4":"2021-11-19T20:40:55.548-08:00","c5":93,"c6":"2020-05-22","c7":"lQ==","c8":9} +{"c1":932,"c2":" 932sdc","c3":990.932,"c4":"2021-11-19T20:40:55.549-08:00","c5":94,"c6":"2020-09-09","c7":"pA==","c8":9} +{"c1":933,"c2":" 933sdc","c3":510.933,"c4":"2021-11-19T20:40:55.549-08:00","c5":94,"c6":"2020-10-10","c7":"pQ==","c8":9} +{"c1":943,"c2":" 943sdc","c3":601.943,"c4":"2021-11-19T20:40:55.549-08:00","c5":95,"c6":"2020-09-20","c7":"rw==","c8":9} +{"c1":945,"c2":" 945sdc","c3":790.945,"c4":"2021-11-19T20:40:55.549-08:00","c5":96,"c6":"2020-11-22","c7":"sQ==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json new file mode 100644 index 000000000..48d91417b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json @@ -0,0 +1,10 @@ +{"c1":0,"c2":" 0sdc","c3":19.000,"c4":"2021-11-19T20:40:55.339-08:00","c5":1,"c6":"2020-01-01","c7":"AA==","c8":9} +{"c1":89,"c2":" 89sdc","c3":759.890,"c4":"2021-11-19T20:40:55.508-08:00","c5":10,"c6":"2020-02-06","c7":"WQ==","c8":9} +{"c1":199,"c2":" 199sdc","c3":315.199,"c4":"2021-11-19T20:40:55.515-08:00","c5":21,"c6":"2020-02-04","c7":"xw==","c8":9} +{"c1":200,"c2":" 200sdc","c3":618.200,"c4":"2021-11-19T20:40:55.515-08:00","c5":21,"c6":"2020-03-05","c7":"yA==","c8":9} +{"c1":309,"c2":" 309sdc","c3":642.309,"c4":"2021-11-19T20:40:55.521-08:00","c5":32,"c6":"2020-02-02","c7":"NQ==","c8":9} +{"c1":318,"c2":" 318sdc","c3":106.318,"c4":"2021-11-19T20:40:55.522-08:00","c5":33,"c6":"2020-11-11","c7":"Pg==","c8":9} +{"c1":329,"c2":" 329sdc","c3":200.329,"c4":"2021-11-19T20:40:55.522-08:00","c5":34,"c6":"2020-11-22","c7":"SQ==","c8":9} +{"c1":690,"c2":" 690sdc","c3":854.690,"c4":"2021-11-19T20:40:55.540-08:00","c5":70,"c6":"2020-09-19","c7":"sg==","c8":9} +{"c1":697,"c2":" 697sdc","c3":916.697,"c4":"2021-11-19T20:40:55.540-08:00","c5":71,"c6":"2020-05-26","c7":"uQ==","c8":9} +{"c1":959,"c2":" 959sdc","c3":480.959,"c4":"2021-11-19T20:40:55.550-08:00","c5":97,"c6":"2020-03-08","c7":"vw==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json index 5c876126a..c8fead0c1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json @@ -1,8 +1,8 @@ -{"c1_maxValue":1000,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 993sdc","c2_minValue":" 1000sdc","c2_num_nulls":0,"c3_maxValue":999.348,"c3_minValue":5.102,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"} -{"c1_maxValue":1000,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 996sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":999.779,"c3_minValue":2.992,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} -{"c1_maxValue":998,"c1_minValue":2,"c1_num_nulls":0,"c2_maxValue":" 998sdc","c2_minValue":" 104sdc","c2_num_nulls":0,"c3_maxValue":997.905,"c3_minValue":0.876,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"} -{"c1_maxValue":997,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 102sdc","c2_num_nulls":0,"c3_maxValue":990.531,"c3_minValue":2.336,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} -{"c1_maxValue":994,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":997.496,"c3_minValue":7.742,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"} -{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 999sdc","c2_minValue":" 100sdc","c2_num_nulls":0,"c3_maxValue":980.676,"c3_minValue":0.120,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} -{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 10sdc","c2_num_nulls":0,"c3_maxValue":993.940,"c3_minValue":4.598,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-1c8226c2-f2a0-455d-aedd-c544003b0b3d-c000.snappy.parquet"} -{"c1_maxValue":998,"c1_minValue":6,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":999.282,"c3_minValue":1.217,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} \ No newline at end of file +{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-0-c000.snappy.parquet"} +{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-1-c000.snappy.parquet"} +{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-0-c000.snappy.parquet"} +{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-1-c000.snappy.parquet"} +{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-0-c000.snappy.parquet"} +{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-1-c000.snappy.parquet"} +{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-0-c000.snappy.parquet"} +{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-1-c000.snappy.parquet"} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json index 45cb9aaf8..30fccb3f9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json @@ -1,4 +1,4 @@ -{"c1_maxValue":1000,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 996sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":999.779,"c3_minValue":2.992,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/g==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} -{"c1_maxValue":997,"c1_minValue":3,"c1_num_nulls":0,"c2_maxValue":" 9sdc","c2_minValue":" 102sdc","c2_num_nulls":0,"c3_maxValue":990.531,"c3_minValue":2.336,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-27","c6_minValue":"2020-01-02","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} -{"c1_maxValue":999,"c1_minValue":1,"c1_num_nulls":0,"c2_maxValue":" 999sdc","c2_minValue":" 100sdc","c2_num_nulls":0,"c3_maxValue":980.676,"c3_minValue":0.120,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-03","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} -{"c1_maxValue":998,"c1_minValue":6,"c1_num_nulls":0,"c2_maxValue":" 99sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":999.282,"c3_minValue":1.217,"c3_num_nulls":0,"c5_maxValue":101,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-28","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"/w==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-5034d84a-c4c8-4eba-85b5-a52f47e628a7-c000.snappy.parquet"} \ No newline at end of file +{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-0-c000.snappy.parquet"} +{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-0-c000.snappy.parquet"} +{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-0-c000.snappy.parquet"} +{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-0-c000.snappy.parquet"} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index e79067041..7b06bca87 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -18,16 +18,25 @@ package org.apache.hudi.functional -import org.apache.hadoop.fs.{LocatedFileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieColumnRangeMetadata +import org.apache.hudi.common.util.ParquetUtils import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.typedLit import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.junit.jupiter.api.Assertions.assertEquals +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, RowFactory, SaveMode, SparkSession, functions} +import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test} -import scala.collection.JavaConversions._ +import java.math.BigInteger +import java.nio.charset.StandardCharsets +import java.sql.{Date, Timestamp} +import scala.collection.JavaConverters._ +import scala.util.{Random, Success} class TestColumnStatsIndex extends HoodieClientTestBase { var spark: SparkSession = _ @@ -58,15 +67,17 @@ class TestColumnStatsIndex extends HoodieClientTestBase { } @Test - @Disabled - def testColumnStatsTableComposition(): Unit = { + def testZIndexTableComposition(): Unit = { + val targetParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString + val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString + + bootstrapParquetInputTableFromJSON(sourceJSONTablePath, targetParquetTablePath) + val inputDf = // NOTE: Schema here is provided for validation that the input date is in the appropriate format spark.read .schema(sourceTableSchema) - .parquet( - getClass.getClassLoader.getResource("index/zorder/input-table").toString - ) + .parquet(targetParquetTablePath) val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8") val zorderedColsSchemaFields = inputDf.schema.fields.filter(f => zorderedCols.contains(f.name)).toSeq @@ -75,22 +86,18 @@ class TestColumnStatsIndex extends HoodieClientTestBase { val newZIndexTableDf = ColumnStatsIndexHelper.buildColumnStatsTableFor( inputDf.sparkSession, - inputDf.inputFiles.toSeq, - zorderedColsSchemaFields + inputDf.inputFiles.toSeq.asJava, + zorderedColsSchemaFields.asJava ) val indexSchema = ColumnStatsIndexHelper.composeIndexSchema( - sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq + sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq.asJava ) // Collect Z-index stats manually (reading individual Parquet files) val manualZIndexTableDf = - buildColumnStatsTableManually( - getClass.getClassLoader.getResource("index/zorder/input-table").toString, - zorderedCols, - indexSchema - ) + buildColumnStatsTableManually(targetParquetTablePath, zorderedCols, indexSchema) // NOTE: Z-index is built against stats collected w/in Parquet footers, which will be // represented w/ corresponding Parquet schema (INT, INT64, INT96, etc). @@ -107,18 +114,23 @@ class TestColumnStatsIndex extends HoodieClientTestBase { .schema(indexSchema) .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString) - assertEquals(asJson(sort(expectedZIndexTableDf)), asJson(sort(newZIndexTableDf))) + assertEquals(asJson(sort(expectedZIndexTableDf)), asJson(sort(replace(newZIndexTableDf)))) } @Test - @Disabled - def testColumnStatsTableMerge(): Unit = { + def testZIndexTableMerge(): Unit = { val testZIndexPath = new Path(basePath, "zindex") + val firstParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString + val firstJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString + + // Bootstrap FIRST source Parquet table + bootstrapParquetInputTableFromJSON(firstJSONTablePath, firstParquetTablePath) + val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8") val indexSchema = ColumnStatsIndexHelper.composeIndexSchema( - sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq + sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq.asJava ) // @@ -126,19 +138,16 @@ class TestColumnStatsIndex extends HoodieClientTestBase { // val firstCommitInstance = "0" - val firstInputDf = - spark.read.parquet( - getClass.getClassLoader.getResource("index/zorder/input-table").toString - ) + val firstInputDf = spark.read.parquet(firstParquetTablePath) ColumnStatsIndexHelper.updateColumnStatsIndexFor( firstInputDf.sparkSession, sourceTableSchema, - firstInputDf.inputFiles.toSeq, - zorderedCols.toSeq, + firstInputDf.inputFiles.toSeq.asJava, + zorderedCols.asJava, testZIndexPath.toString, firstCommitInstance, - Seq() + Seq().asJava ) // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able @@ -152,15 +161,19 @@ class TestColumnStatsIndex extends HoodieClientTestBase { .schema(indexSchema) .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString) - assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(initialZIndexTable))) + assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(replace(initialZIndexTable)))) + + // Bootstrap SECOND source Parquet table + val secondParquetTablePath = tempDir.resolve("index/zorder/another-input-table").toAbsolutePath.toString + val secondJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString + + bootstrapParquetInputTableFromJSON(secondJSONTablePath, secondParquetTablePath) val secondCommitInstance = "1" val secondInputDf = spark.read .schema(sourceTableSchema) - .parquet( - getClass.getClassLoader.getResource("index/zorder/another-input-table").toString - ) + .parquet(secondParquetTablePath) // // Update Z-index table @@ -169,11 +182,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase { ColumnStatsIndexHelper.updateColumnStatsIndexFor( secondInputDf.sparkSession, sourceTableSchema, - secondInputDf.inputFiles.toSeq, - zorderedCols.toSeq, + secondInputDf.inputFiles.toSeq.asJava, + zorderedCols.asJava, testZIndexPath.toString, secondCommitInstance, - Seq(firstCommitInstance) + Seq(firstCommitInstance).asJava ) // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able @@ -187,56 +200,96 @@ class TestColumnStatsIndex extends HoodieClientTestBase { .schema(indexSchema) .json(getClass.getClassLoader.getResource("index/zorder/z-index-table-merged.json").toString) - assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(mergedZIndexTable))) + assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(replace(mergedZIndexTable)))) } @Test - @Disabled def testColumnStatsTablesGarbageCollection(): Unit = { - val testZIndexPath = new Path(System.getProperty("java.io.tmpdir"), "zindex") - val fs = testZIndexPath.getFileSystem(spark.sparkContext.hadoopConfiguration) + val targetParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString + val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString - val inputDf = - spark.read.parquet( - getClass.getClassLoader.getResource("index/zorder/input-table").toString - ) + bootstrapParquetInputTableFromJSON(sourceJSONTablePath, targetParquetTablePath) + + val inputDf = spark.read.parquet(targetParquetTablePath) + + val testColumnStatsIndexPath = new Path(tempDir.resolve("zindex").toAbsolutePath.toString) + val fs = testColumnStatsIndexPath.getFileSystem(spark.sparkContext.hadoopConfiguration) // Try to save statistics ColumnStatsIndexHelper.updateColumnStatsIndexFor( inputDf.sparkSession, sourceTableSchema, - inputDf.inputFiles.toSeq, - Seq("c1","c2","c3","c5","c6","c7","c8"), - testZIndexPath.toString, + inputDf.inputFiles.toSeq.asJava, + Seq("c1","c2","c3","c5","c6","c7","c8").asJava, + testColumnStatsIndexPath.toString, "2", - Seq("0", "1") + Seq("0", "1").asJava ) // Save again ColumnStatsIndexHelper.updateColumnStatsIndexFor( inputDf.sparkSession, sourceTableSchema, - inputDf.inputFiles.toSeq, - Seq("c1","c2","c3","c5","c6","c7","c8"), - testZIndexPath.toString, + inputDf.inputFiles.toSeq.asJava, + Seq("c1","c2","c3","c5","c6","c7","c8").asJava, + testColumnStatsIndexPath.toString, "3", - Seq("0", "1", "2") + Seq("0", "1", "2").asJava ) // Test old index table being cleaned up ColumnStatsIndexHelper.updateColumnStatsIndexFor( inputDf.sparkSession, sourceTableSchema, - inputDf.inputFiles.toSeq, - Seq("c1","c2","c3","c5","c6","c7","c8"), - testZIndexPath.toString, + inputDf.inputFiles.toSeq.asJava, + Seq("c1","c2","c3","c5","c6","c7","c8").asJava, + testColumnStatsIndexPath.toString, "4", - Seq("0", "1", "3") + Seq("0", "1", "3").asJava ) - assertEquals(!fs.exists(new Path(testZIndexPath, "2")), true) - assertEquals(!fs.exists(new Path(testZIndexPath, "3")), true) - assertEquals(fs.exists(new Path(testZIndexPath, "4")), true) + assertEquals(!fs.exists(new Path(testColumnStatsIndexPath, "2")), true) + assertEquals(!fs.exists(new Path(testColumnStatsIndexPath, "3")), true) + assertEquals(fs.exists(new Path(testColumnStatsIndexPath, "4")), true) + } + + @Test + def testParquetMetadataRangeExtraction(): Unit = { + val df = generateRandomDataFrame(spark) + + val pathStr = tempDir.resolve("min-max").toAbsolutePath.toString + + df.write.format("parquet") + .mode(SaveMode.Overwrite) + .save(pathStr) + + val utils = new ParquetUtils + + val conf = new Configuration() + val path = new Path(pathStr) + val fs = path.getFileSystem(conf) + + val parquetFilePath = fs.listStatus(path).filter(fs => fs.getPath.getName.endsWith(".parquet")).toSeq.head.getPath + + val ranges = utils.readRangeFromParquetMetadata(conf, parquetFilePath, + Seq("c1", "c2", "c3a", "c3b", "c3c", "c4", "c5", "c6", "c7", "c8").asJava) + + ranges.asScala.foreach(r => { + // NOTE: Unfortunately Parquet can't compute statistics for Timestamp column, hence we + // skip it in our assertions + if (r.getColumnName.equals("c4")) { + // scalastyle:off return + return + // scalastyle:on return + } + + val min = r.getMinValue + val max = r.getMaxValue + + assertNotNull(min) + assertNotNull(max) + assertTrue(r.getMinValue.asInstanceOf[Comparable[Object]].compareTo(r.getMaxValue.asInstanceOf[Object]) <= 0) + }) } private def buildColumnStatsTableManually(tablePath: String, zorderedCols: Seq[String], indexSchema: StructType) = { @@ -268,11 +321,85 @@ class TestColumnStatsIndex extends HoodieClientTestBase { df.selectExpr(exprs: _*) .collect() - }), + }).asJava, indexSchema ) } + def bootstrapParquetInputTableFromJSON(sourceJSONTablePath: String, targetParquetTablePath: String): Unit = { + val jsonInputDF = + // NOTE: Schema here is provided for validation that the input date is in the appropriate format + spark.read + .schema(sourceTableSchema) + .json(sourceJSONTablePath) + + jsonInputDF + .sort("c1") + .repartition(4, new Column("c1")) + .write + .format("parquet") + .mode("overwrite") + .save(targetParquetTablePath) + + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + // Have to cleanup additional artefacts of Spark write + fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false) + } + + def replace(ds: Dataset[Row]): DataFrame = { + val uuidRegexp = "[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}" + + val uuids = + ds.selectExpr(s"regexp_extract(file, '(${uuidRegexp})')") + .distinct() + .collect() + .map(_.getString(0)) + .sorted + + val uuidToIdx: UserDefinedFunction = functions.udf((fileName: String) => { + val (uuid, idx) = uuids.zipWithIndex.find { case (uuid, _) => fileName.contains(uuid) }.get + fileName.replace(uuid, idx.toString) + }) + + ds.withColumn("file", uuidToIdx(ds("file"))) + } + + private def generateRandomDataFrame(spark: SparkSession): DataFrame = { + val sourceTableSchema = + new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + // NOTE: We're testing different values for precision of the decimal to make sure + // we execute paths bearing different underlying representations in Parquet + // REF: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#DECIMAL + .add("c3a", DecimalType(9,3)) + .add("c3b", DecimalType(10,3)) + .add("c3c", DecimalType(20,3)) + .add("c4", TimestampType) + .add("c5", ShortType) + .add("c6", DateType) + .add("c7", BinaryType) + .add("c8", ByteType) + + val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => + val c1 = Integer.valueOf(item) + val c2 = Random.nextString(10) + val c3a = java.math.BigDecimal.valueOf(Random.nextInt() % (1 << 24), 3) + val c3b = java.math.BigDecimal.valueOf(Random.nextLong() % (1L << 32), 3) + // NOTE: We cap it at 2^64 to make sure we're not exceeding target decimal's range + val c3c = new java.math.BigDecimal(new BigInteger(64, new java.util.Random()), 3) + val c4 = new Timestamp(System.currentTimeMillis()) + val c5 = java.lang.Short.valueOf(s"${(item + 16) / 10}") + val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}") + val c7 = Array(item).map(_.toByte) + val c8 = java.lang.Byte.valueOf("9") + + RowFactory.create(c1, c2, c3a, c3b, c3c, c4, c5, c6, c7, c8) + } + + spark.createDataFrame(rdd, sourceTableSchema) + } + private def asJson(df: DataFrame) = df.toJSON .select("value") @@ -281,7 +408,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase { .map(_.getString(0)) .mkString("\n") - private def sort(df: DataFrame): DataFrame = { // Since upon parsing JSON, Spark re-order columns in lexicographical order // of their names, we have to shuffle new Z-index table columns order to match diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala index 98495c627..818addaf8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala @@ -32,9 +32,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments import org.junit.jupiter.params.provider.{Arguments, MethodSource} -import java.sql.{Date, Timestamp} import scala.collection.JavaConversions._ -import scala.util.Random @Tag("functional") class TestLayoutOptimization extends HoodieClientTestBase { @@ -151,22 +149,6 @@ class TestLayoutOptimization extends HoodieClientTestBase { val rows = one.count() assert(rows == other.count() && one.intersect(other).count() == rows) } - - def createComplexDataFrame(spark: SparkSession): DataFrame = { - val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => - val c1 = Integer.valueOf(item) - val c2 = s" ${item}sdc" - val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}") - val c4 = new Timestamp(System.currentTimeMillis()) - val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}") - val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}") - val c7 = Array(item).map(_.toByte) - val c8 = java.lang.Byte.valueOf("9") - - RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8) - } - spark.createDataFrame(rdd, sourceTableSchema) - } } object TestLayoutOptimization {