1
0

[MINOR] fix typos (#2116)

This commit is contained in:
dugenkui
2020-09-26 20:40:33 +08:00
committed by GitHub
parent 1dd6635fbb
commit ae68b2b355
9 changed files with 66 additions and 46 deletions

View File

@@ -179,7 +179,7 @@ public class BootstrapCommand implements CommandMarker {
final List<Comparable[]> rows = new ArrayList<>(); final List<Comparable[]> rows = new ArrayList<>();
for (BootstrapFileMapping mapping : mappingList) { for (BootstrapFileMapping mapping : mappingList) {
rows.add(new Comparable[] {mapping.getPartitionPath(), mapping.getFileId(), rows.add(new Comparable[] {mapping.getPartitionPath(), mapping.getFileId(),
mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBoostrapFileStatus().getPath().getUri()}); mapping.getBootstrapBasePath(), mapping.getBootstrapPartitionPath(), mapping.getBootstrapFileStatus().getPath().getUri()});
} }
return rows; return rows;
} }

View File

@@ -559,13 +559,13 @@ public class TestCleaner extends HoodieClientTestBase {
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
if (enableBootstrapSourceClean) { if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus = HoodieFileStatus fstatus =
bootstrapMapping.get(p0).get(0).getBoostrapFileStatus(); bootstrapMapping.get(p0).get(0).getBootstrapFileStatus();
// This ensures full path is recorded in metadata. // This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri()); + " but did not contain " + fstatus.getPath().getUri());
assertFalse(Files.exists(Paths.get(bootstrapMapping.get( assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p0).get(0).getBoostrapFileStatus().getPath().getUri()))); p0).get(0).getBootstrapFileStatus().getPath().getUri())));
} }
cleanStat = getCleanStat(hoodieCleanStatsTwo, p1); cleanStat = getCleanStat(hoodieCleanStatsTwo, p1);
String file2P0C1 = partitionAndFileId002.get(p0); String file2P0C1 = partitionAndFileId002.get(p0);
@@ -579,13 +579,13 @@ public class TestCleaner extends HoodieClientTestBase {
: cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file"); : cleanStat.getSuccessDeleteBootstrapBaseFiles().size()), "Must clean at least 1 file");
if (enableBootstrapSourceClean) { if (enableBootstrapSourceClean) {
HoodieFileStatus fstatus = HoodieFileStatus fstatus =
bootstrapMapping.get(p1).get(0).getBoostrapFileStatus(); bootstrapMapping.get(p1).get(0).getBootstrapFileStatus();
// This ensures full path is recorded in metadata. // This ensures full path is recorded in metadata.
assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()), assertTrue(cleanStat.getSuccessDeleteBootstrapBaseFiles().contains(fstatus.getPath().getUri()),
"Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles() "Successful delete files were " + cleanStat.getSuccessDeleteBootstrapBaseFiles()
+ " but did not contain " + fstatus.getPath().getUri()); + " but did not contain " + fstatus.getPath().getUri());
assertFalse(Files.exists(Paths.get(bootstrapMapping.get( assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p1).get(0).getBoostrapFileStatus().getPath().getUri()))); p1).get(0).getBootstrapFileStatus().getPath().getUri())));
} }
// make next commit, with 2 updates to existing files, and 1 insert // make next commit, with 2 updates to existing files, and 1 insert
@@ -928,7 +928,7 @@ public class TestCleaner extends HoodieClientTestBase {
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
if (enableBootstrapSourceClean) { if (enableBootstrapSourceClean) {
assertFalse(Files.exists(Paths.get(bootstrapMapping.get( assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
p0).get(0).getBoostrapFileStatus().getPath().getUri()))); p0).get(0).getBootstrapFileStatus().getPath().getUri())));
} }
// No cleaning on partially written file, with no commit. // No cleaning on partially written file, with no commit.
@@ -968,7 +968,7 @@ public class TestCleaner extends HoodieClientTestBase {
for (Map.Entry<String, List<BootstrapFileMapping>> entry : bootstrapMapping.entrySet()) { for (Map.Entry<String, List<BootstrapFileMapping>> entry : bootstrapMapping.entrySet()) {
new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs(); new File(sourcePath.toString() + "/" + entry.getKey()).mkdirs();
assertTrue(new File(entry.getValue().get(0).getBoostrapFileStatus().getPath().getUri()).createNewFile()); assertTrue(new File(entry.getValue().get(0).getBootstrapFileStatus().getPath().getUri()).createNewFile());
} }
return bootstrapMapping; return bootstrapMapping;
} }

View File

@@ -391,7 +391,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
} }
/** /**
* Boostrap Index Writer to build bootstrap index. * Bootstrap Index Writer to build bootstrap index.
*/ */
public static class HFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter { public static class HFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter {
@@ -443,7 +443,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
bootstrapPartitionMetadata.setPartitionPath(partitionPath); bootstrapPartitionMetadata.setPartitionPath(partitionPath);
bootstrapPartitionMetadata.setFileIdToBootstrapFile( bootstrapPartitionMetadata.setFileIdToBootstrapFile(
bootstrapFileMappings.stream().map(m -> Pair.of(m.getFileId(), bootstrapFileMappings.stream().map(m -> Pair.of(m.getFileId(),
m.getBoostrapFileStatus())).collect(Collectors.toMap(Pair::getKey, Pair::getValue))); m.getBootstrapFileStatus())).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
Option<byte[]> bytes = TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata, HoodieBootstrapPartitionMetadata.class); Option<byte[]> bytes = TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata, HoodieBootstrapPartitionMetadata.class);
if (bytes.isPresent()) { if (bytes.isPresent()) {
indexByPartitionWriter indexByPartitionWriter
@@ -459,14 +459,14 @@ public class HFileBootstrapIndex extends BootstrapIndex {
/** /**
* Write next source file to hudi file-id. Entries are expected to be appended in hudi file-group id * Write next source file to hudi file-id. Entries are expected to be appended in hudi file-group id
* order. * order.
* @param mapping boostrap source file mapping. * @param mapping bootstrap source file mapping.
*/ */
private void writeNextSourceFileMapping(BootstrapFileMapping mapping) { private void writeNextSourceFileMapping(BootstrapFileMapping mapping) {
try { try {
HoodieBootstrapFilePartitionInfo srcFilePartitionInfo = new HoodieBootstrapFilePartitionInfo(); HoodieBootstrapFilePartitionInfo srcFilePartitionInfo = new HoodieBootstrapFilePartitionInfo();
srcFilePartitionInfo.setPartitionPath(mapping.getPartitionPath()); srcFilePartitionInfo.setPartitionPath(mapping.getPartitionPath());
srcFilePartitionInfo.setBootstrapPartitionPath(mapping.getBootstrapPartitionPath()); srcFilePartitionInfo.setBootstrapPartitionPath(mapping.getBootstrapPartitionPath());
srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBoostrapFileStatus()); srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBootstrapFileStatus());
KeyValue kv = new KeyValue(getFileGroupKey(mapping.getFileGroupId()).getBytes(), new byte[0], new byte[0], KeyValue kv = new KeyValue(getFileGroupKey(mapping.getFileGroupId()).getBytes(), new byte[0], new byte[0],
HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put,
TimelineMetadataUtils.serializeAvroMetadata(srcFilePartitionInfo, TimelineMetadataUtils.serializeAvroMetadata(srcFilePartitionInfo,

View File

@@ -29,17 +29,17 @@ public class BootstrapFileMapping implements Serializable, Comparable<BootstrapF
private final String bootstrapBasePath; private final String bootstrapBasePath;
private final String bootstrapPartitionPath; private final String bootstrapPartitionPath;
private final HoodieFileStatus boostrapFileStatus; private final HoodieFileStatus bootstrapFileStatus;
private final String partitionPath; private final String partitionPath;
private final String fileId; private final String fileId;
public BootstrapFileMapping(String bootstrapBasePath, String bootstrapPartitionPath, String partitionPath, public BootstrapFileMapping(String bootstrapBasePath, String bootstrapPartitionPath, String partitionPath,
HoodieFileStatus boostrapFileStatus, String fileId) { HoodieFileStatus bootstrapFileStatus, String fileId) {
this.bootstrapBasePath = bootstrapBasePath; this.bootstrapBasePath = bootstrapBasePath;
this.bootstrapPartitionPath = bootstrapPartitionPath; this.bootstrapPartitionPath = bootstrapPartitionPath;
this.partitionPath = partitionPath; this.partitionPath = partitionPath;
this.boostrapFileStatus = boostrapFileStatus; this.bootstrapFileStatus = bootstrapFileStatus;
this.fileId = fileId; this.fileId = fileId;
} }
@@ -48,7 +48,7 @@ public class BootstrapFileMapping implements Serializable, Comparable<BootstrapF
return "BootstrapFileMapping{" return "BootstrapFileMapping{"
+ "bootstrapBasePath='" + bootstrapBasePath + '\'' + "bootstrapBasePath='" + bootstrapBasePath + '\''
+ ", bootstrapPartitionPath='" + bootstrapPartitionPath + '\'' + ", bootstrapPartitionPath='" + bootstrapPartitionPath + '\''
+ ", boostrapFileStatus=" + boostrapFileStatus + ", bootstrapFileStatus=" + bootstrapFileStatus
+ ", partitionPath='" + partitionPath + '\'' + ", partitionPath='" + partitionPath + '\''
+ ", fileId='" + fileId + '\'' + ", fileId='" + fileId + '\''
+ '}'; + '}';
@@ -66,13 +66,13 @@ public class BootstrapFileMapping implements Serializable, Comparable<BootstrapF
return Objects.equals(bootstrapBasePath, mapping.bootstrapBasePath) return Objects.equals(bootstrapBasePath, mapping.bootstrapBasePath)
&& Objects.equals(bootstrapPartitionPath, mapping.bootstrapPartitionPath) && Objects.equals(bootstrapPartitionPath, mapping.bootstrapPartitionPath)
&& Objects.equals(partitionPath, mapping.partitionPath) && Objects.equals(partitionPath, mapping.partitionPath)
&& Objects.equals(boostrapFileStatus, mapping.boostrapFileStatus) && Objects.equals(bootstrapFileStatus, mapping.bootstrapFileStatus)
&& Objects.equals(fileId, mapping.fileId); && Objects.equals(fileId, mapping.fileId);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(bootstrapBasePath, bootstrapPartitionPath, partitionPath, boostrapFileStatus, fileId); return Objects.hash(bootstrapBasePath, bootstrapPartitionPath, partitionPath, bootstrapFileStatus, fileId);
} }
public String getBootstrapBasePath() { public String getBootstrapBasePath() {
@@ -87,8 +87,8 @@ public class BootstrapFileMapping implements Serializable, Comparable<BootstrapF
return partitionPath; return partitionPath;
} }
public HoodieFileStatus getBoostrapFileStatus() { public HoodieFileStatus getBootstrapFileStatus() {
return boostrapFileStatus; return bootstrapFileStatus;
} }
public String getFileId() { public String getFileId() {

View File

@@ -126,12 +126,12 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
if (!isPartitionAvailableInStore(partition)) { if (!isPartitionAvailableInStore(partition)) {
if (bootstrapIndex.useIndex()) { if (bootstrapIndex.useIndex()) {
try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) { try (BootstrapIndex.IndexReader reader = bootstrapIndex.createReader()) {
LOG.info("Boostrap Index available for partition " + partition); LOG.info("Bootstrap Index available for partition " + partition);
List<BootstrapFileMapping> sourceFileMappings = List<BootstrapFileMapping> sourceFileMappings =
reader.getSourceFileMappingForPartition(partition); reader.getSourceFileMappingForPartition(partition);
addBootstrapBaseFileMapping(sourceFileMappings.stream() addBootstrapBaseFileMapping(sourceFileMappings.stream()
.map(s -> new BootstrapBaseFileMapping(new HoodieFileGroupId(s.getPartitionPath(), .map(s -> new BootstrapBaseFileMapping(new HoodieFileGroupId(s.getPartitionPath(),
s.getFileId()), s.getBoostrapFileStatus()))); s.getFileId()), s.getBootstrapFileStatus())));
} }
} }
storePartitionView(partition, value); storePartitionView(partition, value);

View File

@@ -51,43 +51,63 @@ import java.util.function.Function;
*/ */
public class BoundedInMemoryQueue<I, O> implements Iterable<O> { public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
// interval used for polling records in the queue. /** Interval used for polling records in the queue. **/
public static final int RECORD_POLL_INTERVAL_SEC = 1; public static final int RECORD_POLL_INTERVAL_SEC = 1;
// rate used for sampling records to determine avg record size in bytes.
/** Rate used for sampling records to determine avg record size in bytes. **/
public static final int RECORD_SAMPLING_RATE = 64; public static final int RECORD_SAMPLING_RATE = 64;
// maximum records that will be cached
/** Maximum records that will be cached **/
private static final int RECORD_CACHING_LIMIT = 128 * 1024; private static final int RECORD_CACHING_LIMIT = 128 * 1024;
private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class); private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class);
// It indicates number of records to cache. We will be using sampled record's average size to
// determine how many /**
// records we should cache and will change (increase/decrease) permits accordingly. * It indicates number of records to cache. We will be using sampled record's average size to
* determine how many records we should cache and will change (increase/decrease) permits accordingly.
*/
public final Semaphore rateLimiter = new Semaphore(1); public final Semaphore rateLimiter = new Semaphore(1);
// used for sampling records with "RECORD_SAMPLING_RATE" frequency.
/** Used for sampling records with "RECORD_SAMPLING_RATE" frequency. **/
public final AtomicLong samplingRecordCounter = new AtomicLong(-1); public final AtomicLong samplingRecordCounter = new AtomicLong(-1);
// internal queue for records.
/** Internal queue for records. **/
private final LinkedBlockingQueue<Option<O>> queue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<Option<O>> queue = new LinkedBlockingQueue<>();
// maximum amount of memory to be used for queueing records.
/** Maximum amount of memory to be used for queueing records. **/
private final long memoryLimit; private final long memoryLimit;
// it holds the root cause of the exception in case either queueing records (consuming from
// inputIterator) fails or /**
// thread reading records from queue fails. * it holds the root cause of the exception in case either queueing records
* (consuming from inputIterator) fails or thread reading records from queue fails.
*/
private final AtomicReference<Exception> hasFailed = new AtomicReference<>(null); private final AtomicReference<Exception> hasFailed = new AtomicReference<>(null);
// used for indicating that all the records from queue are read successfully.
/** Used for indicating that all the records from queue are read successfully. **/
private final AtomicBoolean isReadDone = new AtomicBoolean(false); private final AtomicBoolean isReadDone = new AtomicBoolean(false);
// used for indicating that all records have been enqueued
/** used for indicating that all records have been enqueued. **/
private final AtomicBoolean isWriteDone = new AtomicBoolean(false); private final AtomicBoolean isWriteDone = new AtomicBoolean(false);
// Function to transform the input payload to the expected output payload
/** Function to transform the input payload to the expected output payload. **/
private final Function<I, O> transformFunction; private final Function<I, O> transformFunction;
// Payload Size Estimator
/** Payload Size Estimator. **/
private final SizeEstimator<O> payloadSizeEstimator; private final SizeEstimator<O> payloadSizeEstimator;
// Singleton (w.r.t this instance) Iterator for this queue
/** Singleton (w.r.t this instance) Iterator for this queue. **/
private final QueueIterator iterator; private final QueueIterator iterator;
// indicates rate limit (number of records to cache). it is updated whenever there is a change
// in avg record size. /**
* indicates rate limit (number of records to cache). it is updated
* whenever there is a change in avg record size.
*/
public int currentRateLimit = 1; public int currentRateLimit = 1;
// indicates avg record size in bytes. It is updated whenever a new record is sampled.
/** Indicates avg record size in bytes. It is updated whenever a new record is sampled. **/
public long avgRecordSizeInBytes = 0; public long avgRecordSizeInBytes = 0;
// indicates number of samples collected so far.
/** Indicates number of samples collected so far. **/
private long numSamples = 0; private long numSamples = 0;
/** /**

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.common.util.queue; package org.apache.hudi.common.util.queue;
/** /**
* Producer for BoundedInMemoryQueue. Memory Bounded Buffer supports multiple producers single consumer pattern. * Producer for {@link BoundedInMemoryQueue}. Memory Bounded Buffer supports multiple producers single consumer pattern.
* *
* @param <I> Input type for buffer items produced * @param <I> Input type for buffer items produced
*/ */

View File

@@ -156,7 +156,7 @@ public class TestBootstrapIndex extends HoodieCommonTestHarness {
assertEquals(x.getFileId(), res.getFileId()); assertEquals(x.getFileId(), res.getFileId());
assertEquals(x.getPartitionPath(), res.getPartitionPath()); assertEquals(x.getPartitionPath(), res.getPartitionPath());
assertEquals(BOOTSTRAP_BASE_PATH, res.getBootstrapBasePath()); assertEquals(BOOTSTRAP_BASE_PATH, res.getBootstrapBasePath());
assertEquals(x.getBoostrapFileStatus(), res.getBoostrapFileStatus()); assertEquals(x.getBootstrapFileStatus(), res.getBootstrapFileStatus());
assertEquals(x.getBootstrapPartitionPath(), res.getBootstrapPartitionPath()); assertEquals(x.getBootstrapPartitionPath(), res.getBootstrapPartitionPath());
}); });
}); });

View File

@@ -309,7 +309,7 @@ public class TestBootstrap extends HoodieClientTestBase {
} }
@Test @Test
public void testFullBoostrapOnlyCOW() throws Exception { public void testFullBootstrapOnlyCOW() throws Exception {
testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE); testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE);
} }
@@ -319,7 +319,7 @@ public class TestBootstrap extends HoodieClientTestBase {
} }
@Test @Test
public void testMetaAndFullBoostrapCOW() throws Exception { public void testMetaAndFullBootstrapCOW() throws Exception {
testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE); testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE);
} }