1
0

[HUDI-535] Ensure Compaction Plan is always written in .aux folder to avoid 0.5.0/0.5.1 reader-writer compatibility issues (#1229)

This commit is contained in:
Balaji Varadarajan
2020-01-17 10:56:35 -08:00
committed by vinoth chandar
parent 0a07752dc0
commit 923e2b4a1e
7 changed files with 58 additions and 28 deletions

View File

@@ -107,13 +107,13 @@ public class CompactionCommand implements CommandMarker {
try { try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails // This could be a completed compaction. Assume a compaction request file is present but skip if fails
compactionPlan = AvroUtils.deserializeCompactionPlan( compactionPlan = AvroUtils.deserializeCompactionPlan(
activeTimeline.readPlanAsBytes( activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
} catch (HoodieIOException ioe) { } catch (HoodieIOException ioe) {
// SKIP // SKIP
} }
} else { } else {
compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readPlanAsBytes( compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get()); HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
} }
@@ -156,7 +156,7 @@ public class CompactionCommand implements CommandMarker {
HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline(); HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
activeTimeline.readPlanAsBytes( activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();

View File

@@ -220,7 +220,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant) private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant)
throws IOException { throws IOException {
return AvroUtils.deserializeCompactionPlan( return AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().readPlanAsBytes( metaClient.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
} }

View File

@@ -953,7 +953,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
timeline.readPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
// Merge extra meta-data passed by user with the one already in inflight compaction // Merge extra meta-data passed by user with the one already in inflight compaction
Option<Map<String, String>> mergedMetaData = extraMetadata.map(m -> { Option<Map<String, String>> mergedMetaData = extraMetadata.map(m -> {
Map<String, String> merged = new HashMap<>(); Map<String, String> merged = new HashMap<>();

View File

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@@ -279,18 +280,28 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return readDataFromPath(detailPath); return readDataFromPath(detailPath);
} }
public Option<byte[]> readCleanerInfoAsBytes(HoodieInstant instant) {
// Cleaner metadata are always stored only in timeline .hoodie
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
}
//----------------------------------------------------------------- //-----------------------------------------------------------------
// BEGIN - COMPACTION RELATED META-DATA MANAGEMENT. // BEGIN - COMPACTION RELATED META-DATA MANAGEMENT.
//----------------------------------------------------------------- //-----------------------------------------------------------------
public Option<byte[]> readPlanAsBytes(HoodieInstant instant) { public Option<byte[]> readCompactionPlanAsBytes(HoodieInstant instant) {
Path detailPath = null; try {
if (metaClient.getTimelineLayoutVersion().isNullVersion()) { // Reading from auxiliary path first. In future release, we will cleanup compaction management
detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()); // to only write to timeline and skip auxiliary and this code will be able to handle it.
} else { return readDataFromPath(new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()));
detailPath = new Path(metaClient.getMetaPath(), instant.getFileName()); } catch (HoodieIOException e) {
// This will be removed in future release. See HUDI-546
if (e.getIOException() instanceof FileNotFoundException) {
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
} else {
throw e;
}
} }
return readDataFromPath(detailPath);
} }
/** /**
@@ -344,14 +355,9 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
} }
private void createFileInAuxiliaryFolder(HoodieInstant instant, Option<byte[]> data) { private void createFileInAuxiliaryFolder(HoodieInstant instant, Option<byte[]> data) {
if (metaClient.getTimelineLayoutVersion().isNullVersion()) { // This will be removed in future release. See HUDI-546
/** Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
* For latest version, since we write immutable files directly in timeline directory, there is no need to write createFileInPath(fullPath, data);
* additional immutable files in .aux folder
*/
Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
createFileInPath(fullPath, data);
}
} }
//----------------------------------------------------------------- //-----------------------------------------------------------------
@@ -369,8 +375,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
Preconditions.checkArgument(inflightInstant.isInflight()); Preconditions.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp()); HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp());
// First write metadata to aux folder
createFileInAuxiliaryFolder(commitInstant, data);
// Then write to timeline // Then write to timeline
transitionState(inflightInstant, commitInstant, data); transitionState(inflightInstant, commitInstant, data);
return commitInstant; return commitInstant;
@@ -471,8 +475,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public void saveToCleanRequested(HoodieInstant instant, Option<byte[]> content) { public void saveToCleanRequested(HoodieInstant instant, Option<byte[]> content) {
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)); Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
Preconditions.checkArgument(instant.getState().equals(State.REQUESTED)); Preconditions.checkArgument(instant.getState().equals(State.REQUESTED));
// Write workload to auxiliary folder
createFileInAuxiliaryFolder(instant, content);
// Plan is stored in meta path // Plan is stored in meta path
createFileInMetaPath(instant.getFileName(), content, false); createFileInMetaPath(instant.getFileName(), content, false);
} }

View File

@@ -72,7 +72,7 @@ public class CleanerUtils {
throws IOException { throws IOException {
CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient); CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient);
HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata( HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata(
metaClient.getActiveTimeline().readPlanAsBytes(cleanInstant).get()); metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get());
return metadataMigrator.upgradeToLatest(cleanMetadata, cleanMetadata.getVersion()); return metadataMigrator.upgradeToLatest(cleanMetadata, cleanMetadata.getVersion());
} }
@@ -85,7 +85,7 @@ public class CleanerUtils {
*/ */
public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant) public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant)
throws IOException { throws IOException {
return AvroUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readPlanAsBytes(cleanInstant).get(), return AvroUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(),
HoodieCleanerPlan.class); HoodieCleanerPlan.class);
} }
} }

View File

@@ -140,7 +140,7 @@ public class CompactionUtils {
throws IOException { throws IOException {
CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient); CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan( HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().readPlanAsBytes( metaClient.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get()); HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
return migrator.upgradeToLatest(compactionPlan, compactionPlan.getVersion()); return migrator.upgradeToLatest(compactionPlan, compactionPlan.getVersion());
} }

View File

@@ -18,8 +18,11 @@
package org.apache.hudi.common.table.string; package org.apache.hudi.common.table.string;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.TimelineLayoutVersion;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -31,6 +34,7 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@@ -52,7 +56,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
} }
@Test @Test
public void testLoadingInstantsFromFiles() { public void testLoadingInstantsFromFiles() throws IOException {
HoodieInstant instant1 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "1"); HoodieInstant instant1 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "1");
HoodieInstant instant2 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "3"); HoodieInstant instant2 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "3");
HoodieInstant instant3 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "5"); HoodieInstant instant3 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "5");
@@ -96,6 +100,30 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
timeline.getCommitTimeline().filterCompletedInstants().getInstants()); timeline.getCommitTimeline().filterCompletedInstants().getInstants());
HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5), HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5),
timeline.getCommitTimeline().filterPendingExcludingCompaction().getInstants()); timeline.getCommitTimeline().filterPendingExcludingCompaction().getInstants());
// Backwards compatibility testing for reading compaction plans
HoodieInstant instant6 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "9");
byte[] dummy = new byte[5];
HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(new HoodieTableMetaClient(metaClient.getHadoopConf(),
metaClient.getBasePath(), true, metaClient.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(TimelineLayoutVersion.VERSION_0))));
// Old Timeline writes both to aux and timeline folder
oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));
// Now use latest timeline version
timeline = timeline.reload();
// Ensure aux file is present
assertTrue(metaClient.getFs().exists(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName())));
// Read 5 bytes
assertEquals(timeline.readCompactionPlanAsBytes(instant6).get().length, 5);
// Delete auxiliary file to mimic future release where we stop writing to aux
metaClient.getFs().delete(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName()));
// Ensure requested instant is not present in aux
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName())));
// Now read compaction plan again which should not throw exception
assertEquals(timeline.readCompactionPlanAsBytes(instant6).get().length, 5);
} }
@Test @Test