fix some spell errorin Hudi
This commit is contained in:
@@ -38,7 +38,7 @@ import org.apache.spark.api.java.function.Function2;
|
|||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Function performing actual checking of RDD parition containing (fileId, hoodieKeys) against the
|
* Function performing actual checking of RDD partition containing (fileId, hoodieKeys) against the
|
||||||
* actual files
|
* actual files
|
||||||
*/
|
*/
|
||||||
public class HoodieBloomIndexCheckFunction implements
|
public class HoodieBloomIndexCheckFunction implements
|
||||||
@@ -82,9 +82,9 @@ public class HoodieBloomIndexCheckFunction implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterator<List<IndexLookupResult>> call(Integer partition,
|
public Iterator<List<IndexLookupResult>> call(Integer partition,
|
||||||
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> fileParitionRecordKeyTripletItr)
|
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> filePartitionRecordKeyTripletItr)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
|
return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
|
||||||
}
|
}
|
||||||
|
|
||||||
class LazyKeyCheckIterator extends
|
class LazyKeyCheckIterator extends
|
||||||
@@ -96,15 +96,15 @@ public class HoodieBloomIndexCheckFunction implements
|
|||||||
|
|
||||||
private String currentFile;
|
private String currentFile;
|
||||||
|
|
||||||
private String currentParitionPath;
|
private String currentPartitionPath;
|
||||||
|
|
||||||
LazyKeyCheckIterator(
|
LazyKeyCheckIterator(
|
||||||
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> fileParitionRecordKeyTripletItr) {
|
Iterator<Tuple2<String, Tuple2<String, HoodieKey>>> filePartitionRecordKeyTripletItr) {
|
||||||
super(fileParitionRecordKeyTripletItr);
|
super(filePartitionRecordKeyTripletItr);
|
||||||
currentFile = null;
|
currentFile = null;
|
||||||
candidateRecordKeys = new ArrayList<>();
|
candidateRecordKeys = new ArrayList<>();
|
||||||
bloomFilter = null;
|
bloomFilter = null;
|
||||||
currentParitionPath = null;
|
currentPartitionPath = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -118,7 +118,7 @@ public class HoodieBloomIndexCheckFunction implements
|
|||||||
.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath);
|
.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), filePath);
|
||||||
candidateRecordKeys = new ArrayList<>();
|
candidateRecordKeys = new ArrayList<>();
|
||||||
currentFile = fileName;
|
currentFile = fileName;
|
||||||
currentParitionPath = partitionPath;
|
currentPartitionPath = partitionPath;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieIndexException("Error checking candidate keys against file.", e);
|
throw new HoodieIndexException("Error checking candidate keys against file.", e);
|
||||||
}
|
}
|
||||||
@@ -154,7 +154,7 @@ public class HoodieBloomIndexCheckFunction implements
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// do the actual checking of file & break out
|
// do the actual checking of file & break out
|
||||||
Path filePath = new Path(basePath + "/" + currentParitionPath + "/" + currentFile);
|
Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile);
|
||||||
logger.info(
|
logger.info(
|
||||||
"#1 After bloom filter, the candidate row keys is reduced to " + candidateRecordKeys
|
"#1 After bloom filter, the candidate row keys is reduced to " + candidateRecordKeys
|
||||||
.size() + " for " + filePath);
|
.size() + " for " + filePath);
|
||||||
@@ -178,7 +178,7 @@ public class HoodieBloomIndexCheckFunction implements
|
|||||||
|
|
||||||
// handle case, where we ran out of input, close pending work, update return val
|
// handle case, where we ran out of input, close pending work, update return val
|
||||||
if (!inputItr.hasNext()) {
|
if (!inputItr.hasNext()) {
|
||||||
Path filePath = new Path(basePath + "/" + currentParitionPath + "/" + currentFile);
|
Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + currentFile);
|
||||||
logger.info(
|
logger.info(
|
||||||
"#2 After bloom filter, the candidate row keys is reduced to " + candidateRecordKeys
|
"#2 After bloom filter, the candidate row keys is reduced to " + candidateRecordKeys
|
||||||
.size() + " for " + filePath);
|
.size() + " for " + filePath);
|
||||||
|
|||||||
@@ -143,10 +143,10 @@ public class TestHoodieCompactionStrategy {
|
|||||||
|
|
||||||
private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
|
private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
|
||||||
Map<Long, List<Long>> sizesMap) {
|
Map<Long, List<Long>> sizesMap) {
|
||||||
Map<Long, String> keyToParitionMap = sizesMap.entrySet().stream().map(e ->
|
Map<Long, String> keyToPartitionMap = sizesMap.entrySet().stream().map(e ->
|
||||||
Pair.of(e.getKey(), partitionPaths[new Random().nextInt(partitionPaths.length - 1)]))
|
Pair.of(e.getKey(), partitionPaths[new Random().nextInt(partitionPaths.length - 1)]))
|
||||||
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||||
return createCompactionOperations(config, sizesMap, keyToParitionMap);
|
return createCompactionOperations(config, sizesMap, keyToPartitionMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
|
private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ import com.google.common.base.Objects;
|
|||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Location of a HoodieRecord within the parition it belongs to. Ultimately, this points to an
|
* Location of a HoodieRecord within the partition it belongs to. Ultimately, this points to an
|
||||||
* actual file on disk
|
* actual file on disk
|
||||||
*/
|
*/
|
||||||
public class HoodieRecordLocation implements Serializable {
|
public class HoodieRecordLocation implements Serializable {
|
||||||
|
|||||||
@@ -410,12 +410,12 @@ public class SchemaUtil {
|
|||||||
.append(getPartitionKeyType(hiveSchema, partitionKey)).toString());
|
.append(getPartitionKeyType(hiveSchema, partitionKey)).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
String paritionsStr = partitionFields.stream().collect(Collectors.joining(","));
|
String partitionsStr = partitionFields.stream().collect(Collectors.joining(","));
|
||||||
StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
|
StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS ");
|
||||||
sb = sb.append(config.databaseName).append(".").append(config.tableName);
|
sb = sb.append(config.databaseName).append(".").append(config.tableName);
|
||||||
sb = sb.append("( ").append(columns).append(")");
|
sb = sb.append("( ").append(columns).append(")");
|
||||||
if (!config.partitionFields.isEmpty()) {
|
if (!config.partitionFields.isEmpty()) {
|
||||||
sb = sb.append(" PARTITIONED BY (").append(paritionsStr).append(")");
|
sb = sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
|
||||||
}
|
}
|
||||||
sb = sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
|
sb = sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
|
||||||
sb = sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
|
sb = sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
|
||||||
|
|||||||
@@ -38,10 +38,10 @@ class DataSourceDefaultsTest extends AssertionsForJUnit {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private def getKeyConfig(recordKeyFieldName: String, paritionPathField: String): TypedProperties = {
|
private def getKeyConfig(recordKeyFieldName: String, partitionPathField: String): TypedProperties = {
|
||||||
val props = new TypedProperties()
|
val props = new TypedProperties()
|
||||||
props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName)
|
props.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, recordKeyFieldName)
|
||||||
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, paritionPathField)
|
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionPathField)
|
||||||
props
|
props
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user