From 492ddcbb06e107618ae71fa369ba07af47a036fb Mon Sep 17 00:00:00 2001 From: Satish Kotha Date: Thu, 13 Aug 2020 17:14:25 -0700 Subject: [PATCH] [HUDI-1191] Add incremental meta client API to query partitions modified in a time window --- .../common/table/timeline/TimelineUtils.java | 101 +++++++++ .../hudi/common/table/TestTimelineUtils.java | 213 ++++++++++++++++++ .../sync/common/AbstractSyncHoodieClient.java | 26 +-- 3 files changed, 319 insertions(+), 21 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java new file mode 100644 index 000000000..fd30ee35c --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.timeline; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.exception.HoodieIOException; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * TimelineUtils provides a common way to query incremental meta-data changes for a hoodie table. + * + * This is useful in multiple places including: + * 1) HiveSync - this can be used to query partitions that changed since previous sync. + * 2) Incremental reads - InputFormats can use this API to query + */ +public class TimelineUtils { + + /** + * Returns partitions that have new data strictly after commitTime. + * Does not include internal operations such as clean in the timeline. + */ + public static List getPartitionsWritten(HoodieTimeline timeline) { + HoodieTimeline timelineToSync = timeline.getCommitsAndCompactionTimeline(); + return getAffectedPartitions(timelineToSync); + } + + /** + * Returns partitions that have been modified including internal operations such as clean in the passed timeline. + */ + public static List getAffectedPartitions(HoodieTimeline timeline) { + return timeline.filterCompletedInstants().getInstants().flatMap(s -> { + switch (s.getAction()) { + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.DELTA_COMMIT_ACTION: + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(s).get(), HoodieCommitMetadata.class); + return commitMetadata.getPartitionToWriteStats().keySet().stream(); + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions written at " + s, e); + } + case HoodieTimeline.CLEAN_ACTION: + try { + HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(s).get()); + return cleanMetadata.getPartitionMetadata().keySet().stream(); + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions cleaned at " + s, e); + } + case HoodieTimeline.ROLLBACK_ACTION: + try { + return TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(s).get()).getPartitionMetadata().keySet().stream(); + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions rolledback at " + s, e); + } + case HoodieTimeline.RESTORE_ACTION: + try { + HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(s).get(), HoodieRestoreMetadata.class); + return restoreMetadata.getHoodieRestoreMetadata().values().stream() + .flatMap(Collection::stream) + .flatMap(rollbackMetadata -> rollbackMetadata.getPartitionMetadata().keySet().stream()); + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions restored at " + s, e); + } + case HoodieTimeline.SAVEPOINT_ACTION: + try { + return TimelineMetadataUtils.deserializeHoodieSavepointMetadata(timeline.getInstantDetails(s).get()).getPartitionMetadata().keySet().stream(); + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions savepoint at " + s, e); + } + case HoodieTimeline.COMPACTION_ACTION: + // compaction is not a completed instant. So no need to consider this action. + return Stream.empty(); + default: + throw new HoodieIOException("unknown action in timeline " + s.getAction()); + } + + }).distinct().filter(s -> !s.isEmpty()).collect(Collectors.toList()); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java new file mode 100644 index 000000000..1a1ac5461 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestTimelineUtils extends HoodieCommonTestHarness { + + @BeforeEach + public void setUp() throws Exception { + initMetaClient(); + } + + @Test + public void testGetPartitions() throws IOException { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + assertTrue(activeCommitTimeline.empty()); + + String olderPartition = "0"; // older partitions that is modified by all cleans + for (int i = 1; i <= 5; i++) { + String ts = i + ""; + HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2))); + + HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts); + activeTimeline.createNewInstant(cleanInstant); + activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(olderPartition, ts)); + } + + metaClient.reloadActiveTimeline(); + + // verify modified partitions included cleaned data + List partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); + assertEquals(5, partitions.size()); + assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4", "5"})); + + partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); + assertEquals(4, partitions.size()); + assertEquals(partitions, Arrays.asList(new String[]{"0", "2", "3", "4"})); + + // verify only commit actions + partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); + assertEquals(4, partitions.size()); + assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"})); + + partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); + assertEquals(3, partitions.size()); + assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"})); + } + + @Test + public void testGetPartitionsUnpartitioned() throws IOException { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + assertTrue(activeCommitTimeline.empty()); + + String partitionPath = ""; + for (int i = 1; i <= 5; i++) { + String ts = i + ""; + HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2))); + + HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts); + activeTimeline.createNewInstant(cleanInstant); + activeTimeline.saveAsComplete(cleanInstant, getCleanMetadata(partitionPath, ts)); + } + + metaClient.reloadActiveTimeline(); + + // verify modified partitions included cleaned data + List partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); + assertTrue(partitions.isEmpty()); + + partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); + assertTrue(partitions.isEmpty()); + } + + @Test + public void testRestoreInstants() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + assertTrue(activeCommitTimeline.empty()); + + for (int i = 1; i <= 5; i++) { + String ts = i + ""; + HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.RESTORE_ACTION, ts); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, Option.of(getRestoreMetadata(basePath, ts, ts, 2))); + } + + metaClient.reloadActiveTimeline(); + + // verify modified partitions included cleaned data + List partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); + assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4", "5"})); + + partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); + assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"})); + } + + private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count) throws IOException { + HoodieRestoreMetadata metadata = new HoodieRestoreMetadata(); + List rollbackM = new ArrayList<>(); + rollbackM.add(getRollbackMetadataInstance(basePath, partition, commitTs, count)); + metadata.setHoodieRestoreMetadata(CollectionUtils.createImmutableMap(commitTs, rollbackM)); + List rollbackInstants = new ArrayList<>(); + rollbackInstants.add(commitTs); + metadata.setInstantsToRollback(rollbackInstants); + metadata.setStartRestoreTime(commitTs); + return TimelineMetadataUtils.serializeRestoreMetadata(metadata).get(); + } + + private HoodieRollbackMetadata getRollbackMetadataInstance(String basePath, String partition, String commitTs, int count) { + List deletedFiles = new ArrayList<>(); + for (int i = 1; i <= count; i++) { + deletedFiles.add("file-" + i); + } + List rollbacks = new ArrayList<>(); + rollbacks.add(commitTs); + + HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap()); + List rollbackStats = new ArrayList<>(); + rollbackStats.add(rollbackStat); + return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats); + } + + private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count) + throws IOException { + HoodieCommitMetadata commit = new HoodieCommitMetadata(); + for (int i = 1; i <= count; i++) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setFileId(i + ""); + stat.setPartitionPath(Paths.get(basePath, partition).toString()); + stat.setPath(commitTs + "." + i + ".parquet"); + commit.addWriteStat(partition, stat); + } + return commit.toJsonString().getBytes(StandardCharsets.UTF_8); + } + + private Option getCleanMetadata(String partition, String time) throws IOException { + Map partitionToFilesCleaned = new HashMap<>(); + List filesDeleted = new ArrayList<>(); + filesDeleted.add("file-" + partition + "-" + time + "1"); + filesDeleted.add("file-" + partition + "-" + time + "2"); + HoodieCleanPartitionMetadata partitionMetadata = HoodieCleanPartitionMetadata.newBuilder() + .setPartitionPath(partition) + .setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()) + .setFailedDeleteFiles(Collections.emptyList()) + .setDeletePathPatterns(Collections.emptyList()) + .setSuccessDeleteFiles(filesDeleted) + .build(); + partitionToFilesCleaned.putIfAbsent(partition, partitionMetadata); + HoodieCleanMetadata cleanMetadata = HoodieCleanMetadata.newBuilder() + .setVersion(1) + .setTimeTakenInMillis(100) + .setTotalFilesDeleted(1) + .setStartCleanTime(time) + .setEarliestCommitToRetain(time) + .setPartitionMetadata(partitionToFilesCleaned).build(); + + return TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata); + } + +} \ No newline at end of file diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index fe7f1e3cc..419ea16de 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -20,16 +20,14 @@ package org.apache.hudi.sync.common; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -40,12 +38,10 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public abstract class AbstractSyncHoodieClient { private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); protected final HoodieTableMetaClient metaClient; - protected HoodieTimeline activeTimeline; protected final HoodieTableType tableType; protected final FileSystem fs; private String basePath; @@ -57,7 +53,6 @@ public abstract class AbstractSyncHoodieClient { this.basePath = basePath; this.assumeDatePartitioning = assumeDatePartitioning; this.fs = fs; - this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); } public abstract void createTable(String tableName, MessageType storageSchema, @@ -75,10 +70,6 @@ public abstract class AbstractSyncHoodieClient { public abstract Map getTableSchema(String tableName); - public HoodieTimeline getActiveTimeline() { - return activeTimeline; - } - public HoodieTableType getTableType() { return tableType; } @@ -135,21 +126,14 @@ public abstract class AbstractSyncHoodieClient { } } else { LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); - - HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE); - return timelineToSync.getInstants().map(s -> { - try { - return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class); - } catch (IOException e) { - throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e); - } - }).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList()); + return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline() + .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); } } /** * Read the schema from the log file on path. - */ + */ @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private MessageType readSchemaFromLogFile(Option lastCompactionCommitOpt, Path path) throws Exception { MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path);