From bff18280f3032d19d99438e418c856a11849cc02 Mon Sep 17 00:00:00 2001 From: v-zhangjc9 Date: Wed, 22 May 2024 13:11:50 +0800 Subject: [PATCH] =?UTF-8?q?perf(hudi-query):=20=E4=BC=98=E5=8C=96hudi?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E7=BA=BF=E7=9A=84=E6=89=AB=E6=8F=8F=E9=80=9F?= =?UTF-8?q?=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/httpRequests/http-requests-log.http | 16 +-- .../command/pro/commands/HudiCommand.java | 48 ++++++++ .../entity/hudi/HudiInstant.java | 51 +++++--- .../service/forest/service/HudiService.java | 19 ++- .../hudi/controller/TimelineController.java | 19 +++ .../service/hudi/service/TimelineService.java | 81 +++++++------ .../service/hudi/utils/HoodieUtils.java | 109 ++++++++++++++---- 7 files changed, 261 insertions(+), 82 deletions(-) diff --git a/.idea/httpRequests/http-requests-log.http b/.idea/httpRequests/http-requests-log.http index d5c5e9d..c20f2a4 100644 --- a/.idea/httpRequests/http-requests-log.http +++ b/.idea/httpRequests/http-requests-log.http @@ -1,3 +1,11 @@ +GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre +Connection: Keep-Alive +User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) +Cookie: JSESSIONID=8516C92140B5118AF9AA61025D0F8C93 +Accept-Encoding: br,deflate,gzip,x-gzip + +### + GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/service_scheduler/schedule/all Connection: Keep-Alive User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10) @@ -448,11 +456,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip ### -GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre -Connection: Keep-Alive -User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9) -Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877 -Accept-Encoding: br,deflate,gzip,x-gzip - -### - diff --git a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java index ed8a2c0..b6c8f97 100644 --- a/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java +++ b/service-command-pro/src/main/java/com/lanyuanxiaoyao/service/command/pro/commands/HudiCommand.java @@ -1,6 +1,10 @@ package com.lanyuanxiaoyao.service.command.pro.commands; import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.common.entity.TableMeta; +import com.lanyuanxiaoyao.service.configuration.ExecutorProvider; +import com.lanyuanxiaoyao.service.forest.service.HudiService; +import com.lanyuanxiaoyao.service.forest.service.InfoService; import java.io.IOException; import java.time.Instant; import java.util.List; @@ -50,10 +54,19 @@ import org.springframework.shell.standard.ShellOption; * @author lanyuanxiaoyao * @date 2024-03-19 */ +@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @ShellComponent("Hudi相关操作") public class HudiCommand { private static final Logger logger = LoggerFactory.getLogger(HudiCommand.class); + private final InfoService infoService; + private final HudiService hudiService; + + public HudiCommand(InfoService infoService, HudiService hudiService) { + this.infoService = infoService; + this.hudiService = hudiService; + } + public static void time(String name, Runnable runnable) { logger.info(name); long startTime = Instant.now().toEpochMilli(); @@ -158,6 +171,41 @@ public class HudiCommand { time("reader 3", counter -> reader3(counter, configuration, root)); } + @ShellMethod("Count meta files") + public void countMetaFiles(@ShellOption(help = "alias", defaultValue = "") String alias) throws IOException { + LongAdder count = new LongAdder(); + FileSystem fileSystem = FileSystem.get(new Configuration()); + infoService + .tableMetaList() + .select(meta -> StrUtil.isBlank(alias) || StrUtil.equals(meta.getAlias(), alias)) + .collect(TableMeta::getHudi) + .collect(TableMeta.HudiMeta::getTargetHdfsPath) + .asParallel(ExecutorProvider.EXECUTORS_5, 1) + .forEach(hdfs -> { + Path root = new Path(hdfs, ".hoodie"); + try { + FileStatus[] statuses = fileSystem.listStatus(root); + for (FileStatus status : statuses) { + if (status.isFile()) { + count.increment(); + } + if (StrUtil.containsIgnoreCase(status.getPath().toString(), "INVALID")) { + logger.info("{}", status.getPath().toString()); + } + } + } catch (IOException e) { + logger.warn("List file error", e); + } + }); + logger.info("Count: {}", count.longValue()); + fileSystem.close(); + } + + @ShellMethod("Get timeline instants") + public void timelineInstant(@ShellOption(help = "root hdfs path") String hdfs) { + hudiService.timelineHdfsAllActive(hdfs).forEach(instant -> logger.info(instant.toString())); + } + public interface Runnable { void run(LongAdder counter); } diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java index a564d61..024c779 100644 --- a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/hudi/HudiInstant.java @@ -1,5 +1,6 @@ package com.lanyuanxiaoyao.service.configuration.entity.hudi; +import cn.hutool.core.util.StrUtil; import java.util.Comparator; import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.map.ImmutableMap; @@ -18,11 +19,6 @@ public class HudiInstant implements Comparable { Comparator.comparing(HudiInstant::getTimestamp) .thenComparing(ACTION_COMPARATOR) .thenComparing(HudiInstant::getState); - - private static String getComparableAction(String action) { - return COMPARABLE_ACTIONS.getOrDefault(action, action); - } - private String action; // REQUESTED, INFLIGHT, COMPLETED, INVALID private String state; @@ -43,24 +39,49 @@ public class HudiInstant implements Comparable { this.type = type; } + public HudiInstant(String action, String state, String timestamp, String fileName, Long fileTime, String type) { + this.action = action; + this.state = state; + this.timestamp = timestamp; + this.fileName = fileName; + this.fileTime = fileTime; + this.type = type; + } + + private static String getComparableAction(String action) { + return COMPARABLE_ACTIONS.getOrDefault(action, action); + } + public String getAction() { return action; } + public void setAction(String action) { + this.action = action; + } + public String getState() { return state; } + public void setState(String state) { + this.state = state; + } + public String getTimestamp() { return timestamp; } + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + public String getFileName() { return fileName; } - public String getType() { - return type; + public void setFileName(String fileName) { + this.fileName = fileName; } public Long getFileTime() { @@ -71,15 +92,17 @@ public class HudiInstant implements Comparable { this.fileTime = fileTime; } + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + @Override public String toString() { - return "HudiInstant{" + - "action='" + action + '\'' + - ", state='" + state + '\'' + - ", timestamp='" + timestamp + '\'' + - ", fileName='" + fileName + '\'' + - ", type='" + type + '\'' + - '}'; + return StrUtil.format("{},{},{},{},{},{}", action, state, timestamp, fileName, fileTime, type); } @Override diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java index 33882b4..b787bf1 100644 --- a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/HudiService.java @@ -2,7 +2,6 @@ package com.lanyuanxiaoyao.service.forest.service; import com.dtflys.forest.annotation.BaseRequest; import com.dtflys.forest.annotation.Body; -import com.dtflys.forest.annotation.BodyType; import com.dtflys.forest.annotation.Get; import com.dtflys.forest.annotation.Post; import com.dtflys.forest.annotation.Query; @@ -31,6 +30,24 @@ public interface HudiService { @Get("/timeline/list_hdfs") PageResponse timelineHdfsList(@Query Map queryMap); + @Get("/timeline/all?active=true&archive=false") + ImmutableList timelineAllActive(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/timeline/all_hdfs?active=true&archive=false") + ImmutableList timelineHdfsAllActive(@Query("hdfs") String hdfs); + + @Get("/timeline/all?active=false&archive=true") + ImmutableList timelineAllArchive(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias); + + @Get("/timeline/all_hdfs?active=false&archive=true") + ImmutableList timelineHdfsAllArchive(@Query("hdfs") String hdfs); + + @Get("/timeline/all") + ImmutableList timelineAll(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("active") Boolean active, @Query("archive") Boolean archive); + + @Get("/timeline/all_hdfs") + ImmutableList timelineHdfsAll(@Query("hdfs") String hdfs, @Query("active") Boolean active, @Query("archive") Boolean archive); + @Get("/timeline/read_compaction_plan") HudiCompactionPlan readCompactionPlan(@Query("flink_job_id") Long flinkJobId, @Query("alias") String alias, @Query("instant") String instant); diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java index a8f2c8a..0f5d90f 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/controller/TimelineController.java @@ -82,6 +82,25 @@ public class TimelineController { ); } + @GetMapping("all") + public ImmutableList allInstants( + @RequestParam("flink_job_id") Long flinkJobId, + @RequestParam("alias") String alias, + @RequestParam(value = "active", defaultValue = "true") Boolean active, + @RequestParam(value = "archive", defaultValue = "false") Boolean archive + ) throws IOException { + return timelineService.timeline(flinkJobId, alias, active, archive); + } + + @GetMapping("all_hdfs") + public ImmutableList allInstants( + @RequestParam("hdfs") String hdfs, + @RequestParam(value = "active", defaultValue = "true") Boolean active, + @RequestParam(value = "archive", defaultValue = "false") Boolean archive + ) throws IOException { + return timelineService.timeline(hdfs, active, archive); + } + @GetMapping("read_compaction_plan") public HudiCompactionPlan readCompactionPlan( @RequestParam("flink_job_id") Long flinkJobId, diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java index 02d44f2..0d0d8f3 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/service/TimelineService.java @@ -30,7 +30,6 @@ import org.apache.hudi.table.action.rollback.RollbackUtils; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.factory.Maps; import org.eclipse.collections.api.list.ImmutableList; -import org.eclipse.collections.api.list.MutableList; import org.eclipse.collections.api.map.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,33 +99,11 @@ public class TimelineService { ) throws IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(configuration); - if (!fileSystem.exists(new Path(hdfs))) { - throw new IOException(StrUtil.format("Path {} is not exists", hdfs)); - } HoodieTableMetaClient client = HoodieTableMetaClient.builder() .setConf(configuration) .setBasePath(hdfs) .build(); - MutableList instants = Lists.mutable.empty(); - if (ObjectUtil.isEmpty(filterType)) { - filterType = Lists.immutable.of(INSTANT_TYPE_ARCHIVE, INSTANT_TYPE_ACTIVE); - } - if (filterType.contains(INSTANT_TYPE_ARCHIVE)) { - HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getArchivedTimeline) - .collect(instant -> covert(INSTANT_TYPE_ARCHIVE, instant)) - .forEach(instants::add); - } - if (filterType.contains(INSTANT_TYPE_ACTIVE)) { - HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline) - .collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant)) - .forEach(instants::add); - } - ImmutableList hudiInstants = instants - .toSortedList(HudiInstant::compareTo) - .select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction())) - .select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState())) - .toSortedList(ComparatorUtil.stringComparator(order, direction, TIMELINE_SORT_MAP)) - .toImmutable(); + ImmutableList hudiInstants = timeline(order, direction, hdfs, filterType, filterAction, filterState); ImmutableList result = hudiInstants .drop(Math.max(page - 1, 0) * count) .take(count) @@ -148,6 +125,47 @@ public class TimelineService { return new PageResponse<>(result.toList(), hudiInstants.size()); } + public ImmutableList timeline( + String order, + String direction, + String hdfs, + ImmutableList filterType, + ImmutableList filterAction, + ImmutableList filterState + ) throws IOException { + HoodieTableMetaClient client = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(hdfs) + .build(); + FileSystem fileSystem = client.getRawFs(); + if (!fileSystem.exists(new Path(hdfs))) { + throw new IOException(StrUtil.format("Path {} is not exists", hdfs)); + } + if (ObjectUtil.isEmpty(filterType)) { + filterType = Lists.immutable.of(INSTANT_TYPE_ARCHIVE, INSTANT_TYPE_ACTIVE); + } + ImmutableList instants = HoodieUtils.getAllInstants(client, filterType.contains(INSTANT_TYPE_ACTIVE), filterType.contains(INSTANT_TYPE_ARCHIVE)); + return instants + .toSortedList(HudiInstant::compareTo) + .select(instant -> ObjectUtil.isEmpty(filterAction) || filterAction.contains(instant.getAction())) + .select(instant -> ObjectUtil.isEmpty(filterState) || filterState.contains(instant.getState())) + .toSortedList(ComparatorUtil.stringComparator(order, direction, TIMELINE_SORT_MAP)) + .toImmutable(); + } + + public ImmutableList timeline(Long flinkJobId, String alias, Boolean active, Boolean archive) throws IOException { + TableMeta meta = infoService.tableMetaDetail(flinkJobId, alias); + return timeline(meta.getHudi().getTargetHdfsPath(), active, archive); + } + + public ImmutableList timeline(String hdfs, Boolean active, Boolean archive) throws IOException { + HoodieTableMetaClient client = HoodieTableMetaClient.builder() + .setConf(new Configuration()) + .setBasePath(hdfs) + .build(); + return HoodieUtils.getAllInstants(client, active, archive); + } + @Cacheable(value = "read-compaction-plan", sync = true) @Retryable(Throwable.class) public HudiCompactionPlan readCompactionPlan(Long flinkJobId, String alias, String instant) throws IOException { @@ -255,19 +273,8 @@ public class TimelineService { .setConf(new Configuration()) .setBasePath(meta.getHudi().getTargetHdfsPath()) .build(); - return HoodieUtils.getAllInstants(client, HoodieTableMetaClient::getActiveTimeline) + return HoodieUtils.getAllActiveInstants(client) .select(instant -> StrUtil.equals(instant.getAction(), HoodieTimeline.COMPACTION_ACTION)) - .reject(instant -> ObjectUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED)) - .collect(instant -> covert(INSTANT_TYPE_ACTIVE, instant)); - } - - private HudiInstant covert(String type, HoodieInstant instant) { - return new HudiInstant( - instant.getAction(), - instant.getState().name(), - instant.getTimestamp(), - instant.getFileName(), - type - ); + .reject(instant -> StrUtil.equals(instant.getState(), HoodieInstant.State.COMPLETED.name())); } } diff --git a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java index cfa1c20..e3070fd 100644 --- a/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java +++ b/service-hudi-query/src/main/java/com/lanyuanxiaoyao/service/hudi/utils/HoodieUtils.java @@ -1,24 +1,28 @@ package com.lanyuanxiaoyao.service.hudi.utils; -import cn.hutool.core.util.ReUtil; +import cn.hutool.core.util.StrUtil; +import com.lanyuanxiaoyao.service.configuration.entity.hudi.HudiInstant; import java.io.IOException; -import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.org.apache.avro.Schema; import org.eclipse.collections.api.factory.Lists; import org.eclipse.collections.api.list.ImmutableList; +import org.eclipse.collections.api.list.MutableList; +import org.eclipse.collections.api.map.MutableMap; +import org.eclipse.collections.api.multimap.list.MutableListMultimap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,37 +48,98 @@ public class HoodieUtils { return schemaUtil.getTableAvroSchema(true); } - public static ImmutableList getAllInstants(HoodieTableMetaClient client, Function getTimeline) throws IOException { + public static ImmutableList getAllActiveInstants(HoodieTableMetaClient client) throws IOException { + return getAllInstants(client, true, false); + } + + public static ImmutableList getAllArchiveInstants(HoodieTableMetaClient client) throws IOException { + return getAllInstants(client, false, true); + } + + public static ImmutableList getAllInstants(HoodieTableMetaClient client) throws IOException { + return getAllInstants(client, true, true); + } + + public static ImmutableList getAllInstants(HoodieTableMetaClient client, Boolean active, Boolean archive) throws IOException { FileSystem fileSystem = client.getRawFs(); - // 直接使用 toString 方法得到的值是被缓存的 - String hdfs = client.getBasePathV2().toUri().toString(); - Path metadataPath = new Path(hdfs + "/.hoodie"); - return getAllInstants(getTimeline.apply(client), fileSystem, metadataPath) - .toSortedList(HoodieInstant::compareTo) + Path metadataPath = new Path(client.getBasePathV2().toString(), HoodieTableMetaClient.METAFOLDER_NAME); + MutableList instants = Lists.mutable.empty(); + + if (active) { + instants.addAllIterable(activeInstants(client.getActiveTimeline())); + } + if (archive) { + instants.addAllIterable(archiveInstants(client.getArchivedTimeline())); + } + + MutableMap fileModifiedTimeMap = Lists.immutable.of(fileSystem.listStatus(metadataPath)) + .toMap(status -> status.getPath().toString(), FileStatus::getModificationTime); + return instants + .collect(instant -> { + String instantPath = StrUtil.format("{}/{}", client.getMetaPath(), instant.getFileName()); + instant.setFileTime(fileModifiedTimeMap.getOrDefault(instantPath, 0L)); + return instant; + }) + .sortThis(Comparator + .comparingLong(instant -> Long.parseLong(instant.getTimestamp())) + .thenComparingLong(HudiInstant::getFileTime)) .toImmutable(); } - private static ImmutableList getAllInstants(HoodieDefaultTimeline timeline, FileSystem fileSystem, Path metadataPath) throws IOException { + private static HudiInstant convert(HoodieInstant instant, String type) { + return new HudiInstant( + instant.getAction(), + instant.getState().name(), + instant.getTimestamp(), + instant.getFileName(), + 0L, + type + ); + } + + private static ImmutableList activeInstants(HoodieActiveTimeline timeline) { Set committedTimestamps = timeline.getCommitsTimeline() .filterCompletedInstants() .getInstants() .map(HoodieInstant::getTimestamp) .collect(Collectors.toSet()); - List compactionRequestedTimestamps = Arrays.stream(fileSystem.listStatus(metadataPath)) - .filter(status -> status.getPath().toString().endsWith(HoodieTimeline.REQUESTED_COMPACTION_EXTENSION)) - .map(status -> status.getPath().getName()) - .map(name -> ReUtil.get("^(\\d+)\\..+", name, 1)) - .filter(committedTimestamps::contains) - .collect(Collectors.toList()); - return Lists.immutable.ofAll(timeline.getInstants() + List instants = timeline.getInstants() + .map(instant -> convert(instant, "active")) .map(instant -> { - if (compactionRequestedTimestamps.contains(instant.getTimestamp())) { - return new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, instant.getTimestamp()); + if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()) && committedTimestamps.contains(instant.getTimestamp())) { + return new HudiInstant( + HoodieTimeline.COMPACTION_ACTION, + HoodieInstant.State.COMPLETED.name(), + instant.getTimestamp(), + instant.getFileName(), + instant.getFileTime(), + instant.getType() + ); } return instant; }) - .sorted(Comparator.comparingLong(i -> Long.parseLong(i.getTimestamp()))) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + return Lists.immutable.ofAll(instants); + } + + private static ImmutableList archiveInstants(HoodieArchivedTimeline timeline) { + MutableList instants = Lists.mutable.ofAll(timeline.getInstants().collect(Collectors.toList())); + instants.forEach(instant -> logger.info(instant.toString())); + MutableListMultimap stateMap = instants.groupBy(HoodieInstant::getState).collectValues(HoodieInstant::getTimestamp); + return instants + .select(instant -> HoodieInstant.State.REQUESTED.equals(instant.getState())) + .collect(instant -> convert(instant, "archive")) + .collect(instant -> { + if (stateMap.containsKeyAndValue(HoodieInstant.State.INVALID, instant.getTimestamp())) { + instant.setState(HoodieInstant.State.INVALID.name()); + } else if (stateMap.containsKeyAndValue(HoodieInstant.State.COMPLETED, instant.getTimestamp())) { + instant.setState(HoodieInstant.State.COMPLETED.name()); + } else if (stateMap.containsKeyAndValue(HoodieInstant.State.INFLIGHT, instant.getTimestamp())) { + instant.setState(HoodieInstant.State.INFLIGHT.name()); + } + return instant; + }) + .toImmutable(); } }