[HUDI-629]: Replace Guava's Hashing with an equivalent in NumericUtils.java (#1350)
* [HUDI-629]: Replace Guava's Hashing with an equivalent in NumericUtils.java
This commit is contained in:
@@ -18,7 +18,7 @@
|
||||
|
||||
package org.apache.hudi.common.model;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
@@ -37,8 +37,8 @@ public class TimelineLayoutVersion implements Serializable, Comparable<TimelineL
|
||||
private Integer version;
|
||||
|
||||
public TimelineLayoutVersion(Integer version) {
|
||||
Preconditions.checkArgument(version <= CURR_VERSION);
|
||||
Preconditions.checkArgument(version >= VERSION_0);
|
||||
ValidationUtils.checkArgument(version <= CURR_VERSION);
|
||||
ValidationUtils.checkArgument(version >= VERSION_0);
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
|
||||
@@ -26,14 +26,14 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.ConsistencyGuardConfig;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.FailSafeConsistencyGuard;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.NoOpConsistencyGuard;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.TableNotFoundException;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@@ -119,7 +119,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
Option<TimelineLayoutVersion> tableConfigVersion = tableConfig.getTimelineLayoutVersion();
|
||||
if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
|
||||
// Ensure layout version passed in config is not lower than the one seen in hoodie.properties
|
||||
Preconditions.checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get()) >= 0,
|
||||
ValidationUtils.checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get()) >= 0,
|
||||
"Layout Version defined in hoodie properties has higher version (" + tableConfigVersion.get()
|
||||
+ ") than the one passed in config (" + layoutVersion.get() + ")");
|
||||
}
|
||||
@@ -233,7 +233,7 @@ public class HoodieTableMetaClient implements Serializable {
|
||||
public HoodieWrapperFileSystem getFs() {
|
||||
if (fs == null) {
|
||||
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
|
||||
Preconditions.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
|
||||
ValidationUtils.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
|
||||
"File System not expected to be that of HoodieWrapperFileSystem");
|
||||
fs = new HoodieWrapperFileSystem(fileSystem,
|
||||
consistencyGuardConfig.isConsistencyCheckEnabled()
|
||||
|
||||
@@ -28,11 +28,11 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.CorruptedLogFileException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.BufferedFSInputStream;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
@@ -61,7 +61,6 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
||||
private final HoodieLogFile logFile;
|
||||
private static final byte[] MAGIC_BUFFER = new byte[6];
|
||||
private final Schema readerSchema;
|
||||
private HoodieLogFormat.LogFormatVersion nextBlockVersion;
|
||||
private boolean readBlockLazily;
|
||||
private long reverseLogFilePosition;
|
||||
private long lastReverseLogFilePosition;
|
||||
@@ -145,13 +144,13 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
|
||||
}
|
||||
|
||||
// 2. Read the version for this log format
|
||||
this.nextBlockVersion = readVersion();
|
||||
HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
|
||||
|
||||
// 3. Read the block type for a log block
|
||||
if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) {
|
||||
type = inputStream.readInt();
|
||||
|
||||
Preconditions.checkArgument(type < HoodieLogBlockType.values().length, "Invalid block byte type found " + type);
|
||||
ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length, "Invalid block byte type found " + type);
|
||||
blockType = HoodieLogBlockType.values()[type];
|
||||
}
|
||||
|
||||
|
||||
@@ -23,9 +23,9 @@ import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
@@ -140,7 +140,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
|
||||
public void saveAsComplete(HoodieInstant instant, Option<byte[]> data) {
|
||||
LOG.info("Marking instant complete " + instant);
|
||||
Preconditions.checkArgument(instant.isInflight(),
|
||||
ValidationUtils.checkArgument(instant.isInflight(),
|
||||
"Could not mark an already completed instant as complete again " + instant);
|
||||
transitionState(instant, HoodieTimeline.getCompletedInstant(instant), data);
|
||||
LOG.info("Completed " + instant);
|
||||
@@ -155,18 +155,18 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
|
||||
public void deleteInflight(HoodieInstant instant) {
|
||||
Preconditions.checkArgument(instant.isInflight());
|
||||
ValidationUtils.checkArgument(instant.isInflight());
|
||||
deleteInstantFile(instant);
|
||||
}
|
||||
|
||||
public void deletePending(HoodieInstant instant) {
|
||||
Preconditions.checkArgument(!instant.isCompleted());
|
||||
ValidationUtils.checkArgument(!instant.isCompleted());
|
||||
deleteInstantFile(instant);
|
||||
}
|
||||
|
||||
public void deleteCompactionRequested(HoodieInstant instant) {
|
||||
Preconditions.checkArgument(instant.isRequested());
|
||||
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
ValidationUtils.checkArgument(instant.isRequested());
|
||||
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
deleteInstantFile(instant);
|
||||
}
|
||||
|
||||
@@ -222,8 +222,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
* @return requested instant
|
||||
*/
|
||||
public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) {
|
||||
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
Preconditions.checkArgument(inflightInstant.isInflight());
|
||||
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
ValidationUtils.checkArgument(inflightInstant.isInflight());
|
||||
HoodieInstant requestedInstant =
|
||||
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp());
|
||||
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
|
||||
@@ -242,8 +242,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
* @return inflight instant
|
||||
*/
|
||||
public HoodieInstant transitionCompactionRequestedToInflight(HoodieInstant requestedInstant) {
|
||||
Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
Preconditions.checkArgument(requestedInstant.isRequested());
|
||||
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
ValidationUtils.checkArgument(requestedInstant.isRequested());
|
||||
HoodieInstant inflightInstant =
|
||||
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, requestedInstant.getTimestamp());
|
||||
transitionState(requestedInstant, inflightInstant, Option.empty());
|
||||
@@ -258,8 +258,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
* @return commit instant
|
||||
*/
|
||||
public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant inflightInstant, Option<byte[]> data) {
|
||||
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
Preconditions.checkArgument(inflightInstant.isInflight());
|
||||
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
ValidationUtils.checkArgument(inflightInstant.isInflight());
|
||||
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, COMMIT_ACTION, inflightInstant.getTimestamp());
|
||||
transitionState(inflightInstant, commitInstant, data);
|
||||
return commitInstant;
|
||||
@@ -283,8 +283,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
* @return commit instant
|
||||
*/
|
||||
public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightInstant, Option<byte[]> data) {
|
||||
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
|
||||
Preconditions.checkArgument(inflightInstant.isInflight());
|
||||
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
|
||||
ValidationUtils.checkArgument(inflightInstant.isInflight());
|
||||
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp());
|
||||
// Then write to timeline
|
||||
transitionState(inflightInstant, commitInstant, data);
|
||||
@@ -299,15 +299,15 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
* @return commit instant
|
||||
*/
|
||||
public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedInstant, Option<byte[]> data) {
|
||||
Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
|
||||
Preconditions.checkArgument(requestedInstant.isRequested());
|
||||
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
|
||||
ValidationUtils.checkArgument(requestedInstant.isRequested());
|
||||
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, CLEAN_ACTION, requestedInstant.getTimestamp());
|
||||
transitionState(requestedInstant, inflight, data);
|
||||
return inflight;
|
||||
}
|
||||
|
||||
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
|
||||
Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
|
||||
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
|
||||
try {
|
||||
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
|
||||
// Re-create the .inflight file by opening a new file and write the commit metadata in
|
||||
@@ -321,7 +321,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
} else {
|
||||
// Ensures old state exists in timeline
|
||||
LOG.info("Checking for file exists ?" + new Path(metaClient.getMetaPath(), fromInstant.getFileName()));
|
||||
Preconditions.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
|
||||
ValidationUtils.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
|
||||
fromInstant.getFileName())));
|
||||
// Use Write Once to create Target File
|
||||
createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
|
||||
@@ -333,7 +333,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
|
||||
private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
|
||||
Preconditions.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp()));
|
||||
ValidationUtils.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp()));
|
||||
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName());
|
||||
Path commitFilePath = new Path(metaClient.getMetaPath(), completed.getFileName());
|
||||
try {
|
||||
@@ -359,7 +359,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
|
||||
boolean success = metaClient.getFs().delete(commitFilePath, false);
|
||||
Preconditions.checkArgument(success, "State Reverting failed");
|
||||
ValidationUtils.checkArgument(success, "State Reverting failed");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Could not complete revert " + completed, e);
|
||||
@@ -368,7 +368,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
|
||||
public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) {
|
||||
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp());
|
||||
Preconditions.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
|
||||
ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
|
||||
transitionState(requested, inflight, content);
|
||||
}
|
||||
|
||||
@@ -377,15 +377,15 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
||||
}
|
||||
|
||||
public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> content, boolean overwrite) {
|
||||
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
|
||||
// Write workload to auxiliary folder
|
||||
createFileInAuxiliaryFolder(instant, content);
|
||||
createFileInMetaPath(instant.getFileName(), content, overwrite);
|
||||
}
|
||||
|
||||
public void saveToCleanRequested(HoodieInstant instant, Option<byte[]> content) {
|
||||
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
|
||||
Preconditions.checkArgument(instant.getState().equals(State.REQUESTED));
|
||||
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
|
||||
ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));
|
||||
// Plan is stored in meta path
|
||||
createFileInMetaPath(instant.getFileName(), content, false);
|
||||
}
|
||||
|
||||
@@ -32,10 +32,10 @@ import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.HoodieTimer;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -116,10 +116,9 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
long fgBuildTimeTakenMs = timer.endTimer();
|
||||
timer.startTimer();
|
||||
// Group by partition for efficient updates for both InMemory and DiskBased stuctures.
|
||||
fileGroups.stream().collect(Collectors.groupingBy(HoodieFileGroup::getPartitionPath)).entrySet().forEach(entry -> {
|
||||
String partition = entry.getKey();
|
||||
fileGroups.stream().collect(Collectors.groupingBy(HoodieFileGroup::getPartitionPath)).forEach((partition, value) -> {
|
||||
if (!isPartitionAvailableInStore(partition)) {
|
||||
storePartitionView(partition, entry.getValue());
|
||||
storePartitionView(partition, value);
|
||||
}
|
||||
});
|
||||
long storePartitionsTs = timer.endTimer();
|
||||
@@ -209,7 +208,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
*/
|
||||
private void ensurePartitionLoadedCorrectly(String partition) {
|
||||
|
||||
Preconditions.checkArgument(!isClosed(), "View is already closed");
|
||||
ValidationUtils.checkArgument(!isClosed(), "View is already closed");
|
||||
|
||||
// ensure we list files only once even in the face of concurrency
|
||||
addedPartitions.computeIfAbsent(partition, (partitionPathStr) -> {
|
||||
@@ -397,11 +396,9 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
public final Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commitsToReturn) {
|
||||
try {
|
||||
readLock.lock();
|
||||
return fetchAllStoredFileGroups().map(fileGroup -> {
|
||||
return Option.fromJavaOptional(
|
||||
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
|
||||
&& !isBaseFileDueToPendingCompaction(baseFile)).findFirst());
|
||||
}).filter(Option::isPresent).map(Option::get);
|
||||
return fetchAllStoredFileGroups().map(fileGroup -> Option.fromJavaOptional(
|
||||
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
|
||||
&& !isBaseFileDueToPendingCompaction(baseFile)).findFirst())).filter(Option::isPresent).map(Option::get);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
@@ -443,7 +440,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
String partitionPath = formatPartitionKey(partitionStr);
|
||||
ensurePartitionLoadedCorrectly(partitionPath);
|
||||
Option<FileSlice> fs = fetchLatestFileSlice(partitionPath, fileId);
|
||||
return fs.map(f -> filterBaseFileAfterPendingCompaction(f));
|
||||
return fs.map(this::filterBaseFileAfterPendingCompaction);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
@@ -480,7 +477,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
ensurePartitionLoadedCorrectly(partitionPath);
|
||||
Stream<FileSlice> fileSliceStream = fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime);
|
||||
if (includeFileSlicesInPendingCompaction) {
|
||||
return fileSliceStream.map(fs -> filterBaseFileAfterPendingCompaction(fs));
|
||||
return fileSliceStream.map(this::filterBaseFileAfterPendingCompaction);
|
||||
} else {
|
||||
return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId()));
|
||||
}
|
||||
@@ -815,7 +812,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
|
||||
/**
|
||||
* Return Only Commits and Compaction timeline for building file-groups.
|
||||
*
|
||||
* @return
|
||||
* @return {@code HoodieTimeline}
|
||||
*/
|
||||
public HoodieTimeline getVisibleCommitsAndCompactionTimeline() {
|
||||
return visibleCommitsAndCompactionTimeline;
|
||||
|
||||
@@ -18,10 +18,9 @@
|
||||
|
||||
package org.apache.hudi.common.table.view;
|
||||
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.config.DefaultHoodieConfig;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
@@ -192,7 +191,7 @@ public class FileSystemViewStorageConfig extends DefaultHoodieConfig {
|
||||
// Validations
|
||||
FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_VIEW_STORAGE_TYPE));
|
||||
FileSystemViewStorageType.valueOf(props.getProperty(FILESYSTEM_SECONDARY_VIEW_STORAGE_TYPE));
|
||||
Preconditions.checkArgument(Integer.parseInt(props.getProperty(FILESYSTEM_VIEW_REMOTE_PORT)) > 0);
|
||||
ValidationUtils.checkArgument(Integer.parseInt(props.getProperty(FILESYSTEM_VIEW_REMOTE_PORT)) > 0);
|
||||
return new FileSystemViewStorageConfig(props);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,15 +25,16 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.TableFileSystemView;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@@ -134,15 +135,14 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
|
||||
@Override
|
||||
protected void resetPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations) {
|
||||
// Build fileId to Pending Compaction Instants
|
||||
this.fgIdToPendingCompaction = createFileIdToPendingCompactionMap(operations.map(entry -> {
|
||||
return Pair.of(entry.getValue().getFileGroupId(), Pair.of(entry.getKey(), entry.getValue()));
|
||||
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
|
||||
this.fgIdToPendingCompaction = createFileIdToPendingCompactionMap(operations.map(entry ->
|
||||
Pair.of(entry.getValue().getFileGroupId(), Pair.of(entry.getKey(), entry.getValue()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue)));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations) {
|
||||
operations.forEach(opInstantPair -> {
|
||||
Preconditions.checkArgument(!fgIdToPendingCompaction.containsKey(opInstantPair.getValue().getFileGroupId()),
|
||||
ValidationUtils.checkArgument(!fgIdToPendingCompaction.containsKey(opInstantPair.getValue().getFileGroupId()),
|
||||
"Duplicate FileGroupId found in pending compaction operations. FgId :"
|
||||
+ opInstantPair.getValue().getFileGroupId());
|
||||
fgIdToPendingCompaction.put(opInstantPair.getValue().getFileGroupId(),
|
||||
@@ -153,7 +153,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
|
||||
@Override
|
||||
protected void removePendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations) {
|
||||
operations.forEach(opInstantPair -> {
|
||||
Preconditions.checkArgument(fgIdToPendingCompaction.containsKey(opInstantPair.getValue().getFileGroupId()),
|
||||
ValidationUtils.checkArgument(fgIdToPendingCompaction.containsKey(opInstantPair.getValue().getFileGroupId()),
|
||||
"Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :"
|
||||
+ opInstantPair.getValue().getFileGroupId());
|
||||
fgIdToPendingCompaction.remove(opInstantPair.getValue().getFileGroupId());
|
||||
@@ -166,8 +166,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
|
||||
*/
|
||||
@Override
|
||||
Stream<HoodieFileGroup> fetchAllStoredFileGroups(String partition) {
|
||||
final List<HoodieFileGroup> fileGroups = new ArrayList<>();
|
||||
fileGroups.addAll(partitionToFileGroupsMap.get(partition));
|
||||
final List<HoodieFileGroup> fileGroups = new ArrayList<>(partitionToFileGroupsMap.get(partition));
|
||||
return fileGroups.stream();
|
||||
}
|
||||
|
||||
@@ -200,9 +199,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
|
||||
|
||||
@Override
|
||||
public Stream<HoodieFileGroup> fetchAllStoredFileGroups() {
|
||||
return partitionToFileGroupsMap.values().stream().flatMap(fg -> {
|
||||
return fg.stream();
|
||||
});
|
||||
return partitionToFileGroupsMap.values().stream().flatMap(Collection::stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -34,12 +34,12 @@ import org.apache.hudi.common.table.timeline.dto.InstantDTO;
|
||||
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieRemoteException;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.http.client.fluent.Request;
|
||||
import org.apache.http.client.fluent.Response;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
@@ -134,14 +134,12 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
|
||||
|
||||
private <T> T executeRequest(String requestPath, Map<String, String> queryParameters, TypeReference reference,
|
||||
RequestMethod method) throws IOException {
|
||||
Preconditions.checkArgument(!closed, "View already closed");
|
||||
ValidationUtils.checkArgument(!closed, "View already closed");
|
||||
|
||||
URIBuilder builder =
|
||||
new URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme("http");
|
||||
|
||||
queryParameters.entrySet().stream().forEach(entry -> {
|
||||
builder.addParameter(entry.getKey(), entry.getValue());
|
||||
});
|
||||
queryParameters.forEach(builder::addParameter);
|
||||
|
||||
// Adding mandatory parameters - Last instants affecting file-slice
|
||||
timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp()));
|
||||
@@ -149,7 +147,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
|
||||
|
||||
String url = builder.toString();
|
||||
LOG.info("Sending request : (" + url + ")");
|
||||
Response response = null;
|
||||
Response response;
|
||||
int timeout = 1000 * 300; // 5 min timeout
|
||||
switch (method) {
|
||||
case GET:
|
||||
@@ -197,7 +195,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
|
||||
Map<String, String> paramsMap = new HashMap<>();
|
||||
paramsMap.put(BASEPATH_PARAM, basePath);
|
||||
paramsMap.put(PARTITION_PARAM, partitionPath);
|
||||
Preconditions.checkArgument(paramNames.length == paramVals.length);
|
||||
ValidationUtils.checkArgument(paramNames.length == paramVals.length);
|
||||
for (int i = 0; i < paramNames.length; i++) {
|
||||
paramsMap.put(paramNames[i], paramVals[i]);
|
||||
}
|
||||
|
||||
@@ -29,9 +29,9 @@ import org.apache.hudi.common.table.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.RocksDBDAO;
|
||||
import org.apache.hudi.common.util.RocksDBSchemaHelper;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
@@ -110,7 +110,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
|
||||
protected void addPendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations) {
|
||||
rocksDB.writeBatch(batch ->
|
||||
operations.forEach(opInstantPair -> {
|
||||
Preconditions.checkArgument(!isPendingCompactionScheduledForFileId(opInstantPair.getValue().getFileGroupId()),
|
||||
ValidationUtils.checkArgument(!isPendingCompactionScheduledForFileId(opInstantPair.getValue().getFileGroupId()),
|
||||
"Duplicate FileGroupId found in pending compaction operations. FgId :"
|
||||
+ opInstantPair.getValue().getFileGroupId());
|
||||
rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(),
|
||||
@@ -123,7 +123,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
|
||||
void removePendingCompactionOperations(Stream<Pair<String, CompactionOperation>> operations) {
|
||||
rocksDB.writeBatch(batch ->
|
||||
operations.forEach(opInstantPair -> {
|
||||
Preconditions.checkArgument(
|
||||
ValidationUtils.checkArgument(
|
||||
getPendingCompactionOperationWithInstant(opInstantPair.getValue().getFileGroupId()) != null,
|
||||
"Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :"
|
||||
+ opInstantPair.getValue().getFileGroupId());
|
||||
|
||||
@@ -28,7 +28,6 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
|
||||
import org.apache.hudi.common.HoodieRollbackStat;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.avro.file.DataFileReader;
|
||||
import org.apache.avro.file.DataFileWriter;
|
||||
@@ -145,7 +144,7 @@ public class AvroUtils {
|
||||
throws IOException {
|
||||
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
|
||||
FileReader<T> fileReader = DataFileReader.openReader(new SeekableByteArrayInput(bytes), reader);
|
||||
Preconditions.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz);
|
||||
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize metadata of type " + clazz);
|
||||
return fileReader.next();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.InvalidHoodiePathException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@@ -115,11 +114,11 @@ public class FSUtils {
|
||||
}
|
||||
|
||||
public static String translateMarkerToDataPath(String basePath, String markerPath, String instantTs) {
|
||||
Preconditions.checkArgument(markerPath.endsWith(HoodieTableMetaClient.MARKER_EXTN));
|
||||
ValidationUtils.checkArgument(markerPath.endsWith(HoodieTableMetaClient.MARKER_EXTN));
|
||||
String markerRootPath = Path.getPathWithoutSchemeAndAuthority(
|
||||
new Path(String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTs))).toString();
|
||||
int begin = markerPath.indexOf(markerRootPath);
|
||||
Preconditions.checkArgument(begin >= 0,
|
||||
ValidationUtils.checkArgument(begin >= 0,
|
||||
"Not in marker dir. Marker Path=" + markerPath + ", Expected Marker Root=" + markerRootPath);
|
||||
String rPath = markerPath.substring(begin + markerRootPath.length() + 1);
|
||||
return String.format("%s/%s%s", basePath, rPath.replace(HoodieTableMetaClient.MARKER_EXTN, ""),
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.common.util;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -47,7 +46,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
|
||||
public FailSafeConsistencyGuard(FileSystem fs, ConsistencyGuardConfig consistencyGuardConfig) {
|
||||
this.fs = fs;
|
||||
this.consistencyGuardConfig = consistencyGuardConfig;
|
||||
Preconditions.checkArgument(consistencyGuardConfig.isConsistencyCheckEnabled());
|
||||
ValidationUtils.checkArgument(consistencyGuardConfig.isConsistencyCheckEnabled());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -115,8 +114,8 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
|
||||
*
|
||||
* @param filePath File Path
|
||||
* @param visibility Visibility
|
||||
* @return
|
||||
* @throws IOException
|
||||
* @return true (if file visible in Path), false (otherwise)
|
||||
* @throws IOException -
|
||||
*/
|
||||
private boolean checkFileVisibility(Path filePath, FileVisibility visibility) throws IOException {
|
||||
try {
|
||||
|
||||
@@ -18,6 +18,13 @@
|
||||
|
||||
package org.apache.hudi.common.util;
|
||||
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A utility class for numeric.
|
||||
*/
|
||||
@@ -31,4 +38,27 @@ public class NumericUtils {
|
||||
String pre = "KMGTPE".charAt(exp - 1) + "";
|
||||
return String.format("%.1f %sB", bytes / Math.pow(1024, exp), pre);
|
||||
}
|
||||
|
||||
public static long getMessageDigestHash(final String algorithmName, final String string) {
|
||||
MessageDigest md;
|
||||
try {
|
||||
md = MessageDigest.getInstance(algorithmName);
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
return asLong(Objects.requireNonNull(md).digest(string.getBytes(StandardCharsets.UTF_8)));
|
||||
}
|
||||
|
||||
public static long asLong(byte[] bytes) {
|
||||
ValidationUtils.checkState(bytes.length >= 8, "HashCode#asLong() requires >= 8 bytes.");
|
||||
return padToLong(bytes);
|
||||
}
|
||||
|
||||
public static long padToLong(byte[] bytes) {
|
||||
long retVal = (bytes[0] & 0xFF);
|
||||
for (int i = 1; i < Math.min(bytes.length, 8); i++) {
|
||||
retVal |= (bytes[i] & 0xFFL) << (i * 8);
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.rocksdb.AbstractImmutableNativeReference;
|
||||
@@ -105,7 +104,7 @@ public class RocksDBDAO {
|
||||
FileIOUtils.mkdir(new File(rocksDBBasePath));
|
||||
rocksDB = RocksDB.open(dbOptions, rocksDBBasePath, managedColumnFamilies, managedHandles);
|
||||
|
||||
Preconditions.checkArgument(managedHandles.size() == managedColumnFamilies.size(),
|
||||
ValidationUtils.checkArgument(managedHandles.size() == managedColumnFamilies.size(),
|
||||
"Unexpected number of handles are returned");
|
||||
for (int index = 0; index < managedHandles.size(); index++) {
|
||||
ColumnFamilyHandle handle = managedHandles.get(index);
|
||||
@@ -113,7 +112,7 @@ public class RocksDBDAO {
|
||||
String familyNameFromHandle = new String(handle.getName());
|
||||
String familyNameFromDescriptor = new String(descriptor.getName());
|
||||
|
||||
Preconditions.checkArgument(familyNameFromDescriptor.equals(familyNameFromHandle),
|
||||
ValidationUtils.checkArgument(familyNameFromDescriptor.equals(familyNameFromHandle),
|
||||
"Family Handles not in order with descriptors");
|
||||
managedHandlesMap.put(familyNameFromHandle, handle);
|
||||
managedDescriptorMap.put(familyNameFromDescriptor, descriptor);
|
||||
@@ -297,7 +296,7 @@ public class RocksDBDAO {
|
||||
* @param <T> Type of object stored.
|
||||
*/
|
||||
public <T extends Serializable> T get(String columnFamilyName, String key) {
|
||||
Preconditions.checkArgument(!closed);
|
||||
ValidationUtils.checkArgument(!closed);
|
||||
try {
|
||||
byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), key.getBytes());
|
||||
return val == null ? null : SerializationUtils.deserialize(val);
|
||||
@@ -314,7 +313,7 @@ public class RocksDBDAO {
|
||||
* @param <T> Type of object stored.
|
||||
*/
|
||||
public <K extends Serializable, T extends Serializable> T get(String columnFamilyName, K key) {
|
||||
Preconditions.checkArgument(!closed);
|
||||
ValidationUtils.checkArgument(!closed);
|
||||
try {
|
||||
byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), SerializationUtils.serialize(key));
|
||||
return val == null ? null : SerializationUtils.deserialize(val);
|
||||
@@ -331,7 +330,7 @@ public class RocksDBDAO {
|
||||
* @param <T> Type of value stored
|
||||
*/
|
||||
public <T extends Serializable> Stream<Pair<String, T>> prefixSearch(String columnFamilyName, String prefix) {
|
||||
Preconditions.checkArgument(!closed);
|
||||
ValidationUtils.checkArgument(!closed);
|
||||
final HoodieTimer timer = new HoodieTimer();
|
||||
timer.startTimer();
|
||||
long timeTakenMicro = 0;
|
||||
@@ -360,7 +359,7 @@ public class RocksDBDAO {
|
||||
* @param <T> Type of value stored
|
||||
*/
|
||||
public <T extends Serializable> void prefixDelete(String columnFamilyName, String prefix) {
|
||||
Preconditions.checkArgument(!closed);
|
||||
ValidationUtils.checkArgument(!closed);
|
||||
LOG.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName);
|
||||
final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName));
|
||||
it.seek(prefix.getBytes());
|
||||
@@ -396,7 +395,7 @@ public class RocksDBDAO {
|
||||
* @param columnFamilyName Column family name
|
||||
*/
|
||||
public void addColumnFamily(String columnFamilyName) {
|
||||
Preconditions.checkArgument(!closed);
|
||||
ValidationUtils.checkArgument(!closed);
|
||||
|
||||
managedDescriptorMap.computeIfAbsent(columnFamilyName, colFamilyName -> {
|
||||
try {
|
||||
@@ -416,7 +415,7 @@ public class RocksDBDAO {
|
||||
* @param columnFamilyName Column Family Name
|
||||
*/
|
||||
public void dropColumnFamily(String columnFamilyName) {
|
||||
Preconditions.checkArgument(!closed);
|
||||
ValidationUtils.checkArgument(!closed);
|
||||
|
||||
managedDescriptorMap.computeIfPresent(columnFamilyName, (colFamilyName, descriptor) -> {
|
||||
ColumnFamilyHandle handle = managedHandlesMap.get(colFamilyName);
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.common.util;
|
||||
|
||||
/*
|
||||
* Simple utility to test validation conditions (to replace Guava's PreConditions)
|
||||
*/
|
||||
public class ValidationUtils {
|
||||
|
||||
/**
|
||||
* Ensures the truth of an expression.
|
||||
*/
|
||||
public static void checkArgument(final boolean expression) {
|
||||
if (!expression) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures the truth of an expression, throwing the custom errorMessage otherwise.
|
||||
*/
|
||||
public static void checkArgument(final boolean expression, final String errorMessage) {
|
||||
if (!expression) {
|
||||
throw new IllegalArgumentException(errorMessage);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures the truth of an expression involving the state of the calling instance, but not
|
||||
* involving any parameters to the calling method.
|
||||
*
|
||||
* @param expression a boolean expression
|
||||
* @throws IllegalStateException if {@code expression} is false
|
||||
*/
|
||||
public static void checkState(final boolean expression) {
|
||||
if (!expression) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures the truth of an expression involving the state of the calling instance, but not
|
||||
* involving any parameters to the calling method.
|
||||
*
|
||||
* @param expression a boolean expression
|
||||
* @param errorMessage - error message
|
||||
* @throws IllegalStateException if {@code expression} is false
|
||||
*/
|
||||
public static void checkState(final boolean expression, String errorMessage) {
|
||||
if (!expression) {
|
||||
throw new IllegalStateException(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -21,9 +21,9 @@ package org.apache.hudi.common.util.queue;
|
||||
import org.apache.hudi.common.util.DefaultSizeEstimator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.SizeEstimator;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@@ -262,7 +262,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
|
||||
|
||||
@Override
|
||||
public O next() {
|
||||
Preconditions.checkState(hasNext() && this.nextRecord != null);
|
||||
ValidationUtils.checkState(hasNext() && this.nextRecord != null);
|
||||
final O ret = this.nextRecord;
|
||||
this.nextRecord = null;
|
||||
return ret;
|
||||
|
||||
@@ -19,10 +19,9 @@
|
||||
package org.apache.hudi.common.versioning;
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -75,8 +74,8 @@ public class MetadataMigrator<T> {
|
||||
* @return Metadata conforming to the target version
|
||||
*/
|
||||
public T migrateToVersion(T metadata, int metadataVersion, int targetVersion) {
|
||||
Preconditions.checkArgument(targetVersion >= oldestVersion);
|
||||
Preconditions.checkArgument(targetVersion <= latestVersion);
|
||||
ValidationUtils.checkArgument(targetVersion >= oldestVersion);
|
||||
ValidationUtils.checkArgument(targetVersion <= latestVersion);
|
||||
if (metadataVersion == targetVersion) {
|
||||
return metadata;
|
||||
} else if (metadataVersion > targetVersion) {
|
||||
|
||||
@@ -22,10 +22,10 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.common.versioning.AbstractMigratorBase;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.util.Map;
|
||||
@@ -52,7 +52,7 @@ public class CleanV1MigrationHandler extends AbstractMigratorBase<HoodieCleanMet
|
||||
|
||||
@Override
|
||||
public HoodieCleanMetadata downgradeFrom(HoodieCleanMetadata input) {
|
||||
Preconditions.checkArgument(input.getVersion() == 2,
|
||||
ValidationUtils.checkArgument(input.getVersion() == 2,
|
||||
"Input version is " + input.getVersion() + ". Must be 2");
|
||||
final Path basePath = new Path(metaClient.getBasePath());
|
||||
|
||||
@@ -81,16 +81,13 @@ public class CleanV1MigrationHandler extends AbstractMigratorBase<HoodieCleanMet
|
||||
return Pair.of(partitionPath, cleanPartitionMetadata);
|
||||
}).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
|
||||
|
||||
HoodieCleanMetadata metadata = HoodieCleanMetadata.newBuilder()
|
||||
return HoodieCleanMetadata.newBuilder()
|
||||
.setEarliestCommitToRetain(input.getEarliestCommitToRetain())
|
||||
.setStartCleanTime(input.getStartCleanTime())
|
||||
.setTimeTakenInMillis(input.getTimeTakenInMillis())
|
||||
.setTotalFilesDeleted(input.getTotalFilesDeleted())
|
||||
.setPartitionMetadata(partitionMetadataMap)
|
||||
.setVersion(getManagedVersion()).build();
|
||||
|
||||
return metadata;
|
||||
|
||||
}
|
||||
|
||||
private static String convertToV1Path(Path basePath, String partitionPath, String fileName) {
|
||||
|
||||
@@ -21,10 +21,10 @@ package org.apache.hudi.common.versioning.clean;
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.common.versioning.AbstractMigratorBase;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.util.List;
|
||||
@@ -46,7 +46,7 @@ public class CleanV2MigrationHandler extends AbstractMigratorBase<HoodieCleanMet
|
||||
|
||||
@Override
|
||||
public HoodieCleanMetadata upgradeFrom(HoodieCleanMetadata input) {
|
||||
Preconditions.checkArgument(input.getVersion() == 1,
|
||||
ValidationUtils.checkArgument(input.getVersion() == 1,
|
||||
"Input version is " + input.getVersion() + ". Must be 1");
|
||||
HoodieCleanMetadata metadata = new HoodieCleanMetadata();
|
||||
metadata.setEarliestCommitToRetain(input.getEarliestCommitToRetain());
|
||||
|
||||
@@ -22,9 +22,9 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.versioning.AbstractMigratorBase;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -54,19 +54,18 @@ public class CompactionV1MigrationHandler extends AbstractMigratorBase<HoodieCom
|
||||
|
||||
@Override
|
||||
public HoodieCompactionPlan downgradeFrom(HoodieCompactionPlan input) {
|
||||
Preconditions.checkArgument(input.getVersion() == 2, "Input version is " + input.getVersion() + ". Must be 2");
|
||||
ValidationUtils.checkArgument(input.getVersion() == 2, "Input version is " + input.getVersion() + ". Must be 2");
|
||||
HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
|
||||
final Path basePath = new Path(metaClient.getBasePath());
|
||||
List<HoodieCompactionOperation> v1CompactionOperationList = new ArrayList<>();
|
||||
if (null != input.getOperations()) {
|
||||
v1CompactionOperationList = input.getOperations().stream().map(inp -> {
|
||||
return HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime())
|
||||
v1CompactionOperationList = input.getOperations().stream().map(inp ->
|
||||
HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime())
|
||||
.setFileId(inp.getFileId()).setPartitionPath(inp.getPartitionPath()).setMetrics(inp.getMetrics())
|
||||
.setDataFilePath(convertToV1Path(basePath, inp.getPartitionPath(), inp.getDataFilePath()))
|
||||
.setDeltaFilePaths(inp.getDeltaFilePaths().stream()
|
||||
.map(s -> convertToV1Path(basePath, inp.getPartitionPath(), s)).collect(Collectors.toList()))
|
||||
.build();
|
||||
}).collect(Collectors.toList());
|
||||
.build()).collect(Collectors.toList());
|
||||
}
|
||||
compactionPlan.setOperations(v1CompactionOperationList);
|
||||
compactionPlan.setExtraMetadata(input.getExtraMetadata());
|
||||
|
||||
@@ -21,9 +21,9 @@ package org.apache.hudi.common.versioning.compaction;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionOperation;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.versioning.AbstractMigratorBase;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -48,17 +48,16 @@ public class CompactionV2MigrationHandler extends AbstractMigratorBase<HoodieCom
|
||||
|
||||
@Override
|
||||
public HoodieCompactionPlan upgradeFrom(HoodieCompactionPlan input) {
|
||||
Preconditions.checkArgument(input.getVersion() == 1, "Input version is " + input.getVersion() + ". Must be 1");
|
||||
ValidationUtils.checkArgument(input.getVersion() == 1, "Input version is " + input.getVersion() + ". Must be 1");
|
||||
HoodieCompactionPlan compactionPlan = new HoodieCompactionPlan();
|
||||
List<HoodieCompactionOperation> v2CompactionOperationList = new ArrayList<>();
|
||||
if (null != input.getOperations()) {
|
||||
v2CompactionOperationList = input.getOperations().stream().map(inp -> {
|
||||
return HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime())
|
||||
v2CompactionOperationList = input.getOperations().stream().map(inp ->
|
||||
HoodieCompactionOperation.newBuilder().setBaseInstantTime(inp.getBaseInstantTime())
|
||||
.setFileId(inp.getFileId()).setPartitionPath(inp.getPartitionPath()).setMetrics(inp.getMetrics())
|
||||
.setDataFilePath(new Path(inp.getDataFilePath()).getName()).setDeltaFilePaths(
|
||||
inp.getDeltaFilePaths().stream().map(s -> new Path(s).getName()).collect(Collectors.toList()))
|
||||
.build();
|
||||
}).collect(Collectors.toList());
|
||||
.build()).collect(Collectors.toList());
|
||||
}
|
||||
compactionPlan.setOperations(v2CompactionOperationList);
|
||||
compactionPlan.setExtraMetadata(input.getExtraMetadata());
|
||||
|
||||
@@ -21,7 +21,6 @@ package org.apache.hudi.common.minicluster;
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -33,6 +32,7 @@ import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.net.ServerSocket;
|
||||
|
||||
/**
|
||||
@@ -70,7 +70,7 @@ public class HdfsTestService {
|
||||
}
|
||||
|
||||
public MiniDFSCluster start(boolean format) throws IOException {
|
||||
Preconditions.checkState(workDir != null, "The work dir must be set before starting cluster.");
|
||||
Objects.requireNonNull(workDir, "The work dir must be set before starting cluster.");
|
||||
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
|
||||
|
||||
// If clean, then remove the work dir so we can start fresh.
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.common.minicluster;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
@@ -36,6 +35,7 @@ import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A Zookeeper minicluster service implementation.
|
||||
@@ -85,7 +85,7 @@ public class ZookeeperTestService {
|
||||
}
|
||||
|
||||
public ZooKeeperServer start() throws IOException, InterruptedException {
|
||||
Preconditions.checkState(workDir != null, "The localBaseFsLocation must be set before starting cluster.");
|
||||
Objects.requireNonNull(workDir, "The localBaseFsLocation must be set before starting cluster.");
|
||||
|
||||
setupTestEnv();
|
||||
stop();
|
||||
@@ -171,13 +171,10 @@ public class ZookeeperTestService {
|
||||
long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
try {
|
||||
Socket sock = new Socket("localhost", port);
|
||||
try {
|
||||
try (Socket sock = new Socket("localhost", port)) {
|
||||
OutputStream outstream = sock.getOutputStream();
|
||||
outstream.write("stat".getBytes());
|
||||
outstream.flush();
|
||||
} finally {
|
||||
sock.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return true;
|
||||
|
||||
@@ -57,8 +57,8 @@ import org.junit.runners.Parameterized;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
||||
@@ -44,10 +44,10 @@ import org.apache.hudi.common.util.CleanerUtils;
|
||||
import org.apache.hudi.common.util.CompactionUtils;
|
||||
import org.apache.hudi.common.util.FSUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -201,7 +201,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
view1.sync();
|
||||
Map<String, List<String>> instantsToFiles;
|
||||
|
||||
/**
|
||||
/*
|
||||
* Case where incremental syncing is catching up on more than one ingestion at a time
|
||||
*/
|
||||
// Run 1 ingestion on MOR table (1 delta commits). View1 is now sync up to this point
|
||||
@@ -222,7 +222,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
view3.sync();
|
||||
areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size());
|
||||
|
||||
/**
|
||||
/*
|
||||
* Case where a compaction is scheduled and then unscheduled
|
||||
*/
|
||||
scheduleCompaction(view2, "15");
|
||||
@@ -233,7 +233,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||
view4.sync();
|
||||
|
||||
/**
|
||||
/*
|
||||
* Case where a compaction is scheduled, 2 ingestion happens and then a compaction happens
|
||||
*/
|
||||
scheduleCompaction(view2, "16");
|
||||
@@ -247,7 +247,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||
view5.sync();
|
||||
|
||||
/**
|
||||
/*
|
||||
* Case where a clean happened and then rounds of ingestion and compaction happened
|
||||
*/
|
||||
testCleans(view2, Collections.singletonList("19"),
|
||||
@@ -266,7 +266,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
|
||||
view6.sync();
|
||||
|
||||
/**
|
||||
/*
|
||||
* Case where multiple restores and ingestions happened
|
||||
*/
|
||||
testRestore(view2, Collections.singletonList("25"), true, new HashMap<>(), Collections.singletonList("24"), "29", true);
|
||||
@@ -528,7 +528,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
String newBaseInstant) throws IOException {
|
||||
HoodieInstant instant = new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, compactionInstantTime);
|
||||
boolean deleted = metaClient.getFs().delete(new Path(metaClient.getMetaPath(), instant.getFileName()), false);
|
||||
Preconditions.checkArgument(deleted, "Unable to delete compaction instant.");
|
||||
ValidationUtils.checkArgument(deleted, "Unable to delete compaction instant.");
|
||||
|
||||
view.sync();
|
||||
Assert.assertEquals(newLastInstant, view.getLastInstant().get().getTimestamp());
|
||||
@@ -719,7 +719,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
metaClient.getActiveTimeline().createNewInstant(inflightInstant);
|
||||
metaClient.getActiveTimeline().saveAsComplete(inflightInstant,
|
||||
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
/**
|
||||
/*
|
||||
// Delete pending compaction if present
|
||||
metaClient.getFs().delete(new Path(metaClient.getMetaPath(),
|
||||
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instant).getFileName()));
|
||||
|
||||
@@ -20,7 +20,10 @@ package org.apache.hudi.common.util;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
/**
|
||||
* Tests numeric utils.
|
||||
@@ -37,6 +40,29 @@ public class TestNumericUtils {
|
||||
assertEquals("27.0 GB", NumericUtils.humanReadableByteCount(28991029248L));
|
||||
assertEquals("1.7 TB", NumericUtils.humanReadableByteCount(1855425871872L));
|
||||
assertEquals("8.0 EB", NumericUtils.humanReadableByteCount(9223372036854775807L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMessageDigestHash() {
|
||||
assertEquals(6808551913422584641L, NumericUtils.getMessageDigestHash("MD5", "This is a string"));
|
||||
assertEquals(2549749777095932358L, NumericUtils.getMessageDigestHash("MD5", "This is a test string"));
|
||||
assertNotEquals(1L, NumericUtils.getMessageDigestHash("MD5", "This"));
|
||||
assertNotEquals(6808551913422584641L, NumericUtils.getMessageDigestHash("SHA-256", "This is a string"));
|
||||
}
|
||||
|
||||
private static byte[] byteArrayWithNum(int size, int num) {
|
||||
byte[] bytez = new byte[size];
|
||||
Arrays.fill(bytez, (byte) num);
|
||||
return bytez;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPadToLong() {
|
||||
assertEquals(0x0000000099999999L, NumericUtils.padToLong(byteArrayWithNum(4, 0x99)));
|
||||
assertEquals(0x0000999999999999L, NumericUtils.padToLong(byteArrayWithNum(6, 0x99)));
|
||||
assertEquals(0x9999999999999999L, NumericUtils.padToLong(byteArrayWithNum(8, 0x99)));
|
||||
assertEquals(0x1111111111111111L, NumericUtils.padToLong(byteArrayWithNum(8, 0x11)));
|
||||
assertEquals(0x0000000011111111L, NumericUtils.padToLong(byteArrayWithNum(4, 0x11)));
|
||||
assertEquals(0x0000181818181818L, NumericUtils.padToLong(byteArrayWithNum(6, 0x18)));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user