Incorporating code review feedback for finalizeWrite for COW #2
This commit is contained in:
@@ -848,7 +848,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
String commitActionType = table.getCommitActionType();
|
String commitActionType = table.getCommitActionType();
|
||||||
activeTimeline.createInflight(
|
activeTimeline.createInflight(
|
||||||
new HoodieInstant(true, commitActionType, commitTime));
|
new HoodieInstant(true, commitActionType, commitTime));
|
||||||
table.initializeFinalizeWrite();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -130,11 +130,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
|||||||
stat.setNumDeletes(recordsDeleted);
|
stat.setNumDeletes(recordsDeleted);
|
||||||
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
|
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
|
||||||
stat.setFileId(status.getFileId());
|
stat.setFileId(status.getFileId());
|
||||||
String relativePath = path.toString().replace(new Path(config.getBasePath()) + "/", "");
|
stat.setPaths(new Path(config.getBasePath()), path, tempPath);
|
||||||
stat.setPath(relativePath);
|
|
||||||
if (tempPath != null) {
|
|
||||||
stat.setTempPath(tempPath.toString().replace(new Path(config.getBasePath()) + "/", ""));
|
|
||||||
}
|
|
||||||
stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
|
stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
|
||||||
stat.setTotalWriteErrors(status.getFailedRecords().size());
|
stat.setTotalWriteErrors(status.getFailedRecords().size());
|
||||||
status.setStat(stat);
|
status.setStat(stat);
|
||||||
|
|||||||
@@ -576,24 +576,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initializeFinalizeWrite() {
|
|
||||||
if (!config.shouldUseTempFolderForCopyOnWrite()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// create temporary folder if needed
|
|
||||||
final FileSystem fs = FSUtils.getFs();
|
|
||||||
final Path temporaryFolder = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
|
|
||||||
try {
|
|
||||||
if (!fs.exists(temporaryFolder)) {
|
|
||||||
fs.mkdirs(temporaryFolder);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new HoodieIOException("Failed to create temporary folder: " + temporaryFolder);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
|
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
|
||||||
|
|||||||
@@ -250,11 +250,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
|
|||||||
return allRollbackStats;
|
return allRollbackStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initializeFinalizeWrite() {
|
|
||||||
// do nothing for MOR tables
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
|
public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) {
|
||||||
// do nothing for MOR tables
|
// do nothing for MOR tables
|
||||||
|
|||||||
@@ -273,11 +273,6 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits)
|
public abstract List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize resources needed for finalize write.
|
|
||||||
*/
|
|
||||||
public abstract void initializeFinalizeWrite();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finalize the written data files
|
* Finalize the written data files
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Statistics about a single Hoodie write operation.
|
* Statistics about a single Hoodie write operation.
|
||||||
@@ -212,6 +213,16 @@ public class HoodieWriteStat implements Serializable {
|
|||||||
return this.tempPath;
|
return this.tempPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set path and tempPath relative to the given basePath.
|
||||||
|
*/
|
||||||
|
public void setPaths(Path basePath, Path path, Path tempPath) {
|
||||||
|
this.path = path.toString().replace(basePath + "/", "");
|
||||||
|
if (tempPath != null) {
|
||||||
|
this.tempPath = tempPath.toString().replace(basePath + "/", "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return new StringBuilder()
|
return new StringBuilder()
|
||||||
|
|||||||
@@ -233,6 +233,11 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Always create temporaryFolder which is needed for finalizeWrite for Hoodie tables
|
||||||
|
final Path temporaryFolder = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
|
||||||
|
if (!fs.exists(temporaryFolder)) {
|
||||||
|
fs.mkdirs(temporaryFolder);
|
||||||
|
}
|
||||||
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
|
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath);
|
||||||
log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType()
|
log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType()
|
||||||
|
|||||||
@@ -0,0 +1,62 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.common.model;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||||
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestHoodieWriteStat {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetPaths() {
|
||||||
|
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
|
||||||
|
String basePathString = "/data/tables/some-hoodie-table";
|
||||||
|
String partitionPathString = "2017/12/31";
|
||||||
|
String fileName = UUID.randomUUID().toString();
|
||||||
|
int taskPartitionId = Integer.MAX_VALUE;
|
||||||
|
int stageId = Integer.MAX_VALUE;
|
||||||
|
long taskAttemptId = Long.MAX_VALUE;
|
||||||
|
|
||||||
|
Path basePath = new Path(basePathString);
|
||||||
|
Path partitionPath = new Path(basePath, partitionPathString);
|
||||||
|
Path tempPath = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
|
||||||
|
|
||||||
|
Path finalizeFilePath = new Path(partitionPath, FSUtils.makeDataFileName(commitTime,
|
||||||
|
taskPartitionId, fileName));
|
||||||
|
Path tempFilePath = new Path(tempPath, FSUtils.makeTempDataFileName(partitionPathString,
|
||||||
|
commitTime, taskPartitionId, fileName, stageId, taskAttemptId));
|
||||||
|
|
||||||
|
HoodieWriteStat writeStat = new HoodieWriteStat();
|
||||||
|
writeStat.setPaths(basePath, finalizeFilePath, tempFilePath);
|
||||||
|
assertEquals(finalizeFilePath, new Path(basePath, writeStat.getPath()));
|
||||||
|
assertEquals(tempFilePath, new Path(basePath, writeStat.getTempPath()));
|
||||||
|
|
||||||
|
// test for null tempFilePath
|
||||||
|
writeStat = new HoodieWriteStat();
|
||||||
|
writeStat.setPaths(basePath, finalizeFilePath, null);
|
||||||
|
assertEquals(finalizeFilePath, new Path(basePath, writeStat.getPath()));
|
||||||
|
assertNull(writeStat.getTempPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user