1
0

Ensure Compaction workload is stored in write-once meta-data files separate from timeline files.

This avoids concurrency issues when compactor(s) and ingestor are running in parallel.
    In the Next PR -> Safety concern regarding Cleaner retaining all meta-data and file-slices for pending compactions will be addressed
This commit is contained in:
Balaji Varadarajan
2018-05-31 14:11:43 -07:00
committed by vinoth chandar
parent 9d99942564
commit 0a0451a765
9 changed files with 207 additions and 47 deletions

View File

@@ -0,0 +1,42 @@
/*
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.uber.hoodie.io.compact;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
/**
* Contains Hoodie Compaction instant along with workload
*/
public class HoodieCompactionInstantWithPlan {
private final String compactionInstantTime;
private final HoodieCompactionPlan compactionPlan;
public HoodieCompactionInstantWithPlan(String compactionInstantTime,
HoodieCompactionPlan compactionPlan) {
this.compactionInstantTime = compactionInstantTime;
this.compactionPlan = compactionPlan;
}
public String getCompactionInstantTime() {
return compactionInstantTime;
}
public HoodieCompactionPlan getCompactionPlan() {
return compactionPlan;
}
}

View File

@@ -16,6 +16,7 @@
package com.uber.hoodie.common;
import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.HoodieCommitMetadata;
import com.uber.hoodie.common.model.HoodieKey;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
@@ -23,6 +24,8 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.AvroUtils;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import java.io.IOException;
@@ -135,6 +138,21 @@ public class HoodieTestDataGenerator {
}
}
public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant,
Configuration configuration) throws IOException {
Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
FileSystem fs = FSUtils.getFs(basePath, configuration);
FSDataOutputStream os = fs.create(commitFile, true);
HoodieCompactionPlan workload = new HoodieCompactionPlan();
try {
// Write empty commit metadata
os.writeBytes(new String(AvroUtils.serializeCompactionPlan(workload).get(), StandardCharsets.UTF_8));
} finally {
os.close();
}
}
public static void createSavepointFile(String basePath, String commitTime) throws IOException {
createSavepointFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf());
}

View File

@@ -22,14 +22,20 @@ import com.uber.hoodie.common.SerializableConfiguration;
import com.uber.hoodie.common.model.HoodieTableType;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -54,6 +60,7 @@ public class HoodieTableMetaClient implements Serializable {
private static final transient Logger log = LogManager.getLogger(HoodieTableMetaClient.class);
public static String METAFOLDER_NAME = ".hoodie";
public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
private String basePath;
private transient FileSystem fs;
@@ -135,6 +142,13 @@ public class HoodieTableMetaClient implements Serializable {
return metaPath;
}
/**
* @return Auxiliary Meta path
*/
public String getMetaAuxiliaryPath() {
return basePath + File.separator + AUXILIARYFOLDER_NAME;
}
/**
* @return path where archived timeline is stored
*/
@@ -242,6 +256,13 @@ public class HoodieTableMetaClient implements Serializable {
if (!fs.exists(temporaryFolder)) {
fs.mkdirs(temporaryFolder);
}
// Always create auxiliary folder which is needed to track compaction workloads (stats and any metadata in future)
final Path auxiliaryFolder = new Path(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
if (!fs.exists(auxiliaryFolder)) {
fs.mkdirs(auxiliaryFolder);
}
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
// We should not use fs.getConf as this might be different from the original configuration
// used to create the fs in unit tests
@@ -321,6 +342,30 @@ public class HoodieTableMetaClient implements Serializable {
}
/**
* Helper method to scan all hoodie-instant metafiles and construct HoodieInstant objects
*
* @param fs FileSystem
* @param metaPath Meta Path where hoodie instants are present
* @param includedExtensions Included hoodie extensions
* @return List of Hoodie Instants generated
* @throws IOException in case of failure
*/
public static List<HoodieInstant> scanHoodieInstantsFromFileSystem(
FileSystem fs, Path metaPath, Set<String> includedExtensions) throws IOException {
return Arrays.stream(
HoodieTableMetaClient
.scanFiles(fs, metaPath, path -> {
// Include only the meta files with extensions that needs to be included
String extension = FSUtils.getFileExtension(path.getName());
return includedExtensions.contains(extension);
})).sorted(Comparator.comparing(
// Sort the meta-data by the instant time (first part of the file name)
fileStatus -> FSUtils.getInstantTime(fileStatus.getPath().getName())))
// create HoodieInstantMarkers from FileStatus, which extracts properties
.map(HoodieInstant::new).collect(Collectors.toList());
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@@ -20,18 +20,16 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.time.FastDateFormat;
@@ -54,6 +52,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public static final FastDateFormat COMMIT_FORMATTER = FastDateFormat
.getInstance("yyyyMMddHHmmss");
public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(
new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION,
CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION}));
private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class);
private HoodieTableMetaClient metaClient;
@@ -64,22 +67,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
}
protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, String[] includedExtensions) {
protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set<String> includedExtensions) {
// Filter all the filter in the metapath and include only the extensions passed and
// convert them into HoodieInstant
try {
this.instants =
Arrays.stream(
HoodieTableMetaClient
.scanFiles(metaClient.getFs(), new Path(metaClient.getMetaPath()), path -> {
// Include only the meta files with extensions that needs to be included
String extension = FSUtils.getFileExtension(path.getName());
return Arrays.stream(includedExtensions).anyMatch(Predicate.isEqual(extension));
})).sorted(Comparator.comparing(
// Sort the meta-data by the instant time (first part of the file name)
fileStatus -> FSUtils.getInstantTime(fileStatus.getPath().getName())))
// create HoodieInstantMarkers from FileStatus, which extracts properties
.map(HoodieInstant::new).collect(Collectors.toList());
this.instants = HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(),
new Path(metaClient.getMetaPath()), includedExtensions);
log.info("Loaded instants " + instants);
} catch (IOException e) {
throw new HoodieIOException("Failed to scan metadata", e);
@@ -92,10 +85,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
public HoodieActiveTimeline(HoodieTableMetaClient metaClient) {
this(metaClient,
new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION,
CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION});
this(metaClient, VALID_EXTENSIONS_IN_ACTIVE_TIMELINE);
}
/**
@@ -218,7 +208,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public void revertToInflight(HoodieInstant instant) {
log.info("Reverting instant to inflight " + instant);
moveCompleteToInflight(instant, HoodieTimeline.getInflightInstant(instant));
revertCompleteToInflight(instant, HoodieTimeline.getInflightInstant(instant));
log.info("Reverted " + instant + " to inflight");
}
@@ -255,23 +245,67 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return readDataFromPath(detailPath);
}
public void revertFromInflightToRequested(HoodieInstant inflightInstant, HoodieInstant requestedInstant,
Optional<byte[]> data) {
/** BEGIN - COMPACTION RELATED META-DATA MANAGEMENT **/
public Optional<byte[]> getInstantAuxiliaryDetails(HoodieInstant instant) {
Path detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
return readDataFromPath(detailPath);
}
/**
* Revert compaction State from inflight to requested
*
* @param inflightInstant Inflight Instant
* @return requested instant
*/
public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) {
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
transitionState(inflightInstant, requestedInstant, data);
Preconditions.checkArgument(inflightInstant.isInflight());
HoodieInstant requestedInstant =
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp());
transitionState(inflightInstant, requestedInstant, Optional.empty());
return requestedInstant;
}
public void transitionFromRequestedToInflight(HoodieInstant requestedInstant, HoodieInstant inflightInstant,
Optional<byte[]> data) {
/**
* Transition Compaction State from requested to inflight
*
* @param requestedInstant Requested instant
* @return inflight instant
*/
public HoodieInstant transitionCompactionRequestedToInflight(HoodieInstant requestedInstant) {
Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
transitionState(requestedInstant, inflightInstant, data);
Preconditions.checkArgument(requestedInstant.isRequested());
HoodieInstant inflightInstant =
new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, requestedInstant.getTimestamp());
transitionState(requestedInstant, inflightInstant, Optional.empty());
return inflightInstant;
}
protected void moveInflightToComplete(HoodieInstant inflightInstant, HoodieInstant commitInstant,
Optional<byte[]> data) {
/**
* Transition Compaction State from inflight to Committed
*
* @param inflightInstant Inflight instant
* @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant inflightInstant, Optional<byte[]> data) {
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
Preconditions.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, COMMIT_ACTION, inflightInstant.getTimestamp());
transitionState(inflightInstant, commitInstant, data);
return commitInstant;
}
private void createFileInAuxiliaryFolder(HoodieInstant instant, Optional<byte[]> data) {
Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
createFileInPath(fullPath, data);
}
/**
* END - COMPACTION RELATED META-DATA MANAGEMENT
**/
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant,
Optional<byte[]> data) {
Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
@@ -290,7 +324,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
}
protected void moveCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) {
Preconditions.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp()));
Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName());
try {
@@ -308,35 +342,44 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
}
public void saveToInflight(HoodieInstant instant, Optional<byte[]> content) {
Preconditions.checkArgument(instant.isInflight());
createFileInMetaPath(instant.getFileName(), content);
}
public void saveToRequested(HoodieInstant instant, Optional<byte[]> content) {
public void saveToCompactionRequested(HoodieInstant instant, Optional<byte[]> content) {
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
// Write workload to auxiliary folder
createFileInAuxiliaryFolder(instant, content);
createFileInMetaPath(instant.getFileName(), content);
}
protected void createFileInMetaPath(String filename, Optional<byte[]> content) {
private void createFileInMetaPath(String filename, Optional<byte[]> content) {
Path fullPath = new Path(metaClient.getMetaPath(), filename);
createFileInPath(fullPath, content);
}
private void createFileInPath(Path fullPath, Optional<byte[]> content) {
try {
if (!content.isPresent()) {
// If the path does not exist, create it first
if (!metaClient.getFs().exists(fullPath)) {
if (metaClient.getFs().createNewFile(fullPath)) {
log.info("Created a new file in meta path: " + fullPath);
return;
} else {
throw new HoodieIOException("Failed to create file " + fullPath);
}
} else {
}
if (content.isPresent()) {
FSDataOutputStream fsout = metaClient.getFs().create(fullPath, true);
fsout.write(content.get());
fsout.close();
return;
}
throw new HoodieIOException("Failed to create file " + fullPath);
} catch (IOException e) {
throw new HoodieIOException("Failed to create file " + fullPath, e);
}
}
protected Optional<byte[]> readDataFromPath(Path detailPath) {
private Optional<byte[]> readDataFromPath(Path detailPath) {
try (FSDataInputStream is = metaClient.getFs().open(detailPath)) {
return Optional.of(IOUtils.toByteArray(is));
} catch (IOException e) {

View File

@@ -84,6 +84,10 @@ public class HoodieInstant implements Serializable {
this.timestamp = timestamp;
}
public boolean isCompleted() {
return state == State.COMPLETED;
}
public boolean isInflight() {
return state == State.INFLIGHT;
}

View File

@@ -143,7 +143,7 @@ public class AvroUtils {
partitionMetadataBuilder.build());
}
public static Optional<byte[]> serializeCompactionWorkload(HoodieCompactionPlan compactionWorkload)
public static Optional<byte[]> serializeCompactionPlan(HoodieCompactionPlan compactionWorkload)
throws IOException {
return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class);
}

View File

@@ -21,6 +21,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan;
import com.uber.hoodie.common.model.CompactionOperation;
import com.uber.hoodie.common.model.FileSlice;
import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieInstant;
import com.uber.hoodie.exception.HoodieException;
import java.io.IOException;
@@ -114,7 +115,8 @@ public class CompactionUtils {
return pendingCompactionInstants.stream().map(instant -> {
try {
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().getInstantDetails(instant).get());
metaClient.getActiveTimeline().getInstantAuxiliaryDetails(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
return Pair.of(instant, compactionPlan);
} catch (IOException e) {
throw new HoodieException(e);

View File

@@ -252,10 +252,12 @@ public class HoodieTableFileSystemViewTest {
// Create a Data-file but this should be skipped by view
new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile();
compactionInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime);
commitTimeline.saveToInflight(compactionInstant, AvroUtils.serializeCompactionWorkload(compactionPlan));
HoodieInstant requested = HoodieTimeline.getCompactionRequestedInstant(compactionInstant.getTimestamp());
commitTimeline.saveToCompactionRequested(requested, AvroUtils.serializeCompactionPlan(compactionPlan));
commitTimeline.transitionCompactionRequestedToInflight(requested);
} else {
compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime);
commitTimeline.saveToRequested(compactionInstant, AvroUtils.serializeCompactionWorkload(compactionPlan));
commitTimeline.saveToCompactionRequested(compactionInstant, AvroUtils.serializeCompactionPlan(compactionPlan));
}
// Fake delta-ingestion after compaction-requested

View File

@@ -241,13 +241,17 @@ public class TestCompactionUtils {
}
private void scheduleCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException {
metaClient.getActiveTimeline().saveToRequested(new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime),
AvroUtils.serializeCompactionWorkload(compactionPlan));
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime),
AvroUtils.serializeCompactionPlan(compactionPlan));
}
private void scheduleInflightCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException {
metaClient.getActiveTimeline().saveToInflight(new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, instantTime),
AvroUtils.serializeCompactionWorkload(compactionPlan));
metaClient.getActiveTimeline().saveToCompactionRequested(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime),
AvroUtils.serializeCompactionPlan(compactionPlan));
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(
new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime));
}
private HoodieCompactionPlan createCompactionPlan(String instantId, int numFileIds) {