[HUDI-595] code cleanup, refactoring code out of PR# 1159 (#1302)
This commit is contained in:
@@ -205,9 +205,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
}
|
||||
}
|
||||
List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
|
||||
HTable hTable = null;
|
||||
try {
|
||||
hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
|
||||
try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName))) {
|
||||
List<Get> statements = new ArrayList<>();
|
||||
List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
|
||||
// Do the tagging.
|
||||
@@ -250,15 +248,6 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIndexException("Failed to Tag indexed locations because of exception with HBase Client", e);
|
||||
} finally {
|
||||
if (hTable != null) {
|
||||
try {
|
||||
hTable.close();
|
||||
} catch (IOException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return taggedRecords.iterator();
|
||||
};
|
||||
@@ -444,16 +433,14 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
|
||||
*/
|
||||
public int getBatchSize(int numRegionServersForTable, int maxQpsPerRegionServer, int numTasksDuringPut,
|
||||
int maxExecutors, int sleepTimeMs, float qpsFraction) {
|
||||
int numRSAlive = numRegionServersForTable;
|
||||
int maxReqPerSec = (int) (qpsFraction * numRSAlive * maxQpsPerRegionServer);
|
||||
int numTasks = numTasksDuringPut;
|
||||
int maxParallelPuts = Math.max(1, Math.min(numTasks, maxExecutors));
|
||||
int maxReqPerSec = (int) (qpsFraction * numRegionServersForTable * maxQpsPerRegionServer);
|
||||
int maxParallelPuts = Math.max(1, Math.min(numTasksDuringPut, maxExecutors));
|
||||
int maxReqsSentPerTaskPerSec = MILLI_SECONDS_IN_A_SECOND / sleepTimeMs;
|
||||
int multiPutBatchSize = Math.max(1, maxReqPerSec / (maxParallelPuts * maxReqsSentPerTaskPerSec));
|
||||
LOG.info("HbaseIndexThrottling: qpsFraction :" + qpsFraction);
|
||||
LOG.info("HbaseIndexThrottling: numRSAlive :" + numRSAlive);
|
||||
LOG.info("HbaseIndexThrottling: numRSAlive :" + numRegionServersForTable);
|
||||
LOG.info("HbaseIndexThrottling: maxReqPerSec :" + maxReqPerSec);
|
||||
LOG.info("HbaseIndexThrottling: numTasks :" + numTasks);
|
||||
LOG.info("HbaseIndexThrottling: numTasks :" + numTasksDuringPut);
|
||||
LOG.info("HbaseIndexThrottling: maxExecutors :" + maxExecutors);
|
||||
LOG.info("HbaseIndexThrottling: maxParallelPuts :" + maxParallelPuts);
|
||||
LOG.info("HbaseIndexThrottling: maxReqsSentPerTaskPerSec :" + maxReqsSentPerTaskPerSec);
|
||||
|
||||
@@ -147,9 +147,9 @@ public class HoodieCommitArchiveLog {
|
||||
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
|
||||
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants();
|
||||
Stream<HoodieInstant> instants = cleanAndRollbackTimeline.getInstants()
|
||||
.collect(Collectors.groupingBy(s -> s.getAction())).entrySet().stream().map(i -> {
|
||||
if (i.getValue().size() > maxCommitsToKeep) {
|
||||
return i.getValue().subList(0, i.getValue().size() - minCommitsToKeep);
|
||||
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream().map(hoodieInstants -> {
|
||||
if (hoodieInstants.size() > maxCommitsToKeep) {
|
||||
return hoodieInstants.subList(0, hoodieInstants.size() - minCommitsToKeep);
|
||||
} else {
|
||||
return new ArrayList<HoodieInstant>();
|
||||
}
|
||||
|
||||
@@ -62,10 +62,10 @@ public abstract class CompactionStrategy implements Serializable {
|
||||
public Map<String, Double> captureMetrics(HoodieWriteConfig writeConfig, Option<HoodieBaseFile> dataFile,
|
||||
String partitionPath, List<HoodieLogFile> logFiles) {
|
||||
Map<String, Double> metrics = Maps.newHashMap();
|
||||
Long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
|
||||
long defaultMaxParquetFileSize = writeConfig.getParquetMaxFileSize();
|
||||
// Total size of all the log files
|
||||
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(size -> size >= 0)
|
||||
.reduce((size1, size2) -> size1 + size2).orElse(0L);
|
||||
.reduce(Long::sum).orElse(0L);
|
||||
// Total read will be the base file + all the log files
|
||||
Long totalIORead =
|
||||
FSUtils.getSizeInMB((dataFile.isPresent() ? dataFile.get().getFileSize() : 0L) + totalLogFileSize);
|
||||
@@ -73,11 +73,11 @@ public abstract class CompactionStrategy implements Serializable {
|
||||
Long totalIOWrite =
|
||||
FSUtils.getSizeInMB(dataFile.isPresent() ? dataFile.get().getFileSize() : defaultMaxParquetFileSize);
|
||||
// Total IO will the the IO for read + write
|
||||
Long totalIO = totalIORead + totalIOWrite;
|
||||
long totalIO = totalIORead + totalIOWrite;
|
||||
// Save these metrics and we will use during the filter
|
||||
metrics.put(TOTAL_IO_READ_MB, totalIORead.doubleValue());
|
||||
metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite.doubleValue());
|
||||
metrics.put(TOTAL_IO_MB, totalIO.doubleValue());
|
||||
metrics.put(TOTAL_IO_MB, (double) totalIO);
|
||||
metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize.doubleValue());
|
||||
metrics.put(TOTAL_LOG_FILES, (double) logFiles.size());
|
||||
return metrics;
|
||||
|
||||
@@ -49,17 +49,14 @@ public class Metrics {
|
||||
}
|
||||
// reporter.start();
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
reporter.report();
|
||||
Closeables.close(reporter.getReporter(), true);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
try {
|
||||
reporter.report();
|
||||
Closeables.close(reporter.getReporter(), true);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
public static Metrics getInstance() {
|
||||
|
||||
@@ -68,6 +68,7 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@@ -106,7 +107,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
|
||||
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
|
||||
|
||||
/**
|
||||
/*
|
||||
* do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
|
||||
* in insert(), if the implementation diverges.)
|
||||
*/
|
||||
@@ -606,8 +607,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
String filePath2 = metaClient.getBasePath() + "/" + partition1 + "/" + fileName2;
|
||||
|
||||
List<String> deletePathPatterns1 = Arrays.asList(filePath1, filePath2);
|
||||
List<String> successDeleteFiles1 = Arrays.asList(filePath1);
|
||||
List<String> failedDeleteFiles1 = Arrays.asList(filePath2);
|
||||
List<String> successDeleteFiles1 = Collections.singletonList(filePath1);
|
||||
List<String> failedDeleteFiles1 = Collections.singletonList(filePath2);
|
||||
|
||||
// create partition1 clean stat.
|
||||
HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
|
||||
@@ -630,7 +631,8 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
|
||||
// map with relative path.
|
||||
Map<String, Tuple3> newExpected = new HashMap<>();
|
||||
newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Arrays.asList(fileName1), Arrays.asList(fileName2)));
|
||||
newExpected.put(partition1, new Tuple3<>(Arrays.asList(fileName1, fileName2), Collections.singletonList(fileName1),
|
||||
Collections.singletonList(fileName2)));
|
||||
newExpected.put(partition2, new Tuple3<>(deletePathPatterns2, successDeleteFiles2, failedDeleteFiles2));
|
||||
|
||||
HoodieCleanMetadata metadata =
|
||||
@@ -1079,19 +1081,18 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
});
|
||||
|
||||
// Test for progress (Did we clean some files ?)
|
||||
long numFilesUnderCompactionDeleted = hoodieCleanStats.stream().flatMap(cleanStat -> {
|
||||
return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns())
|
||||
.map(fileIdWithCommitTime -> {
|
||||
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
|
||||
Assert.assertTrue("Deleted instant time must be less than pending compaction",
|
||||
HoodieTimeline.compareTimestamps(
|
||||
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
|
||||
fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}).filter(x -> x).count();
|
||||
long numFilesUnderCompactionDeleted = hoodieCleanStats.stream()
|
||||
.flatMap(cleanStat -> convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns())
|
||||
.map(fileIdWithCommitTime -> {
|
||||
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
|
||||
Assert.assertTrue("Deleted instant time must be less than pending compaction",
|
||||
HoodieTimeline.compareTimestamps(
|
||||
fileIdToLatestInstantBeforeCompaction.get(fileIdWithCommitTime.getKey()),
|
||||
fileIdWithCommitTime.getValue(), HoodieTimeline.GREATER));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
})).filter(x -> x).count();
|
||||
long numDeleted =
|
||||
hoodieCleanStats.stream().mapToLong(cleanStat -> cleanStat.getDeletePathPatterns().size()).sum();
|
||||
// Tighter check for regression
|
||||
@@ -1123,7 +1124,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
private int getTotalTempFiles() throws IOException {
|
||||
RemoteIterator itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
|
||||
RemoteIterator<?> itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
|
||||
int count = 0;
|
||||
while (itr.hasNext()) {
|
||||
count++;
|
||||
|
||||
@@ -210,13 +210,10 @@ public class HoodieTestDataGenerator {
|
||||
Path commitFile =
|
||||
new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
|
||||
FileSystem fs = FSUtils.getFs(basePath, configuration);
|
||||
FSDataOutputStream os = fs.create(commitFile, true);
|
||||
HoodieCompactionPlan workload = new HoodieCompactionPlan();
|
||||
try {
|
||||
try (FSDataOutputStream os = fs.create(commitFile, true)) {
|
||||
HoodieCompactionPlan workload = new HoodieCompactionPlan();
|
||||
// Write empty commit metadata
|
||||
os.writeBytes(new String(AvroUtils.serializeCompactionPlan(workload).get(), StandardCharsets.UTF_8));
|
||||
} finally {
|
||||
os.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,13 +222,10 @@ public class HoodieTestDataGenerator {
|
||||
Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
|
||||
+ HoodieTimeline.makeSavePointFileName(commitTime));
|
||||
FileSystem fs = FSUtils.getFs(basePath, configuration);
|
||||
FSDataOutputStream os = fs.create(commitFile, true);
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
try {
|
||||
try (FSDataOutputStream os = fs.create(commitFile, true)) {
|
||||
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
|
||||
// Write empty commit metadata
|
||||
os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
} finally {
|
||||
os.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -42,6 +42,7 @@ import org.junit.Test;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@@ -246,13 +247,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
|
||||
|
||||
String filename0 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Arrays.asList(record1), schema, null, false);
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2016/04/01", Collections.singletonList(record1),
|
||||
schema, null, false);
|
||||
String filename1 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(), schema, null, false);
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Lists.newArrayList(),
|
||||
schema, null, false);
|
||||
String filename2 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record2), schema, null, false);
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record2),
|
||||
schema, null, false);
|
||||
String filename3 =
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false);
|
||||
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Collections.singletonList(record4),
|
||||
schema, null, false);
|
||||
|
||||
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||
@@ -265,21 +270,29 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
||||
JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
|
||||
|
||||
for (HoodieRecord record : taggedRecordRDD.collect()) {
|
||||
if (record.getRecordKey().equals("000")) {
|
||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0)));
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData());
|
||||
} else if (record.getRecordKey().equals("001")) {
|
||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2)));
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData());
|
||||
} else if (record.getRecordKey().equals("002")) {
|
||||
assertTrue(!record.isCurrentLocationKnown());
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData());
|
||||
} else if (record.getRecordKey().equals("003")) {
|
||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData());
|
||||
} else if (record.getRecordKey().equals("004")) {
|
||||
assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3)));
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData());
|
||||
switch (record.getRecordKey()) {
|
||||
case "000":
|
||||
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename0));
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData());
|
||||
break;
|
||||
case "001":
|
||||
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2));
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData());
|
||||
break;
|
||||
case "002":
|
||||
assertFalse(record.isCurrentLocationKnown());
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData());
|
||||
break;
|
||||
case "003":
|
||||
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData());
|
||||
break;
|
||||
case "004":
|
||||
assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
|
||||
assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData());
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown Key: " + record.getRecordKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user