[MINOR] Fix assigning to configuration more times (#1291)
This commit is contained in:
@@ -116,7 +116,7 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
private HoodieHBaseIndexConfig getConfigWithResourceAllocator(Option<String> resourceAllocatorClass) {
|
private HoodieHBaseIndexConfig getConfigWithResourceAllocator(Option<String> resourceAllocatorClass) {
|
||||||
HoodieHBaseIndexConfig.Builder builder = new HoodieHBaseIndexConfig.Builder()
|
HoodieHBaseIndexConfig.Builder builder = new HoodieHBaseIndexConfig.Builder()
|
||||||
.hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
|
.hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
|
||||||
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName).hbaseIndexGetBatchSize(100);
|
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName).hbaseIndexGetBatchSize(100);
|
||||||
if (resourceAllocatorClass.isPresent()) {
|
if (resourceAllocatorClass.isPresent()) {
|
||||||
builder.withQPSResourceAllocatorType(resourceAllocatorClass.get());
|
builder.withQPSResourceAllocatorType(resourceAllocatorClass.get());
|
||||||
|
|||||||
@@ -418,7 +418,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
.forTable("test-trip-table")
|
.forTable("test-trip-table")
|
||||||
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
|
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.HBASE)
|
||||||
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder()
|
.withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder()
|
||||||
.hbaseZkPort(Integer.valueOf(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
|
.hbaseZkPort(Integer.parseInt(hbaseConfig.get("hbase.zookeeper.property.clientPort")))
|
||||||
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
|
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(tableName)
|
||||||
.hbaseIndexGetBatchSize(100).build())
|
.hbaseIndexGetBatchSize(100).build())
|
||||||
.build());
|
.build());
|
||||||
|
|||||||
@@ -188,15 +188,11 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
|||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Configuration addRequiredProjectionFields(Configuration configuration) {
|
private static void addRequiredProjectionFields(Configuration configuration) {
|
||||||
// Need this to do merge records in HoodieRealtimeRecordReader
|
// Need this to do merge records in HoodieRealtimeRecordReader
|
||||||
configuration =
|
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
|
||||||
addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HOODIE_RECORD_KEY_COL_POS);
|
addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS);
|
||||||
configuration =
|
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
|
||||||
addProjectionField(configuration, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HOODIE_COMMIT_TIME_COL_POS);
|
|
||||||
configuration =
|
|
||||||
addProjectionField(configuration, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HOODIE_PARTITION_PATH_COL_POS);
|
|
||||||
return configuration;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -205,7 +201,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
|||||||
* e.g. ",2,0,3" and will cause an error. Actually this method is a temporary solution because the real bug is from
|
* e.g. ",2,0,3" and will cause an error. Actually this method is a temporary solution because the real bug is from
|
||||||
* Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
|
* Hive. Hive has fixed this bug after 3.0.0, but the version before that would still face this problem. (HIVE-22438)
|
||||||
*/
|
*/
|
||||||
private static Configuration cleanProjectionColumnIds(Configuration conf) {
|
private static void cleanProjectionColumnIds(Configuration conf) {
|
||||||
String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
|
String columnIds = conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
|
||||||
if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
|
if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
|
||||||
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
|
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
|
||||||
@@ -213,23 +209,22 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
|||||||
LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
|
LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return conf;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf job,
|
public RecordReader<NullWritable, ArrayWritable> getRecordReader(final InputSplit split, final JobConf jobConf,
|
||||||
final Reporter reporter) throws IOException {
|
final Reporter reporter) throws IOException {
|
||||||
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
|
// Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the
|
||||||
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
|
// same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the
|
||||||
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
|
// risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible
|
||||||
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
|
// latency incurred here due to the synchronization since get record reader is called once per spilt before the
|
||||||
// actual heavy lifting of reading the parquet files happen.
|
// actual heavy lifting of reading the parquet files happen.
|
||||||
if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
|
if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
|
||||||
synchronized (job) {
|
synchronized (jobConf) {
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
"Before adding Hoodie columns, Projections :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
||||||
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
||||||
if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
|
if (jobConf.get(HOODIE_READ_COLUMNS_PROP) == null) {
|
||||||
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
|
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
|
||||||
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
|
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
|
||||||
// hoodie additional projection columns are reset after calling setConf and only natural projections
|
// hoodie additional projection columns are reset after calling setConf and only natural projections
|
||||||
@@ -237,21 +232,23 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
|
|||||||
// For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
|
// For e:g _hoodie_record_key would be missing and merge step would throw exceptions.
|
||||||
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
|
// TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction
|
||||||
// time.
|
// time.
|
||||||
this.conf = cleanProjectionColumnIds(job);
|
cleanProjectionColumnIds(jobConf);
|
||||||
this.conf = addRequiredProjectionFields(job);
|
addRequiredProjectionFields(jobConf);
|
||||||
|
|
||||||
|
this.conf = jobConf;
|
||||||
this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
|
this.conf.set(HOODIE_READ_COLUMNS_PROP, "true");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
|
||||||
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
+ ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
|
||||||
// sanity check
|
// sanity check
|
||||||
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
|
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
|
||||||
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split);
|
"HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split);
|
||||||
|
|
||||||
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, job,
|
return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf,
|
||||||
super.getRecordReader(split, job, reporter));
|
super.getRecordReader(split, jobConf, reporter));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user