From 0a0451a7655a2ffd998cd5302bf3e4272cd895ed Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Thu, 31 May 2018 14:11:43 -0700 Subject: [PATCH] Ensure Compaction workload is stored in write-once meta-data files separate from timeline files. This avoids concurrency issues when compactor(s) and ingestor are running in parallel. In the Next PR -> Safety concern regarding Cleaner retaining all meta-data and file-slices for pending compactions will be addressed --- .../HoodieCompactionInstantWithPlan.java | 42 ++++++ .../common/HoodieTestDataGenerator.java | 18 +++ .../common/table/HoodieTableMetaClient.java | 45 +++++++ .../table/timeline/HoodieActiveTimeline.java | 121 ++++++++++++------ .../common/table/timeline/HoodieInstant.java | 4 + .../uber/hoodie/common/util/AvroUtils.java | 2 +- .../hoodie/common/util/CompactionUtils.java | 4 +- .../view/HoodieTableFileSystemViewTest.java | 6 +- .../common/util/TestCompactionUtils.java | 12 +- 9 files changed, 207 insertions(+), 47 deletions(-) create mode 100644 hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionInstantWithPlan.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionInstantWithPlan.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionInstantWithPlan.java new file mode 100644 index 000000000..ac0b5b331 --- /dev/null +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieCompactionInstantWithPlan.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.io.compact; + +import com.uber.hoodie.avro.model.HoodieCompactionPlan; + +/** + * Contains Hoodie Compaction instant along with workload + */ +public class HoodieCompactionInstantWithPlan { + + private final String compactionInstantTime; + private final HoodieCompactionPlan compactionPlan; + + public HoodieCompactionInstantWithPlan(String compactionInstantTime, + HoodieCompactionPlan compactionPlan) { + this.compactionInstantTime = compactionInstantTime; + this.compactionPlan = compactionPlan; + } + + public String getCompactionInstantTime() { + return compactionInstantTime; + } + + public HoodieCompactionPlan getCompactionPlan() { + return compactionPlan; + } +} diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index 83161aa2c..1d3f6a7ee 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common; +import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.HoodieCommitMetadata; import com.uber.hoodie.common.model.HoodieKey; import com.uber.hoodie.common.model.HoodiePartitionMetadata; @@ -23,6 +24,8 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieTestUtils; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; +import com.uber.hoodie.common.util.AvroUtils; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import java.io.IOException; @@ -135,6 +138,21 @@ public class HoodieTestDataGenerator { } } + public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant, + Configuration configuration) throws IOException { + Path commitFile = new Path( + basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName()); + FileSystem fs = FSUtils.getFs(basePath, configuration); + FSDataOutputStream os = fs.create(commitFile, true); + HoodieCompactionPlan workload = new HoodieCompactionPlan(); + try { + // Write empty commit metadata + os.writeBytes(new String(AvroUtils.serializeCompactionPlan(workload).get(), StandardCharsets.UTF_8)); + } finally { + os.close(); + } + } + public static void createSavepointFile(String basePath, String commitTime) throws IOException { createSavepointFile(basePath, commitTime, HoodieTestUtils.getDefaultHadoopConf()); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index 1a0edfc76..12649c527 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -22,14 +22,20 @@ import com.uber.hoodie.common.SerializableConfiguration; import com.uber.hoodie.common.model.HoodieTableType; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline; +import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.exception.DatasetNotFoundException; import com.uber.hoodie.exception.HoodieException; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; import java.util.Objects; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -54,6 +60,7 @@ public class HoodieTableMetaClient implements Serializable { private static final transient Logger log = LogManager.getLogger(HoodieTableMetaClient.class); public static String METAFOLDER_NAME = ".hoodie"; public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp"; + public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux"; private String basePath; private transient FileSystem fs; @@ -135,6 +142,13 @@ public class HoodieTableMetaClient implements Serializable { return metaPath; } + /** + * @return Auxiliary Meta path + */ + public String getMetaAuxiliaryPath() { + return basePath + File.separator + AUXILIARYFOLDER_NAME; + } + /** * @return path where archived timeline is stored */ @@ -242,6 +256,13 @@ public class HoodieTableMetaClient implements Serializable { if (!fs.exists(temporaryFolder)) { fs.mkdirs(temporaryFolder); } + + // Always create auxiliary folder which is needed to track compaction workloads (stats and any metadata in future) + final Path auxiliaryFolder = new Path(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); + if (!fs.exists(auxiliaryFolder)) { + fs.mkdirs(auxiliaryFolder); + } + HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props); // We should not use fs.getConf as this might be different from the original configuration // used to create the fs in unit tests @@ -321,6 +342,30 @@ public class HoodieTableMetaClient implements Serializable { } + /** + * Helper method to scan all hoodie-instant metafiles and construct HoodieInstant objects + * + * @param fs FileSystem + * @param metaPath Meta Path where hoodie instants are present + * @param includedExtensions Included hoodie extensions + * @return List of Hoodie Instants generated + * @throws IOException in case of failure + */ + public static List scanHoodieInstantsFromFileSystem( + FileSystem fs, Path metaPath, Set includedExtensions) throws IOException { + return Arrays.stream( + HoodieTableMetaClient + .scanFiles(fs, metaPath, path -> { + // Include only the meta files with extensions that needs to be included + String extension = FSUtils.getFileExtension(path.getName()); + return includedExtensions.contains(extension); + })).sorted(Comparator.comparing( + // Sort the meta-data by the instant time (first part of the file name) + fileStatus -> FSUtils.getInstantTime(fileStatus.getPath().getName()))) + // create HoodieInstantMarkers from FileStatus, which extracts properties + .map(HoodieInstant::new).collect(Collectors.toList()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 20dea2805..f2c1f3432 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -20,18 +20,16 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; -import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.common.table.timeline.HoodieInstant.State; import com.uber.hoodie.exception.HoodieIOException; import java.io.IOException; import java.io.Serializable; import java.util.Arrays; -import java.util.Comparator; import java.util.Date; +import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.time.FastDateFormat; @@ -54,6 +52,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public static final FastDateFormat COMMIT_FORMATTER = FastDateFormat .getInstance("yyyyMMddHHmmss"); + public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList( + new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, + INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, + CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION})); + private static final transient Logger log = LogManager.getLogger(HoodieActiveTimeline.class); private HoodieTableMetaClient metaClient; @@ -64,22 +67,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); } - protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, String[] includedExtensions) { + protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set includedExtensions) { // Filter all the filter in the metapath and include only the extensions passed and // convert them into HoodieInstant try { - this.instants = - Arrays.stream( - HoodieTableMetaClient - .scanFiles(metaClient.getFs(), new Path(metaClient.getMetaPath()), path -> { - // Include only the meta files with extensions that needs to be included - String extension = FSUtils.getFileExtension(path.getName()); - return Arrays.stream(includedExtensions).anyMatch(Predicate.isEqual(extension)); - })).sorted(Comparator.comparing( - // Sort the meta-data by the instant time (first part of the file name) - fileStatus -> FSUtils.getInstantTime(fileStatus.getPath().getName()))) - // create HoodieInstantMarkers from FileStatus, which extracts properties - .map(HoodieInstant::new).collect(Collectors.toList()); + this.instants = HoodieTableMetaClient.scanHoodieInstantsFromFileSystem(metaClient.getFs(), + new Path(metaClient.getMetaPath()), includedExtensions); log.info("Loaded instants " + instants); } catch (IOException e) { throw new HoodieIOException("Failed to scan metadata", e); @@ -92,10 +85,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } public HoodieActiveTimeline(HoodieTableMetaClient metaClient) { - this(metaClient, - new String[] {COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, - INFLIGHT_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, - CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION}); + this(metaClient, VALID_EXTENSIONS_IN_ACTIVE_TIMELINE); } /** @@ -218,7 +208,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public void revertToInflight(HoodieInstant instant) { log.info("Reverting instant to inflight " + instant); - moveCompleteToInflight(instant, HoodieTimeline.getInflightInstant(instant)); + revertCompleteToInflight(instant, HoodieTimeline.getInflightInstant(instant)); log.info("Reverted " + instant + " to inflight"); } @@ -255,23 +245,67 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return readDataFromPath(detailPath); } - public void revertFromInflightToRequested(HoodieInstant inflightInstant, HoodieInstant requestedInstant, - Optional data) { + /** BEGIN - COMPACTION RELATED META-DATA MANAGEMENT **/ + + public Optional getInstantAuxiliaryDetails(HoodieInstant instant) { + Path detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); + return readDataFromPath(detailPath); + } + + /** + * Revert compaction State from inflight to requested + * + * @param inflightInstant Inflight Instant + * @return requested instant + */ + public HoodieInstant revertCompactionInflightToRequested(HoodieInstant inflightInstant) { Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); - transitionState(inflightInstant, requestedInstant, data); + Preconditions.checkArgument(inflightInstant.isInflight()); + HoodieInstant requestedInstant = + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, inflightInstant.getTimestamp()); + transitionState(inflightInstant, requestedInstant, Optional.empty()); + return requestedInstant; } - public void transitionFromRequestedToInflight(HoodieInstant requestedInstant, HoodieInstant inflightInstant, - Optional data) { + /** + * Transition Compaction State from requested to inflight + * + * @param requestedInstant Requested instant + * @return inflight instant + */ + public HoodieInstant transitionCompactionRequestedToInflight(HoodieInstant requestedInstant) { Preconditions.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); - transitionState(requestedInstant, inflightInstant, data); + Preconditions.checkArgument(requestedInstant.isRequested()); + HoodieInstant inflightInstant = + new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, requestedInstant.getTimestamp()); + transitionState(requestedInstant, inflightInstant, Optional.empty()); + return inflightInstant; } - protected void moveInflightToComplete(HoodieInstant inflightInstant, HoodieInstant commitInstant, - Optional data) { + /** + * Transition Compaction State from inflight to Committed + * + * @param inflightInstant Inflight instant + * @param data Extra Metadata + * @return commit instant + */ + public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant inflightInstant, Optional data) { + Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + Preconditions.checkArgument(inflightInstant.isInflight()); + HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, COMMIT_ACTION, inflightInstant.getTimestamp()); transitionState(inflightInstant, commitInstant, data); + return commitInstant; } + private void createFileInAuxiliaryFolder(HoodieInstant instant, Optional data) { + Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); + createFileInPath(fullPath, data); + } + + /** + * END - COMPACTION RELATED META-DATA MANAGEMENT + **/ + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Optional data) { Preconditions.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); @@ -290,7 +324,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } } - protected void moveCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { + private void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { Preconditions.checkArgument(completed.getTimestamp().equals(inflight.getTimestamp())); Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), inflight.getFileName()); try { @@ -308,35 +342,44 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } public void saveToInflight(HoodieInstant instant, Optional content) { + Preconditions.checkArgument(instant.isInflight()); createFileInMetaPath(instant.getFileName(), content); } - public void saveToRequested(HoodieInstant instant, Optional content) { + public void saveToCompactionRequested(HoodieInstant instant, Optional content) { Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); + // Write workload to auxiliary folder + createFileInAuxiliaryFolder(instant, content); createFileInMetaPath(instant.getFileName(), content); } - protected void createFileInMetaPath(String filename, Optional content) { + private void createFileInMetaPath(String filename, Optional content) { Path fullPath = new Path(metaClient.getMetaPath(), filename); + createFileInPath(fullPath, content); + } + + private void createFileInPath(Path fullPath, Optional content) { try { - if (!content.isPresent()) { + // If the path does not exist, create it first + if (!metaClient.getFs().exists(fullPath)) { if (metaClient.getFs().createNewFile(fullPath)) { log.info("Created a new file in meta path: " + fullPath); - return; + } else { + throw new HoodieIOException("Failed to create file " + fullPath); } - } else { + } + + if (content.isPresent()) { FSDataOutputStream fsout = metaClient.getFs().create(fullPath, true); fsout.write(content.get()); fsout.close(); - return; } - throw new HoodieIOException("Failed to create file " + fullPath); } catch (IOException e) { throw new HoodieIOException("Failed to create file " + fullPath, e); } } - protected Optional readDataFromPath(Path detailPath) { + private Optional readDataFromPath(Path detailPath) { try (FSDataInputStream is = metaClient.getFs().open(detailPath)) { return Optional.of(IOUtils.toByteArray(is)); } catch (IOException e) { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java index f87d614da..a21683f0a 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieInstant.java @@ -84,6 +84,10 @@ public class HoodieInstant implements Serializable { this.timestamp = timestamp; } + public boolean isCompleted() { + return state == State.COMPLETED; + } + public boolean isInflight() { return state == State.INFLIGHT; } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java index 1cde27f31..93fee8cb4 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/AvroUtils.java @@ -143,7 +143,7 @@ public class AvroUtils { partitionMetadataBuilder.build()); } - public static Optional serializeCompactionWorkload(HoodieCompactionPlan compactionWorkload) + public static Optional serializeCompactionPlan(HoodieCompactionPlan compactionWorkload) throws IOException { return serializeAvroMetadata(compactionWorkload, HoodieCompactionPlan.class); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java index cadc9d7bd..2ee99eb8d 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java @@ -21,6 +21,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan; import com.uber.hoodie.common.model.CompactionOperation; import com.uber.hoodie.common.model.FileSlice; import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieInstant; import com.uber.hoodie.exception.HoodieException; import java.io.IOException; @@ -114,7 +115,8 @@ public class CompactionUtils { return pendingCompactionInstants.stream().map(instant -> { try { HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( - metaClient.getActiveTimeline().getInstantDetails(instant).get()); + metaClient.getActiveTimeline().getInstantAuxiliaryDetails( + HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); return Pair.of(instant, compactionPlan); } catch (IOException e) { throw new HoodieException(e); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java index 713b6b802..3035712ea 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java @@ -252,10 +252,12 @@ public class HoodieTableFileSystemViewTest { // Create a Data-file but this should be skipped by view new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile(); compactionInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime); - commitTimeline.saveToInflight(compactionInstant, AvroUtils.serializeCompactionWorkload(compactionPlan)); + HoodieInstant requested = HoodieTimeline.getCompactionRequestedInstant(compactionInstant.getTimestamp()); + commitTimeline.saveToCompactionRequested(requested, AvroUtils.serializeCompactionPlan(compactionPlan)); + commitTimeline.transitionCompactionRequestedToInflight(requested); } else { compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionRequestedTime); - commitTimeline.saveToRequested(compactionInstant, AvroUtils.serializeCompactionWorkload(compactionPlan)); + commitTimeline.saveToCompactionRequested(compactionInstant, AvroUtils.serializeCompactionPlan(compactionPlan)); } // Fake delta-ingestion after compaction-requested diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java index 9faaca3e5..65edb2de5 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java @@ -241,13 +241,17 @@ public class TestCompactionUtils { } private void scheduleCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException { - metaClient.getActiveTimeline().saveToRequested(new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime), - AvroUtils.serializeCompactionWorkload(compactionPlan)); + metaClient.getActiveTimeline().saveToCompactionRequested( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime), + AvroUtils.serializeCompactionPlan(compactionPlan)); } private void scheduleInflightCompaction(String instantTime, HoodieCompactionPlan compactionPlan) throws IOException { - metaClient.getActiveTimeline().saveToInflight(new HoodieInstant(State.INFLIGHT, COMPACTION_ACTION, instantTime), - AvroUtils.serializeCompactionWorkload(compactionPlan)); + metaClient.getActiveTimeline().saveToCompactionRequested( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime), + AvroUtils.serializeCompactionPlan(compactionPlan)); + metaClient.getActiveTimeline().transitionCompactionRequestedToInflight( + new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, instantTime)); } private HoodieCompactionPlan createCompactionPlan(String instantId, int numFileIds) {