1
0

Update deprecated HBase API

This commit is contained in:
Wenning Ding
2020-01-03 12:36:14 -08:00
committed by n3nash
parent 9884972a3a
commit aba83876e7

View File

@@ -41,11 +41,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
@@ -284,13 +286,10 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
hbaseConnection = getHBaseConnection(); hbaseConnection = getHBaseConnection();
} }
} }
HTable hTable = null; try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
try {
hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
while (statusIterator.hasNext()) { while (statusIterator.hasNext()) {
WriteStatus writeStatus = statusIterator.next(); WriteStatus writeStatus = statusIterator.next();
List<Put> puts = new ArrayList<>(); List<Mutation> mutations = new ArrayList<>();
List<Delete> deletes = new ArrayList<>();
try { try {
for (HoodieRecord rec : writeStatus.getWrittenRecords()) { for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
if (!writeStatus.isErrored(rec.getKey())) { if (!writeStatus.isErrored(rec.getKey())) {
@@ -304,20 +303,20 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime())); put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime()));
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId())); put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId()));
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath())); put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath()));
puts.add(put); mutations.add(put);
} else { } else {
// Delete existing index for a deleted record // Delete existing index for a deleted record
Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey())); Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey()));
deletes.add(delete); mutations.add(delete);
} }
} }
if (puts.size() + deletes.size() < multiPutBatchSize) { if (mutations.size() < multiPutBatchSize) {
continue; continue;
} }
doPutsAndDeletes(hTable, puts, deletes); doMutations(mutator, mutations);
} }
// process remaining puts and deletes, if any // process remaining puts and deletes, if any
doPutsAndDeletes(hTable, puts, deletes); doMutations(mutator, mutations);
} catch (Exception e) { } catch (Exception e) {
Exception we = new Exception("Error updating index for " + writeStatus, e); Exception we = new Exception("Error updating index for " + writeStatus, e);
LOG.error(we); LOG.error(we);
@@ -327,32 +326,21 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
} }
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIndexException("Failed to Update Index locations because of exception with HBase Client", e); throw new HoodieIndexException("Failed to Update Index locations because of exception with HBase Client", e);
} finally {
if (hTable != null) {
try {
hTable.close();
} catch (IOException e) {
// Ignore
}
}
} }
return writeStatusList.iterator(); return writeStatusList.iterator();
}; };
} }
/** /**
* Helper method to facilitate performing puts and deletes in Hbase. * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
*/ */
private void doPutsAndDeletes(HTable hTable, List<Put> puts, List<Delete> deletes) throws IOException { private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
if (puts.size() > 0) { if (mutations.isEmpty()) {
hTable.put(puts); return;
} }
if (deletes.size() > 0) { mutator.mutate(mutations);
hTable.delete(deletes); mutator.flush();
} mutations.clear();
hTable.flushCommits();
puts.clear();
deletes.clear();
sleepForTime(SLEEP_TIME_MILLISECONDS); sleepForTime(SLEEP_TIME_MILLISECONDS);
} }