1
0

Spark Stage retry handling

This commit is contained in:
Balaji Varadarajan
2019-03-08 15:05:33 -08:00
committed by vinoth chandar
parent 3fd2fd6e9d
commit 145034c5fa
53 changed files with 1664 additions and 967 deletions

View File

@@ -29,6 +29,7 @@ import com.uber.hoodie.common.model.HoodieFileGroupId;
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.log.HoodieLogFormat;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
@@ -245,7 +246,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
"Expect new log version to be sane");
HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(),
FSUtils.makeLogFileName(lf.getFileId(), "." + FSUtils.getFileExtensionFromLog(lf.getPath()),
compactionInstant, lf.getLogVersion() - maxVersion)));
compactionInstant, lf.getLogVersion() - maxVersion, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
return Pair.of(lf, newLogFile);
}).collect(Collectors.toList());
}
@@ -436,7 +437,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
.filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
List<HoodieLogFile> logFilesToRepair =
merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant))
.sorted(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed())
.sorted(HoodieLogFile.getLogFileComparator())
.collect(Collectors.toList());
FileSlice fileSliceForCompaction =
fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime())
@@ -451,7 +452,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
for (HoodieLogFile toRepair : logFilesToRepair) {
int version = maxUsedVersion + 1;
HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, FSUtils.makeLogFileName(operation.getFileId(),
logExtn, operation.getBaseInstantTime(), version)));
logExtn, operation.getBaseInstantTime(), version, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
result.add(Pair.of(toRepair, newLf));
maxUsedVersion = version;
}

View File

@@ -72,6 +72,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -333,9 +334,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
String commitTime, HoodieTable<T> table,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
final int parallelism = config.getBulkInsertShuffleParallelism();
if (bulkInsertPartitioner.isDefined()) {
repartitionedRecords = bulkInsertPartitioner.get()
.repartitionRecords(dedupedRecords, config.getBulkInsertShuffleParallelism());
.repartitionRecords(dedupedRecords, parallelism);
} else {
// Now, sort the records and line them up nicely for loading.
repartitionedRecords = dedupedRecords.sortBy(record -> {
@@ -343,10 +345,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// the records split evenly across RDD partitions, such that small partitions fit
// into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
}, true, config.getBulkInsertShuffleParallelism());
}, true, parallelism);
}
//generate new file ID prefixes for each output partition
final List<String> fileIDPrefixes = IntStream.range(0, parallelism)
.mapToObj(i -> FSUtils.createNewFileIdPfx())
.collect(Collectors.toList());
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true)
.flatMap(writeStatuses -> writeStatuses.iterator());
return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
@@ -498,20 +506,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
updateMetadataAndRollingStats(actionType, metadata, stats);
// Finalize write
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
try {
table.finalizeWrite(jsc, stats);
if (finalizeCtx != null) {
Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
durationInMs.ifPresent(duration -> {
logger.info("Finalize write elapsed time (milliseconds): " + duration);
metrics.updateFinalizeWriteMetrics(duration, stats.size());
});
}
} catch (HoodieIOException ioe) {
throw new HoodieCommitException(
"Failed to complete commit " + commitTime + " due to finalize errors.", ioe);
}
finalizeWrite(table, commitTime, stats);
// add in extra metadata
if (extraMetadata.isPresent()) {
@@ -1270,7 +1265,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
String compactionCommitTime, boolean autoCommit, Optional<Map<String, String>> extraMetadata) {
if (autoCommit) {
HoodieCommitMetadata metadata =
doCompactionCommit(compactedStatuses, table.getMetaClient(), compactionCommitTime, extraMetadata);
doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata);
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
@@ -1288,6 +1283,23 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
}
private void finalizeWrite(HoodieTable<T> table, String instantTime, List<HoodieWriteStat> stats) {
try {
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
table.finalizeWrite(jsc, instantTime, stats);
if (finalizeCtx != null) {
Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
durationInMs.ifPresent(duration -> {
logger.info("Finalize write elapsed time (milliseconds): " + duration);
metrics.updateFinalizeWriteMetrics(duration, stats.size());
});
}
} catch (HoodieIOException ioe) {
throw new HoodieCommitException(
"Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
}
}
/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
*
@@ -1301,8 +1313,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}
private HoodieCommitMetadata doCompactionCommit(JavaRDD<WriteStatus> writeStatuses,
HoodieTableMetaClient metaClient, String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses,
String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = table.getMetaClient();
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat)
.collect();
@@ -1311,6 +1324,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
metadata.addWriteStat(stat.getPartitionPath(), stat);
}
// Finalize write
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
finalizeWrite(table, compactionCommitTime, stats);
// Copy extraMetadata
extraMetadata.ifPresent(m -> {
m.entrySet().stream().forEach(e -> {

View File

@@ -62,19 +62,26 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE =
"hoodie.copyonwrite.use" + ".temp.folder.for.create";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE = "false";
private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE =
"hoodie.copyonwrite.use" + ".temp.folder.for.merge";
private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false";
private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
private static final String CONSISTENCY_CHECK_ENABLED = "hoodie.consistency.check.enabled";
private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled";
private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "false";
// time between successive attempts to ensure written data's metadata is consistent on storage
private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
"hoodie.consistency.check.initial_interval_ms";
private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
// max interval time
private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
// maximum number of checks, for consistency of written data. Will wait upto 256 Secs
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
// We keep track of original config and rewritten config
private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
@@ -148,31 +155,30 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
}
public boolean shouldUseTempFolderForCopyOnWriteForCreate() {
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE));
}
public boolean shouldUseTempFolderForCopyOnWriteForMerge() {
return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE));
}
public boolean shouldUseTempFolderForCopyOnWrite() {
return shouldUseTempFolderForCopyOnWriteForCreate()
|| shouldUseTempFolderForCopyOnWriteForMerge();
}
public int getFinalizeWriteParallelism() {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}
public boolean isConsistencyCheckEnabled() {
return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED));
return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
}
public boolean isEmbeddedTimelineServerEnabled() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
}
public int getMaxConsistencyChecks() {
return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECKS_PROP));
}
public int getInitialConsistencyCheckIntervalMs() {
return Integer.parseInt(props.getProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
}
public int getMaxConsistencyCheckIntervalMs() {
return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
}
/**
* compaction properties
**/
@@ -588,20 +594,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withUseTempFolderCopyOnWriteForCreate(
boolean shouldUseTempFolderCopyOnWriteForCreate) {
props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
String.valueOf(shouldUseTempFolderCopyOnWriteForCreate));
return this;
}
public Builder withUseTempFolderCopyOnWriteForMerge(
boolean shouldUseTempFolderCopyOnWriteForMerge) {
props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
String.valueOf(shouldUseTempFolderCopyOnWriteForMerge));
return this;
}
public Builder withFileSystemViewConfig(FileSystemViewStorageConfig viewStorageConfig) {
props.putAll(viewStorageConfig.getProps());
isViewConfigSet = true;
@@ -614,7 +606,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
}
public Builder withConsistencyCheckEnabled(boolean enabled) {
props.setProperty(CONSISTENCY_CHECK_ENABLED, String.valueOf(enabled));
props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, String.valueOf(enabled));
return this;
}
@@ -623,6 +615,21 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) {
props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(initialIntevalMs));
return this;
}
public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(maxIntervalMs));
return this;
}
public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(maxConsistencyChecks));
return this;
}
public HoodieWriteConfig build() {
// Check for mandatory properties
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
@@ -643,18 +650,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE);
setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE),
HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED),
CONSISTENCY_CHECK_ENABLED, DEFAULT_CONSISTENCY_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS));
setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS));
setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECKS_PROP),
MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,

View File

@@ -35,17 +35,19 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload> implements
private String commitTime;
private HoodieWriteConfig config;
private HoodieTable<T> hoodieTable;
private List<String> fileIDPrefixes;
public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config,
HoodieTable<T> hoodieTable) {
HoodieTable<T> hoodieTable, List<String> fileIDPrefixes) {
this.commitTime = commitTime;
this.config = config;
this.hoodieTable = hoodieTable;
this.fileIDPrefixes = fileIDPrefixes;
}
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> sortedRecordItr) throws Exception {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable);
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable,
fileIDPrefixes.get(partition));
}
}

View File

@@ -27,16 +27,12 @@ import com.uber.hoodie.io.HoodieCreateHandle;
import com.uber.hoodie.io.HoodieIOHandle;
import com.uber.hoodie.table.HoodieTable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext;
/**
* Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
@@ -48,15 +44,17 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
protected final HoodieWriteConfig hoodieConfig;
protected final String commitTime;
protected final HoodieTable<T> hoodieTable;
protected Set<String> partitionsCleaned;
protected final String idPrefix;
protected int numFilesWritten;
public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String commitTime, HoodieTable<T> hoodieTable) {
String commitTime, HoodieTable<T> hoodieTable, String idPrefix) {
super(sortedRecordItr);
this.partitionsCleaned = new HashSet<>();
this.hoodieConfig = config;
this.commitTime = commitTime;
this.hoodieTable = hoodieTable;
this.idPrefix = idPrefix;
this.numFilesWritten = 0;
}
// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
@@ -113,7 +111,10 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
@Override
protected void end() {
}
protected String getNextFileId(String idPfx) {
return String.format("%s-%d", idPfx, numFilesWritten++);
}
protected CopyOnWriteInsertHandler getInsertHandler() {
@@ -133,20 +134,11 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
@Override
protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
final HoodieRecord insertPayload = payload.record;
// clean up any partial failures
if (!partitionsCleaned.contains(insertPayload.getPartitionPath())) {
// This insert task could fail multiple times, but Spark will faithfully retry with
// the same data again. Thus, before we open any files under a given partition, we
// first delete any files in the same partitionPath written by same Spark partition
HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig, commitTime, insertPayload.getPartitionPath(),
TaskContext.getPartitionId(), hoodieTable);
partitionsCleaned.add(insertPayload.getPartitionPath());
}
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
.randomUUID().toString());
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix));
}
if (handle.canWrite(payload.record)) {
@@ -156,8 +148,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
// handle is full.
statuses.add(handle.close());
// Need to handle the rejected payload & open new handle
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
.randomUUID().toString());
handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(),
getNextFileId(idPrefix));
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}

View File

@@ -34,8 +34,8 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
CopyOnWriteLazyInsertIterable<T> {
public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
String commitTime, HoodieTable<T> hoodieTable) {
super(sortedRecordItr, config, commitTime, hoodieTable);
String commitTime, HoodieTable<T> hoodieTable, String idPfx) {
super(sortedRecordItr, config, commitTime, hoodieTable, idPfx);
}
@Override
@@ -51,7 +51,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
List<WriteStatus> statuses = new ArrayList<>();
// lazily initialize the handle, for the first time
if (handle == null) {
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
}
if (handle.canWrite(insertPayload)) {
// write the payload, if the handle has capacity
@@ -61,7 +61,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
handle.close();
statuses.add(handle.getWriteStatus());
// Need to handle the rejected payload & open new handle
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
}
}

View File

@@ -1,112 +0,0 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io;
import com.google.common.annotations.VisibleForTesting;
import com.uber.hoodie.common.SerializableConfiguration;
import com.uber.hoodie.common.util.FSUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Checks if all the written paths have their metadata consistent on storage and thus be listable to
* queries. This is important for cloud, stores like AWS S3 which are eventually consistent with
* their metadata. Without such checks, we may proceed to commit the written data, without the
* written data being made available to queries. In cases like incremental pull this can lead to
* downstream readers failing to ever see some data.
*/
public class ConsistencyCheck implements Serializable {
private static final transient Logger log = LogManager.getLogger(ConsistencyCheck.class);
private String basePath;
private List<String> relPaths;
private transient JavaSparkContext jsc;
private SerializableConfiguration hadoopConf;
private int parallelism;
public ConsistencyCheck(String basePath, List<String> relPaths, JavaSparkContext jsc,
int parallelism) {
this.basePath = basePath;
this.relPaths = relPaths;
this.jsc = jsc;
this.hadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration());
this.parallelism = parallelism;
}
@VisibleForTesting
void sleepSafe(long waitMs) {
try {
Thread.sleep(waitMs);
} catch (InterruptedException e) {
// ignore & continue next attempt
}
}
/**
* Repeatedly lists the filesystem on the paths, with exponential backoff and marks paths found as
* passing the check.
*
* @return list of (relative) paths failing the check
*/
public List<String> check(int maxAttempts, long initalDelayMs) {
long waitMs = initalDelayMs;
int attempt = 0;
List<String> remainingPaths = new ArrayList<>(relPaths);
while (attempt++ < maxAttempts) {
remainingPaths = jsc.parallelize(remainingPaths, parallelism)
.groupBy(p -> new Path(basePath, p).getParent()) // list by partition
.map(pair -> {
FileSystem fs = FSUtils.getFs(basePath, hadoopConf.get());
// list the partition path and obtain all file paths present
Set<String> fileNames = Arrays.stream(fs.listStatus(pair._1()))
.map(s -> s.getPath().getName())
.collect(Collectors.toSet());
// only return paths that can't be found
return StreamSupport.stream(pair._2().spliterator(), false)
.filter(p -> !fileNames.contains(new Path(basePath, p).getName()))
.collect(Collectors.toList());
})
.flatMap(List::iterator).collect();
if (remainingPaths.size() == 0) {
break; // we are done.
}
log.info("Consistency check, waiting for " + waitMs + " ms , after attempt :" + attempt);
sleepSafe(waitMs);
waitMs = waitMs * 2; // double check interval every attempt
}
return remainingPaths;
}
}

View File

@@ -32,6 +32,7 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.Option;
import com.uber.hoodie.config.HoodieWriteConfig;
@@ -40,12 +41,10 @@ import com.uber.hoodie.exception.HoodieUpsertException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -96,14 +95,14 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String fileId, Iterator<HoodieRecord<T>> recordItr) {
super(config, commitTime, hoodieTable);
super(config, commitTime, fileId, hoodieTable);
writeStatus.setStat(new HoodieDeltaWriteStat());
this.fileId = fileId;
this.recordItr = recordItr;
}
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
this(config, commitTime, hoodieTable, UUID.randomUUID().toString(), null);
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, String fileId) {
this(config, commitTime, hoodieTable, fileId, null);
}
private void init(HoodieRecord record) {
@@ -270,12 +269,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
throws IOException, InterruptedException {
Optional<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
return HoodieLogFormat.newWriterBuilder()
.onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
.withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(
fileSlice.get().getLogFiles().map(logFile -> logFile.getLogVersion())
.max(Comparator.naturalOrder()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
.withLogWriteToken(
latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withRolloverLogWriteToken(writeToken)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}

View File

@@ -45,7 +45,6 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
private final HoodieStorageWriter<IndexedRecord> storageWriter;
private final Path path;
private Path tempPath = null;
private long recordsWritten = 0;
private long insertRecordsWritten = 0;
private long recordsDeleted = 0;
@@ -54,26 +53,22 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId) {
super(config, commitTime, hoodieTable);
super(config, commitTime, fileId, hoodieTable);
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
final int sparkPartitionId = TaskContext.getPartitionId();
this.path = makeNewPath(partitionPath, sparkPartitionId, writeStatus.getFileId());
if (config.shouldUseTempFolderForCopyOnWriteForCreate()) {
this.tempPath = makeTempPath(partitionPath, sparkPartitionId, writeStatus.getFileId(),
TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
}
this.path = makeNewPath(partitionPath);
try {
HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(TaskContext.getPartitionId());
createMarkerFile(partitionPath);
this.storageWriter = HoodieStorageWriterFactory
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
.getStorageWriter(commitTime, path, hoodieTable, config, writerSchema);
} catch (IOException e) {
throw new HoodieInsertException(
"Failed to initialize HoodieStorageWriter for path " + getStorageWriterPath(), e);
"Failed to initialize HoodieStorageWriter for path " + path, e);
}
logger.info("New InsertHandle for partition :" + partitionPath + " with fileId " + fileId);
}
@@ -138,7 +133,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
}
} catch (IOException io) {
throw new HoodieInsertException(
"Failed to insert records for path " + getStorageWriterPath(), io);
"Failed to insert records for path " + path, io);
}
}
@@ -165,8 +160,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
stat.setNumInserts(insertRecordsWritten);
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPaths(new Path(config.getBasePath()), path, tempPath);
long fileSizeInBytes = FSUtils.getFileSize(fs, getStorageWriterPath());
stat.setPath(new Path(config.getBasePath()), path);
long fileSizeInBytes = FSUtils.getFileSize(fs, path);
stat.setTotalWriteBytes(fileSizeInBytes);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
@@ -180,9 +175,4 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
}
}
private Path getStorageWriterPath() {
// Use tempPath for storage writer if possible
return (this.tempPath == null) ? this.path : this.tempPath;
}
}

View File

@@ -17,14 +17,17 @@
package com.uber.hoodie.io;
import com.uber.hoodie.WriteStatus;
import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import com.uber.hoodie.common.util.HoodieTimer;
import com.uber.hoodie.common.util.NoOpConsistencyGuard;
import com.uber.hoodie.common.util.ReflectionUtils;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.table.HoodieTable;
import java.io.IOException;
@@ -32,16 +35,19 @@ import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.TaskContext;
public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
private static Logger logger = LogManager.getLogger(HoodieIOHandle.class);
protected final String commitTime;
protected final String fileId;
protected final String writeToken;
protected final HoodieWriteConfig config;
protected final FileSystem fs;
protected final HoodieTable<T> hoodieTable;
@@ -50,10 +56,13 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
protected HoodieTimer timer;
protected final WriteStatus writeStatus;
public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
public HoodieIOHandle(HoodieWriteConfig config, String commitTime, String fileId,
HoodieTable<T> hoodieTable) {
this.commitTime = commitTime;
this.fileId = fileId;
this.writeToken = makeSparkWriteToken();
this.config = config;
this.fs = hoodieTable.getMetaClient().getFs();
this.fs = getFileSystem(hoodieTable, config);
this.hoodieTable = hoodieTable;
this.originalSchema = new Schema.Parser().parse(config.getSchema());
this.writerSchema = createHoodieWriteSchema(originalSchema);
@@ -63,33 +72,26 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
config.getWriteStatusFailureFraction());
}
private static FileSystem getFileSystem(HoodieTable hoodieTable, HoodieWriteConfig config) {
return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled()
? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(),
config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard());
}
/**
* Deletes any new tmp files written during the current commit, into the partition
* Generate a write token based on the currently running spark task and its place in the spark dag.
*/
public static void cleanupTmpFilesFromCurrentCommit(HoodieWriteConfig config, String commitTime,
String partitionPath, int taskPartitionId, HoodieTable hoodieTable) {
FileSystem fs = hoodieTable.getMetaClient().getFs();
try {
FileStatus[] prevFailedFiles = fs.globStatus(new Path(String
.format("%s/%s/%s", config.getBasePath(), partitionPath,
FSUtils.maskWithoutFileId(commitTime, taskPartitionId))));
if (prevFailedFiles != null) {
logger.info(
"Deleting " + prevFailedFiles.length + " files generated by previous failed attempts.");
for (FileStatus status : prevFailedFiles) {
fs.delete(status.getPath(), false);
}
}
} catch (IOException e) {
throw new HoodieIOException("Failed to cleanup Temp files from commit " + commitTime, e);
}
private static String makeSparkWriteToken() {
return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(),
TaskContext.get().taskAttemptId());
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
return HoodieAvroUtils.addMetadataFields(originalSchema);
}
public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) {
public Path makeNewPath(String partitionPath) {
Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
try {
fs.mkdirs(path); // create a new partition as needed.
@@ -97,16 +99,37 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
throw new HoodieIOException("Failed to make dir " + path, e);
}
return new Path(path.toString(),
FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
return new Path(path.toString(), FSUtils.makeDataFileName(commitTime, writeToken, fileId));
}
public Path makeTempPath(String partitionPath, int taskPartitionId, String fileName, int stageId,
long taskAttemptId) {
Path path = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
return new Path(path.toString(),
FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId,
taskAttemptId));
/**
* Creates an empty marker file corresponding to storage writer path
* @param partitionPath Partition path
*/
protected void createMarkerFile(String partitionPath) {
Path markerPath = makeNewMarkerPath(partitionPath);
try {
logger.info("Creating Marker Path=" + markerPath);
fs.create(markerPath, false).close();
} catch (IOException e) {
throw new HoodieException("Failed to create marker file " + markerPath, e);
}
}
/**
* THe marker path will be <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename
* @param partitionPath
* @return
*/
private Path makeNewMarkerPath(String partitionPath) {
Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
try {
fs.mkdirs(path); // create a new partition as needed.
} catch (IOException e) {
throw new HoodieIOException("Failed to make dir " + path, e);
}
return new Path(path.toString(), FSUtils.makeMarkerFile(commitTime, writeToken, fileId));
}
public Schema getWriterSchema() {

View File

@@ -57,7 +57,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
private HoodieStorageWriter<IndexedRecord> storageWriter;
private Path newFilePath;
private Path oldFilePath;
private Path tempPath = null;
private long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
@@ -66,7 +65,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String fileId) {
super(config, commitTime, hoodieTable);
super(config, commitTime, fileId, hoodieTable);
String partitionPath = init(fileId, recordItr);
init(fileId, partitionPath,
hoodieTable.getROFileSystemView().getLatestDataFile(partitionPath, fileId).get());
@@ -77,7 +76,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
*/
public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, HoodieDataFile dataFileToBeMerged) {
super(config, commitTime, hoodieTable);
super(config, commitTime, fileId, hoodieTable);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get())
@@ -101,30 +100,25 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
oldFilePath = new Path(
config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + FSUtils
.makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
.makeDataFileName(commitTime, writeToken, fileId)).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
if (config.shouldUseTempFolderForCopyOnWriteForMerge()) {
this.tempPath = makeTempPath(partitionPath, TaskContext.getPartitionId(), fileId,
TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
}
// handle cases of partial failures, for update task
if (fs.exists(newFilePath)) {
fs.delete(newFilePath, false);
}
logger.info(String
.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
getStorageWriterPath().toString()));
newFilePath.toString()));
// file name is same for all records, in this bunch
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId);
writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath);
// Create Marker file
createMarkerFile(partitionPath);
// Create the writer for writing the new version file
storageWriter = HoodieStorageWriterFactory
.getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
.getStorageWriter(commitTime, newFilePath, hoodieTable, config, writerSchema);
} catch (IOException io) {
logger.error("Error in update task at commit " + commitTime, io);
writeStatus.setGlobalError(io);
@@ -231,17 +225,17 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
if (copyOldRecord) {
// this should work as it is, since this is an existing record
String errMsg = "Failed to merge old record into new file for key " + key + " from old file "
+ getOldFilePath() + " to new file " + getStorageWriterPath();
+ getOldFilePath() + " to new file " + newFilePath;
try {
storageWriter.writeAvro(key, oldRecord);
} catch (ClassCastException e) {
logger.error("Schema mismatch when rewriting old record " + oldRecord + " from file "
+ getOldFilePath() + " to file " + getStorageWriterPath() + " with writerSchema " + writerSchema
+ getOldFilePath() + " to file " + newFilePath + " with writerSchema " + writerSchema
.toString(true));
throw new HoodieUpsertException(errMsg, e);
} catch (IOException e) {
logger.error("Failed to merge old record into new file for key " + key + " from old file "
+ getOldFilePath() + " to new file " + getStorageWriterPath(), e);
+ getOldFilePath() + " to new file " + newFilePath, e);
throw new HoodieUpsertException(errMsg, e);
}
recordsWritten++;
@@ -270,7 +264,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
storageWriter.close();
}
long fileSizeInBytes = FSUtils.getFileSize(fs, getStorageWriterPath());
long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
writeStatus.getStat().setTotalWriteBytes(fileSizeInBytes);
writeStatus.getStat().setFileSizeInBytes(fileSizeInBytes);
writeStatus.getStat().setNumWrites(recordsWritten);
@@ -291,13 +285,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
return oldFilePath;
}
private Path getStorageWriterPath() {
// Use tempPath for storage writer if possible
return (this.tempPath == null) ? this.newFilePath : this.tempPath;
}
@Override
public WriteStatus getWriteStatus() {
return writeStatus;
}
}
}

View File

@@ -201,7 +201,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
.map(
s -> {
List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile
.getBaseInstantAndLogVersionComparator().reversed()).collect(Collectors.toList());
.getLogFileComparator()).collect(Collectors.toList());
totalLogFiles.add((long) logFiles.size());
totalFileSlices.add(1L);
// Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO

View File

@@ -17,6 +17,7 @@
package com.uber.hoodie.io.storage;
import com.uber.hoodie.avro.HoodieAvroWriteSupport;
import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.FSUtils;

View File

@@ -1,777 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io.storage;
import com.uber.hoodie.common.storage.StorageSchemes;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
/**
* HoodieWrapperFileSystem wraps the default file system. It holds state about the open streams in
* the file system to support getting the written size to each of the open streams.
*/
public class HoodieWrapperFileSystem extends FileSystem {
public static final String HOODIE_SCHEME_PREFIX = "hoodie-";
private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new
ConcurrentHashMap<>();
private FileSystem fileSystem;
private URI uri;
public static Path convertToHoodiePath(Path file, Configuration conf) {
try {
String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
return convertPathWithScheme(file, getHoodieScheme(scheme));
} catch (HoodieIOException e) {
throw e;
}
}
private static Path convertPathWithScheme(Path oldPath, String newScheme) {
URI oldURI = oldPath.toUri();
URI newURI;
try {
newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(),
oldURI.getPath(), oldURI.getQuery(), oldURI.getFragment());
return new Path(newURI);
} catch (URISyntaxException e) {
// TODO - Better Exception handling
throw new RuntimeException(e);
}
}
public static String getHoodieScheme(String scheme) {
String newScheme;
if (StorageSchemes.isSchemeSupported(scheme)) {
newScheme = HOODIE_SCHEME_PREFIX + scheme;
} else {
throw new IllegalArgumentException(
"BlockAlignedAvroParquetWriter does not support scheme " + scheme);
}
return newScheme;
}
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
// Get the default filesystem to decorate
Path path = new Path(uri);
// Remove 'hoodie-' prefix from path
if (path.toString().startsWith(HOODIE_SCHEME_PREFIX)) {
path = new Path(path.toString().replace(HOODIE_SCHEME_PREFIX, ""));
}
this.fileSystem = FSUtils.getFs(path.toString(), conf);
// Do not need to explicitly initialize the default filesystem, its done already in the above
// FileSystem.get
// fileSystem.initialize(FileSystem.getDefaultUri(conf), conf);
// fileSystem.setConf(conf);
this.uri = uri;
}
@Override
public URI getUri() {
return uri;
}
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return fileSystem.open(convertToDefaultPath(f), bufferSize);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
final Path translatedPath = convertToDefaultPath(f);
return wrapOutputStream(f, fileSystem
.create(translatedPath, permission, overwrite, bufferSize, replication, blockSize,
progress));
}
private FSDataOutputStream wrapOutputStream(final Path path,
FSDataOutputStream fsDataOutputStream) throws IOException {
if (fsDataOutputStream instanceof SizeAwareFSDataOutputStream) {
return fsDataOutputStream;
}
SizeAwareFSDataOutputStream os = new SizeAwareFSDataOutputStream(
fsDataOutputStream, () -> openStreams.remove(path.getName()));
openStreams.put(path.getName(), os);
return os;
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite));
}
@Override
public FSDataOutputStream create(Path f) throws IOException {
return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f)));
}
@Override
public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
return fileSystem.create(convertToDefaultPath(f), progress);
}
@Override
public FSDataOutputStream create(Path f, short replication) throws IOException {
return fileSystem.create(convertToDefaultPath(f), replication);
}
@Override
public FSDataOutputStream create(Path f, short replication, Progressable progress)
throws IOException {
return fileSystem.create(convertToDefaultPath(f), replication, progress);
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
return fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize);
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
throws IOException {
return fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress);
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
return fileSystem
.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return fileSystem
.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize,
progress);
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags,
int bufferSize, short replication, long blockSize, Progressable progress,
Options.ChecksumOpt checksumOpt) throws IOException {
return fileSystem
.create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize,
progress, checksumOpt);
}
@Override
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
long blockSize) throws IOException {
return fileSystem
.create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
return fileSystem.append(convertToDefaultPath(f), bufferSize, progress);
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
return fileSystem.rename(convertToDefaultPath(src), convertToDefaultPath(dst));
}
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return fileSystem.delete(convertToDefaultPath(f), recursive);
}
@Override
public FileStatus[] listStatus(Path f) throws IOException {
return fileSystem.listStatus(convertToDefaultPath(f));
}
@Override
public Path getWorkingDirectory() {
return convertToHoodiePath(fileSystem.getWorkingDirectory());
}
@Override
public void setWorkingDirectory(Path newDir) {
fileSystem.setWorkingDirectory(convertToDefaultPath(newDir));
}
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return fileSystem.mkdirs(convertToDefaultPath(f), permission);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return fileSystem.getFileStatus(convertToDefaultPath(f));
}
@Override
public String getScheme() {
return uri.getScheme();
}
@Override
public String getCanonicalServiceName() {
return fileSystem.getCanonicalServiceName();
}
@Override
public String getName() {
return fileSystem.getName();
}
@Override
public Path makeQualified(Path path) {
return convertToHoodiePath(fileSystem.makeQualified(convertToDefaultPath(path)));
}
@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
return fileSystem.getDelegationToken(renewer);
}
@Override
public Token<?>[] addDelegationTokens(String renewer, Credentials credentials)
throws IOException {
return fileSystem.addDelegationTokens(renewer, credentials);
}
@Override
public FileSystem[] getChildFileSystems() {
return fileSystem.getChildFileSystems();
}
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)
throws IOException {
return fileSystem.getFileBlockLocations(file, start, len);
}
@Override
public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException {
return fileSystem.getFileBlockLocations(convertToDefaultPath(p), start, len);
}
@Override
public FsServerDefaults getServerDefaults() throws IOException {
return fileSystem.getServerDefaults();
}
@Override
public FsServerDefaults getServerDefaults(Path p) throws IOException {
return fileSystem.getServerDefaults(convertToDefaultPath(p));
}
@Override
public Path resolvePath(Path p) throws IOException {
return convertToHoodiePath(fileSystem.resolvePath(convertToDefaultPath(p)));
}
@Override
public FSDataInputStream open(Path f) throws IOException {
return fileSystem.open(convertToDefaultPath(f));
}
@Override
public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int bufferSize,
short replication, long blockSize, Progressable progress) throws IOException {
return fileSystem
.createNonRecursive(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize,
progress);
}
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return fileSystem
.createNonRecursive(convertToDefaultPath(f), permission, overwrite, bufferSize, replication,
blockSize, progress);
}
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return fileSystem
.createNonRecursive(convertToDefaultPath(f), permission, flags, bufferSize, replication,
blockSize, progress);
}
@Override
public boolean createNewFile(Path f) throws IOException {
return fileSystem.createNewFile(convertToDefaultPath(f));
}
@Override
public FSDataOutputStream append(Path f) throws IOException {
return fileSystem.append(convertToDefaultPath(f));
}
@Override
public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
return fileSystem.append(convertToDefaultPath(f), bufferSize);
}
@Override
public void concat(Path trg, Path[] psrcs) throws IOException {
Path[] psrcsNew = convertDefaults(psrcs);
fileSystem.concat(convertToDefaultPath(trg), psrcsNew);
}
@Override
public short getReplication(Path src) throws IOException {
return fileSystem.getReplication(convertToDefaultPath(src));
}
@Override
public boolean setReplication(Path src, short replication) throws IOException {
return fileSystem.setReplication(convertToDefaultPath(src), replication);
}
@Override
public boolean delete(Path f) throws IOException {
return fileSystem.delete(convertToDefaultPath(f));
}
@Override
public boolean deleteOnExit(Path f) throws IOException {
return fileSystem.deleteOnExit(convertToDefaultPath(f));
}
@Override
public boolean cancelDeleteOnExit(Path f) {
return fileSystem.cancelDeleteOnExit(convertToDefaultPath(f));
}
@Override
public boolean exists(Path f) throws IOException {
return fileSystem.exists(convertToDefaultPath(f));
}
@Override
public boolean isDirectory(Path f) throws IOException {
return fileSystem.isDirectory(convertToDefaultPath(f));
}
@Override
public boolean isFile(Path f) throws IOException {
return fileSystem.isFile(convertToDefaultPath(f));
}
@Override
public long getLength(Path f) throws IOException {
return fileSystem.getLength(convertToDefaultPath(f));
}
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
return fileSystem.getContentSummary(convertToDefaultPath(f));
}
@Override
public RemoteIterator<Path> listCorruptFileBlocks(Path path) throws IOException {
return fileSystem.listCorruptFileBlocks(convertToDefaultPath(path));
}
@Override
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException {
return fileSystem.listStatus(convertToDefaultPath(f), filter);
}
@Override
public FileStatus[] listStatus(Path[] files) throws IOException {
return fileSystem.listStatus(convertDefaults(files));
}
@Override
public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException {
return fileSystem.listStatus(convertDefaults(files), filter);
}
@Override
public FileStatus[] globStatus(Path pathPattern) throws IOException {
return fileSystem.globStatus(convertToDefaultPath(pathPattern));
}
@Override
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException {
return fileSystem.globStatus(convertToDefaultPath(pathPattern), filter);
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) throws IOException {
return fileSystem.listLocatedStatus(convertToDefaultPath(f));
}
@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
return fileSystem.listFiles(convertToDefaultPath(f), recursive);
}
@Override
public Path getHomeDirectory() {
return convertToHoodiePath(fileSystem.getHomeDirectory());
}
@Override
public boolean mkdirs(Path f) throws IOException {
return fileSystem.mkdirs(convertToDefaultPath(f));
}
@Override
public void copyFromLocalFile(Path src, Path dst) throws IOException {
fileSystem.copyFromLocalFile(convertToDefaultPath(src), convertToDefaultPath(dst));
}
@Override
public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException {
fileSystem.moveFromLocalFile(convertDefaults(srcs), convertToDefaultPath(dst));
}
@Override
public void moveFromLocalFile(Path src, Path dst) throws IOException {
fileSystem.moveFromLocalFile(convertToDefaultPath(src), convertToDefaultPath(dst));
}
@Override
public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
fileSystem.copyFromLocalFile(delSrc, convertToDefaultPath(src), convertToDefaultPath(dst));
}
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst)
throws IOException {
fileSystem
.copyFromLocalFile(delSrc, overwrite, convertDefaults(srcs), convertToDefaultPath(dst));
}
@Override
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst)
throws IOException {
fileSystem
.copyFromLocalFile(delSrc, overwrite, convertToDefaultPath(src), convertToDefaultPath(dst));
}
@Override
public void copyToLocalFile(Path src, Path dst) throws IOException {
fileSystem.copyToLocalFile(convertToDefaultPath(src), convertToDefaultPath(dst));
}
@Override
public void moveToLocalFile(Path src, Path dst) throws IOException {
fileSystem.moveToLocalFile(convertToDefaultPath(src), convertToDefaultPath(dst));
}
@Override
public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), convertToDefaultPath(dst));
}
@Override
public void copyToLocalFile(boolean delSrc, Path src, Path dst, boolean useRawLocalFileSystem)
throws IOException {
fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), convertToDefaultPath(dst),
useRawLocalFileSystem);
}
@Override
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException {
return convertToHoodiePath(fileSystem
.startLocalOutput(convertToDefaultPath(fsOutputFile), convertToDefaultPath(tmpLocalFile)));
}
@Override
public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException {
fileSystem.completeLocalOutput(convertToDefaultPath(fsOutputFile),
convertToDefaultPath(tmpLocalFile));
}
@Override
public void close() throws IOException {
// Don't close the wrapped `fileSystem` object. This will end up closing it for every thread since it
// could be cached across jvm. We don't own that object anyway.
super.close();
}
@Override
public long getUsed() throws IOException {
return fileSystem.getUsed();
}
@Override
public long getBlockSize(Path f) throws IOException {
return fileSystem.getBlockSize(convertToDefaultPath(f));
}
@Override
public long getDefaultBlockSize() {
return fileSystem.getDefaultBlockSize();
}
@Override
public long getDefaultBlockSize(Path f) {
return fileSystem.getDefaultBlockSize(convertToDefaultPath(f));
}
@Override
public short getDefaultReplication() {
return fileSystem.getDefaultReplication();
}
@Override
public short getDefaultReplication(Path path) {
return fileSystem.getDefaultReplication(convertToDefaultPath(path));
}
@Override
public void access(Path path, FsAction mode) throws IOException {
fileSystem.access(convertToDefaultPath(path), mode);
}
@Override
public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
fileSystem
.createSymlink(convertToDefaultPath(target), convertToDefaultPath(link), createParent);
}
@Override
public FileStatus getFileLinkStatus(Path f) throws IOException {
return fileSystem.getFileLinkStatus(convertToDefaultPath(f));
}
@Override
public boolean supportsSymlinks() {
return fileSystem.supportsSymlinks();
}
@Override
public Path getLinkTarget(Path f) throws IOException {
return convertToHoodiePath(fileSystem.getLinkTarget(convertToDefaultPath(f)));
}
@Override
public FileChecksum getFileChecksum(Path f) throws IOException {
return fileSystem.getFileChecksum(convertToDefaultPath(f));
}
@Override
public FileChecksum getFileChecksum(Path f, long length) throws IOException {
return fileSystem.getFileChecksum(convertToDefaultPath(f), length);
}
@Override
public void setVerifyChecksum(boolean verifyChecksum) {
fileSystem.setVerifyChecksum(verifyChecksum);
}
@Override
public void setWriteChecksum(boolean writeChecksum) {
fileSystem.setWriteChecksum(writeChecksum);
}
@Override
public FsStatus getStatus() throws IOException {
return fileSystem.getStatus();
}
@Override
public FsStatus getStatus(Path p) throws IOException {
return fileSystem.getStatus(convertToDefaultPath(p));
}
@Override
public void setPermission(Path p, FsPermission permission) throws IOException {
fileSystem.setPermission(convertToDefaultPath(p), permission);
}
@Override
public void setOwner(Path p, String username, String groupname) throws IOException {
fileSystem.setOwner(convertToDefaultPath(p), username, groupname);
}
@Override
public void setTimes(Path p, long mtime, long atime) throws IOException {
fileSystem.setTimes(convertToDefaultPath(p), mtime, atime);
}
@Override
public Path createSnapshot(Path path, String snapshotName) throws IOException {
return convertToHoodiePath(fileSystem.createSnapshot(convertToDefaultPath(path), snapshotName));
}
@Override
public void renameSnapshot(Path path, String snapshotOldName, String snapshotNewName)
throws IOException {
fileSystem.renameSnapshot(convertToDefaultPath(path), snapshotOldName, snapshotNewName);
}
@Override
public void deleteSnapshot(Path path, String snapshotName) throws IOException {
fileSystem.deleteSnapshot(convertToDefaultPath(path), snapshotName);
}
@Override
public void modifyAclEntries(Path path, List<AclEntry> aclSpec) throws IOException {
fileSystem.modifyAclEntries(convertToDefaultPath(path), aclSpec);
}
@Override
public void removeAclEntries(Path path, List<AclEntry> aclSpec) throws IOException {
fileSystem.removeAclEntries(convertToDefaultPath(path), aclSpec);
}
@Override
public void removeDefaultAcl(Path path) throws IOException {
fileSystem.removeDefaultAcl(convertToDefaultPath(path));
}
@Override
public void removeAcl(Path path) throws IOException {
fileSystem.removeAcl(convertToDefaultPath(path));
}
@Override
public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
fileSystem.setAcl(convertToDefaultPath(path), aclSpec);
}
@Override
public AclStatus getAclStatus(Path path) throws IOException {
return fileSystem.getAclStatus(convertToDefaultPath(path));
}
@Override
public void setXAttr(Path path, String name, byte[] value) throws IOException {
fileSystem.setXAttr(convertToDefaultPath(path), name, value);
}
@Override
public void setXAttr(Path path, String name, byte[] value, EnumSet<XAttrSetFlag> flag)
throws IOException {
fileSystem.setXAttr(convertToDefaultPath(path), name, value, flag);
}
@Override
public byte[] getXAttr(Path path, String name) throws IOException {
return fileSystem.getXAttr(convertToDefaultPath(path), name);
}
@Override
public Map<String, byte[]> getXAttrs(Path path) throws IOException {
return fileSystem.getXAttrs(convertToDefaultPath(path));
}
@Override
public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOException {
return fileSystem.getXAttrs(convertToDefaultPath(path), names);
}
@Override
public List<String> listXAttrs(Path path) throws IOException {
return fileSystem.listXAttrs(convertToDefaultPath(path));
}
@Override
public void removeXAttr(Path path, String name) throws IOException {
fileSystem.removeXAttr(convertToDefaultPath(path), name);
}
@Override
public Configuration getConf() {
return fileSystem.getConf();
}
@Override
public void setConf(Configuration conf) {
// ignore this. we will set conf on init
}
@Override
public int hashCode() {
return fileSystem.hashCode();
}
@Override
public boolean equals(Object obj) {
return fileSystem.equals(obj);
}
@Override
public String toString() {
return fileSystem.toString();
}
public Path convertToHoodiePath(Path oldPath) {
return convertPathWithScheme(oldPath, getHoodieScheme(fileSystem.getScheme()));
}
private Path convertToDefaultPath(Path oldPath) {
return convertPathWithScheme(oldPath, fileSystem.getScheme());
}
private Path[] convertDefaults(Path[] psrcs) {
Path[] psrcsNew = new Path[psrcs.length];
for (int i = 0; i < psrcs.length; i++) {
psrcsNew[i] = convertToDefaultPath(psrcs[i]);
}
return psrcsNew;
}
public long getBytesWritten(Path file) {
if (openStreams.containsKey(file.getName())) {
return openStreams.get(file.getName()).getBytesWritten();
}
// When the file is first written, we do not have a track of it
throw new IllegalArgumentException(file.toString()
+ " does not have a open stream. Cannot get the bytes written on the stream");
}
}

View File

@@ -1,62 +0,0 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io.storage;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FSDataOutputStream;
/**
* Wrapper over <code>FSDataOutputStream</code> to keep track of the size of the written bytes. This
* gives a cheap way to check on the underlying file size.
*/
public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
// A callback to call when the output stream is closed.
private final Runnable closeCallback;
// Keep track of the bytes written
private final AtomicLong bytesWritten = new AtomicLong(0L);
public SizeAwareFSDataOutputStream(FSDataOutputStream out, Runnable closeCallback)
throws IOException {
super(out);
this.closeCallback = closeCallback;
}
@Override
public synchronized void write(byte[] b, int off, int len) throws IOException {
bytesWritten.addAndGet(len);
super.write(b, off, len);
}
@Override
public void write(byte[] b) throws IOException {
bytesWritten.addAndGet(b.length);
super.write(b);
}
@Override
public void close() throws IOException {
super.close();
closeCallback.run();
}
public long getBytesWritten() {
return bytesWritten.get();
}
}

View File

@@ -28,8 +28,6 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -52,7 +50,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -234,14 +231,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged);
}
public Iterator<List<WriteStatus>> handleInsert(String commitTime,
public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
logger.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this);
return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
}
public Iterator<List<WriteStatus>> handleInsert(String commitTime, String partitionPath, String fileId,
@@ -261,9 +258,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
BucketType btype = binfo.bucketType;
try {
if (btype.equals(BucketType.INSERT)) {
return handleInsert(commitTime, recordItr);
return handleInsert(commitTime, binfo.fileIdPrefix, recordItr);
} else if (btype.equals(BucketType.UPDATE)) {
return handleUpdate(commitTime, binfo.fileLoc, recordItr);
return handleUpdate(commitTime, binfo.fileIdPrefix, recordItr);
} else {
throw new HoodieUpsertException(
"Unknown bucketType " + btype + " for partition :" + partition);
@@ -376,9 +373,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
.withDeletedFileResults(filesToDeletedStatus).build();
}).collect();
// clean temporary data files
cleanTemporaryDataFiles(jsc);
// Delete Inflight instant if enabled
deleteInflightInstant(deleteInstants, activeTimeline,
new HoodieInstant(true, actionType, commit));
@@ -391,99 +385,28 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
*/
protected static void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
protected void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
HoodieInstant instantToBeDeleted) {
// Remove the rolled back inflight commits
if (deleteInstant) {
activeTimeline.deleteInflight(instantToBeDeleted);
logger.info("Deleted inflight commit " + instantToBeDeleted);
try {
//TODO: Cleanup Hoodie 1.0 rollback to simply call super.cleanFailedWrites with consistency check disabled
// and empty WriteStat list.
Path markerDir = new Path(metaClient.getMarkerFolderPath(instantToBeDeleted.getTimestamp()));
logger.info("Removing marker directory=" + markerDir);
if (metaClient.getFs().exists(markerDir)) {
metaClient.getFs().delete(markerDir, true);
}
activeTimeline.deleteInflight(instantToBeDeleted);
logger.info("Deleted inflight commit " + instantToBeDeleted);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
} else {
logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
}
}
/**
* Finalize the written data files
*
* @param stats List of HoodieWriteStats
* @return number of files finalized
*/
@Override
@SuppressWarnings("unchecked")
public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
throws HoodieIOException {
super.finalizeWrite(jsc, stats);
if (config.shouldUseTempFolderForCopyOnWrite()) {
// This is to rename each data file from temporary path to its final location
jsc.parallelize(stats, config.getFinalizeWriteParallelism())
.foreach(writeStat -> {
final FileSystem fs = getMetaClient().getFs();
final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());
if (writeStat.getTempPath() != null) {
final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath());
boolean success;
try {
logger.info("Renaming temporary file: " + tempPath + " to " + finalPath);
success = fs.rename(tempPath, finalPath);
} catch (IOException e) {
throw new HoodieIOException(
"Failed to rename file: " + tempPath + " to " + finalPath);
}
if (!success) {
throw new HoodieIOException(
"Failed to rename file: " + tempPath + " to " + finalPath);
}
}
});
// clean temporary data files
cleanTemporaryDataFiles(jsc);
}
}
/**
* Clean temporary data files that are produced from previous failed commit or retried spark
* stages.
*/
private void cleanTemporaryDataFiles(JavaSparkContext jsc) {
if (!config.shouldUseTempFolderForCopyOnWrite()) {
return;
}
final FileSystem fs = getMetaClient().getFs();
final Path temporaryFolder = new Path(config.getBasePath(),
HoodieTableMetaClient.TEMPFOLDER_NAME);
try {
if (!fs.exists(temporaryFolder)) {
logger.info("Temporary folder does not exist: " + temporaryFolder);
return;
}
List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder));
List<Tuple2<String, Boolean>> results = jsc
.parallelize(fileStatusesList, config.getFinalizeWriteParallelism()).map(fileStatus -> {
FileSystem fs1 = getMetaClient().getFs();
boolean success = fs1.delete(fileStatus.getPath(), false);
logger
.info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success);
return new Tuple2<>(fileStatus.getPath().toString(), success);
}).collect();
for (Tuple2<String, Boolean> result : results) {
if (!result._2()) {
logger.info("Failed to delete file: " + result._1());
throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1());
}
}
} catch (IOException e) {
throw new HoodieIOException(
"Failed to clean data files in temporary folder: " + temporaryFolder);
}
}
private List<HoodieCleanStat> cleanPartitionPaths(List<String> partitionsToClean,
JavaSparkContext jsc) {
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
@@ -624,13 +547,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
class BucketInfo implements Serializable {
BucketType bucketType;
String fileLoc;
String fileIdPrefix;
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BucketInfo {");
sb.append("bucketType=").append(bucketType).append(", ");
sb.append("fileLoc=").append(fileLoc);
sb.append("fileIdPrefix=").append(fileIdPrefix);
sb.append('}');
return sb.toString();
}
@@ -697,12 +620,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
private int addUpdateBucket(String fileLoc) {
private int addUpdateBucket(String fileIdHint) {
int bucket = totalBuckets;
updateLocationToBucket.put(fileLoc, bucket);
updateLocationToBucket.put(fileIdHint, bucket);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.UPDATE;
bucketInfo.fileLoc = fileLoc;
bucketInfo.fileIdPrefix = fileIdHint;
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
return bucket;
@@ -764,6 +687,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
BucketInfo bucketInfo = new BucketInfo();
bucketInfo.bucketType = BucketType.INSERT;
bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
bucketInfoMap.put(totalBuckets, bucketInfo);
totalBuckets++;
}
@@ -784,7 +708,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
}
}
/**
* Returns a list of small files in the given partition path
*/

View File

@@ -121,13 +121,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String commitTime,
public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx,
Iterator<HoodieRecord<T>> recordItr) throws Exception {
// If canIndexLogFiles, write inserts to log files else write inserts to parquet files
if (index.canIndexLogFiles()) {
return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this);
return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
} else {
return super.handleInsert(commitTime, recordItr);
return super.handleInsert(commitTime, idPfx, recordItr);
}
}
@@ -325,10 +325,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
}
@Override
public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
// delegate to base class for MOR tables
super.finalizeWrite(jsc, stats);
super.finalizeWrite(jsc, instantTs, stats);
}
@Override
@@ -362,6 +362,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
super(profile);
}
@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath

View File

@@ -34,19 +34,30 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.table.view.FileSystemViewManager;
import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.ConsistencyGuard;
import com.uber.hoodie.common.util.ConsistencyGuard.FileVisibility;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
import com.uber.hoodie.common.util.collection.Pair;
import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieIOException;
import com.uber.hoodie.exception.HoodieSavepointException;
import com.uber.hoodie.index.HoodieIndex;
import com.uber.hoodie.io.ConsistencyCheck;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -56,10 +67,7 @@ import org.apache.spark.api.java.JavaSparkContext;
*/
public abstract class HoodieTable<T extends HoodieRecordPayload> implements Serializable {
// time between successive attempts to ensure written data's metadata is consistent on storage
private static long INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
// maximum number of checks, for consistency of written data. Will wait upto 256 Secs
private static int MAX_CONSISTENCY_CHECKS = 7;
private static Logger logger = LogManager.getLogger(HoodieTable.class);
protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
@@ -279,20 +287,126 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* @param stats List of HoodieWriteStats
* @throws HoodieIOException if some paths can't be finalized on storage
*/
public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
throws HoodieIOException {
if (config.isConsistencyCheckEnabled()) {
List<String> pathsToCheck = stats.stream()
.map(stat -> stat.getTempPath() != null
? stat.getTempPath() : stat.getPath())
.collect(Collectors.toList());
cleanFailedWrites(jsc, instantTs, stats, config.isConsistencyCheckEnabled());
}
List<String> failingPaths = new ConsistencyCheck(config.getBasePath(), pathsToCheck, jsc,
config.getFinalizeWriteParallelism())
.check(MAX_CONSISTENCY_CHECKS, INITIAL_CONSISTENCY_CHECK_INTERVAL_MS);
if (failingPaths.size() > 0) {
throw new HoodieIOException("Could not verify consistency of paths : " + failingPaths);
/**
* Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark
* retries.
*
* @param jsc Spark Context
* @param instantTs Instant Timestamp
* @param stats Hoodie Write Stat
* @param consistencyCheckEnabled Consistency Check Enabled
* @throws HoodieIOException
*/
protected void cleanFailedWrites(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats,
boolean consistencyCheckEnabled) throws HoodieIOException {
try {
// Reconcile marker and data files with WriteStats so that partially written data-files due to failed
// (but succeeded on retry) tasks are removed.
String basePath = getMetaClient().getBasePath();
FileSystem fs = getMetaClient().getFs();
Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
if (!fs.exists(markerDir)) {
// Happens when all writes are appends
return;
}
List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString());
List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
.filter(p -> p.endsWith(".parquet")).collect(Collectors.toList());
// Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths);
logger.warn("InValid data paths=" + invalidDataPaths);
Map<String, List<Pair<String, String>>> groupByPartition = invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(dp).getParent().toString(), dp))
.collect(Collectors.groupingBy(Pair::getKey));
if (!groupByPartition.isEmpty()) {
// Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
// Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
if (consistencyCheckEnabled) {
// This will either ensure all files to be deleted are present.
waitForAllFiles(jsc, groupByPartition, FileVisibility.APPEAR);
}
// Now delete partially written files
jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism())
.map(partitionWithFileList -> {
final FileSystem fileSystem = metaClient.getFs();
logger.info("Deleting invalid data files=" + partitionWithFileList);
if (partitionWithFileList.isEmpty()) {
return true;
}
// Delete
partitionWithFileList.stream().map(Pair::getValue).forEach(file -> {
try {
fileSystem.delete(new Path(file), false);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
});
return true;
}).collect();
// Now ensure the deleted files disappear
if (consistencyCheckEnabled) {
// This will either ensure all files to be deleted are absent.
waitForAllFiles(jsc, groupByPartition, FileVisibility.DISAPPEAR);
}
}
// Now delete the marker directory
if (fs.exists(markerDir)) {
// For append only case, we do not write to marker dir. Hence, the above check
logger.info("Removing marker directory=" + markerDir);
fs.delete(markerDir, true);
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
/**
* Ensures all files passed either appear or disappear
* @param jsc JavaSparkContext
* @param groupByPartition Files grouped by partition
* @param visibility Appear/Disappear
*/
private void waitForAllFiles(JavaSparkContext jsc, Map<String, List<Pair<String, String>>> groupByPartition,
FileVisibility visibility) {
// This will either ensure all files to be deleted are present.
boolean checkPassed =
jsc.parallelize(new ArrayList<>(groupByPartition.entrySet()), config.getFinalizeWriteParallelism())
.map(partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),
partitionWithFileList.getValue().stream(), visibility))
.collect().stream().allMatch(x -> x);
if (!checkPassed) {
throw new HoodieIOException("Consistency check failed to ensure all files " + visibility);
}
}
private boolean waitForCondition(String partitionPath, Stream<Pair<String, String>> partitionFilePaths,
FileVisibility visibility) {
final FileSystem fileSystem = metaClient.getFs();
List<String> fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
try {
getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, fileList, visibility);
} catch (IOException | TimeoutException ioe) {
logger.error("Got exception while waiting for files to show up", ioe);
return false;
}
return true;
}
private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
return new FailSafeConsistencyGuard(fileSystem, config.getMaxConsistencyChecks(),
config.getInitialConsistencyCheckIntervalMs(),
config.getMaxConsistencyCheckIntervalMs());
}
}