diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java index 3789bff37..3f7909639 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java @@ -41,11 +41,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; @@ -284,13 +286,10 @@ public class HBaseIndex extends HoodieIndex { hbaseConnection = getHBaseConnection(); } } - HTable hTable = null; - try { - hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName)); + try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) { while (statusIterator.hasNext()) { WriteStatus writeStatus = statusIterator.next(); - List puts = new ArrayList<>(); - List deletes = new ArrayList<>(); + List mutations = new ArrayList<>(); try { for (HoodieRecord rec : writeStatus.getWrittenRecords()) { if (!writeStatus.isErrored(rec.getKey())) { @@ -304,20 +303,20 @@ public class HBaseIndex extends HoodieIndex { put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime())); put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId())); put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath())); - puts.add(put); + mutations.add(put); } else { // Delete existing index for a deleted record Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey())); - deletes.add(delete); + mutations.add(delete); } } - if (puts.size() + deletes.size() < multiPutBatchSize) { + if (mutations.size() < multiPutBatchSize) { continue; } - doPutsAndDeletes(hTable, puts, deletes); + doMutations(mutator, mutations); } // process remaining puts and deletes, if any - doPutsAndDeletes(hTable, puts, deletes); + doMutations(mutator, mutations); } catch (Exception e) { Exception we = new Exception("Error updating index for " + writeStatus, e); LOG.error(we); @@ -327,32 +326,21 @@ public class HBaseIndex extends HoodieIndex { } } catch (IOException e) { throw new HoodieIndexException("Failed to Update Index locations because of exception with HBase Client", e); - } finally { - if (hTable != null) { - try { - hTable.close(); - } catch (IOException e) { - // Ignore - } - } } return writeStatusList.iterator(); }; } /** - * Helper method to facilitate performing puts and deletes in Hbase. + * Helper method to facilitate performing mutations (including puts and deletes) in Hbase. */ - private void doPutsAndDeletes(HTable hTable, List puts, List deletes) throws IOException { - if (puts.size() > 0) { - hTable.put(puts); + private void doMutations(BufferedMutator mutator, List mutations) throws IOException { + if (mutations.isEmpty()) { + return; } - if (deletes.size() > 0) { - hTable.delete(deletes); - } - hTable.flushCommits(); - puts.clear(); - deletes.clear(); + mutator.mutate(mutations); + mutator.flush(); + mutations.clear(); sleepForTime(SLEEP_TIME_MILLISECONDS); }