diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpoint.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpoint.java new file mode 100644 index 0000000..6811040 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpoint.java @@ -0,0 +1,127 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import com.fasterxml.jackson.annotation.JsonAlias; +import org.eclipse.collections.api.map.ImmutableMap; + +/** + * Flink checkpoint + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public final class FlinkCheckpoint { + private Long id; + private String status; + @JsonAlias("is_savepoint") + private boolean savePoint; + @JsonAlias("trigger_timestamp") + private Long triggerTimestamp; + @JsonAlias("latest_ack_timestamp") + private Long latestAckTimestamp; + @JsonAlias("state_size") + private Long stateSize; + @JsonAlias("end_to_end_duration") + private Long endToEndDuration; + @JsonAlias("alignment_buffered") + private Long alignmentBuffered; + @JsonAlias("processed_data") + private Long processedData; + @JsonAlias("persisted_data") + private Long persistedData; + @JsonAlias("num_subtasks") + private Long numSubtasks; + @JsonAlias("num_acknowledged_subtasks") + private Long numAcknowledgedSubtasks; + @JsonAlias("checkpoint_type") + private String checkpointType; + private ImmutableMap tasks; + @JsonAlias("external_path") + private String externalPath; + private boolean discarded; + + public Long getId() { + return id; + } + + public String getStatus() { + return status; + } + + public boolean isSavePoint() { + return savePoint; + } + + public Long getTriggerTimestamp() { + return triggerTimestamp; + } + + public Long getLatestAckTimestamp() { + return latestAckTimestamp; + } + + public Long getStateSize() { + return stateSize; + } + + public Long getEndToEndDuration() { + return endToEndDuration; + } + + public Long getAlignmentBuffered() { + return alignmentBuffered; + } + + public Long getProcessedData() { + return processedData; + } + + public Long getPersistedData() { + return persistedData; + } + + public Long getNumSubtasks() { + return numSubtasks; + } + + public Long getNumAcknowledgedSubtasks() { + return numAcknowledgedSubtasks; + } + + public String getCheckpointType() { + return checkpointType; + } + + public ImmutableMap getTasks() { + return tasks; + } + + public String getExternalPath() { + return externalPath; + } + + public boolean isDiscarded() { + return discarded; + } + + @Override + public String toString() { + return "FlinkCheckpoint{" + + "id=" + id + + ", status='" + status + '\'' + + ", savePoint=" + savePoint + + ", triggerTimestamp=" + triggerTimestamp + + ", latestAckTimestamp=" + latestAckTimestamp + + ", stateSize=" + stateSize + + ", endToEndDuration=" + endToEndDuration + + ", alignmentBuffered=" + alignmentBuffered + + ", processedData=" + processedData + + ", persistedData=" + persistedData + + ", numSubtasks=" + numSubtasks + + ", numAcknowledgedSubtasks=" + numAcknowledgedSubtasks + + ", checkpointType='" + checkpointType + '\'' + + ", tasks=" + tasks + + ", externalPath='" + externalPath + '\'' + + ", discarded=" + discarded + + '}'; + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpointConfig.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpointConfig.java new file mode 100644 index 0000000..6280888 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpointConfig.java @@ -0,0 +1,78 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import com.fasterxml.jackson.annotation.JsonAlias; + +/** + * Checkpoint config + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public class FlinkCheckpointConfig { + private String mode; + private Long interval; + private Long timeout; + @JsonAlias("min_pause") + private Long minPause; + @JsonAlias("max_concurrent") + private Long maxConcurrent; + @JsonAlias("state_backend") + private String stateBackend; + @JsonAlias("checkpoint_storage") + private String checkpointStorage; + @JsonAlias("unaligned_checkpoints") + private Boolean unalignedCheckpoints; + @JsonAlias("tolerable_failed_checkpoints") + private Integer tolerableFailedCheckpoints; + + public String getMode() { + return mode; + } + + public Long getInterval() { + return interval; + } + + public Long getTimeout() { + return timeout; + } + + public Long getMinPause() { + return minPause; + } + + public Long getMaxConcurrent() { + return maxConcurrent; + } + + public String getStateBackend() { + return stateBackend; + } + + public String getCheckpointStorage() { + return checkpointStorage; + } + + public Boolean isUnalignedCheckpoints() { + return unalignedCheckpoints; + } + + public Integer getTolerableFailedCheckpoints() { + return tolerableFailedCheckpoints; + } + + @Override + public String toString() { + return "FlinkCheckpointConfig{" + + "mode='" + mode + '\'' + + ", interval=" + interval + + ", timeout=" + timeout + + ", minPause=" + minPause + + ", maxConcurrent=" + maxConcurrent + + ", stateBackend='" + stateBackend + '\'' + + ", checkpointStorage='" + checkpointStorage + '\'' + + ", unalignedCheckpoints=" + unalignedCheckpoints + + ", tolerableFailedCheckpoints=" + tolerableFailedCheckpoints + + '}'; + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpointOverview.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpointOverview.java new file mode 100644 index 0000000..f6f264c --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkCheckpointOverview.java @@ -0,0 +1,152 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import com.fasterxml.jackson.annotation.JsonAlias; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public final class FlinkCheckpointOverview { + private Counts counts; + private Summary summary; + private FlinkCheckpoint latest; + private ImmutableList history; + + public Counts getCounts() { + return counts; + } + + public Summary getSummary() { + return summary; + } + + public FlinkCheckpoint getLatest() { + return latest; + } + + public ImmutableList getHistory() { + return history; + } + + @Override + public String toString() { + return "FlinkCheckpointOverview{" + + "counts=" + counts + + ", summary=" + summary + + ", latest=" + latest + + ", history=" + history + + '}'; + } + + public static final class Counts { + private Long restored; + private Long total; + @JsonAlias("in_progress") + private Long inProgress; + private Long completed; + private Long failed; + + public Long getRestored() { + return restored; + } + + public Long getTotal() { + return total; + } + + public Long getInProgress() { + return inProgress; + } + + public Long getCompleted() { + return completed; + } + + public Long getFailed() { + return failed; + } + + @Override + public String toString() { + return "Counts{" + + "restored=" + restored + + ", total=" + total + + ", inProgress=" + inProgress + + ", completed=" + completed + + ", failed=" + failed + + '}'; + } + } + + public static final class Summary { + @JsonAlias("state_size") + private Metric stateSize; + @JsonAlias("end_to_end_duration") + private Metric endToEndDuration; + @JsonAlias("alignment_buffered") + private Metric alignmentBuffered; + @JsonAlias("processed_data") + private Metric processedData; + @JsonAlias("persisted_data") + private Metric persistedData; + + public Metric getStateSize() { + return stateSize; + } + + public Metric getEndToEndDuration() { + return endToEndDuration; + } + + public Metric getAlignmentBuffered() { + return alignmentBuffered; + } + + public Metric getProcessedData() { + return processedData; + } + + public Metric getPersistedData() { + return persistedData; + } + + @Override + public String toString() { + return "Summary{" + + "stateSize=" + stateSize + + ", endToEndDuration=" + endToEndDuration + + ", alignmentBuffered=" + alignmentBuffered + + ", processedData=" + processedData + + ", persistedData=" + persistedData + + '}'; + } + + public static final class Metric { + private Long min; + private Long max; + private Long avg; + + public Long getMin() { + return min; + } + + public Long getMax() { + return max; + } + + public Long getAvg() { + return avg; + } + + @Override + public String toString() { + return "Metric{" + + "min=" + min + + ", max=" + max + + ", avg=" + avg + + '}'; + } + } + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkConfig.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkConfig.java new file mode 100644 index 0000000..1516378 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkConfig.java @@ -0,0 +1,50 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import org.eclipse.collections.api.map.ImmutableMap; + +/** + * Flink Config + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public final class FlinkConfig { + @JsonProperty(value = "flink-version", access = JsonProperty.Access.WRITE_ONLY) + private String flinkVersion; + @JsonProperty(value = "flink-revision", access = JsonProperty.Access.WRITE_ONLY) + private String flinkRevision; + private ImmutableMap features; + + public FlinkConfig() { + } + + public FlinkConfig(String flinkVersion, String flinkRevision, ImmutableMap features) { + this.flinkVersion = flinkVersion; + this.flinkRevision = flinkRevision; + this.features = features; + } + + public String getFlinkVersion() { + return flinkVersion; + } + + public String getFlinkRevision() { + return flinkRevision; + } + + public ImmutableMap getFeatures() { + return features; + } + + @Override + public String toString() { + return "FlinkConfig{" + + "flinkVersion='" + flinkVersion + '\'' + + ", flinkRevision='" + flinkRevision + '\'' + + ", features=" + features + + '}'; + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkOverview.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkOverview.java new file mode 100644 index 0000000..41e4850 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkOverview.java @@ -0,0 +1,81 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import com.fasterxml.jackson.annotation.JsonAlias; + +/** + * Flink overview + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public class FlinkOverview { + @JsonAlias("taskmanagers") + private Integer taskManagers; + @JsonAlias("slots-total") + private Integer slotsTotal; + @JsonAlias("slots-available") + private Integer slotsAvailable; + @JsonAlias("jobs-running") + private Integer jobsRunning; + @JsonAlias("jobs-finished") + private Integer jobsFinished; + @JsonAlias("jobs-cancelled") + private Integer jobsCanceled; + @JsonAlias("jobs-failed") + private Integer jobsFailed; + @JsonAlias("flink-version") + private String flinkVersion; + @JsonAlias("flink-commit") + private String flinkCommit; + + public Integer getTaskManagers() { + return taskManagers; + } + + public Integer getSlotsTotal() { + return slotsTotal; + } + + public Integer getSlotsAvailable() { + return slotsAvailable; + } + + public Integer getJobsRunning() { + return jobsRunning; + } + + public Integer getJobsFinished() { + return jobsFinished; + } + + public Integer getJobsCanceled() { + return jobsCanceled; + } + + public Integer getJobsFailed() { + return jobsFailed; + } + + public String getFlinkVersion() { + return flinkVersion; + } + + public String getFlinkCommit() { + return flinkCommit; + } + + @Override + public String toString() { + return "FlinkOverview{" + + "taskManagers=" + taskManagers + + ", slotsTotal=" + slotsTotal + + ", slotsAvailable=" + slotsAvailable + + ", jobsRunning=" + jobsRunning + + ", jobsFinished=" + jobsFinished + + ", jobsCanceled=" + jobsCanceled + + ", jobsFailed=" + jobsFailed + + ", flinkVersion='" + flinkVersion + '\'' + + ", flinkCommit='" + flinkCommit + '\'' + + '}'; + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkTaskManager.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkTaskManager.java new file mode 100644 index 0000000..55faabe --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkTaskManager.java @@ -0,0 +1,372 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import org.eclipse.collections.api.list.ImmutableList; + +/** + * Flink taskmanager + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public final class FlinkTaskManager { + private String id; + private String path; + private Integer dataPort; + private Integer jmxPort; + private Long timeSinceLastHeartbeat; + private Integer slotsNumber; + private Integer freeSlots; + private Resource totalResource; + private Resource freeResource; + private Resource hardware; + private MemoryConfiguration memoryConfiguration; + private Metrics metrics; + + public String getId() { + return id; + } + + public String getPath() { + return path; + } + + public Integer getDataPort() { + return dataPort; + } + + public Integer getJmxPort() { + return jmxPort; + } + + public Long getTimeSinceLastHeartbeat() { + return timeSinceLastHeartbeat; + } + + public Integer getSlotsNumber() { + return slotsNumber; + } + + public Integer getFreeSlots() { + return freeSlots; + } + + public Resource getTotalResource() { + return totalResource; + } + + public Resource getFreeResource() { + return freeResource; + } + + public Resource getHardware() { + return hardware; + } + + public MemoryConfiguration getMemoryConfiguration() { + return memoryConfiguration; + } + + public Metrics getMetrics() { + return metrics; + } + + @Override + public String toString() { + return "FlinkTaskManager{" + + "id='" + id + '\'' + + ", path='" + path + '\'' + + ", dataPort=" + dataPort + + ", jmxPort=" + jmxPort + + ", timeSinceLastHeartbeat=" + timeSinceLastHeartbeat + + ", slotsNumber=" + slotsNumber + + ", freeSlots=" + freeSlots + + ", totalResource=" + totalResource + + ", freeResource=" + freeResource + + ", hardware=" + hardware + + ", memoryConfiguration=" + memoryConfiguration + + ", metrics=" + metrics + + '}'; + } + + public static final class Resource { + private Double cpuCores; + private Long taskHeapMemory; + private Long taskOffHeapMemory; + private Long managedMemory; + private Long networkMemory; + private Long physicalMemory; + private Long freeMemory; + + public Double getCpuCores() { + return cpuCores; + } + + public Long getTaskHeapMemory() { + return taskHeapMemory; + } + + public Long getTaskOffHeapMemory() { + return taskOffHeapMemory; + } + + public Long getManagedMemory() { + return managedMemory; + } + + public Long getNetworkMemory() { + return networkMemory; + } + + public Long getPhysicalMemory() { + return physicalMemory; + } + + public Long getFreeMemory() { + return freeMemory; + } + + @Override + public String toString() { + return "Resource{" + + "cpuCores=" + cpuCores + + ", taskHeapMemory=" + taskHeapMemory + + ", taskOffHeapMemory=" + taskOffHeapMemory + + ", managedMemory=" + managedMemory + + ", networkMemory=" + networkMemory + + ", physicalMemory=" + physicalMemory + + ", freeMemory=" + freeMemory + + '}'; + } + } + + public static final class MemoryConfiguration { + private Long frameworkHeap; + private Long taskHeap; + private Long frameworkOffHeap; + private Long taskOffHeap; + private Long networkMemory; + private Long managedMemory; + private Long jvmMetaspace; + private Long jvmOverhead; + private Long totalFlinkMemory; + private Long totalProcessMemory; + + public Long getFrameworkHeap() { + return frameworkHeap; + } + + public Long getTaskHeap() { + return taskHeap; + } + + public Long getFrameworkOffHeap() { + return frameworkOffHeap; + } + + public Long getTaskOffHeap() { + return taskOffHeap; + } + + public Long getNetworkMemory() { + return networkMemory; + } + + public Long getManagedMemory() { + return managedMemory; + } + + public Long getJvmMetaspace() { + return jvmMetaspace; + } + + public Long getJvmOverhead() { + return jvmOverhead; + } + + public Long getTotalFlinkMemory() { + return totalFlinkMemory; + } + + public Long getTotalProcessMemory() { + return totalProcessMemory; + } + + @Override + public String toString() { + return "MemoryConfiguration{" + + "frameworkHeap=" + frameworkHeap + + ", taskHeap=" + taskHeap + + ", frameworkOffHeap=" + frameworkOffHeap + + ", taskOffHeap=" + taskOffHeap + + ", networkMemory=" + networkMemory + + ", managedMemory=" + managedMemory + + ", jvmMetaspace=" + jvmMetaspace + + ", jvmOverhead=" + jvmOverhead + + ", totalFlinkMemory=" + totalFlinkMemory + + ", totalProcessMemory=" + totalProcessMemory + + '}'; + } + } + + public static final class Metrics { + private Long heapUsed; + private Long heapCommitted; + private Long heapMax; + private Long nonHeapUsed; + private Long nonHeapCommitted; + private Long nonHeapMax; + private Long directCount; + private Long directUsed; + private Long directMax; + private Long mappedCount; + private Long mappedUsed; + private Long mappedMax; + private Long memorySegmentsAvailable; + private Long memorySegmentsTotal; + private Long nettyShuffleMemorySegmentsAvailable; + private Long nettyShuffleMemorySegmentsUsed; + private Long nettyShuffleMemorySegmentsTotal; + private Long nettyShuffleMemoryAvailable; + private Long nettyShuffleMemoryUsed; + private Long nettyShuffleMemoryTotal; + private ImmutableList garbageCollectors; + + public Long getHeapUsed() { + return heapUsed; + } + + public Long getHeapCommitted() { + return heapCommitted; + } + + public Long getHeapMax() { + return heapMax; + } + + public Long getNonHeapUsed() { + return nonHeapUsed; + } + + public Long getNonHeapCommitted() { + return nonHeapCommitted; + } + + public Long getNonHeapMax() { + return nonHeapMax; + } + + public Long getDirectCount() { + return directCount; + } + + public Long getDirectUsed() { + return directUsed; + } + + public Long getDirectMax() { + return directMax; + } + + public Long getMappedCount() { + return mappedCount; + } + + public Long getMappedUsed() { + return mappedUsed; + } + + public Long getMappedMax() { + return mappedMax; + } + + public Long getMemorySegmentsAvailable() { + return memorySegmentsAvailable; + } + + public Long getMemorySegmentsTotal() { + return memorySegmentsTotal; + } + + public Long getNettyShuffleMemorySegmentsAvailable() { + return nettyShuffleMemorySegmentsAvailable; + } + + public Long getNettyShuffleMemorySegmentsUsed() { + return nettyShuffleMemorySegmentsUsed; + } + + public Long getNettyShuffleMemorySegmentsTotal() { + return nettyShuffleMemorySegmentsTotal; + } + + public Long getNettyShuffleMemoryAvailable() { + return nettyShuffleMemoryAvailable; + } + + public Long getNettyShuffleMemoryUsed() { + return nettyShuffleMemoryUsed; + } + + public Long getNettyShuffleMemoryTotal() { + return nettyShuffleMemoryTotal; + } + + public ImmutableList getGarbageCollectors() { + return garbageCollectors; + } + + @Override + public String toString() { + return "Metrics{" + + "heapUsed=" + heapUsed + + ", heapCommitted=" + heapCommitted + + ", heapMax=" + heapMax + + ", nonHeapUsed=" + nonHeapUsed + + ", nonHeapCommitted=" + nonHeapCommitted + + ", nonHeapMax=" + nonHeapMax + + ", directCount=" + directCount + + ", directUsed=" + directUsed + + ", directMax=" + directMax + + ", mappedCount=" + mappedCount + + ", mappedUsed=" + mappedUsed + + ", mappedMax=" + mappedMax + + ", memorySegmentsAvailable=" + memorySegmentsAvailable + + ", memorySegmentsTotal=" + memorySegmentsTotal + + ", nettyShuffleMemorySegmentsAvailable=" + nettyShuffleMemorySegmentsAvailable + + ", nettyShuffleMemorySegmentsUsed=" + nettyShuffleMemorySegmentsUsed + + ", nettyShuffleMemorySegmentsTotal=" + nettyShuffleMemorySegmentsTotal + + ", nettyShuffleMemoryAvailable=" + nettyShuffleMemoryAvailable + + ", nettyShuffleMemoryUsed=" + nettyShuffleMemoryUsed + + ", nettyShuffleMemoryTotal=" + nettyShuffleMemoryTotal + + ", garbageCollectors=" + garbageCollectors + + '}'; + } + + public static final class Garbage { + private String name; + private Long count; + private Long time; + + public String getName() { + return name; + } + + public Long getCount() { + return count; + } + + public Long getTime() { + return time; + } + + @Override + public String toString() { + return "Garbage{" + + "name='" + name + '\'' + + ", count=" + count + + ", time=" + time + + '}'; + } + } + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkTaskManagerOverview.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkTaskManagerOverview.java new file mode 100644 index 0000000..64419a3 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkTaskManagerOverview.java @@ -0,0 +1,24 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import org.eclipse.collections.api.list.ImmutableList; + +/** + * Flink task manager overview + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public class FlinkTaskManagerOverview { + private ImmutableList taskmanagers; + + public ImmutableList getTaskmanagers() { + return taskmanagers; + } + + @Override + public String toString() { + return "FlinkTaskManagerOverview{" + + "taskmanagers=" + taskmanagers + + '}'; + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertex.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertex.java new file mode 100644 index 0000000..bcc86bf --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertex.java @@ -0,0 +1,327 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import com.fasterxml.jackson.annotation.JsonAlias; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.eclipse.collections.api.list.ImmutableList; + +/** + * Flink Vertices + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public class FlinkVertex { + private String jid; + private String name; + private String state; + @JsonAlias("start-time") + private Long startTime; + @JsonAlias("end-time") + private Long endTime; + private Long duration; + private Long lastModification; + private Tasks tasks; + @JsonAlias("maxParallelism") + private Integer maxParallelism; + private Long now; + private Timestamps timestamps; + private ImmutableList vertices; + private Metrics metrics; + + public String getJid() { + return jid; + } + + public String getName() { + return name; + } + + public String getState() { + return state; + } + + public Long getStartTime() { + return startTime; + } + + public Long getEndTime() { + return endTime; + } + + public Long getDuration() { + return duration; + } + + public Long getLastModification() { + return lastModification; + } + + public Tasks getTasks() { + return tasks; + } + + public Integer getMaxParallelism() { + return maxParallelism; + } + + public Long getNow() { + return now; + } + + public Timestamps getTimestamps() { + return timestamps; + } + + public ImmutableList getVertices() { + return vertices; + } + + public Metrics getMetrics() { + return metrics; + } + + public static final class Tasks { + private Integer total; + private Integer created; + private Integer scheduled; + private Integer deploying; + private Integer running; + private Integer finished; + private Integer canceling; + private Integer canceled; + private Integer failed; + private Integer reconciling; + private Integer initializing; + + public Integer getTotal() { + return total; + } + + public Integer getCreated() { + return created; + } + + public Integer getScheduled() { + return scheduled; + } + + public Integer getDeploying() { + return deploying; + } + + public Integer getRunning() { + return running; + } + + public Integer getFinished() { + return finished; + } + + public Integer getCanceling() { + return canceling; + } + + public Integer getCanceled() { + return canceled; + } + + public Integer getFailed() { + return failed; + } + + public Integer getReconciling() { + return reconciling; + } + + public Integer getInitializing() { + return initializing; + } + + @Override + public String toString() { + return "Tasks{" + + "total=" + total + + ", created=" + created + + ", scheduled=" + scheduled + + ", deploying=" + deploying + + ", running=" + running + + ", finished=" + finished + + ", canceling=" + canceling + + ", canceled=" + canceled + + ", failed=" + failed + + ", reconciling=" + reconciling + + ", initializing=" + initializing + + '}'; + } + } + + public static final class Timestamps { + @JsonAlias("RUNNING") + private Long running; + @JsonAlias("FAILING") + private Long failing; + @JsonAlias("FINISHED") + private Long finished; + @JsonAlias("RESTARTING") + private Long restarting; + @JsonAlias("RECONCILING") + private Long reconciling; + @JsonAlias("CREATED") + private Long created; + @JsonAlias("FAILED") + private Long failed; + @JsonAlias("INITIALIZING") + private Long initializing; + @JsonAlias("CANCELED") + private Long canceled; + @JsonAlias("CANCELLING") + private Long canceling; + @JsonAlias("SUSPENDED") + private Long suspended; + + public Long getRunning() { + return running; + } + + public Long getFailing() { + return failing; + } + + public Long getFinished() { + return finished; + } + + public Long getRestarting() { + return restarting; + } + + public Long getReconciling() { + return reconciling; + } + + public Long getCreated() { + return created; + } + + public Long getFailed() { + return failed; + } + + public Long getInitializing() { + return initializing; + } + + public Long getCanceled() { + return canceled; + } + + public Long getCanceling() { + return canceling; + } + + public Long getSuspended() { + return suspended; + } + + @Override + public String toString() { + return "Timestamps{" + + "running=" + running + + ", failing=" + failing + + ", finished=" + finished + + ", restarting=" + restarting + + ", reconciling=" + reconciling + + ", created=" + created + + ", failed=" + failed + + ", initializing=" + initializing + + ", canceled=" + canceled + + ", canceling=" + canceling + + ", suspended=" + suspended + + '}'; + } + } + + public static final class Metrics { + @JsonAlias("read-bytes") + private Long readBytes; + @JsonAlias("read-bytes-complete") + private Boolean readBytesComplete; + @JsonAlias("write-bytes") + private Long writeBytes; + @JsonAlias("write-bytes-complete") + private Boolean writeBytesComplete; + @JsonAlias("read-records") + private Long readRecords; + @JsonAlias("read-records-complete") + private Boolean readRecordsComplete; + @JsonAlias("write-records") + private Long writeRecords; + @JsonAlias("write-records-complete") + private Boolean writeRecordsComplete; + + public Long getReadBytes() { + return readBytes; + } + + public Boolean isReadBytesComplete() { + return readBytesComplete; + } + + public Long getWriteBytes() { + return writeBytes; + } + + public Boolean isWriteBytesComplete() { + return writeBytesComplete; + } + + public Long getReadRecords() { + return readRecords; + } + + public Boolean isReadRecordsComplete() { + return readRecordsComplete; + } + + public Long getWriteRecords() { + return writeRecords; + } + + public Boolean isWriteRecordsComplete() { + return writeRecordsComplete; + } + + @Override + public String toString() { + return "Metrics{" + + "readBytes=" + readBytes + + ", readBytesComplete=" + readBytesComplete + + ", writeBytes=" + writeBytes + + ", writeBytesComplete=" + writeBytesComplete + + ", readRecords=" + readRecords + + ", readRecordsComplete=" + readRecordsComplete + + ", writeRecords=" + writeRecords + + ", writeRecordsComplete=" + writeRecordsComplete + + '}'; + } + } + + @Override + public String toString() { + return "FlinkVertex{" + + "jid='" + jid + '\'' + + ", name='" + name + '\'' + + ", state='" + state + '\'' + + ", startTime=" + startTime + + ", endTime=" + endTime + + ", duration=" + duration + + ", lastModification=" + lastModification + + ", tasks=" + tasks + + ", maxParallelism=" + maxParallelism + + ", now=" + now + + ", timestamps=" + timestamps + + ", vertices=" + vertices + + ", metrics=" + metrics + + '}'; + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertexConfig.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertexConfig.java new file mode 100644 index 0000000..4acc898 --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertexConfig.java @@ -0,0 +1,67 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import com.fasterxml.jackson.annotation.JsonAlias; + +/** + * Flink job config + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public final class FlinkVertexConfig { + private String jid; + private String name; + @JsonAlias("execution-config") + private ExecutionConfig executionConfig; + + public String getJid() { + return jid; + } + + public String getName() { + return name; + } + + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + @Override + public String toString() { + return "FlinkVertexConfig{" + + "jid='" + jid + '\'' + + ", name='" + name + '\'' + + ", executionConfig=" + executionConfig + + '}'; + } + + public static final class ExecutionConfig { + @JsonAlias("execution-mode") + private String executionMode; + @JsonAlias("restart-strategy") + private String restartStrategy; + @JsonAlias("job-parallelism") + private Integer jobParallelism; + + public String getExecutionMode() { + return executionMode; + } + + public String getRestartStrategy() { + return restartStrategy; + } + + public Integer getJobParallelism() { + return jobParallelism; + } + + @Override + public String toString() { + return "ExecutionConfig{" + + "executionMode='" + executionMode + '\'' + + ", restartStrategy='" + restartStrategy + '\'' + + ", jobParallelism=" + jobParallelism + + '}'; + } + } +} diff --git a/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertexOverview.java b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertexOverview.java new file mode 100644 index 0000000..cd54cbd --- /dev/null +++ b/service-configuration/src/main/java/com/lanyuanxiaoyao/service/configuration/entity/flink/FlinkVertexOverview.java @@ -0,0 +1,24 @@ +package com.lanyuanxiaoyao.service.configuration.entity.flink; + +import org.eclipse.collections.api.list.ImmutableList; + +/** + * Flink job overview + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public final class FlinkVertexOverview { + private ImmutableList jobs; + + public ImmutableList getJobs() { + return jobs; + } + + @Override + public String toString() { + return "FlinkJobOverview{" + + "jobs=" + jobs + + '}'; + } +} diff --git a/service-configuration/src/main/resources/application-common.yml b/service-configuration/src/main/resources/application-common.yml index fa3c272..5ecb9ab 100644 --- a/service-configuration/src/main/resources/application-common.yml +++ b/service-configuration/src/main/resources/application-common.yml @@ -1,3 +1,8 @@ spring: main: - banner-mode: off \ No newline at end of file + banner-mode: off + jackson: + default-property-inclusion: non_empty + deserialization: + fail-on-ignored-properties: false + fail-on-unknown-properties: false \ No newline at end of file diff --git a/service-flink-query/pom.xml b/service-flink-query/pom.xml index 7eb36c4..08462cb 100644 --- a/service-flink-query/pom.xml +++ b/service-flink-query/pom.xml @@ -11,10 +11,12 @@ service-flink-query - - 8 - 8 - UTF-8 - + + + com.lanyuanxiaoyao + service-configuration + 1.0.0-SNAPSHOT + + \ No newline at end of file diff --git a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/controller/FlinkController.java b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/controller/FlinkController.java new file mode 100644 index 0000000..0bb9e42 --- /dev/null +++ b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/controller/FlinkController.java @@ -0,0 +1,95 @@ +package com.lanyuanxiaoyao.service.flink.controller; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.lanyuanxiaoyao.service.configuration.entity.flink.*; +import com.lanyuanxiaoyao.service.flink.service.FlinkService; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +@RestController +@RequestMapping("flink") +public class FlinkController implements FlinkService { + private static final Logger logger = LoggerFactory.getLogger(FlinkController.class); + + private final FlinkService flinkService; + + @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") + public FlinkController(FlinkService flinkService) { + this.flinkService = flinkService; + } + + @GetMapping("/overview") + @Override + public FlinkOverview overview(@RequestParam("url") String url) throws JsonProcessingException { + return flinkService.overview(url); + } + + @GetMapping("/config") + @Override + public FlinkConfig config(@RequestParam("url") String url) throws JsonProcessingException { + return flinkService.config(url); + } + + @GetMapping("/job_manager_config") + @Override + public ImmutableMap jobManagerConfig(@RequestParam("url") String url) throws JsonProcessingException { + return flinkService.jobManagerConfig(url); + } + + @GetMapping("/vertex_overview") + @Override + public FlinkVertexOverview vertexOverview(@RequestParam("url") String url) throws JsonProcessingException { + return flinkService.vertexOverview(url); + } + + @GetMapping("/vertex") + @Override + public FlinkVertex vertex(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException { + return flinkService.vertex(url, vertexId); + } + + @GetMapping("/vertex_config") + @Override + public FlinkVertexConfig vertexConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException { + return flinkService.vertexConfig(url, vertexId); + } + + @GetMapping("/checkpoint_overview") + @Override + public FlinkCheckpointOverview checkpointOverview(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException { + return flinkService.checkpointOverview(url, vertexId); + } + + @GetMapping("/checkpoint") + @Override + public FlinkCheckpoint checkpoint(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId, @RequestParam("checkpoint_id") String checkpointId) throws JsonProcessingException { + return flinkService.checkpoint(url, vertexId, checkpointId); + } + + @GetMapping("/checkpoint_config") + @Override + public FlinkCheckpointConfig checkpointConfig(@RequestParam("url") String url, @RequestParam("vertex_id") String vertexId) throws JsonProcessingException { + return flinkService.checkpointConfig(url, vertexId); + } + + @GetMapping("/task_manager_overview") + @Override + public FlinkTaskManagerOverview taskManagerOverview(@RequestParam("url") String url) throws JsonProcessingException { + return flinkService.taskManagerOverview(url); + } + + @GetMapping("/task_manager") + @Override + public FlinkTaskManager taskManager(@RequestParam("url") String url, @RequestParam("task_manager_id") String taskManagerId) throws JsonProcessingException { + return flinkService.taskManager(url, taskManagerId); + } +} diff --git a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/FlinkService.java b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/FlinkService.java new file mode 100644 index 0000000..df05952 --- /dev/null +++ b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/FlinkService.java @@ -0,0 +1,35 @@ +package com.lanyuanxiaoyao.service.flink.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.lanyuanxiaoyao.service.configuration.entity.flink.*; +import org.eclipse.collections.api.map.ImmutableMap; + +/** + * Flink 服务 + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public interface FlinkService { + FlinkOverview overview(String url) throws JsonProcessingException; + + FlinkConfig config(String url) throws JsonProcessingException; + + ImmutableMap jobManagerConfig(String url) throws JsonProcessingException; + + FlinkVertexOverview vertexOverview(String url) throws JsonProcessingException; + + FlinkVertex vertex(String url, String vertexId) throws JsonProcessingException; + + FlinkVertexConfig vertexConfig(String url, String vertexId) throws JsonProcessingException; + + FlinkCheckpointOverview checkpointOverview(String url, String vertexId) throws JsonProcessingException; + + FlinkCheckpoint checkpoint(String url, String vertexId, String checkpointId) throws JsonProcessingException; + + FlinkCheckpointConfig checkpointConfig(String url, String vertexId) throws JsonProcessingException; + + FlinkTaskManagerOverview taskManagerOverview(String url) throws JsonProcessingException; + + FlinkTaskManager taskManager(String url, String taskManagerId) throws JsonProcessingException; +} diff --git a/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/impl/FlinkServiceImpl.java b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/impl/FlinkServiceImpl.java new file mode 100644 index 0000000..d8b04f5 --- /dev/null +++ b/service-flink-query/src/main/java/com/lanyuanxiaoyao/service/flink/service/impl/FlinkServiceImpl.java @@ -0,0 +1,119 @@ +package com.lanyuanxiaoyao.service.flink.service.impl; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.URLUtil; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.lanyuanxiaoyao.service.configuration.entity.flink.*; +import com.lanyuanxiaoyao.service.flink.service.FlinkService; +import org.eclipse.collections.api.map.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; + +/** + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +@Service +public class FlinkServiceImpl implements FlinkService { + private static final Logger logger = LoggerFactory.getLogger(FlinkServiceImpl.class); + + private final ObjectMapper mapper; + + public FlinkServiceImpl(Jackson2ObjectMapperBuilder builder) { + mapper = builder.build(); + } + + private String get(String url, String path) { + try (HttpResponse response = HttpUtil.createGet(URLUtil.completeUrl(url, path)) + .setMaxRedirectCount(10) + .execute()) { + return response.body(); + } + } + + @Cacheable(value = "flink-overview", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkOverview overview(String url) throws JsonProcessingException { + return mapper.readValue(get(url, "/v1/overview"), FlinkOverview.class); + } + + @Cacheable(value = "flink-config", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkConfig config(String url) throws JsonProcessingException { + return mapper.readValue(get(url, "/v1/config"), FlinkConfig.class); + } + + @Cacheable(value = "flink-jobmanager-config", sync = true) + @Retryable(Throwable.class) + @Override + public ImmutableMap jobManagerConfig(String url) throws JsonProcessingException { + return mapper.readValue(get(url, "/v1/jobmanager/config"), new TypeReference>() { + }); + } + + @Cacheable(value = "flink-vertex-overview", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkVertexOverview vertexOverview(String url) throws JsonProcessingException { + return mapper.readValue(get(url, "/v1/jobs/overview"), FlinkVertexOverview.class); + } + + @Cacheable(value = "flink-vertex", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkVertex vertex(String url, String vertexId) throws JsonProcessingException { + return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}", vertexId)), FlinkVertex.class); + } + + @Cacheable(value = "flink-vertex-config", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkVertexConfig vertexConfig(String url, String vertexId) throws JsonProcessingException { + return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}/config", vertexId)), FlinkVertexConfig.class); + } + + @Cacheable(value = "flink-checkpoint-overview", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkCheckpointOverview checkpointOverview(String url, String vertexId) throws JsonProcessingException { + return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}/checkpoints", vertexId)), FlinkCheckpointOverview.class); + } + + @Cacheable(value = "flink-checkpoint", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkCheckpoint checkpoint(String url, String vertexId, String checkpointId) throws JsonProcessingException { + return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}/checkpoints/details/{}", vertexId, checkpointId)), FlinkCheckpoint.class); + } + + @Cacheable(value = "flink-checkpoint-config", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkCheckpointConfig checkpointConfig(String url, String vertexId) throws JsonProcessingException { + return mapper.readValue(get(url, StrUtil.format("/v1/jobs/{}/checkpoints/config", vertexId)), FlinkCheckpointConfig.class); + } + + @Cacheable(value = "flink-taskmanager-overview", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkTaskManagerOverview taskManagerOverview(String url) throws JsonProcessingException { + return mapper.readValue(get(url, "/v1/taskmanagers"), FlinkTaskManagerOverview.class); + } + + @Cacheable(value = "flink-taskmanager", sync = true) + @Retryable(Throwable.class) + @Override + public FlinkTaskManager taskManager(String url, String taskManagerId) throws JsonProcessingException { + return mapper.readValue(get(url, StrUtil.format("/v1/taskmanagers/{}", taskManagerId)), FlinkTaskManager.class); + } +} diff --git a/service-flink-query/src/test/java/com/lanyuanxiaoyao/service/flink/TestJsonParse.java b/service-flink-query/src/test/java/com/lanyuanxiaoyao/service/flink/TestJsonParse.java new file mode 100644 index 0000000..5e3ad1f --- /dev/null +++ b/service-flink-query/src/test/java/com/lanyuanxiaoyao/service/flink/TestJsonParse.java @@ -0,0 +1,36 @@ +package com.lanyuanxiaoyao.service.flink; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.URLUtil; +import cn.hutool.http.HttpUtil; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.eclipsecollections.EclipseCollectionsModule; +import com.lanyuanxiaoyao.service.configuration.entity.flink.FlinkTaskManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 测试 Flink 解析 + * + * @author lanyuanxiaoyao + * @date 2023-05-04 + */ +public class TestJsonParse { + private static final Logger logger = LoggerFactory.getLogger(TestJsonParse.class); + + public static void main(String[] args) throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new EclipseCollectionsModule()); + mapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + mapper.setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY); + String url = "b5s120.hdp.dc:8088/proxy/application_1672368973318_1814"; + String body = HttpUtil.get(StrUtil.format("http://{}/v1/taskmanagers/container_1672368973318_1814_01_000002", url)); + logger.info("{}", mapper.writeValueAsString(mapper.readValue(body, FlinkTaskManager.class))); + } +} diff --git a/service-flink-query/src/test/resources/flink.http b/service-flink-query/src/test/resources/flink.http new file mode 100644 index 0000000..7f54dfe --- /dev/null +++ b/service-flink-query/src/test/resources/flink.http @@ -0,0 +1,99 @@ +### Config +# { +# "refresh-interval": 3000, +# "timezone-name": "China Time", +# "timezone-offset": 28800000, +# "flink-version": "1.13.3", +# "flink-revision": "a4700e3 @ 2021-10-11T23:52:36+02:00", +# "features": { +# "web-submit": false +# } +# } +GET http://{{url}}/v1/config + +### Jobmnager config +# [{key: xxx, value: xxx}] +GET http://{{url}}/v1/jobmanager/config + +### jobs +# { +# "jobs": [ +# { +# "id": "320c6e7438afebea43fa0f0160319717", +# "status": "RUNNING" +# } +# ] +# } +GET http://{{url}}/v1/jobs + +### jobs overview +# { +# "jobs": [ +# { +# "jid": "320c6e7438afebea43fa0f0160319717", +# "name": "conf_center (ID: 1542097983881048064)", +# "state": "RUNNING", +# "start-time": 1680699756563, +# "end-time": -1, +# "duration": 2503052502, +# "last-modification": 1680699770420, +# "tasks": { +# "total": 8, +# "created": 0, +# "scheduled": 0, +# "deploying": 0, +# "running": 8, +# "finished": 0, +# "canceling": 0, +# "canceled": 0, +# "failed": 0, +# "reconciling": 0, +# "initializing": 0 +# } +# } +# ] +# } +GET http://{{url}}/v1/jobs/overview + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717 + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/config + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/checkpoints + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/checkpoints/config + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/checkpoints/details/2722 + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/exceptions + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/plan + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/vertices/1171dea6747ab509fdaefbe74f7195af + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/vertices/1171dea6747ab509fdaefbe74f7195af/backpressure + +### +GET http://{{url}}/v1/jobs/320c6e7438afebea43fa0f0160319717/vertices/1171dea6747ab509fdaefbe74f7195af/taskmanagers + +### +GET http://{{url}}/v1/overview + +### +GET http://{{url}}/v1/taskmanagers + +### +GET http://{{url}}/v1/taskmanagers/container_1672368973318_1814_01_000002 + +### +GET http://{{url}}/v1/taskmanagers/container_1672368973318_1814_01_000002/logs + diff --git a/service-flink-query/src/test/resources/http-client.env.json b/service-flink-query/src/test/resources/http-client.env.json new file mode 100644 index 0000000..65b5db1 --- /dev/null +++ b/service-flink-query/src/test/resources/http-client.env.json @@ -0,0 +1,5 @@ +{ + "production": { + "url": "b5s120.hdp.dc:8088/proxy/application_1672368973318_1814" + } +} \ No newline at end of file diff --git a/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/FlinkService.java b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/FlinkService.java new file mode 100644 index 0000000..e4e176d --- /dev/null +++ b/service-forest/src/main/java/com/lanyuanxiaoyao/service/forest/service/FlinkService.java @@ -0,0 +1,49 @@ +package com.lanyuanxiaoyao.service.forest.service; + +import com.dtflys.forest.annotation.BaseRequest; +import com.dtflys.forest.annotation.Get; +import com.dtflys.forest.annotation.Query; +import com.lanyuanxiaoyao.service.configuration.entity.flink.*; +import org.eclipse.collections.api.map.ImmutableMap; + +/** + * Flink 接口 + * + * @author lanyuanxiaoyao + * @date 2023-05-05 + */ +@BaseRequest(baseURL = "http://service-flink-query") +public interface FlinkService { + @Get("/overview") + public FlinkOverview overview(@Query("url") String url); + + @Get("/config") + public FlinkConfig config(@Query("url") String url); + + @Get("/job_manager_config") + public ImmutableMap jobManagerConfig(@Query("url") String url); + + @Get("/vertex_overview") + public FlinkVertexOverview vertexOverview(@Query("url") String url); + + @Get("/vertex") + public FlinkVertex vertex(@Query("url") String url, @Query("vertex_id") String vertexId); + + @Get("/vertex_config") + public FlinkVertexConfig vertexConfig(@Query("url") String url, @Query("vertex_id") String vertexId); + + @Get("/checkpoint_overview") + public FlinkCheckpointOverview checkpointOverview(@Query("url") String url, @Query("vertex_id") String vertexId); + + @Get("/checkpoint") + public FlinkCheckpoint checkpoint(@Query("url") String url, @Query("vertex_id") String vertexId, @Query("checkpoint_id") String checkpointId); + + @Get("/checkpoint_config") + public FlinkCheckpointConfig checkpointConfig(@Query("url") String url, @Query("vertex_id") String vertexId); + + @Get("/task_manager_overview") + public FlinkTaskManagerOverview taskManagerOverview(@Query("url") String url); + + @Get("/task_manager") + public FlinkTaskManager taskManager(@Query("url") String url, @Query("task_manager_id") String taskManagerId); +}