1
0

[HUDI-355] Refactor hudi-common based on new comment and code style rules (#1049)

[HUDI-355] Refactor hudi-common based on new comment and code style rules
This commit is contained in:
vinoyang
2019-12-04 12:49:14 +08:00
committed by Balaji Varadarajan
parent 98ab33bb6e
commit 84602c8882
149 changed files with 637 additions and 380 deletions

View File

@@ -37,7 +37,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* Converts Json record to Avro Generic Record * Converts Json record to Avro Generic Record.
*/ */
public class MercifulJsonConverter { public class MercifulJsonConverter {
@@ -61,21 +61,21 @@ public class MercifulJsonConverter {
} }
/** /**
* Uses a default objectMapper to deserialize a json string * Uses a default objectMapper to deserialize a json string.
*/ */
public MercifulJsonConverter() { public MercifulJsonConverter() {
this(new ObjectMapper()); this(new ObjectMapper());
} }
/** /**
* Allows a configured ObjectMapper to be passed for converting json records to avro record * Allows a configured ObjectMapper to be passed for converting json records to avro record.
*/ */
public MercifulJsonConverter(ObjectMapper mapper) { public MercifulJsonConverter(ObjectMapper mapper) {
this.mapper = mapper; this.mapper = mapper;
} }
/** /**
* Converts json to Avro generic record * Converts json to Avro generic record.
* *
* @param json Json record * @param json Json record
* @param schema Schema * @param schema Schema
@@ -133,7 +133,7 @@ public class MercifulJsonConverter {
} }
/** /**
* Base Class for converting json to avro fields * Base Class for converting json to avro fields.
*/ */
private abstract static class JsonToAvroFieldProcessor implements Serializable { private abstract static class JsonToAvroFieldProcessor implements Serializable {
@@ -311,7 +311,7 @@ public class MercifulJsonConverter {
} }
/** /**
* Exception Class for any schema conversion issue * Exception Class for any schema conversion issue.
*/ */
public static class HoodieJsonToAvroConversionException extends HoodieException { public static class HoodieJsonToAvroConversionException extends HoodieException {

View File

@@ -26,7 +26,7 @@ import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
* Collects stats about a single partition clean operation * Collects stats about a single partition clean operation.
*/ */
public class HoodieCleanStat implements Serializable { public class HoodieCleanStat implements Serializable {
@@ -80,6 +80,9 @@ public class HoodieCleanStat implements Serializable {
return new Builder(); return new Builder();
} }
/**
* A builder used to build {@link HoodieCleanStat}.
*/
public static class Builder { public static class Builder {
private HoodieCleaningPolicy policy; private HoodieCleaningPolicy policy;

View File

@@ -36,6 +36,9 @@ import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
/**
* Hoodie json payload.
*/
public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload> { public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload> {
private byte[] jsonDataCompressed; private byte[] jsonDataCompressed;

View File

@@ -26,7 +26,7 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Collects stats about a single partition clean operation * Collects stats about a single partition clean operation.
*/ */
public class HoodieRollbackStat implements Serializable { public class HoodieRollbackStat implements Serializable {
@@ -66,6 +66,9 @@ public class HoodieRollbackStat implements Serializable {
return new Builder(); return new Builder();
} }
/**
* A builder used to build {@link HoodieRollbackStat}.
*/
public static class Builder { public static class Builder {
private List<String> successDeleteFiles; private List<String> successDeleteFiles;

View File

@@ -25,6 +25,9 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
/**
* A wrapped configuration which can be serialized.
*/
public class SerializableConfiguration implements Serializable { public class SerializableConfiguration implements Serializable {
private transient Configuration configuration; private transient Configuration configuration;

View File

@@ -18,6 +18,9 @@
package org.apache.hudi.common.model; package org.apache.hudi.common.model;
/**
* The supported action types.
*/
public enum ActionType { public enum ActionType {
commit, savepoint, compaction, clean, rollback commit, savepoint, compaction, clean, rollback
} }

View File

@@ -34,8 +34,7 @@ import java.util.stream.Collectors;
/** /**
* Encapsulates all the needed information about a compaction and make a decision whether this compaction is effective * Encapsulates all the needed information about a compaction and make a decision whether this compaction is effective
* or not * or not.
*
*/ */
public class CompactionOperation implements Serializable { public class CompactionOperation implements Serializable {
@@ -118,7 +117,7 @@ public class CompactionOperation implements Serializable {
} }
/** /**
* Convert Avro generated Compaction operation to POJO for Spark RDD operation * Convert Avro generated Compaction operation to POJO for Spark RDD operation.
* *
* @param operation Hoodie Compaction Operation * @param operation Hoodie Compaction Operation
* @return * @return

View File

@@ -27,22 +27,22 @@ import java.util.stream.Stream;
/** /**
* Within a file group, a slice is a combination of data file written at a commit time and list of log files, containing * Within a file group, a slice is a combination of data file written at a commit time and list of log files, containing
* changes to the data file from that commit time * changes to the data file from that commit time.
*/ */
public class FileSlice implements Serializable { public class FileSlice implements Serializable {
/** /**
* File Group Id of the Slice * File Group Id of the Slice.
*/ */
private HoodieFileGroupId fileGroupId; private HoodieFileGroupId fileGroupId;
/** /**
* Point in the timeline, at which the slice was created * Point in the timeline, at which the slice was created.
*/ */
private String baseInstantTime; private String baseInstantTime;
/** /**
* data file, with the compacted data, for this slice * data file, with the compacted data, for this slice.
*/ */
private HoodieDataFile dataFile; private HoodieDataFile dataFile;

View File

@@ -21,6 +21,9 @@ package org.apache.hudi.common.model;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
/**
* The hoodie archived log file.
*/
public class HoodieArchivedLogFile extends HoodieLogFile { public class HoodieArchivedLogFile extends HoodieLogFile {
public static final String ARCHIVE_EXTENSION = ".archive"; public static final String ARCHIVE_EXTENSION = ".archive";

View File

@@ -18,6 +18,9 @@
package org.apache.hudi.common.model; package org.apache.hudi.common.model;
/**
* Hoodie cleaning policies.
*/
public enum HoodieCleaningPolicy { public enum HoodieCleaningPolicy {
KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_COMMITS KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_COMMITS
} }

View File

@@ -26,6 +26,9 @@ import org.apache.hadoop.fs.Path;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
/**
* Hoodie data file.
*/
public class HoodieDataFile implements Serializable { public class HoodieDataFile implements Serializable {
private transient FileStatus fileStatus; private transient FileStatus fileStatus;

View File

@@ -18,6 +18,9 @@
package org.apache.hudi.common.model; package org.apache.hudi.common.model;
/**
* Hoodie file format.
*/
public enum HoodieFileFormat { public enum HoodieFileFormat {
PARQUET(".parquet"), HOODIE_LOG(".log"); PARQUET(".parquet"), HOODIE_LOG(".log");

View File

@@ -30,7 +30,7 @@ import java.util.TreeMap;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* A set of data/base files + set of log files, that make up an unit for all operations * A set of data/base files + set of log files, that make up an unit for all operations.
*/ */
public class HoodieFileGroup implements Serializable { public class HoodieFileGroup implements Serializable {
@@ -39,7 +39,7 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* file group id * file group id.
*/ */
private final HoodieFileGroupId fileGroupId; private final HoodieFileGroupId fileGroupId;
@@ -49,12 +49,12 @@ public class HoodieFileGroup implements Serializable {
private final TreeMap<String, FileSlice> fileSlices; private final TreeMap<String, FileSlice> fileSlices;
/** /**
* Timeline, based on which all getter work * Timeline, based on which all getter work.
*/ */
private final HoodieTimeline timeline; private final HoodieTimeline timeline;
/** /**
* The last completed instant, that acts as a high watermark for all getters * The last completed instant, that acts as a high watermark for all getters.
*/ */
private final Option<HoodieInstant> lastInstant; private final Option<HoodieInstant> lastInstant;
@@ -71,7 +71,7 @@ public class HoodieFileGroup implements Serializable {
/** /**
* Potentially add a new file-slice by adding base-instant time A file-slice without any data-file and log-files can * Potentially add a new file-slice by adding base-instant time A file-slice without any data-file and log-files can
* exist (if a compaction just got requested) * exist (if a compaction just got requested).
*/ */
public void addNewFileSliceAtInstant(String baseInstantTime) { public void addNewFileSliceAtInstant(String baseInstantTime) {
if (!fileSlices.containsKey(baseInstantTime)) { if (!fileSlices.containsKey(baseInstantTime)) {
@@ -80,7 +80,7 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* Add a new datafile into the file group * Add a new datafile into the file group.
*/ */
public void addDataFile(HoodieDataFile dataFile) { public void addDataFile(HoodieDataFile dataFile) {
if (!fileSlices.containsKey(dataFile.getCommitTime())) { if (!fileSlices.containsKey(dataFile.getCommitTime())) {
@@ -90,7 +90,7 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* Add a new log file into the group * Add a new log file into the group.
*/ */
public void addLogFile(HoodieLogFile logFile) { public void addLogFile(HoodieLogFile logFile) {
if (!fileSlices.containsKey(logFile.getBaseCommitTime())) { if (!fileSlices.containsKey(logFile.getBaseCommitTime())) {
@@ -109,7 +109,7 @@ public class HoodieFileGroup implements Serializable {
/** /**
* A FileSlice is considered committed, if one of the following is true - There is a committed data file - There are * A FileSlice is considered committed, if one of the following is true - There is a committed data file - There are
* some log files, that are based off a commit or delta commit * some log files, that are based off a commit or delta commit.
*/ */
private boolean isFileSliceCommitted(FileSlice slice) { private boolean isFileSliceCommitted(FileSlice slice) {
String maxCommitTime = lastInstant.get().getTimestamp(); String maxCommitTime = lastInstant.get().getTimestamp();
@@ -119,14 +119,14 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* Get all the the file slices including in-flight ones as seen in underlying file-system * Get all the the file slices including in-flight ones as seen in underlying file-system.
*/ */
public Stream<FileSlice> getAllFileSlicesIncludingInflight() { public Stream<FileSlice> getAllFileSlicesIncludingInflight() {
return fileSlices.entrySet().stream().map(Map.Entry::getValue); return fileSlices.entrySet().stream().map(Map.Entry::getValue);
} }
/** /**
* Get latest file slices including in-flight ones * Get latest file slices including in-flight ones.
*/ */
public Option<FileSlice> getLatestFileSlicesIncludingInflight() { public Option<FileSlice> getLatestFileSlicesIncludingInflight() {
return Option.fromJavaOptional(getAllFileSlicesIncludingInflight().findFirst()); return Option.fromJavaOptional(getAllFileSlicesIncludingInflight().findFirst());
@@ -143,7 +143,7 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* Gets the latest slice - this can contain either * Gets the latest slice - this can contain either.
* <p> * <p>
* - just the log files without data file - (or) data file with 0 or more log files * - just the log files without data file - (or) data file with 0 or more log files
*/ */
@@ -153,14 +153,14 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* Gets the latest data file * Gets the latest data file.
*/ */
public Option<HoodieDataFile> getLatestDataFile() { public Option<HoodieDataFile> getLatestDataFile() {
return Option.fromJavaOptional(getAllDataFiles().findFirst()); return Option.fromJavaOptional(getAllDataFiles().findFirst());
} }
/** /**
* Obtain the latest file slice, upto a commitTime i.e <= maxCommitTime * Obtain the latest file slice, upto a commitTime i.e <= maxCommitTime.
*/ */
public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxCommitTime) { public Option<FileSlice> getLatestFileSliceBeforeOrOn(String maxCommitTime) {
return Option.fromJavaOptional(getAllFileSlices().filter(slice -> HoodieTimeline return Option.fromJavaOptional(getAllFileSlices().filter(slice -> HoodieTimeline
@@ -168,7 +168,7 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* Obtain the latest file slice, upto a commitTime i.e < maxInstantTime * Obtain the latest file slice, upto a commitTime i.e < maxInstantTime.
* *
* @param maxInstantTime Max Instant Time * @param maxInstantTime Max Instant Time
* @return * @return
@@ -185,7 +185,7 @@ public class HoodieFileGroup implements Serializable {
} }
/** /**
* Stream of committed data files, sorted reverse commit time * Stream of committed data files, sorted reverse commit time.
*/ */
public Stream<HoodieDataFile> getAllDataFiles() { public Stream<HoodieDataFile> getAllDataFiles() {
return getAllFileSlices().filter(slice -> slice.getDataFile().isPresent()).map(slice -> slice.getDataFile().get()); return getAllFileSlices().filter(slice -> slice.getDataFile().isPresent()).map(slice -> slice.getDataFile().get());

View File

@@ -22,7 +22,7 @@ import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
/** /**
* Unique ID to identify a file-group in a data-set * Unique ID to identify a file-group in a data-set.
*/ */
public class HoodieFileGroupId implements Serializable { public class HoodieFileGroupId implements Serializable {

View File

@@ -26,7 +26,8 @@ import java.io.Serializable;
* HoodieKey consists of * HoodieKey consists of
* <p> * <p>
* - recordKey : a recordKey that acts as primary key for a record - partitionPath : path to the partition that contains * - recordKey : a recordKey that acts as primary key for a record - partitionPath : path to the partition that contains
* the record * the record.
* - partitionPath : the partition path of a record.
*/ */
public class HoodieKey implements Serializable { public class HoodieKey implements Serializable {

View File

@@ -125,7 +125,7 @@ public class HoodieLogFile implements Serializable {
} }
/** /**
* Comparator to order log-files * Comparator to order log-files.
*/ */
public static class LogFileComparator implements Comparator<HoodieLogFile>, Serializable { public static class LogFileComparator implements Comparator<HoodieLogFile>, Serializable {

View File

@@ -31,7 +31,7 @@ import java.io.IOException;
import java.util.Properties; import java.util.Properties;
/** /**
* The metadata that goes into the meta file in each partition * The metadata that goes into the meta file in each partition.
*/ */
public class HoodiePartitionMetadata { public class HoodiePartitionMetadata {
@@ -40,12 +40,12 @@ public class HoodiePartitionMetadata {
public static final String COMMIT_TIME_KEY = "commitTime"; public static final String COMMIT_TIME_KEY = "commitTime";
/** /**
* Contents of the metadata * Contents of the metadata.
*/ */
private final Properties props; private final Properties props;
/** /**
* Path to the partition, about which we have the metadata * Path to the partition, about which we have the metadata.
*/ */
private final Path partitionPath; private final Path partitionPath;
@@ -54,7 +54,7 @@ public class HoodiePartitionMetadata {
private static Logger log = LogManager.getLogger(HoodiePartitionMetadata.class); private static Logger log = LogManager.getLogger(HoodiePartitionMetadata.class);
/** /**
* Construct metadata from existing partition * Construct metadata from existing partition.
*/ */
public HoodiePartitionMetadata(FileSystem fs, Path partitionPath) { public HoodiePartitionMetadata(FileSystem fs, Path partitionPath) {
this.fs = fs; this.fs = fs;
@@ -118,7 +118,7 @@ public class HoodiePartitionMetadata {
} }
/** /**
* Read out the metadata for this partition * Read out the metadata for this partition.
*/ */
public void readFromFS() throws IOException { public void readFromFS() throws IOException {
FSDataInputStream is = null; FSDataInputStream is = null;

View File

@@ -27,7 +27,7 @@ import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
* A Single Record managed by Hoodie TODO - Make this generic * A Single Record managed by Hoodie.
*/ */
public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable { public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable {
@@ -42,12 +42,12 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
.add(RECORD_KEY_METADATA_FIELD).add(PARTITION_PATH_METADATA_FIELD).add(FILENAME_METADATA_FIELD).build(); .add(RECORD_KEY_METADATA_FIELD).add(PARTITION_PATH_METADATA_FIELD).add(FILENAME_METADATA_FIELD).build();
/** /**
* Identifies the record across the table * Identifies the record across the table.
*/ */
private HoodieKey key; private HoodieKey key;
/** /**
* Actual payload of the record * Actual payload of the record.
*/ */
private T data; private T data;
@@ -57,7 +57,7 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
private HoodieRecordLocation currentLocation; private HoodieRecordLocation currentLocation;
/** /**
* New location of record on storage, after written * New location of record on storage, after written.
*/ */
private HoodieRecordLocation newLocation; private HoodieRecordLocation newLocation;

View File

@@ -29,13 +29,13 @@ import java.util.Map;
/** /**
* Every Hoodie dataset has an implementation of the <code>HoodieRecordPayload</code> This abstracts out callbacks which * Every Hoodie dataset has an implementation of the <code>HoodieRecordPayload</code> This abstracts out callbacks which
* depend on record specific logic * depend on record specific logic.
*/ */
public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Serializable { public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Serializable {
/** /**
* When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to
* insert/upsert (if combining turned on in HoodieClientConfig) * insert/upsert (if combining turned on in HoodieClientConfig).
*/ */
T preCombine(T another); T preCombine(T another);

View File

@@ -24,6 +24,9 @@ import javax.annotation.Nullable;
import java.io.Serializable; import java.io.Serializable;
/**
* A model class defines hoodie rolling stat.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class HoodieRollingStat implements Serializable { public class HoodieRollingStat implements Serializable {

View File

@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
* This class holds statistics about files belonging to a dataset * This class holds statistics about files belonging to a dataset.
*/ */
public class HoodieRollingStatMetadata implements Serializable { public class HoodieRollingStatMetadata implements Serializable {

View File

@@ -35,12 +35,12 @@ public class HoodieWriteStat implements Serializable {
public static final String NULL_COMMIT = "null"; public static final String NULL_COMMIT = "null";
/** /**
* Id of the file being written * Id of the file being written.
*/ */
private String fileId; private String fileId;
/** /**
* Relative path to the file from the base path * Relative path to the file from the base path.
*/ */
private String path; private String path;
@@ -66,12 +66,12 @@ public class HoodieWriteStat implements Serializable {
private long numUpdateWrites; private long numUpdateWrites;
/** /**
* Total number of insert records or converted to updates (for small file handling) * Total number of insert records or converted to updates (for small file handling).
*/ */
private long numInserts; private long numInserts;
/** /**
* Total size of file written * Total size of file written.
*/ */
private long totalWriteBytes; private long totalWriteBytes;
@@ -91,54 +91,54 @@ public class HoodieWriteStat implements Serializable {
*/ */
/** /**
* Partition Path associated with this writeStat * Partition Path associated with this writeStat.
*/ */
@Nullable @Nullable
private String partitionPath; private String partitionPath;
/** /**
* Total number of log records that were compacted by a compaction operation * Total number of log records that were compacted by a compaction operation.
*/ */
@Nullable @Nullable
private long totalLogRecords; private long totalLogRecords;
/** /**
* Total number of log files compacted for a file slice with this base fileid * Total number of log files compacted for a file slice with this base fileid.
*/ */
@Nullable @Nullable
private long totalLogFilesCompacted; private long totalLogFilesCompacted;
/** /**
* Total size of all log files for a file slice with this base fileid * Total size of all log files for a file slice with this base fileid.
*/ */
@Nullable @Nullable
private long totalLogSizeCompacted; private long totalLogSizeCompacted;
/** /**
* Total number of records updated by a compaction operation * Total number of records updated by a compaction operation.
*/ */
@Nullable @Nullable
private long totalUpdatedRecordsCompacted; private long totalUpdatedRecordsCompacted;
/** /**
* Total number of log blocks seen in a compaction operation * Total number of log blocks seen in a compaction operation.
*/ */
@Nullable @Nullable
private long totalLogBlocks; private long totalLogBlocks;
/** /**
* Total number of corrupt blocks seen in a compaction operation * Total number of corrupt blocks seen in a compaction operation.
*/ */
@Nullable @Nullable
private long totalCorruptLogBlock; private long totalCorruptLogBlock;
/** /**
* Total number of rollback blocks seen in a compaction operation * Total number of rollback blocks seen in a compaction operation.
*/ */
private long totalRollbackBlocks; private long totalRollbackBlocks;
/** /**
* File Size as of close * File Size as of close.
*/ */
private long fileSizeInBytes; private long fileSizeInBytes;
@@ -353,21 +353,25 @@ public class HoodieWriteStat implements Serializable {
return result; return result;
} }
/**
* The runtime stats for writing operation.
*/
public static class RuntimeStats implements Serializable { public static class RuntimeStats implements Serializable {
/** /**
* Total time taken to read and merge logblocks in a log file * Total time taken to read and merge logblocks in a log file.
*/ */
@Nullable @Nullable
private long totalScanTime; private long totalScanTime;
/** /**
* Total time taken by a Hoodie Merge for an existing file * Total time taken by a Hoodie Merge for an existing file.
*/ */
@Nullable @Nullable
private long totalUpsertTime; private long totalUpsertTime;
/** /**
* Total time taken by a Hoodie Insert to a file * Total time taken by a Hoodie Insert to a file.
*/ */
@Nullable @Nullable
private long totalCreateTime; private long totalCreateTime;

View File

@@ -23,7 +23,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Keeps track of how many bytes were read from a DataInputStream * Keeps track of how many bytes were read from a DataInputStream.
*/ */
public class SizeAwareDataInputStream { public class SizeAwareDataInputStream {

View File

@@ -83,14 +83,14 @@ public class HoodieTableConfig implements Serializable {
} }
/** /**
* For serailizing and de-serializing * For serailizing and de-serializing.
* *
* @deprecated * @deprecated
*/ */
public HoodieTableConfig() {} public HoodieTableConfig() {}
/** /**
* Initialize the hoodie meta directory and any necessary files inside the meta (including the hoodie.properties) * Initialize the hoodie meta directory and any necessary files inside the meta (including the hoodie.properties).
*/ */
public static void createHoodieProperties(FileSystem fs, Path metadataFolder, Properties properties) public static void createHoodieProperties(FileSystem fs, Path metadataFolder, Properties properties)
throws IOException { throws IOException {
@@ -117,7 +117,7 @@ public class HoodieTableConfig implements Serializable {
} }
/** /**
* Read the table type from the table properties and if not found, return the default * Read the table type from the table properties and if not found, return the default.
*/ */
public HoodieTableType getTableType() { public HoodieTableType getTableType() {
if (props.containsKey(HOODIE_TABLE_TYPE_PROP_NAME)) { if (props.containsKey(HOODIE_TABLE_TYPE_PROP_NAME)) {
@@ -127,7 +127,7 @@ public class HoodieTableConfig implements Serializable {
} }
/** /**
* Read the payload class for HoodieRecords from the table properties * Read the payload class for HoodieRecords from the table properties.
*/ */
public String getPayloadClass() { public String getPayloadClass() {
// There could be datasets written with payload class from com.uber.hoodie. Need to transparently // There could be datasets written with payload class from com.uber.hoodie. Need to transparently
@@ -137,14 +137,14 @@ public class HoodieTableConfig implements Serializable {
} }
/** /**
* Read the table name * Read the table name.
*/ */
public String getTableName() { public String getTableName() {
return props.getProperty(HOODIE_TABLE_NAME_PROP_NAME); return props.getProperty(HOODIE_TABLE_NAME_PROP_NAME);
} }
/** /**
* Get the Read Optimized Storage Format * Get the Read Optimized Storage Format.
* *
* @return HoodieFileFormat for the Read Optimized Storage format * @return HoodieFileFormat for the Read Optimized Storage format
*/ */
@@ -156,7 +156,7 @@ public class HoodieTableConfig implements Serializable {
} }
/** /**
* Get the Read Optimized Storage Format * Get the Read Optimized Storage Format.
* *
* @return HoodieFileFormat for the Read Optimized Storage format * @return HoodieFileFormat for the Read Optimized Storage format
*/ */
@@ -168,7 +168,7 @@ public class HoodieTableConfig implements Serializable {
} }
/** /**
* Get the relative path of archive log folder under metafolder, for this dataset * Get the relative path of archive log folder under metafolder, for this dataset.
*/ */
public String getArchivelogFolder() { public String getArchivelogFolder() {
return props.getProperty(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, DEFAULT_ARCHIVELOG_FOLDER); return props.getProperty(HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, DEFAULT_ARCHIVELOG_FOLDER);

View File

@@ -112,7 +112,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* For serailizing and de-serializing * For serailizing and de-serializing.
* *
* @deprecated * @deprecated
*/ */
@@ -166,7 +166,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Returns Marker folder path * Returns Marker folder path.
* *
* @param instantTs Instant Timestamp * @param instantTs Instant Timestamp
* @return * @return
@@ -202,7 +202,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Get the FS implementation for this table * Get the FS implementation for this table.
*/ */
public HoodieWrapperFileSystem getFs() { public HoodieWrapperFileSystem getFs() {
if (fs == null) { if (fs == null) {
@@ -218,7 +218,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Return raw file-system * Return raw file-system.
* *
* @return * @return
*/ */
@@ -231,7 +231,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Get the active instants as a timeline * Get the active instants as a timeline.
* *
* @return Active instants timeline * @return Active instants timeline
*/ */
@@ -243,7 +243,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Reload ActiveTimeline and cache * Reload ActiveTimeline and cache.
* *
* @return Active instants timeline * @return Active instants timeline
*/ */
@@ -258,7 +258,7 @@ public class HoodieTableMetaClient implements Serializable {
/** /**
* Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read. * Get the archived commits as a timeline. This is costly operation, as all data from the archived files are read.
* This should not be used, unless for historical debugging purposes * This should not be used, unless for historical debugging purposes.
* *
* @return Active commit timeline * @return Active commit timeline
*/ */
@@ -270,7 +270,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Helper method to initialize a dataset, with given basePath, tableType, name, archiveFolder * Helper method to initialize a dataset, with given basePath, tableType, name, archiveFolder.
*/ */
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType, public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, String tableType,
String tableName, String archiveLogFolder) throws IOException { String tableName, String archiveLogFolder) throws IOException {
@@ -283,7 +283,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Helper method to initialize a given path, as a given storage type and table name * Helper method to initialize a given path, as a given storage type and table name.
*/ */
public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType, String tableName, String payloadClassName) throws IOException { HoodieTableType tableType, String tableName, String payloadClassName) throws IOException {
@@ -297,7 +297,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Helper method to initialize a given path as a hoodie dataset with configs passed in as as Properties * Helper method to initialize a given path as a hoodie dataset with configs passed in as as Properties.
* *
* @return Instance of HoodieTableMetaClient * @return Instance of HoodieTableMetaClient
*/ */
@@ -350,7 +350,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Get the commit timeline visible for this table * Get the commit timeline visible for this table.
*/ */
public HoodieTimeline getCommitsTimeline() { public HoodieTimeline getCommitsTimeline() {
switch (this.getTableType()) { switch (this.getTableType()) {
@@ -384,7 +384,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Get the compacted commit timeline visible for this table * Get the compacted commit timeline visible for this table.
*/ */
public HoodieTimeline getCommitTimeline() { public HoodieTimeline getCommitTimeline() {
switch (this.getTableType()) { switch (this.getTableType()) {
@@ -398,7 +398,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Gets the commit action type * Gets the commit action type.
*/ */
public String getCommitActionType() { public String getCommitActionType() {
switch (this.getTableType()) { switch (this.getTableType()) {
@@ -412,7 +412,7 @@ public class HoodieTableMetaClient implements Serializable {
} }
/** /**
* Helper method to scan all hoodie-instant metafiles and construct HoodieInstant objects * Helper method to scan all hoodie-instant metafiles and construct HoodieInstant objects.
* *
* @param fs FileSystem * @param fs FileSystem
* @param metaPath Meta Path where hoodie instants are present * @param metaPath Meta Path where hoodie instants are present

View File

@@ -76,28 +76,28 @@ public interface HoodieTimeline extends Serializable {
String INVALID_INSTANT_TS = "0"; String INVALID_INSTANT_TS = "0";
/** /**
* Filter this timeline to just include the in-flights * Filter this timeline to just include the in-flights.
* *
* @return New instance of HoodieTimeline with just in-flights * @return New instance of HoodieTimeline with just in-flights
*/ */
HoodieTimeline filterInflights(); HoodieTimeline filterInflights();
/** /**
* Filter this timeline to include requested and in-flights * Filter this timeline to include requested and in-flights.
* *
* @return New instance of HoodieTimeline with just in-flights and requested instants * @return New instance of HoodieTimeline with just in-flights and requested instants
*/ */
HoodieTimeline filterInflightsAndRequested(); HoodieTimeline filterInflightsAndRequested();
/** /**
* Filter this timeline to just include the in-flights excluding compaction instants * Filter this timeline to just include the in-flights excluding compaction instants.
* *
* @return New instance of HoodieTimeline with just in-flights excluding compaction inflights * @return New instance of HoodieTimeline with just in-flights excluding compaction inflights
*/ */
HoodieTimeline filterInflightsExcludingCompaction(); HoodieTimeline filterInflightsExcludingCompaction();
/** /**
* Filter this timeline to just include the completed instants * Filter this timeline to just include the completed instants.
* *
* @return New instance of HoodieTimeline with just completed instants * @return New instance of HoodieTimeline with just completed instants
*/ */
@@ -114,36 +114,36 @@ public interface HoodieTimeline extends Serializable {
HoodieTimeline filterCompletedAndCompactionInstants(); HoodieTimeline filterCompletedAndCompactionInstants();
/** /**
* Timeline to just include commits (commit/deltacommit) and compaction actions * Timeline to just include commits (commit/deltacommit) and compaction actions.
* *
* @return * @return
*/ */
HoodieTimeline getCommitsAndCompactionTimeline(); HoodieTimeline getCommitsAndCompactionTimeline();
/** /**
* Filter this timeline to just include requested and inflight compaction instants * Filter this timeline to just include requested and inflight compaction instants.
* *
* @return * @return
*/ */
HoodieTimeline filterPendingCompactionTimeline(); HoodieTimeline filterPendingCompactionTimeline();
/** /**
* Create a new Timeline with instants after startTs and before or on endTs * Create a new Timeline with instants after startTs and before or on endTs.
*/ */
HoodieTimeline findInstantsInRange(String startTs, String endTs); HoodieTimeline findInstantsInRange(String startTs, String endTs);
/** /**
* Create a new Timeline with all the instants after startTs * Create a new Timeline with all the instants after startTs.
*/ */
HoodieTimeline findInstantsAfter(String commitTime, int numCommits); HoodieTimeline findInstantsAfter(String commitTime, int numCommits);
/** /**
* Custom Filter of Instants * Custom Filter of Instants.
*/ */
HoodieTimeline filter(Predicate<HoodieInstant> filter); HoodieTimeline filter(Predicate<HoodieInstant> filter);
/** /**
* If the timeline has any instants * If the timeline has any instants.
* *
* @return true if timeline is empty * @return true if timeline is empty
*/ */
@@ -171,7 +171,7 @@ public interface HoodieTimeline extends Serializable {
/** /**
* Get hash of timeline * Get hash of timeline.
* *
* @return * @return
*/ */
@@ -210,12 +210,12 @@ public interface HoodieTimeline extends Serializable {
boolean isBeforeTimelineStarts(String ts); boolean isBeforeTimelineStarts(String ts);
/** /**
* Read the completed instant details * Read the completed instant details.
*/ */
Option<byte[]> getInstantDetails(HoodieInstant instant); Option<byte[]> getInstantDetails(HoodieInstant instant);
/** /**
* Helper methods to compare instants * Helper methods to compare instants.
**/ **/
BiPredicate<String, String> EQUAL = (commit1, commit2) -> commit1.compareTo(commit2) == 0; BiPredicate<String, String> EQUAL = (commit1, commit2) -> commit1.compareTo(commit2) == 0;
BiPredicate<String, String> GREATER_OR_EQUAL = (commit1, commit2) -> commit1.compareTo(commit2) >= 0; BiPredicate<String, String> GREATER_OR_EQUAL = (commit1, commit2) -> commit1.compareTo(commit2) >= 0;

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.common.table; package org.apache.hudi.common.table;
/* /**
* A consolidated file-system view interface exposing both realtime and read-optimized views along with * A consolidated file-system view interface exposing both realtime and read-optimized views along with
* update operations. * update operations.
*/ */
@@ -28,19 +28,19 @@ public interface SyncableFileSystemView
/** /**
* Allow View to release resources and close * Allow View to release resources and close.
*/ */
void close(); void close();
/** /**
* Reset View so that they can be refreshed * Reset View so that they can be refreshed.
*/ */
void reset(); void reset();
/** /**
* Read the latest timeline and refresh the file-system view to match the current state of the file-system. The * Read the latest timeline and refresh the file-system view to match the current state of the file-system. The
* refresh can either be done incrementally (from reading file-slices in metadata files) or from scratch by reseting * refresh can either be done incrementally (from reading file-slices in metadata files) or from scratch by reseting
* view storage * view storage.
*/ */
void sync(); void sync();
} }

View File

@@ -42,28 +42,28 @@ public interface TableFileSystemView {
interface ReadOptimizedViewWithLatestSlice { interface ReadOptimizedViewWithLatestSlice {
/** /**
* Stream all the latest data files in the given partition * Stream all the latest data files in the given partition.
*/ */
Stream<HoodieDataFile> getLatestDataFiles(String partitionPath); Stream<HoodieDataFile> getLatestDataFiles(String partitionPath);
/** /**
* Get Latest data file for a partition and file-Id * Get Latest data file for a partition and file-Id.
*/ */
Option<HoodieDataFile> getLatestDataFile(String partitionPath, String fileId); Option<HoodieDataFile> getLatestDataFile(String partitionPath, String fileId);
/** /**
* Stream all the latest data files, in the file system view * Stream all the latest data files, in the file system view.
*/ */
Stream<HoodieDataFile> getLatestDataFiles(); Stream<HoodieDataFile> getLatestDataFiles();
/** /**
* Stream all the latest version data files in the given partition with precondition that commitTime(file) before * Stream all the latest version data files in the given partition with precondition that commitTime(file) before
* maxCommitTime * maxCommitTime.
*/ */
Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath, String maxCommitTime); Stream<HoodieDataFile> getLatestDataFilesBeforeOrOn(String partitionPath, String maxCommitTime);
/** /**
* Stream all the latest data files pass * Stream all the latest data files pass.
*/ */
Stream<HoodieDataFile> getLatestDataFilesInRange(List<String> commitsToReturn); Stream<HoodieDataFile> getLatestDataFilesInRange(List<String> commitsToReturn);
} }
@@ -73,12 +73,12 @@ public interface TableFileSystemView {
*/ */
interface ReadOptimizedView extends ReadOptimizedViewWithLatestSlice { interface ReadOptimizedView extends ReadOptimizedViewWithLatestSlice {
/** /**
* Stream all the data file versions grouped by FileId for a given partition * Stream all the data file versions grouped by FileId for a given partition.
*/ */
Stream<HoodieDataFile> getAllDataFiles(String partitionPath); Stream<HoodieDataFile> getAllDataFiles(String partitionPath);
/** /**
* Get the version of data file matching the instant time in the given partition * Get the version of data file matching the instant time in the given partition.
*/ */
Option<HoodieDataFile> getDataFileOn(String partitionPath, String instantTime, String fileId); Option<HoodieDataFile> getDataFileOn(String partitionPath, String instantTime, String fileId);
@@ -90,22 +90,22 @@ public interface TableFileSystemView {
interface RealtimeViewWithLatestSlice { interface RealtimeViewWithLatestSlice {
/** /**
* Stream all the latest file slices in the given partition * Stream all the latest file slices in the given partition.
*/ */
Stream<FileSlice> getLatestFileSlices(String partitionPath); Stream<FileSlice> getLatestFileSlices(String partitionPath);
/** /**
* Get Latest File Slice for a given fileId in a given partition * Get Latest File Slice for a given fileId in a given partition.
*/ */
Option<FileSlice> getLatestFileSlice(String partitionPath, String fileId); Option<FileSlice> getLatestFileSlice(String partitionPath, String fileId);
/** /**
* Stream all the latest uncompacted file slices in the given partition * Stream all the latest uncompacted file slices in the given partition.
*/ */
Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath); Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath);
/** /**
* Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
* *
* @param partitionPath Partition path * @param partitionPath Partition path
* @param maxCommitTime Max Instant Time * @param maxCommitTime Max Instant Time
@@ -125,7 +125,7 @@ public interface TableFileSystemView {
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime); public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime);
/** /**
* Stream all the latest file slices, in the given range * Stream all the latest file slices, in the given range.
*/ */
Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn); Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn);
} }
@@ -143,24 +143,24 @@ public interface TableFileSystemView {
} }
/** /**
* Stream all the file groups for a given partition * Stream all the file groups for a given partition.
*/ */
Stream<HoodieFileGroup> getAllFileGroups(String partitionPath); Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
/** /**
* Return Pending Compaction Operations * Return Pending Compaction Operations.
* *
* @return Pair<Pair<InstantTime,CompactionOperation>> * @return Pair<Pair<InstantTime,CompactionOperation>>
*/ */
Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations(); Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();
/** /**
* Last Known Instant on which the view is built * Last Known Instant on which the view is built.
*/ */
Option<HoodieInstant> getLastInstant(); Option<HoodieInstant> getLastInstant();
/** /**
* Timeline corresponding to the view * Timeline corresponding to the view.
*/ */
HoodieTimeline getTimeline(); HoodieTimeline getTimeline();
} }

View File

@@ -119,7 +119,7 @@ public abstract class AbstractHoodieLogRecordScanner {
} }
/** /**
* Scan Log files * Scan Log files.
*/ */
public void scan() { public void scan() {
HoodieLogFormatReader logFormatReaderWrapper = null; HoodieLogFormatReader logFormatReaderWrapper = null;
@@ -255,7 +255,7 @@ public abstract class AbstractHoodieLogRecordScanner {
} }
/** /**
* Checks if the current logblock belongs to a later instant * Checks if the current logblock belongs to a later instant.
*/ */
private boolean isNewInstantBlock(HoodieLogBlock logBlock) { private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK return currentInstantLogBlocks.size() > 0 && currentInstantLogBlocks.peek().getBlockType() != CORRUPT_BLOCK
@@ -279,14 +279,14 @@ public abstract class AbstractHoodieLogRecordScanner {
} }
/** /**
* Process next record * Process next record.
* *
* @param hoodieRecord Hoodie Record to process * @param hoodieRecord Hoodie Record to process
*/ */
protected abstract void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws Exception; protected abstract void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws Exception;
/** /**
* Process next deleted key * Process next deleted key.
* *
* @param key Deleted record key * @param key Deleted record key
*/ */
@@ -319,7 +319,7 @@ public abstract class AbstractHoodieLogRecordScanner {
} }
/** /**
* Return progress of scanning as a float between 0.0 to 1.0 * Return progress of scanning as a float between 0.0 to 1.0.
*/ */
public float getProgress() { public float getProgress() {
return progress; return progress;

View File

@@ -49,7 +49,7 @@ import java.util.Map;
/** /**
* Scans a log file and provides block level iterator on the log file Loads the entire block contents in memory Can emit * Scans a log file and provides block level iterator on the log file Loads the entire block contents in memory Can emit
* either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one is found) * either a DataBlock, CommandBlock, DeleteBlock or CorruptBlock (if one is found).
*/ */
class HoodieLogFileReader implements HoodieLogFormat.Reader { class HoodieLogFileReader implements HoodieLogFormat.Reader {
@@ -104,7 +104,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
} }
/** /**
* Close the inputstream if not closed when the JVM exits * Close the inputstream if not closed when the JVM exits.
*/ */
private void addShutDownHook() { private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -331,7 +331,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
} }
/** /**
* hasPrev is not idempotent * hasPrev is not idempotent.
*/ */
@Override @Override
public boolean hasPrev() { public boolean hasPrev() {

View File

@@ -56,7 +56,7 @@ public interface HoodieLogFormat {
String UNKNOWN_WRITE_TOKEN = "1-0-1"; String UNKNOWN_WRITE_TOKEN = "1-0-1";
/** /**
* Writer interface to allow appending block to this file format * Writer interface to allow appending block to this file format.
*/ */
interface Writer extends Closeable { interface Writer extends Closeable {
@@ -66,7 +66,7 @@ public interface HoodieLogFormat {
HoodieLogFile getLogFile(); HoodieLogFile getLogFile();
/** /**
* Append Block returns a new Writer if the log is rolled * Append Block returns a new Writer if the log is rolled.
*/ */
Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException; Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException;
@@ -74,7 +74,7 @@ public interface HoodieLogFormat {
} }
/** /**
* Reader interface which is an Iterator of HoodieLogBlock * Reader interface which is an Iterator of HoodieLogBlock.
*/ */
interface Reader extends Closeable, Iterator<HoodieLogBlock> { interface Reader extends Closeable, Iterator<HoodieLogBlock> {
@@ -84,14 +84,14 @@ public interface HoodieLogFormat {
HoodieLogFile getLogFile(); HoodieLogFile getLogFile();
/** /**
* Read log file in reverse order and check if prev block is present * Read log file in reverse order and check if prev block is present.
* *
* @return * @return
*/ */
public boolean hasPrev(); public boolean hasPrev();
/** /**
* Read log file in reverse order and return prev block if present * Read log file in reverse order and return prev block if present.
* *
* @return * @return
* @throws IOException * @throws IOException
@@ -100,7 +100,7 @@ public interface HoodieLogFormat {
} }
/** /**
* Builder class to construct the default log format writer * Builder class to construct the default log format writer.
*/ */
class WriterBuilder { class WriterBuilder {

View File

@@ -31,6 +31,9 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
/**
* Hoodie log format reader.
*/
public class HoodieLogFormatReader implements HoodieLogFormat.Reader { public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final List<HoodieLogFile> logFiles; private final List<HoodieLogFile> logFiles;

View File

@@ -40,7 +40,7 @@ import org.apache.log4j.Logger;
import java.io.IOException; import java.io.IOException;
/** /**
* HoodieLogFormatWriter can be used to append blocks to a log file Use HoodieLogFormat.WriterBuilder to construct * HoodieLogFormatWriter can be used to append blocks to a log file Use HoodieLogFormat.WriterBuilder to construct.
*/ */
public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {

View File

@@ -27,6 +27,9 @@ import org.apache.hadoop.fs.FileSystem;
import java.util.List; import java.util.List;
/**
* A scanner used to scan hoodie unmerged log records.
*/
public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner { public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScanner {
private final LogRecordScannerCallback callback; private final LogRecordScannerCallback callback;
@@ -49,6 +52,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordScann
throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config"); throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config");
} }
/**
* A callback for log record scanner.
*/
@FunctionalInterface @FunctionalInterface
public static interface LogRecordScannerCallback { public static interface LogRecordScannerCallback {

View File

@@ -211,25 +211,27 @@ public class HoodieAvroDataBlock extends HoodieLogBlock {
deflate(); deflate();
} }
/********************************* DEPRECATED METHODS ***********************************/ //----------------------------------------------------------------------------------------
// DEPRECATED METHODS
//----------------------------------------------------------------------------------------
@Deprecated
@VisibleForTesting
/** /**
* This constructor is retained to provide backwards compatibility to HoodieArchivedLogs which were written using * This constructor is retained to provide backwards compatibility to HoodieArchivedLogs which were written using
* HoodieLogFormat V1 * HoodieLogFormat V1.
*/ */
@Deprecated
@VisibleForTesting
public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema) { public HoodieAvroDataBlock(List<IndexedRecord> records, Schema schema) {
super(new HashMap<>(), new HashMap<>(), Option.empty(), Option.empty(), null, false); super(new HashMap<>(), new HashMap<>(), Option.empty(), Option.empty(), null, false);
this.records = records; this.records = records;
this.schema = schema; this.schema = schema;
} }
@Deprecated
/** /**
* This method is retained to provide backwards compatibility to HoodieArchivedLogs which were written using * This method is retained to provide backwards compatibility to HoodieArchivedLogs which were written using
* HoodieLogFormat V1 * HoodieLogFormat V1.
*/ */
@Deprecated
public static HoodieLogBlock getBlock(byte[] content, Schema readerSchema) throws IOException { public static HoodieLogBlock getBlock(byte[] content, Schema readerSchema) throws IOException {
SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content))); SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(content)));

View File

@@ -27,12 +27,15 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
* Command block issues a specific command to the scanner * Command block issues a specific command to the scanner.
*/ */
public class HoodieCommandBlock extends HoodieLogBlock { public class HoodieCommandBlock extends HoodieLogBlock {
private final HoodieCommandBlockTypeEnum type; private final HoodieCommandBlockTypeEnum type;
/**
* Hoodie command block type enum.
*/
public enum HoodieCommandBlockTypeEnum { public enum HoodieCommandBlockTypeEnum {
ROLLBACK_PREVIOUS_BLOCK ROLLBACK_PREVIOUS_BLOCK
} }

View File

@@ -28,7 +28,7 @@ import java.util.Map;
/** /**
* Corrupt block is emitted whenever the scanner finds the length of the block written at the beginning does not match * Corrupt block is emitted whenever the scanner finds the length of the block written at the beginning does not match
* (did not find a EOF or a sync marker after the length) * (did not find a EOF or a sync marker after the length).
*/ */
public class HoodieCorruptBlock extends HoodieLogBlock { public class HoodieCorruptBlock extends HoodieLogBlock {

View File

@@ -36,7 +36,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
* Delete block contains a list of keys to be deleted from scanning the blocks so far * Delete block contains a list of keys to be deleted from scanning the blocks so far.
*/ */
public class HoodieDeleteBlock extends HoodieLogBlock { public class HoodieDeleteBlock extends HoodieLogBlock {

View File

@@ -39,7 +39,7 @@ import java.util.Map;
/** /**
* Abstract class defining a block in HoodieLogFile * Abstract class defining a block in HoodieLogFile.
*/ */
public abstract class HoodieLogBlock { public abstract class HoodieLogBlock {
@@ -188,7 +188,7 @@ public abstract class HoodieLogBlock {
} }
/** /**
* Convert bytes to LogMetadata, follow the same order as {@link HoodieLogBlock#getLogMetadataBytes} * Convert bytes to LogMetadata, follow the same order as {@link HoodieLogBlock#getLogMetadataBytes}.
*/ */
public static Map<HeaderMetadataType, String> getLogMetadata(DataInputStream dis) throws IOException { public static Map<HeaderMetadataType, String> getLogMetadata(DataInputStream dis) throws IOException {
@@ -229,7 +229,7 @@ public abstract class HoodieLogBlock {
} }
/** /**
* When lazyReading of blocks is turned on, inflate the content of a log block from disk * When lazyReading of blocks is turned on, inflate the content of a log block from disk.
*/ */
protected void inflate() throws IOException { protected void inflate() throws IOException {
@@ -258,7 +258,7 @@ public abstract class HoodieLogBlock {
} }
/** /**
* Handles difference in seek behavior for GCS and non-GCS input stream * Handles difference in seek behavior for GCS and non-GCS input stream.
* *
* @param inputStream Input Stream * @param inputStream Input Stream
* @param pos Position to seek * @param pos Position to seek

View File

@@ -124,7 +124,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Get all instants (commits, delta commits) that produce new data, in the active timeline * * Get all instants (commits, delta commits) that produce new data, in the active timeline.
*/ */
public HoodieTimeline getCommitsTimeline() { public HoodieTimeline getCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION));
@@ -141,7 +141,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
/** /**
* Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions, in the active * Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions, in the active
* timeline * * timeline.
*/ */
public HoodieTimeline getAllCommitsTimeline() { public HoodieTimeline getAllCommitsTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION, return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION,
@@ -149,14 +149,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Get only pure commits (inflight and completed) in the active timeline * Get only pure commits (inflight and completed) in the active timeline.
*/ */
public HoodieTimeline getCommitTimeline() { public HoodieTimeline getCommitTimeline() {
return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION)); return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
} }
/** /**
* Get only the delta commits (inflight and completed) in the active timeline * Get only the delta commits (inflight and completed) in the active timeline.
*/ */
public HoodieTimeline getDeltaCommitTimeline() { public HoodieTimeline getDeltaCommitTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION), return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION),
@@ -164,7 +164,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Get a timeline of a specific set of actions. useful to create a merged timeline of multiple actions * Get a timeline of a specific set of actions. useful to create a merged timeline of multiple actions.
* *
* @param actions actions allowed in the timeline * @param actions actions allowed in the timeline
*/ */
@@ -174,7 +174,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Get only the cleaner action (inflight and completed) in the active timeline * Get only the cleaner action (inflight and completed) in the active timeline.
*/ */
public HoodieTimeline getCleanerTimeline() { public HoodieTimeline getCleanerTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION), return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION),
@@ -182,7 +182,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Get only the rollback action (inflight and completed) in the active timeline * Get only the rollback action (inflight and completed) in the active timeline.
*/ */
public HoodieTimeline getRollbackTimeline() { public HoodieTimeline getRollbackTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION), return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
@@ -190,7 +190,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Get only the save point action (inflight and completed) in the active timeline * Get only the save point action (inflight and completed) in the active timeline.
*/ */
public HoodieTimeline getSavePointTimeline() { public HoodieTimeline getSavePointTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION), return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION),
@@ -198,7 +198,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Get only the restore action (inflight and completed) in the active timeline * Get only the restore action (inflight and completed) in the active timeline.
*/ */
public HoodieTimeline getRestoreTimeline() { public HoodieTimeline getRestoreTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION), return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION),
@@ -269,9 +269,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return readDataFromPath(detailPath); return readDataFromPath(detailPath);
} }
/** //-----------------------------------------------------------------
* BEGIN - COMPACTION RELATED META-DATA MANAGEMENT // BEGIN - COMPACTION RELATED META-DATA MANAGEMENT.
**/ //-----------------------------------------------------------------
public Option<byte[]> getInstantAuxiliaryDetails(HoodieInstant instant) { public Option<byte[]> getInstantAuxiliaryDetails(HoodieInstant instant) {
Path detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); Path detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
@@ -279,7 +279,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Revert compaction State from inflight to requested * Revert compaction State from inflight to requested.
* *
* @param inflightInstant Inflight Instant * @param inflightInstant Inflight Instant
* @return requested instant * @return requested instant
@@ -295,7 +295,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Transition Compaction State from requested to inflight * Transition Compaction State from requested to inflight.
* *
* @param requestedInstant Requested instant * @param requestedInstant Requested instant
* @return inflight instant * @return inflight instant
@@ -310,7 +310,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Transition Compaction State from inflight to Committed * Transition Compaction State from inflight to Committed.
* *
* @param inflightInstant Inflight instant * @param inflightInstant Inflight instant
* @param data Extra Metadata * @param data Extra Metadata
@@ -329,12 +329,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
createFileInPath(fullPath, data); createFileInPath(fullPath, data);
} }
/** //-----------------------------------------------------------------
* END - COMPACTION RELATED META-DATA MANAGEMENT // END - COMPACTION RELATED META-DATA MANAGEMENT
**/ //-----------------------------------------------------------------
/** /**
* Transition Clean State from inflight to Committed * Transition Clean State from inflight to Committed.
* *
* @param inflightInstant Inflight instant * @param inflightInstant Inflight instant
* @param data Extra Metadata * @param data Extra Metadata
@@ -352,7 +352,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
/** /**
* Transition Clean State from requested to inflight * Transition Clean State from requested to inflight.
* *
* @param requestedInstant requested instant * @param requestedInstant requested instant
* @return commit instant * @return commit instant

View File

@@ -75,7 +75,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
} }
/** /**
* For serailizing and de-serializing * For serializing and de-serializing.
* *
* @deprecated * @deprecated
*/ */

View File

@@ -35,7 +35,7 @@ import java.util.Objects;
public class HoodieInstant implements Serializable { public class HoodieInstant implements Serializable {
/** /**
* Instant State * Instant State.
*/ */
public enum State { public enum State {
// Requested State (valid state for Compaction) // Requested State (valid state for Compaction)
@@ -53,7 +53,7 @@ public class HoodieInstant implements Serializable {
private String timestamp; private String timestamp;
/** /**
* Load the instant from the meta FileStatus * Load the instant from the meta FileStatus.
*/ */
public HoodieInstant(FileStatus fileStatus) { public HoodieInstant(FileStatus fileStatus) {
// First read the instant timestamp. [==>20170101193025<==].commit // First read the instant timestamp. [==>20170101193025<==].commit
@@ -111,7 +111,7 @@ public class HoodieInstant implements Serializable {
} }
/** /**
* Get the filename for this instant * Get the filename for this instant.
*/ */
public String getFileName() { public String getFileName() {
if (HoodieTimeline.COMMIT_ACTION.equals(action)) { if (HoodieTimeline.COMMIT_ACTION.equals(action)) {

View File

@@ -30,6 +30,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/**
* The data transfer object of compaction.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class CompactionOpDTO { public class CompactionOpDTO {

View File

@@ -23,6 +23,9 @@ import org.apache.hudi.common.model.HoodieDataFile;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
/**
* The data transfer object of data file.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class DataFileDTO { public class DataFileDTO {

View File

@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import java.io.Serializable; import java.io.Serializable;
/** /**
* A serializable FS Permission * A serializable FS Permission.
*/ */
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class FSPermissionDTO implements Serializable { public class FSPermissionDTO implements Serializable {

View File

@@ -27,6 +27,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/**
* The data transfer object of file group.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class FileGroupDTO { public class FileGroupDTO {

View File

@@ -25,6 +25,9 @@ import org.apache.hadoop.fs.Path;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
/**
* The data transfer object of file path.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class FilePathDTO { public class FilePathDTO {

View File

@@ -26,6 +26,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/**
* The data transfer object of file slice.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class FileSliceDTO { public class FileSliceDTO {

View File

@@ -26,6 +26,9 @@ import org.apache.hadoop.fs.FileStatus;
import java.io.IOException; import java.io.IOException;
/**
* The data transfer object of file status.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class FileStatusDTO { public class FileStatusDTO {

View File

@@ -23,6 +23,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
/**
* The data transfer object of instant.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class InstantDTO { public class InstantDTO {

View File

@@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
/**
* The data transfer object of log file.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class LogFileDTO { public class LogFileDTO {

View File

@@ -28,6 +28,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/**
* The data transfer object of timeline.
*/
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
public class TimelineDTO { public class TimelineDTO {

View File

@@ -99,7 +99,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Refresh commits timeline * Refresh commits timeline.
* *
* @param visibleActiveTimeline Visible Active Timeline * @param visibleActiveTimeline Visible Active Timeline
*/ */
@@ -129,7 +129,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Build FileGroups from passed in file-status * Build FileGroups from passed in file-status.
*/ */
protected List<HoodieFileGroup> buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline, protected List<HoodieFileGroup> buildFileGroups(FileStatus[] statuses, HoodieTimeline timeline,
boolean addPendingCompactionFileSlice) { boolean addPendingCompactionFileSlice) {
@@ -180,7 +180,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Clears the partition Map and reset view states * Clears the partition Map and reset view states.
*/ */
public final void reset() { public final void reset() {
try { try {
@@ -197,12 +197,12 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Allows all view metadata in file system view storage to be reset by subclasses * Allows all view metadata in file system view storage to be reset by subclasses.
*/ */
protected abstract void resetViewState(); protected abstract void resetViewState();
/** /**
* Allows lazily loading the partitions if needed * Allows lazily loading the partitions if needed.
* *
* @param partition partition to be loaded if not present * @param partition partition to be loaded if not present
*/ */
@@ -244,7 +244,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Helper to convert file-status to data-files * Helper to convert file-status to data-files.
* *
* @param statuses List of File-Status * @param statuses List of File-Status
*/ */
@@ -255,7 +255,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Helper to convert file-status to log-files * Helper to convert file-status to log-files.
* *
* @param statuses List of FIle-Status * @param statuses List of FIle-Status
*/ */
@@ -267,7 +267,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
/** /**
* With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore those * With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore those
* data-files * data-files.
* *
* @param dataFile Data File * @param dataFile Data File
*/ */
@@ -282,7 +282,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
/** /**
* Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction * Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction
* Instant * Instant.
* *
* @param fileSlice File Slice * @param fileSlice File Slice
*/ */
@@ -296,7 +296,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
/** /**
* With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore those * With async compaction, it is possible to see partial/complete data-files due to inflight-compactions, Ignore those
* data-files * data-files.
* *
* @param fileSlice File Slice * @param fileSlice File Slice
*/ */
@@ -378,7 +378,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Get Latest data file for a partition and file-Id * Get Latest data file for a partition and file-Id.
*/ */
public final Option<HoodieDataFile> getLatestDataFile(String partitionStr, String fileId) { public final Option<HoodieDataFile> getLatestDataFile(String partitionStr, String fileId) {
try { try {
@@ -432,7 +432,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Get Latest File Slice for a given fileId in a given partition * Get Latest File Slice for a given fileId in a given partition.
*/ */
public final Option<FileSlice> getLatestFileSlice(String partitionStr, String fileId) { public final Option<FileSlice> getLatestFileSlice(String partitionStr, String fileId) {
try { try {
@@ -552,7 +552,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
// Fetch APIs to be implemented by concrete sub-classes // Fetch APIs to be implemented by concrete sub-classes
/** /**
* Check if there is an outstanding compaction scheduled for this file * Check if there is an outstanding compaction scheduled for this file.
* *
* @param fgId File-Group Id * @param fgId File-Group Id
* @return true if there is a pending compaction, false otherwise * @return true if there is a pending compaction, false otherwise
@@ -560,28 +560,28 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
protected abstract boolean isPendingCompactionScheduledForFileId(HoodieFileGroupId fgId); protected abstract boolean isPendingCompactionScheduledForFileId(HoodieFileGroupId fgId);
/** /**
* resets the pending compaction operation and overwrite with the new list * resets the pending compaction operation and overwrite with the new list.
* *
* @param operations Pending Compaction Operations * @param operations Pending Compaction Operations
*/ */
abstract void resetPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations); abstract void resetPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations);
/** /**
* Add pending compaction operations to store * Add pending compaction operations to store.
* *
* @param operations Pending compaction operations to be added * @param operations Pending compaction operations to be added
*/ */
abstract void addPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations); abstract void addPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations);
/** /**
* Remove pending compaction operations from store * Remove pending compaction operations from store.
* *
* @param operations Pending compaction operations to be removed * @param operations Pending compaction operations to be removed
*/ */
abstract void removePendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations); abstract void removePendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations);
/** /**
* Return pending compaction operation for a file-group * Return pending compaction operation for a file-group.
* *
* @param fileGroupId File-Group Id * @param fileGroupId File-Group Id
*/ */
@@ -589,19 +589,19 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
HoodieFileGroupId fileGroupId); HoodieFileGroupId fileGroupId);
/** /**
* Fetch all pending compaction operations * Fetch all pending compaction operations.
*/ */
abstract Stream<Pair<String, CompactionOperation>> fetchPendingCompactionOperations(); abstract Stream<Pair<String, CompactionOperation>> fetchPendingCompactionOperations();
/** /**
* Checks if partition is pre-loaded and available in store * Checks if partition is pre-loaded and available in store.
* *
* @param partitionPath Partition Path * @param partitionPath Partition Path
*/ */
abstract boolean isPartitionAvailableInStore(String partitionPath); abstract boolean isPartitionAvailableInStore(String partitionPath);
/** /**
* Add a complete partition view to store * Add a complete partition view to store.
* *
* @param partitionPath Partition Path * @param partitionPath Partition Path
* @param fileGroups File Groups for the partition path * @param fileGroups File Groups for the partition path
@@ -609,7 +609,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
abstract void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups); abstract void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups);
/** /**
* Fetch all file-groups stored for a partition-path * Fetch all file-groups stored for a partition-path.
* *
* @param partitionPath Partition path for which the file-groups needs to be retrieved. * @param partitionPath Partition path for which the file-groups needs to be retrieved.
* @return file-group stream * @return file-group stream
@@ -617,19 +617,19 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups(String partitionPath); abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups(String partitionPath);
/** /**
* Fetch all Stored file-groups across all partitions loaded * Fetch all Stored file-groups across all partitions loaded.
* *
* @return file-group stream * @return file-group stream
*/ */
abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups(); abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups();
/** /**
* Check if the view is already closed * Check if the view is already closed.
*/ */
abstract boolean isClosed(); abstract boolean isClosed();
/** /**
* Default implementation for fetching latest file-slice in commit range * Default implementation for fetching latest file-slice in commit range.
* *
* @param commitsToReturn Commits * @param commitsToReturn Commits
*/ */
@@ -639,7 +639,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Default implementation for fetching all file-slices for a partition-path * Default implementation for fetching all file-slices for a partition-path.
* *
* @param partitionPath Partition path * @param partitionPath Partition path
* @return file-slice stream * @return file-slice stream
@@ -650,7 +650,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Default implementation for fetching latest data-files for the partition-path * Default implementation for fetching latest data-files for the partition-path.
*/ */
Stream<HoodieDataFile> fetchLatestDataFiles(final String partitionPath) { Stream<HoodieDataFile> fetchLatestDataFiles(final String partitionPath) {
return fetchAllStoredFileGroups(partitionPath).map(this::getLatestDataFile).filter(Option::isPresent) return fetchAllStoredFileGroups(partitionPath).map(this::getLatestDataFile).filter(Option::isPresent)
@@ -663,14 +663,14 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Default implementation for fetching latest data-files across all partitions * Default implementation for fetching latest data-files across all partitions.
*/ */
Stream<HoodieDataFile> fetchLatestDataFiles() { Stream<HoodieDataFile> fetchLatestDataFiles() {
return fetchAllStoredFileGroups().map(this::getLatestDataFile).filter(Option::isPresent).map(Option::get); return fetchAllStoredFileGroups().map(this::getLatestDataFile).filter(Option::isPresent).map(Option::get);
} }
/** /**
* Default implementation for fetching all data-files for a partition * Default implementation for fetching all data-files for a partition.
* *
* @param partitionPath partition-path * @param partitionPath partition-path
*/ */
@@ -680,7 +680,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Default implementation for fetching file-group * Default implementation for fetching file-group.
*/ */
Option<HoodieFileGroup> fetchHoodieFileGroup(String partitionPath, String fileId) { Option<HoodieFileGroup> fetchHoodieFileGroup(String partitionPath, String fileId) {
return Option.fromJavaOptional(fetchAllStoredFileGroups(partitionPath) return Option.fromJavaOptional(fetchAllStoredFileGroups(partitionPath)
@@ -688,7 +688,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Default implementation for fetching latest file-slices for a partition path * Default implementation for fetching latest file-slices for a partition path.
*/ */
Stream<FileSlice> fetchLatestFileSlices(String partitionPath) { Stream<FileSlice> fetchLatestFileSlices(String partitionPath) {
return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlice).filter(Option::isPresent) return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlice).filter(Option::isPresent)
@@ -696,7 +696,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Default implementation for fetching latest file-slices for a partition path as of instant * Default implementation for fetching latest file-slices for a partition path as of instant.
* *
* @param partitionPath Partition Path * @param partitionPath Partition Path
* @param maxCommitTime Instant Time * @param maxCommitTime Instant Time
@@ -727,7 +727,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
/** /**
* If the file-slice is because of pending compaction instant, this method merges the file-slice with the one before * If the file-slice is because of pending compaction instant, this method merges the file-slice with the one before
* the compaction instant time * the compaction instant time.
* *
* @param fileGroup File Group for which the file slice belongs to * @param fileGroup File Group for which the file slice belongs to
* @param fileSlice File Slice which needs to be merged * @param fileSlice File Slice which needs to be merged
@@ -749,7 +749,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Default implementation for fetching latest data-file * Default implementation for fetching latest data-file.
* *
* @param partitionPath Partition path * @param partitionPath Partition path
* @param fileId File Id * @param fileId File Id
@@ -761,7 +761,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Default implementation for fetching file-slice * Default implementation for fetching file-slice.
* *
* @param partitionPath Partition path * @param partitionPath Partition path
* @param fileId File Id * @param fileId File Id
@@ -810,7 +810,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
} }
/** /**
* Return Only Commits and Compaction timeline for building file-groups * Return Only Commits and Compaction timeline for building file-groups.
* *
* @return * @return
*/ */

View File

@@ -82,7 +82,7 @@ public class FileSystemViewManager {
} }
/** /**
* Main API to get the file-system view for the base-path * Main API to get the file-system view for the base-path.
* *
* @param basePath * @param basePath
* @return * @return
@@ -92,7 +92,7 @@ public class FileSystemViewManager {
} }
/** /**
* Closes all views opened * Closes all views opened.
*/ */
public void close() { public void close() {
this.globalViewMap.values().stream().forEach(v -> v.close()); this.globalViewMap.values().stream().forEach(v -> v.close());
@@ -102,7 +102,7 @@ public class FileSystemViewManager {
// FACTORY METHODS FOR CREATING FILE-SYSTEM VIEWS // FACTORY METHODS FOR CREATING FILE-SYSTEM VIEWS
/** /**
* Create RocksDB based file System view for a dataset * Create RocksDB based file System view for a dataset.
* *
* @param conf Hadoop Configuration * @param conf Hadoop Configuration
* @param viewConf View Storage Configuration * @param viewConf View Storage Configuration
@@ -117,7 +117,7 @@ public class FileSystemViewManager {
} }
/** /**
* Create a spillable Map based file System view for a dataset * Create a spillable Map based file System view for a dataset.
* *
* @param conf Hadoop Configuration * @param conf Hadoop Configuration
* @param viewConf View Storage Configuration * @param viewConf View Storage Configuration
@@ -133,7 +133,7 @@ public class FileSystemViewManager {
} }
/** /**
* Create an in-memory file System view for a dataset * Create an in-memory file System view for a dataset.
* *
* @param conf Hadoop Configuration * @param conf Hadoop Configuration
* @param viewConf View Storage Configuration * @param viewConf View Storage Configuration
@@ -149,7 +149,7 @@ public class FileSystemViewManager {
} }
/** /**
* Create a remote file System view for a dataset * Create a remote file System view for a dataset.
* *
* @param conf Hadoop Configuration * @param conf Hadoop Configuration
* @param viewConf View Storage Configuration * @param viewConf View Storage Configuration
@@ -165,7 +165,7 @@ public class FileSystemViewManager {
} }
/** /**
* Main Factory method for building file-system views * Main Factory method for building file-system views.
* *
* @param conf Hadoop Configuration * @param conf Hadoop Configuration
* @param config View Storage Configuration * @param config View Storage Configuration

View File

@@ -28,7 +28,7 @@ import java.io.IOException;
import java.util.Properties; import java.util.Properties;
/** /**
* File System View Storage Configurations * File System View Storage Configurations.
*/ */
public class FileSystemViewStorageConfig extends DefaultHoodieConfig { public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
@@ -105,6 +105,9 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
return props.getProperty(ROCKSDB_BASE_PATH_PROP); return props.getProperty(ROCKSDB_BASE_PATH_PROP);
} }
/**
* The builder used to build {@link FileSystemViewStorageConfig}.
*/
public static class Builder { public static class Builder {
private final Properties props = new Properties(); private final Properties props = new Properties();

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.common.table.view; package org.apache.hudi.common.table.view;
/** /**
* Storage Type used to store/retrieve File system view of a table * Storage Type used to store/retrieve File system view of a table.
*/ */
public enum FileSystemViewStorageType { public enum FileSystemViewStorageType {
// In-memory storage of file-system view // In-memory storage of file-system view

View File

@@ -54,12 +54,12 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
protected Map<String, List<HoodieFileGroup>> partitionToFileGroupsMap; protected Map<String, List<HoodieFileGroup>> partitionToFileGroupsMap;
/** /**
* PartitionPath + File-Id to pending compaction instant time * PartitionPath + File-Id to pending compaction instant time.
*/ */
protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction; protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction;
/** /**
* Flag to determine if closed * Flag to determine if closed.
*/ */
private boolean closed = false; private boolean closed = false;
@@ -68,14 +68,14 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
} }
/** /**
* Create a file system view, as of the given timeline * Create a file system view, as of the given timeline.
*/ */
public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) {
this(metaClient, visibleActiveTimeline, false); this(metaClient, visibleActiveTimeline, false);
} }
/** /**
* Create a file system view, as of the given timeline * Create a file system view, as of the given timeline.
*/ */
public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
boolean enableIncrementalTimelineSync) { boolean enableIncrementalTimelineSync) {

View File

@@ -137,7 +137,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
} }
/** /**
* Remove Pending compaction instant * Remove Pending compaction instant.
* *
* @param timeline New Hoodie Timeline * @param timeline New Hoodie Timeline
* @param instant Compaction Instant to be removed * @param instant Compaction Instant to be removed
@@ -151,7 +151,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
} }
/** /**
* Add newly found compaction instant * Add newly found compaction instant.
* *
* @param timeline Hoodie Timeline * @param timeline Hoodie Timeline
* @param instant Compaction Instant * @param instant Compaction Instant
@@ -182,7 +182,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
} }
/** /**
* Add newly found commit/delta-commit instant * Add newly found commit/delta-commit instant.
* *
* @param timeline Hoodie Timeline * @param timeline Hoodie Timeline
* @param instant Instant * @param instant Instant
@@ -211,7 +211,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
} }
/** /**
* Add newly found restore instant * Add newly found restore instant.
* *
* @param timeline Hoodie Timeline * @param timeline Hoodie Timeline
* @param instant Restore Instant * @param instant Restore Instant
@@ -235,7 +235,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
} }
/** /**
* Add newly found rollback instant * Add newly found rollback instant.
* *
* @param timeline Hoodie Timeline * @param timeline Hoodie Timeline
* @param instant Rollback Instant * @param instant Rollback Instant
@@ -252,7 +252,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
} }
/** /**
* Add newly found clean instant * Add newly found clean instant.
* *
* @param timeline Timeline * @param timeline Timeline
* @param instant Clean instant * @param instant Clean instant
@@ -291,7 +291,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
} }
/** /**
* Apply mode whether to add or remove the delta view * Apply mode whether to add or remove the delta view.
*/ */
enum DeltaApplyMode { enum DeltaApplyMode {
ADD, REMOVE ADD, REMOVE

View File

@@ -54,7 +54,7 @@ import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* A proxy for table file-system view which translates local View API calls to REST calls to remote timeline service * A proxy for table file-system view which translates local View API calls to REST calls to remote timeline service.
*/ */
public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, Serializable { public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, Serializable {

View File

@@ -38,7 +38,7 @@ import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* Table FileSystemView implementation where view is stored in spillable disk using fixed memory * Table FileSystemView implementation where view is stored in spillable disk using fixed memory.
*/ */
public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {

View File

@@ -46,6 +46,9 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/**
* A utility class for avro.
*/
public class AvroUtils { public class AvroUtils {
private static final Integer DEFAULT_VERSION = 1; private static final Integer DEFAULT_VERSION = 1;

View File

@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* Helper class to generate compaction plan from FileGroup/FileSlice abstraction * Helper class to generate compaction plan from FileGroup/FileSlice abstraction.
*/ */
public class CompactionUtils { public class CompactionUtils {
@@ -55,7 +55,7 @@ public class CompactionUtils {
public static final Integer LATEST_COMPACTION_METADATA_VERSION = COMPACTION_METADATA_VERSION_2; public static final Integer LATEST_COMPACTION_METADATA_VERSION = COMPACTION_METADATA_VERSION_2;
/** /**
* Generate compaction operation from file-slice * Generate compaction operation from file-slice.
* *
* @param partitionPath Partition path * @param partitionPath Partition path
* @param fileSlice File Slice * @param fileSlice File Slice
@@ -80,7 +80,7 @@ public class CompactionUtils {
} }
/** /**
* Generate compaction plan from file-slices * Generate compaction plan from file-slices.
* *
* @param partitionFileSlicePairs list of partition file-slice pairs * @param partitionFileSlicePairs list of partition file-slice pairs
* @param extraMetadata Extra Metadata * @param extraMetadata Extra Metadata
@@ -100,7 +100,7 @@ public class CompactionUtils {
} }
/** /**
* Build Avro generated Compaction operation payload from compaction operation POJO for serialization * Build Avro generated Compaction operation payload from compaction operation POJO for serialization.
*/ */
public static HoodieCompactionOperation buildHoodieCompactionOperation(CompactionOperation op) { public static HoodieCompactionOperation buildHoodieCompactionOperation(CompactionOperation op) {
return HoodieCompactionOperation.newBuilder().setFileId(op.getFileId()).setBaseInstantTime(op.getBaseInstantTime()) return HoodieCompactionOperation.newBuilder().setFileId(op.getFileId()).setBaseInstantTime(op.getBaseInstantTime())
@@ -110,7 +110,7 @@ public class CompactionUtils {
} }
/** /**
* Build Compaction operation payload from Avro version for using in Spark executors * Build Compaction operation payload from Avro version for using in Spark executors.
* *
* @param hc HoodieCompactionOperation * @param hc HoodieCompactionOperation
*/ */
@@ -119,7 +119,7 @@ public class CompactionUtils {
} }
/** /**
* Get all pending compaction plans along with their instants * Get all pending compaction plans along with their instants.
* *
* @param metaClient Hoodie Meta Client * @param metaClient Hoodie Meta Client
*/ */
@@ -145,7 +145,7 @@ public class CompactionUtils {
} }
/** /**
* Get all PartitionPath + file-ids with pending Compaction operations and their target compaction instant time * Get all PartitionPath + file-ids with pending Compaction operations and their target compaction instant time.
* *
* @param metaClient Hoodie Table Meta Client * @param metaClient Hoodie Table Meta Client
*/ */
@@ -192,7 +192,7 @@ public class CompactionUtils {
} }
/** /**
* Return all pending compaction instant times * Return all pending compaction instant times.
* *
* @return * @return
*/ */

View File

@@ -25,19 +25,19 @@ import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
/** /**
* Ensures file create/delete operation is visible * Ensures file create/delete operation is visible.
*/ */
public interface ConsistencyGuard { public interface ConsistencyGuard {
/** /**
* File Visibility * File Visibility.
*/ */
enum FileVisibility { enum FileVisibility {
APPEAR, DISAPPEAR, APPEAR, DISAPPEAR,
} }
/** /**
* Wait for file to be listable based on configurable timeout * Wait for file to be listable based on configurable timeout.
* *
* @param filePath * @param filePath
* @throws IOException when having trouble listing the path * @throws IOException when having trouble listing the path
@@ -46,7 +46,7 @@ public interface ConsistencyGuard {
void waitTillFileAppears(Path filePath) throws IOException, TimeoutException; void waitTillFileAppears(Path filePath) throws IOException, TimeoutException;
/** /**
* Wait for file to be listable based on configurable timeout * Wait for file to be listable based on configurable timeout.
* *
* @param filePath * @param filePath
* @throws IOException when having trouble listing the path * @throws IOException when having trouble listing the path
@@ -55,17 +55,17 @@ public interface ConsistencyGuard {
void waitTillFileDisappears(Path filePath) throws IOException, TimeoutException; void waitTillFileDisappears(Path filePath) throws IOException, TimeoutException;
/** /**
* Wait till all passed files belonging to a directory shows up in the listing * Wait till all passed files belonging to a directory shows up in the listing.
*/ */
void waitTillAllFilesAppear(String dirPath, List<String> files) throws IOException, TimeoutException; void waitTillAllFilesAppear(String dirPath, List<String> files) throws IOException, TimeoutException;
/** /**
* Wait till all passed files belonging to a directory disappears from listing * Wait till all passed files belonging to a directory disappears from listing.
*/ */
void waitTillAllFilesDisappear(String dirPath, List<String> files) throws IOException, TimeoutException; void waitTillAllFilesDisappear(String dirPath, List<String> files) throws IOException, TimeoutException;
/** /**
* Wait Till target visibility is reached * Wait Till target visibility is reached.
* *
* @param dirPath Directory Path * @param dirPath Directory Path
* @param files Files * @param files Files

View File

@@ -25,6 +25,9 @@ import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.util.Properties; import java.util.Properties;
/**
* The consistency guard relevant config options.
*/
public class ConsistencyGuardConfig extends DefaultHoodieConfig { public class ConsistencyGuardConfig extends DefaultHoodieConfig {
private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled"; private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled";
@@ -67,6 +70,9 @@ public class ConsistencyGuardConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP)); return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
} }
/**
* The builder used to build consistency configurations.
*/
public static class Builder { public static class Builder {
private final Properties props = new Properties(); private final Properties props = new Properties();

View File

@@ -82,7 +82,7 @@ public class DFSPropertiesConfiguration {
} }
/** /**
* Add properties from input stream * Add properties from input stream.
* *
* @param reader Buffered Reader * @param reader Buffered Reader
* @throws IOException * @throws IOException

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.common.util; package org.apache.hudi.common.util;
/** /**
* Default implementation of size-estimator that uses Twitter's ObjectSizeCalculator * Default implementation of size-estimator that uses Twitter's ObjectSizeCalculator.
* *
* @param <T> * @param <T>
*/ */

View File

@@ -56,7 +56,7 @@ import java.util.regex.Pattern;
import java.util.stream.Stream; import java.util.stream.Stream;
/** /**
* Utility functions related to accessing the file storage * Utility functions related to accessing the file storage.
*/ */
public class FSUtils { public class FSUtils {
@@ -105,7 +105,7 @@ public class FSUtils {
} }
/** /**
* A write token uniquely identifies an attempt at one of the IOHandle operations (Merge/Create/Append) * A write token uniquely identifies an attempt at one of the IOHandle operations (Merge/Create/Append).
*/ */
public static String makeWriteToken(int taskPartitionId, int stageId, long taskAttemptId) { public static String makeWriteToken(int taskPartitionId, int stageId, long taskAttemptId) {
return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId); return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
@@ -168,7 +168,7 @@ public class FSUtils {
} }
/** /**
* Given a base partition and a partition path, return relative path of partition path to the base path * Given a base partition and a partition path, return relative path of partition path to the base path.
*/ */
public static String getRelativePartitionPath(Path basePath, Path partitionPath) { public static String getRelativePartitionPath(Path basePath, Path partitionPath) {
basePath = Path.getPathWithoutSchemeAndAuthority(basePath); basePath = Path.getPathWithoutSchemeAndAuthority(basePath);
@@ -183,7 +183,7 @@ public class FSUtils {
/** /**
* Obtain all the partition paths, that are present in this table, denoted by presence of * Obtain all the partition paths, that are present in this table, denoted by presence of
* {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE} * {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}.
*/ */
public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException { public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException {
final Path basePath = new Path(basePathStr); final Path basePath = new Path(basePathStr);
@@ -284,7 +284,7 @@ public class FSUtils {
} }
/** /**
* Get the file extension from the log file * Get the file extension from the log file.
*/ */
public static String getFileExtensionFromLog(Path logPath) { public static String getFileExtensionFromLog(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
@@ -329,7 +329,7 @@ public class FSUtils {
} }
/** /**
* Get TaskId used in log-path * Get TaskPartitionId used in log-path.
*/ */
public static Integer getTaskPartitionIdFromLogPath(Path path) { public static Integer getTaskPartitionIdFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
@@ -341,7 +341,7 @@ public class FSUtils {
} }
/** /**
* Get Write-Token used in log-path * Get Write-Token used in log-path.
*/ */
public static String getWriteTokenFromLogPath(Path path) { public static String getWriteTokenFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
@@ -352,7 +352,7 @@ public class FSUtils {
} }
/** /**
* Get StageId used in log-path * Get StageId used in log-path.
*/ */
public static Integer getStageIdFromLogPath(Path path) { public static Integer getStageIdFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
@@ -364,7 +364,7 @@ public class FSUtils {
} }
/** /**
* Get Task Attempt Id used in log-path * Get Task Attempt Id used in log-path.
*/ */
public static Integer getTaskAttemptIdFromLogPath(Path path) { public static Integer getTaskAttemptIdFromLogPath(Path path) {
Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
@@ -403,14 +403,14 @@ public class FSUtils {
} }
/** /**
* Get the latest log file written from the list of log files passed in * Get the latest log file written from the list of log files passed in.
*/ */
public static Option<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) { public static Option<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) {
return Option.fromJavaOptional(logFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).findFirst()); return Option.fromJavaOptional(logFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).findFirst());
} }
/** /**
* Get all the log files for the passed in FileId in the partition path * Get all the log files for the passed in FileId in the partition path.
*/ */
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId, public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId,
final String logFileExtension, final String baseCommitTime) throws IOException { final String logFileExtension, final String baseCommitTime) throws IOException {
@@ -421,7 +421,7 @@ public class FSUtils {
} }
/** /**
* Get the latest log version for the fileId in the partition path * Get the latest log version for the fileId in the partition path.
*/ */
public static Option<Pair<Integer, String>> getLatestLogVersion(FileSystem fs, Path partitionPath, public static Option<Pair<Integer, String>> getLatestLogVersion(FileSystem fs, Path partitionPath,
final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
@@ -435,7 +435,7 @@ public class FSUtils {
} }
/** /**
* computes the next log version for the specified fileId in the partition path * computes the next log version for the specified fileId in the partition path.
*/ */
public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId, public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId,
final String logFileExtension, final String baseCommitTime) throws IOException { final String logFileExtension, final String baseCommitTime) throws IOException {

View File

@@ -35,7 +35,7 @@ import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* A consistency checker that fails if it is unable to meet the required condition within a specified timeout * A consistency checker that fails if it is unable to meet the required condition within a specified timeout.
*/ */
public class FailSafeConsistencyGuard implements ConsistencyGuard { public class FailSafeConsistencyGuard implements ConsistencyGuard {
@@ -71,7 +71,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
} }
/** /**
* Helper function to wait for all files belonging to single directory to appear * Helper function to wait for all files belonging to single directory to appear.
* *
* @param dirPath Dir Path * @param dirPath Dir Path
* @param files Files to appear/disappear * @param files Files to appear/disappear
@@ -111,7 +111,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
} }
/** /**
* Helper to check of file visibility * Helper to check of file visibility.
* *
* @param filePath File Path * @param filePath File Path
* @param visibility Visibility * @param visibility Visibility
@@ -140,7 +140,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
} }
/** /**
* Helper function to wait till file either appears/disappears * Helper function to wait till file either appears/disappears.
* *
* @param filePath File Path * @param filePath File Path
* @param visibility * @param visibility
@@ -167,7 +167,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
} }
/** /**
* Retries the predicate for condfigurable number of times till we the predicate returns success * Retries the predicate for condfigurable number of times till we the predicate returns success.
* *
* @param predicate Predicate Function * @param predicate Predicate Function
* @param timedOutMessage Timed-Out message for logging * @param timedOutMessage Timed-Out message for logging

View File

@@ -31,7 +31,7 @@ import java.nio.file.Path;
import java.util.Comparator; import java.util.Comparator;
/** /**
* Bunch of utility methods for working with files and byte streams * Bunch of utility methods for working with files and byte streams.
*/ */
public class FileIOUtils { public class FileIOUtils {

View File

@@ -20,20 +20,35 @@ package org.apache.hudi.common.util;
import java.io.Serializable; import java.io.Serializable;
/**
* An interface contains a set of functions.
*/
public interface Functions { public interface Functions {
/**
* A function which has not any parameter.
*/
public interface Function0<R> extends Serializable { public interface Function0<R> extends Serializable {
R apply(); R apply();
} }
/**
* A function which contains only one parameter.
*/
public interface Function1<T1, R> extends Serializable { public interface Function1<T1, R> extends Serializable {
R apply(T1 val1); R apply(T1 val1);
} }
/**
* A function which contains two parameters.
*/
public interface Function2<T1, T2, R> extends Serializable { public interface Function2<T1, T2, R> extends Serializable {
R apply(T1 val1, T2 val2); R apply(T1 val1, T2 val2);
} }
/**
* A function which contains three parameters.
*/
public interface Function3<T1, T2, T3, R> extends Serializable { public interface Function3<T1, T2, T3, R> extends Serializable {
R apply(T1 val1, T2 val2, T3 val3); R apply(T1 val1, T2 val2, T3 val3);
} }

View File

@@ -64,7 +64,7 @@ public class HoodieAvroUtils {
private static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema(); private static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema();
/** /**
* Convert a given avro record to bytes * Convert a given avro record to bytes.
*/ */
public static byte[] avroToBytes(GenericRecord record) throws IOException { public static byte[] avroToBytes(GenericRecord record) throws IOException {
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema()); GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
@@ -78,7 +78,7 @@ public class HoodieAvroUtils {
} }
/** /**
* Convert serialized bytes back into avro record * Convert serialized bytes back into avro record.
*/ */
public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOException { public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOException {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, reuseDecoder.get()); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, reuseDecoder.get());
@@ -96,7 +96,7 @@ public class HoodieAvroUtils {
} }
/** /**
* Adds the Hoodie metadata fields to the given schema * Adds the Hoodie metadata fields to the given schema.
*/ */
public static Schema addMetadataFields(Schema schema) { public static Schema addMetadataFields(Schema schema) {
List<Schema.Field> parentFields = new ArrayList<>(); List<Schema.Field> parentFields = new ArrayList<>();
@@ -187,7 +187,7 @@ public class HoodieAvroUtils {
/** /**
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old
* schema * schema.
*/ */
public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) { public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) {
return rewrite(record, record.getSchema(), newSchema); return rewrite(record, record.getSchema(), newSchema);
@@ -195,7 +195,7 @@ public class HoodieAvroUtils {
/** /**
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new
* schema * schema.
*/ */
public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) { public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
return rewrite(record, newSchema, newSchema); return rewrite(record, newSchema, newSchema);

View File

@@ -27,7 +27,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
/** /**
* Size Estimator for Hoodie record payload * Size Estimator for Hoodie record payload.
* *
* @param <T> * @param <T>
*/ */

View File

@@ -38,7 +38,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Utils class for performing various log file reading operations * Utils class for performing various log file reading operations.
*/ */
public class LogReaderUtils { public class LogReaderUtils {

View File

@@ -23,6 +23,9 @@ import org.apache.hudi.exception.HoodieException;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
/**
* A utility class for network.
*/
public class NetworkUtils { public class NetworkUtils {
public static synchronized String getHostname() { public static synchronized String getHostname() {

View File

@@ -18,6 +18,9 @@
package org.apache.hudi.common.util; package org.apache.hudi.common.util;
/**
* A utility class for numeric.
*/
public class NumericUtils { public class NumericUtils {
public static String humanReadableByteCount(double bytes) { public static String humanReadableByteCount(double bytes) {

View File

@@ -39,14 +39,14 @@ public final class Option<T> implements Serializable {
private final T val; private final T val;
/** /**
* Convert to java Optional * Convert to java Optional.
*/ */
public Optional<T> toJavaOptional() { public Optional<T> toJavaOptional() {
return Optional.ofNullable(val); return Optional.ofNullable(val);
} }
/** /**
* Convert from java.util.Optional * Convert from java.util.Optional.
* *
* @param v java.util.Optional object * @param v java.util.Optional object
* @param <T> type of the value stored in java.util.Optional object * @param <T> type of the value stored in java.util.Optional object

View File

@@ -31,6 +31,9 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.stream.Stream; import java.util.stream.Stream;
/**
* A utility class for reflection.
*/
public class ReflectionUtils { public class ReflectionUtils {
private static Map<String, Class<?>> clazzCache = new HashMap<>(); private static Map<String, Class<?>> clazzCache = new HashMap<>();
@@ -56,7 +59,7 @@ public class ReflectionUtils {
} }
/** /**
* Instantiate a given class with a generic record payload * Instantiate a given class with a generic record payload.
*/ */
public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass, Object[] payloadArgs, public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass, Object[] payloadArgs,
Class<?>... constructorArgTypes) { Class<?>... constructorArgTypes) {
@@ -87,7 +90,7 @@ public class ReflectionUtils {
} }
/** /**
* Return stream of top level class names in the same class path as passed-in class * Return stream of top level class names in the same class path as passed-in class.
* *
* @param clazz * @param clazz
*/ */

View File

@@ -82,7 +82,7 @@ public class RocksDBDAO {
} }
/** /**
* Initialized Rocks DB instance * Initialized Rocks DB instance.
*/ */
private void init() throws HoodieException { private void init() throws HoodieException {
try { try {
@@ -127,7 +127,7 @@ public class RocksDBDAO {
} }
/** /**
* Helper to load managed column family descriptors * Helper to load managed column family descriptors.
*/ */
private List<ColumnFamilyDescriptor> loadManagedColumnFamilies(DBOptions dbOptions) throws RocksDBException { private List<ColumnFamilyDescriptor> loadManagedColumnFamilies(DBOptions dbOptions) throws RocksDBException {
final List<ColumnFamilyDescriptor> managedColumnFamilies = new ArrayList<>(); final List<ColumnFamilyDescriptor> managedColumnFamilies = new ArrayList<>();
@@ -150,7 +150,7 @@ public class RocksDBDAO {
} }
/** /**
* Perform a batch write operation * Perform a batch write operation.
*/ */
public void writeBatch(BatchHandler handler) { public void writeBatch(BatchHandler handler) {
WriteBatch batch = new WriteBatch(); WriteBatch batch = new WriteBatch();
@@ -165,7 +165,7 @@ public class RocksDBDAO {
} }
/** /**
* Helper to add put operation in batch * Helper to add put operation in batch.
* *
* @param batch Batch Handle * @param batch Batch Handle
* @param columnFamilyName Column Family * @param columnFamilyName Column Family
@@ -183,7 +183,7 @@ public class RocksDBDAO {
} }
/** /**
* Helper to add put operation in batch * Helper to add put operation in batch.
* *
* @param batch Batch Handle * @param batch Batch Handle
* @param columnFamilyName Column Family * @param columnFamilyName Column Family
@@ -203,7 +203,7 @@ public class RocksDBDAO {
} }
/** /**
* Perform single PUT on a column-family * Perform single PUT on a column-family.
* *
* @param columnFamilyName Column family name * @param columnFamilyName Column family name
* @param key Key * @param key Key
@@ -220,7 +220,7 @@ public class RocksDBDAO {
} }
/** /**
* Perform single PUT on a column-family * Perform single PUT on a column-family.
* *
* @param columnFamilyName Column family name * @param columnFamilyName Column family name
* @param key Key * @param key Key
@@ -237,7 +237,7 @@ public class RocksDBDAO {
} }
/** /**
* Helper to add delete operation in batch * Helper to add delete operation in batch.
* *
* @param batch Batch Handle * @param batch Batch Handle
* @param columnFamilyName Column Family * @param columnFamilyName Column Family
@@ -252,7 +252,7 @@ public class RocksDBDAO {
} }
/** /**
* Helper to add delete operation in batch * Helper to add delete operation in batch.
* *
* @param batch Batch Handle * @param batch Batch Handle
* @param columnFamilyName Column Family * @param columnFamilyName Column Family
@@ -267,7 +267,7 @@ public class RocksDBDAO {
} }
/** /**
* Perform a single Delete operation * Perform a single Delete operation.
* *
* @param columnFamilyName Column Family name * @param columnFamilyName Column Family name
* @param key Key to be deleted * @param key Key to be deleted
@@ -281,7 +281,7 @@ public class RocksDBDAO {
} }
/** /**
* Perform a single Delete operation * Perform a single Delete operation.
* *
* @param columnFamilyName Column Family name * @param columnFamilyName Column Family name
* @param key Key to be deleted * @param key Key to be deleted
@@ -295,7 +295,7 @@ public class RocksDBDAO {
} }
/** /**
* Retrieve a value for a given key in a column family * Retrieve a value for a given key in a column family.
* *
* @param columnFamilyName Column Family Name * @param columnFamilyName Column Family Name
* @param key Key to be retrieved * @param key Key to be retrieved
@@ -312,7 +312,7 @@ public class RocksDBDAO {
} }
/** /**
* Retrieve a value for a given key in a column family * Retrieve a value for a given key in a column family.
* *
* @param columnFamilyName Column Family Name * @param columnFamilyName Column Family Name
* @param key Key to be retrieved * @param key Key to be retrieved
@@ -329,7 +329,7 @@ public class RocksDBDAO {
} }
/** /**
* Perform a prefix search and return stream of key-value pairs retrieved * Perform a prefix search and return stream of key-value pairs retrieved.
* *
* @param columnFamilyName Column Family Name * @param columnFamilyName Column Family Name
* @param prefix Prefix Key * @param prefix Prefix Key
@@ -358,7 +358,7 @@ public class RocksDBDAO {
} }
/** /**
* Perform a prefix delete and return stream of key-value pairs retrieved * Perform a prefix delete and return stream of key-value pairs retrieved.
* *
* @param columnFamilyName Column Family Name * @param columnFamilyName Column Family Name
* @param prefix Prefix Key * @param prefix Prefix Key
@@ -396,7 +396,7 @@ public class RocksDBDAO {
} }
/** /**
* Add a new column family to store * Add a new column family to store.
* *
* @param columnFamilyName Column family name * @param columnFamilyName Column family name
*/ */
@@ -416,7 +416,7 @@ public class RocksDBDAO {
} }
/** /**
* Note : Does not delete from underlying DB. Just closes the handle * Note : Does not delete from underlying DB. Just closes the handle.
* *
* @param columnFamilyName Column Family Name * @param columnFamilyName Column Family Name
*/ */
@@ -437,7 +437,7 @@ public class RocksDBDAO {
} }
/** /**
* Close the DAO object * Close the DAO object.
*/ */
public synchronized void close() { public synchronized void close() {
if (!closed) { if (!closed) {
@@ -462,7 +462,7 @@ public class RocksDBDAO {
} }
/** /**
* Functional interface for stacking operation to Write batch * Functional interface for stacking operation to Write batch.
*/ */
public interface BatchHandler { public interface BatchHandler {

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.common.util; package org.apache.hudi.common.util;
/** /**
* An interface to estimate the size of payload in memory * An interface to estimate the size of payload in memory.
* *
* @param <T> * @param <T>
*/ */

View File

@@ -31,10 +31,13 @@ import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.zip.CRC32; import java.util.zip.CRC32;
/**
* A utility class supports spillable map.
*/
public class SpillableMapUtils { public class SpillableMapUtils {
/** /**
* Using the schema and payload class, read and convert the bytes on disk to a HoodieRecord * Using the schema and payload class, read and convert the bytes on disk to a HoodieRecord.
*/ */
public static byte[] readBytesFromDisk(RandomAccessFile file, long valuePosition, int valueLength) public static byte[] readBytesFromDisk(RandomAccessFile file, long valuePosition, int valueLength)
throws IOException { throws IOException {
@@ -43,7 +46,8 @@ public class SpillableMapUtils {
} }
/** /**
* |crc|timestamp|sizeOfKey|SizeOfValue|key|value| * Reads the given file with specific pattern(|crc|timestamp|sizeOfKey|SizeOfValue|key|value|) then
* returns an instance of {@link FileEntry}.
*/ */
private static FileEntry readInternal(RandomAccessFile file, long valuePosition, int valueLength) throws IOException { private static FileEntry readInternal(RandomAccessFile file, long valuePosition, int valueLength) throws IOException {
file.seek(valuePosition); file.seek(valuePosition);
@@ -86,7 +90,7 @@ public class SpillableMapUtils {
} }
/** /**
* Generate a checksum for a given set of bytes * Generate a checksum for a given set of bytes.
*/ */
public static long generateChecksum(byte[] data) { public static long generateChecksum(byte[] data) {
CRC32 crc = new CRC32(); CRC32 crc = new CRC32();
@@ -96,14 +100,14 @@ public class SpillableMapUtils {
/** /**
* Compute a bytes representation of the payload by serializing the contents This is used to estimate the size of the * Compute a bytes representation of the payload by serializing the contents This is used to estimate the size of the
* payload (either in memory or when written to disk) * payload (either in memory or when written to disk).
*/ */
public static <R> long computePayloadSize(R value, SizeEstimator<R> valueSizeEstimator) throws IOException { public static <R> long computePayloadSize(R value, SizeEstimator<R> valueSizeEstimator) throws IOException {
return valueSizeEstimator.sizeEstimate(value); return valueSizeEstimator.sizeEstimate(value);
} }
/** /**
* Utility method to convert bytes to HoodieRecord using schema and payload class * Utility method to convert bytes to HoodieRecord using schema and payload class.
*/ */
public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz) { public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz) {
String recKey = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); String recKey = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -114,7 +118,7 @@ public class SpillableMapUtils {
} }
/** /**
* Utility method to convert bytes to HoodieRecord using schema and payload class * Utility method to convert bytes to HoodieRecord using schema and payload class.
*/ */
public static <R> R generateEmptyPayload(String recKey, String partitionPath, String payloadClazz) { public static <R> R generateEmptyPayload(String recKey, String partitionPath, String payloadClazz) {
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath), HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieRecord<>(new HoodieKey(recKey, partitionPath),

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.common.util; package org.apache.hudi.common.util;
/** /**
* Simple utility for operations on strings * Simple utility for operations on strings.
*/ */
public class StringUtils { public class StringUtils {

View File

@@ -31,6 +31,9 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/**
* A helper class used to diff timeline.
*/
public class TimelineDiffHelper { public class TimelineDiffHelper {
protected static Logger log = LogManager.getLogger(TimelineDiffHelper.class); protected static Logger log = LogManager.getLogger(TimelineDiffHelper.class);
@@ -97,6 +100,9 @@ public class TimelineDiffHelper {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
/**
* A diff result of timeline.
*/
public static class TimelineDiffResult { public static class TimelineDiffResult {
private final List<HoodieInstant> newlySeenInstants; private final List<HoodieInstant> newlySeenInstants;

View File

@@ -25,7 +25,7 @@ import java.util.Properties;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Type-aware extension of {@link java.util.Properties} * Type-aware extension of {@link java.util.Properties}.
*/ */
public class TypedProperties extends Properties implements Serializable { public class TypedProperties extends Properties implements Serializable {

View File

@@ -121,7 +121,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
/** /**
* Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache * Register shutdown hook to force flush contents of the data written to FileOutputStream from OS page cache
* (typically 4 KB) to disk * (typically 4 KB) to disk.
*/ */
private void addShutDownHook() { private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -153,7 +153,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
} }
/** /**
* Custom iterator to iterate over values written to disk * Custom iterator to iterate over values written to disk.
*/ */
@Override @Override
public Iterator<R> iterator() { public Iterator<R> iterator() {
@@ -161,7 +161,7 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
} }
/** /**
* Number of bytes spilled to disk * Number of bytes spilled to disk.
*/ */
public long sizeOfFileOnDiskInBytes() { public long sizeOfFileOnDiskInBytes() {
return filePosition.get(); return filePosition.get();
@@ -272,6 +272,9 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
return entrySet; return entrySet;
} }
/**
* The file metadata that should be spilled to disk.
*/
public static final class FileEntry { public static final class FileEntry {
// Checksum of the value written to disk, compared during every readFromDisk to make sure no corruption // Checksum of the value written to disk, compared during every readFromDisk to make sure no corruption
@@ -321,6 +324,9 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
} }
} }
/**
* The value relevant metadata.
*/
public static final class ValueMetadata implements Comparable<ValueMetadata> { public static final class ValueMetadata implements Comparable<ValueMetadata> {
// FilePath to store the spilled data // FilePath to store the spilled data

View File

@@ -106,35 +106,35 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
} }
/** /**
* A custom iterator to wrap over iterating in-memory + disk spilled data * A custom iterator to wrap over iterating in-memory + disk spilled data.
*/ */
public Iterator<R> iterator() { public Iterator<R> iterator() {
return new IteratorWrapper<>(inMemoryMap.values().iterator(), getDiskBasedMap().iterator()); return new IteratorWrapper<>(inMemoryMap.values().iterator(), getDiskBasedMap().iterator());
} }
/** /**
* Number of entries in DiskBasedMap * Number of entries in DiskBasedMap.
*/ */
public int getDiskBasedMapNumEntries() { public int getDiskBasedMapNumEntries() {
return getDiskBasedMap().size(); return getDiskBasedMap().size();
} }
/** /**
* Number of bytes spilled to disk * Number of bytes spilled to disk.
*/ */
public long getSizeOfFileOnDiskInBytes() { public long getSizeOfFileOnDiskInBytes() {
return getDiskBasedMap().sizeOfFileOnDiskInBytes(); return getDiskBasedMap().sizeOfFileOnDiskInBytes();
} }
/** /**
* Number of entries in InMemoryMap * Number of entries in InMemoryMap.
*/ */
public int getInMemoryMapNumEntries() { public int getInMemoryMapNumEntries() {
return inMemoryMap.size(); return inMemoryMap.size();
} }
/** /**
* Approximate memory footprint of the in-memory map * Approximate memory footprint of the in-memory map.
*/ */
public long getCurrentInMemoryMapSize() { public long getCurrentInMemoryMapSize() {
return currentInMemoryMapSize; return currentInMemoryMapSize;
@@ -257,7 +257,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
/** /**
* Iterator that wraps iterating over all the values for this map 1) inMemoryIterator - Iterates over all the data * Iterator that wraps iterating over all the values for this map 1) inMemoryIterator - Iterates over all the data
* in-memory map 2) diskLazyFileIterator - Iterates over all the data spilled to disk * in-memory map 2) diskLazyFileIterator - Iterates over all the data spilled to disk.
*/ */
private class IteratorWrapper<R> implements Iterator<R> { private class IteratorWrapper<R> implements Iterator<R> {

View File

@@ -40,16 +40,17 @@ package org.apache.hudi.common.util.collection;
public final class ImmutablePair<L, R> extends Pair<L, R> { public final class ImmutablePair<L, R> extends Pair<L, R> {
/** /**
* Serialization version * Serialization version.
*/ */
private static final long serialVersionUID = 4954918890077093841L; private static final long serialVersionUID = 4954918890077093841L;
/** /**
* Left object * Left object.
*/ */
public final L left; public final L left;
/** /**
* Right object * Right object.
*/ */
public final R right; public final R right;

View File

@@ -41,20 +41,22 @@ package org.apache.hudi.common.util.collection;
public final class ImmutableTriple<L, M, R> extends Triple<L, M, R> { public final class ImmutableTriple<L, M, R> extends Triple<L, M, R> {
/** /**
* Serialization version * Serialization version.
*/ */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
/** /**
* Left object * Left object.
*/ */
public final L left; public final L left;
/** /**
* Middle object * Middle object.
*/ */
public final M middle; public final M middle;
/** /**
* Right object * Right object.
*/ */
public final R right; public final R right;

View File

@@ -44,7 +44,7 @@ import java.util.Map;
public abstract class Pair<L, R> implements Map.Entry<L, R>, Comparable<Pair<L, R>>, Serializable { public abstract class Pair<L, R> implements Map.Entry<L, R>, Comparable<Pair<L, R>>, Serializable {
/** /**
* Serialization version * Serialization version.
*/ */
private static final long serialVersionUID = 4954918890077093841L; private static final long serialVersionUID = 4954918890077093841L;

View File

@@ -27,6 +27,9 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
/**
* A map's implementation based on RocksDB.
*/
public final class RocksDBBasedMap<K extends Serializable, R extends Serializable> implements Map<K, R> { public final class RocksDBBasedMap<K extends Serializable, R extends Serializable> implements Map<K, R> {
private static final String COL_FAMILY_NAME = "map_handle"; private static final String COL_FAMILY_NAME = "map_handle";

View File

@@ -44,7 +44,7 @@ import java.io.Serializable;
public abstract class Triple<L, M, R> implements Comparable<Triple<L, M, R>>, Serializable { public abstract class Triple<L, M, R> implements Comparable<Triple<L, M, R>>, Serializable {
/** /**
* Serialization version * Serialization version.
*/ */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@@ -24,7 +24,7 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
* Wrapper for DataOutpuStream to keep track of number of bytes written * Wrapper for DataOutpuStream to keep track of number of bytes written.
*/ */
public class SizeAwareDataOutputStream { public class SizeAwareDataOutputStream {

View File

@@ -78,7 +78,7 @@ public class BoundedInMemoryExecutor<I, O, E> {
} }
/** /**
* Start all Producers * Start all Producers.
*/ */
public ExecutorCompletionService<Boolean> startProducers() { public ExecutorCompletionService<Boolean> startProducers() {
// Latch to control when and which producer thread will close the queue // Latch to control when and which producer thread will close the queue
@@ -110,7 +110,7 @@ public class BoundedInMemoryExecutor<I, O, E> {
} }
/** /**
* Start only consumer * Start only consumer.
*/ */
private Future<E> startConsumer() { private Future<E> startConsumer() {
return consumer.map(consumer -> { return consumer.map(consumer -> {
@@ -131,7 +131,7 @@ public class BoundedInMemoryExecutor<I, O, E> {
} }
/** /**
* Main API to run both production and consumption * Main API to run both production and consumption.
*/ */
public E execute() { public E execute() {
try { try {

View File

@@ -95,7 +95,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
private long numSamples = 0; private long numSamples = 0;
/** /**
* Construct BoundedInMemoryQueue with default SizeEstimator * Construct BoundedInMemoryQueue with default SizeEstimator.
* *
* @param memoryLimit MemoryLimit in bytes * @param memoryLimit MemoryLimit in bytes
* @param transformFunction Transformer Function to convert input payload type to stored payload type * @param transformFunction Transformer Function to convert input payload type to stored payload type
@@ -105,7 +105,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
} }
/** /**
* Construct BoundedInMemoryQueue with passed in size estimator * Construct BoundedInMemoryQueue with passed in size estimator.
* *
* @param memoryLimit MemoryLimit in bytes * @param memoryLimit MemoryLimit in bytes
* @param transformFunction Transformer Function to convert input payload type to stored payload type * @param transformFunction Transformer Function to convert input payload type to stored payload type
@@ -155,7 +155,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
} }
/** /**
* Inserts record into queue after applying transformation * Inserts record into queue after applying transformation.
* *
* @param t Item to be queueed * @param t Item to be queueed
*/ */
@@ -178,7 +178,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
} }
/** /**
* Checks if records are either available in the queue or expected to be written in future * Checks if records are either available in the queue or expected to be written in future.
*/ */
private boolean expectMoreRecords() { private boolean expectMoreRecords() {
return !isWriteDone.get() || (isWriteDone.get() && !queue.isEmpty()); return !isWriteDone.get() || (isWriteDone.get() && !queue.isEmpty());
@@ -220,7 +220,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
} }
/** /**
* Puts an empty entry to queue to denote termination * Puts an empty entry to queue to denote termination.
*/ */
public void close() throws InterruptedException { public void close() throws InterruptedException {
// done queueing records notifying queue-reader. // done queueing records notifying queue-reader.
@@ -234,7 +234,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
} }
/** /**
* API to allow producers and consumer to communicate termination due to failure * API to allow producers and consumer to communicate termination due to failure.
*/ */
public void markAsFailed(Exception e) { public void markAsFailed(Exception e) {
this.hasFailed.set(e); this.hasFailed.set(e);
@@ -249,7 +249,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
} }
/** /**
* Iterator for the memory bounded queue * Iterator for the memory bounded queue.
*/ */
private final class QueueIterator implements Iterator<O> { private final class QueueIterator implements Iterator<O> {

View File

@@ -21,12 +21,12 @@ package org.apache.hudi.common.util.queue;
import java.util.Iterator; import java.util.Iterator;
/** /**
* Consume entries from queue and execute callback function * Consume entries from queue and execute callback function.
*/ */
public abstract class BoundedInMemoryQueueConsumer<I, O> { public abstract class BoundedInMemoryQueueConsumer<I, O> {
/** /**
* API to de-queue entries to memory bounded queue * API to de-queue entries to memory bounded queue.
* *
* @param queue In Memory bounded queue * @param queue In Memory bounded queue
*/ */
@@ -44,17 +44,17 @@ public abstract class BoundedInMemoryQueueConsumer<I, O> {
} }
/** /**
* Consumer One record * Consumer One record.
*/ */
protected abstract void consumeOneRecord(I record); protected abstract void consumeOneRecord(I record);
/** /**
* Notifies implementation that we have exhausted consuming records from queue * Notifies implementation that we have exhausted consuming records from queue.
*/ */
protected abstract void finish(); protected abstract void finish();
/** /**
* Return result of consuming records so far * Return result of consuming records so far.
*/ */
protected abstract O getResult(); protected abstract O getResult();

View File

@@ -26,7 +26,7 @@ package org.apache.hudi.common.util.queue;
public interface BoundedInMemoryQueueProducer<I> { public interface BoundedInMemoryQueueProducer<I> {
/** /**
* API to enqueue entries to memory bounded queue * API to enqueue entries to memory bounded queue.
* *
* @param queue In Memory bounded queue * @param queue In Memory bounded queue
*/ */

View File

@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import java.util.Iterator; import java.util.Iterator;
/** /**
* Iterator based producer which pulls entry from iterator and produces items for the queue * Iterator based producer which pulls entry from iterator and produces items for the queue.
* *
* @param <I> Item type produced for the buffer. * @param <I> Item type produced for the buffer.
*/ */

View File

@@ -20,6 +20,11 @@ package org.apache.hudi.common.versioning;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
/**
* The basic abstraction of migrator.
*
* @param <T> Metadata Type.
*/
public abstract class AbstractMigratorBase<T> implements VersionMigrator<T> { public abstract class AbstractMigratorBase<T> implements VersionMigrator<T> {
protected final HoodieTableMetaClient metaClient; protected final HoodieTableMetaClient metaClient;

View File

@@ -28,7 +28,7 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Migrates a specific metadata type stored in .hoodie folder to latest version * Migrates a specific metadata type stored in .hoodie folder to latest version.
* *
* @param <T> * @param <T>
*/ */
@@ -46,7 +46,7 @@ public class MetadataMigrator<T> {
} }
/** /**
* Upgrade Metadata version to its latest * Upgrade Metadata version to its latest.
* *
* @param metadata Metadata * @param metadata Metadata
* @param metadataVersion Current version of metadata * @param metadataVersion Current version of metadata
@@ -67,7 +67,7 @@ public class MetadataMigrator<T> {
} }
/** /**
* Migrate metadata to a specific version * Migrate metadata to a specific version.
* *
* @param metadata Hoodie Table Meta Client * @param metadata Hoodie Table Meta Client
* @param metadataVersion Metadata Version * @param metadataVersion Metadata Version

View File

@@ -21,21 +21,21 @@ package org.apache.hudi.common.versioning;
import java.io.Serializable; import java.io.Serializable;
/** /**
* Responsible for upgrading and downgrading metadata versions for a specific metadata * Responsible for upgrading and downgrading metadata versions for a specific metadata.
* *
* @param <T> Metadata Type * @param <T> Metadata Type
*/ */
public interface VersionMigrator<T> extends Serializable { public interface VersionMigrator<T> extends Serializable {
/** /**
* Version of Metadata that this class will handle * Version of Metadata that this class will handle.
* *
* @return * @return
*/ */
Integer getManagedVersion(); Integer getManagedVersion();
/** /**
* Upgrades metadata of type T from previous version to this version * Upgrades metadata of type T from previous version to this version.
* *
* @param input Metadata as of previous version. * @param input Metadata as of previous version.
* @return Metadata compatible with the version managed by this class * @return Metadata compatible with the version managed by this class
@@ -43,7 +43,7 @@ public interface VersionMigrator<T> extends Serializable {
T upgradeFrom(T input); T upgradeFrom(T input);
/** /**
* Downgrades metadata of type T from next version to this version * Downgrades metadata of type T from next version to this version.
* *
* @param input Metadata as of next highest version * @param input Metadata as of next highest version
* @return Metadata compatible with the version managed by this class * @return Metadata compatible with the version managed by this class

View File

@@ -25,7 +25,7 @@ import org.apache.hudi.common.versioning.MetadataMigrator;
import java.util.Arrays; import java.util.Arrays;
/** /**
* Responsible for handling different versions of compaction plan * Responsible for handling different versions of compaction plan.
*/ */
public class CompactionPlanMigrator extends MetadataMigrator<HoodieCompactionPlan> { public class CompactionPlanMigrator extends MetadataMigrator<HoodieCompactionPlan> {

Some files were not shown because too many files have changed in this diff Show More