[MINOR] Fix typos (#4567)
This commit is contained in:
@@ -49,8 +49,8 @@ public class HoodieBaseBloomIndexCheckFunction
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
|
public Iterator<List<KeyLookupResult>> apply(Iterator<Pair<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
|
||||||
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
|
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
|
||||||
}
|
}
|
||||||
|
|
||||||
class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<KeyLookupResult>> {
|
class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, HoodieKey>, List<KeyLookupResult>> {
|
||||||
|
|||||||
@@ -138,7 +138,7 @@ public class SparkValidatorUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get reads from paritions modified including any inflight commits.
|
* Get reads from partitions modified including any inflight commits.
|
||||||
* Note that this only works for COW tables
|
* Note that this only works for COW tables
|
||||||
*/
|
*/
|
||||||
public static Dataset<Row> getRecordsFromPendingCommits(SQLContext sqlContext,
|
public static Dataset<Row> getRecordsFromPendingCommits(SQLContext sqlContext,
|
||||||
|
|||||||
@@ -53,8 +53,8 @@ public class HoodieBloomIndexCheckFunction
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<List<KeyLookupResult>> call(Integer partition,
|
public Iterator<List<KeyLookupResult>> call(Integer partition,
|
||||||
Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
|
Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
|
||||||
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
|
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
|
||||||
}
|
}
|
||||||
|
|
||||||
class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
|
class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
|
||||||
|
|||||||
@@ -98,7 +98,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
lazy val tableTypeName: String = tableType.name()
|
lazy val tableTypeName: String = tableType.name()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recored Field List(Primary Key List)
|
* Record Field List(Primary Key List)
|
||||||
*/
|
*/
|
||||||
lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields.orElse(Array.empty)
|
lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields.orElse(Array.empty)
|
||||||
|
|
||||||
@@ -108,7 +108,7 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
|
|||||||
lazy val preCombineKey: Option[String] = Option(tableConfig.getPreCombineField)
|
lazy val preCombineKey: Option[String] = Option(tableConfig.getPreCombineField)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Paritition Fields
|
* Partition Fields
|
||||||
*/
|
*/
|
||||||
lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
|
lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)
|
||||||
|
|
||||||
|
|||||||
@@ -186,14 +186,14 @@ public class TestHoodieDatasetBulkInsertHelper extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int metadataRecordKeyIndex = resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
|
int metadataRecordKeyIndex = resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD);
|
||||||
int metadataParitionPathIndex = resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
|
int metadataPartitionPathIndex = resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
|
||||||
int metadataCommitTimeIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
|
int metadataCommitTimeIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
|
||||||
int metadataCommitSeqNoIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
|
int metadataCommitSeqNoIndex = resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
|
||||||
int metadataFilenameIndex = resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD);
|
int metadataFilenameIndex = resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD);
|
||||||
|
|
||||||
result.toJavaRDD().foreach(entry -> {
|
result.toJavaRDD().foreach(entry -> {
|
||||||
assertTrue(entry.get(metadataRecordKeyIndex).equals(entry.getAs("_row_key")));
|
assertTrue(entry.get(metadataRecordKeyIndex).equals(entry.getAs("_row_key")));
|
||||||
assertTrue(entry.get(metadataParitionPathIndex).equals(entry.getAs("partition")));
|
assertTrue(entry.get(metadataPartitionPathIndex).equals(entry.getAs("partition")));
|
||||||
assertTrue(entry.get(metadataCommitSeqNoIndex).equals(""));
|
assertTrue(entry.get(metadataCommitSeqNoIndex).equals(""));
|
||||||
assertTrue(entry.get(metadataCommitTimeIndex).equals(""));
|
assertTrue(entry.get(metadataCommitTimeIndex).equals(""));
|
||||||
assertTrue(entry.get(metadataFilenameIndex).equals(""));
|
assertTrue(entry.get(metadataFilenameIndex).equals(""));
|
||||||
|
|||||||
@@ -245,12 +245,12 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
spark.sql(String.format("select count(*) from tmpTable")).show()
|
spark.sql(String.format("select count(*) from tmpTable")).show()
|
||||||
|
|
||||||
// step4: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
|
// step4: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
|
||||||
val recordCountForParititon1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
|
val recordCountForPartition1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
|
||||||
assertEquals("6", recordCountForParititon1(0).get(0).toString)
|
assertEquals("6", recordCountForPartition1(0).get(0).toString)
|
||||||
|
|
||||||
// step5: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH
|
// step5: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH
|
||||||
val recordCountForParititon2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
|
val recordCountForPartition2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
|
||||||
assertEquals("7", recordCountForParititon2(0).get(0).toString)
|
assertEquals("7", recordCountForPartition2(0).get(0).toString)
|
||||||
|
|
||||||
// step6: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH using spark.collect and then filter mode
|
// step6: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH using spark.collect and then filter mode
|
||||||
val recordsForPartitionColumn = spark.sql(String.format("select partition from tmpTable")).collect()
|
val recordsForPartitionColumn = spark.sql(String.format("select partition from tmpTable")).collect()
|
||||||
@@ -292,12 +292,12 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
spark.sql(String.format("select count(*) from tmpTable")).show()
|
spark.sql(String.format("select count(*) from tmpTable")).show()
|
||||||
|
|
||||||
// step3: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
|
// step3: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
|
||||||
val recordCountForParititon1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
|
val recordCountForPartition1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
|
||||||
assertEquals("0", recordCountForParititon1(0).get(0).toString)
|
assertEquals("0", recordCountForPartition1(0).get(0).toString)
|
||||||
|
|
||||||
// step4: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH
|
// step4: Query the rows count from hoodie table for partition2 DEFAULT_SECOND_PARTITION_PATH
|
||||||
val recordCountForParititon2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
|
val recordCountForPartition2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
|
||||||
assertEquals("7", recordCountForParititon2(0).get(0).toString)
|
assertEquals("7", recordCountForPartition2(0).get(0).toString)
|
||||||
|
|
||||||
// step5: Query the rows count from hoodie table
|
// step5: Query the rows count from hoodie table
|
||||||
val recordCount = spark.sql(String.format("select count(*) from tmpTable")).collect()
|
val recordCount = spark.sql(String.format("select count(*) from tmpTable")).collect()
|
||||||
@@ -417,7 +417,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testSparkPartitonByWithCustomKeyGenerator(): Unit = {
|
@Test def testSparkPartitionByWithCustomKeyGenerator(): Unit = {
|
||||||
// Without fieldType, the default is SIMPLE
|
// Without fieldType, the default is SIMPLE
|
||||||
var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
|
var writer = getDataFrameWriter(classOf[CustomKeyGenerator].getName)
|
||||||
writer.partitionBy("current_ts")
|
writer.partitionBy("current_ts")
|
||||||
@@ -465,7 +465,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testSparkPartitonByWithSimpleKeyGenerator() {
|
@Test def testSparkPartitionByWithSimpleKeyGenerator() {
|
||||||
// Use the `driver` field as the partition key
|
// Use the `driver` field as the partition key
|
||||||
var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName)
|
var writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName)
|
||||||
writer.partitionBy("driver")
|
writer.partitionBy("driver")
|
||||||
@@ -484,7 +484,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0)
|
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testSparkPartitonByWithComplexKeyGenerator() {
|
@Test def testSparkPartitionByWithComplexKeyGenerator() {
|
||||||
// Use the `driver` field as the partition key
|
// Use the `driver` field as the partition key
|
||||||
var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName)
|
var writer = getDataFrameWriter(classOf[ComplexKeyGenerator].getName)
|
||||||
writer.partitionBy("driver")
|
writer.partitionBy("driver")
|
||||||
@@ -503,7 +503,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0)
|
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= concat(col("driver"), lit("/"), col("rider"))).count() == 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testSparkPartitonByWithTimestampBasedKeyGenerator() {
|
@Test def testSparkPartitionByWithTimestampBasedKeyGenerator() {
|
||||||
val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName)
|
val writer = getDataFrameWriter(classOf[TimestampBasedKeyGenerator].getName)
|
||||||
writer.partitionBy("current_ts")
|
writer.partitionBy("current_ts")
|
||||||
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
|
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
|
||||||
@@ -517,7 +517,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0)
|
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= udf_date_format(col("current_ts"))).count() == 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testSparkPartitonByWithGlobalDeleteKeyGenerator() {
|
@Test def testSparkPartitionByWithGlobalDeleteKeyGenerator() {
|
||||||
val writer = getDataFrameWriter(classOf[GlobalDeleteKeyGenerator].getName)
|
val writer = getDataFrameWriter(classOf[GlobalDeleteKeyGenerator].getName)
|
||||||
writer.partitionBy("driver")
|
writer.partitionBy("driver")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
@@ -528,7 +528,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
|
|||||||
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0)
|
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testSparkPartitonByWithNonpartitionedKeyGenerator() {
|
@Test def testSparkPartitionByWithNonpartitionedKeyGenerator() {
|
||||||
// Empty string column
|
// Empty string column
|
||||||
var writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName)
|
var writer = getDataFrameWriter(classOf[NonpartitionedKeyGenerator].getName)
|
||||||
writer.partitionBy("")
|
writer.partitionBy("")
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extractor for Hive Style Partitioned tables, when the parition folders are key value pairs.
|
* Extractor for Hive Style Partitioned tables, when the partition folders are key value pairs.
|
||||||
*
|
*
|
||||||
* <p>This implementation extracts the partition value of yyyy-mm-dd from the path of type datestr=yyyy-mm-dd.
|
* <p>This implementation extracts the partition value of yyyy-mm-dd from the path of type datestr=yyyy-mm-dd.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -131,9 +131,9 @@ public class DatePartitionPathSelector extends DFSPathSelector {
|
|||||||
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
|
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
|
||||||
HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
|
HoodieSparkEngineContext context = new HoodieSparkEngineContext(sparkContext);
|
||||||
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
|
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
|
||||||
List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP), currentDate);
|
List<String> prunedPartitionPaths = pruneDatePartitionPaths(context, fs, props.getString(ROOT_INPUT_PATH_PROP), currentDate);
|
||||||
|
|
||||||
List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
|
List<FileStatus> eligibleFiles = context.flatMap(prunedPartitionPaths,
|
||||||
path -> {
|
path -> {
|
||||||
FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
|
FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
|
||||||
return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream();
|
return listEligibleFiles(fs, new Path(path), lastCheckpointTime).stream();
|
||||||
|
|||||||
Reference in New Issue
Block a user