feat(hudi-query): 新增写hdfs文件接口
This commit is contained in:
101
.idea/httpRequests/http-requests-log.http
generated
101
.idea/httpRequests/http-requests-log.http
generated
@@ -1,3 +1,64 @@
|
||||
POST http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s8.hdp.dc:15391/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt&overwrite=true
|
||||
Content-Length: 11
|
||||
Content-Type: */*; charset=UTF-8
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10)
|
||||
Cookie: JSESSIONID=D12E206603C453F1429C0B7DF1519A4B
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
Hello world
|
||||
|
||||
###
|
||||
|
||||
POST http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s8.hdp.dc:34469/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt&overwrite=true
|
||||
Content-Length: 11
|
||||
Content-Type: */*; charset=UTF-8
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10)
|
||||
Cookie: JSESSIONID=D12E206603C453F1429C0B7DF1519A4B
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
Hello world
|
||||
|
||||
###
|
||||
|
||||
POST http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s8.hdp.dc:34469/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt
|
||||
Content-Length: 11
|
||||
Content-Type: */*; charset=UTF-8
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10)
|
||||
Cookie: JSESSIONID=D12E206603C453F1429C0B7DF1519A4B
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
Hello world
|
||||
|
||||
<> 2024-05-08T095641.500.txt
|
||||
|
||||
###
|
||||
|
||||
POST http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s8.hdp.dc:34469/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt
|
||||
Content-Length: 11
|
||||
Content-Type: */*; charset=UTF-8
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10)
|
||||
Cookie: JSESSIONID=21482112F88BCF63D4FE4F5D2A6681FF
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
Hello world
|
||||
|
||||
###
|
||||
|
||||
POST http://b12s8.hdp.dc:34469/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt
|
||||
Content-Length: 11
|
||||
Content-Type: */*; charset=UTF-8
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10)
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
Hello world
|
||||
|
||||
###
|
||||
|
||||
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@b12s10.hdp.dc:16695/hdfs/list?root=hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.10)
|
||||
@@ -373,43 +434,3 @@ Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
###
|
||||
|
||||
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
|
||||
Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
###
|
||||
|
||||
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
|
||||
Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
###
|
||||
|
||||
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
|
||||
Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
###
|
||||
|
||||
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
|
||||
Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
###
|
||||
|
||||
GET http://AxhEbscwsJDbYMH2:cYxg3b4PtWoVD5SjFayWxtnSVsjzRsg4@132.126.207.130:35690/hudi_services/queue/queue/clear?name=compaction-queue-pre
|
||||
Connection: Keep-Alive
|
||||
User-Agent: Apache-HttpClient/4.5.14 (Java/17.0.9)
|
||||
Cookie: JSESSIONID=9AB8D98C10FACE15EA1CB758D79F8877
|
||||
Accept-Encoding: br,deflate,gzip,x-gzip
|
||||
|
||||
###
|
||||
|
||||
|
||||
@@ -28,6 +28,12 @@
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.javalin</groupId>
|
||||
<artifactId>javalin</artifactId>
|
||||
<version>4.6.8</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.lanyuanxiaoyao.service.forest.service;
|
||||
|
||||
import com.dtflys.forest.annotation.BaseRequest;
|
||||
import com.dtflys.forest.annotation.Get;
|
||||
import com.dtflys.forest.annotation.Post;
|
||||
import com.dtflys.forest.annotation.Query;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.PageResponse;
|
||||
import com.lanyuanxiaoyao.service.configuration.entity.hudi.HPath;
|
||||
@@ -76,6 +77,12 @@ public interface HudiService {
|
||||
@Get("/hdfs/read")
|
||||
String read(@Query("root") String root);
|
||||
|
||||
@Post("/hdfs/write")
|
||||
String write(@Query("root") String root, String text);
|
||||
|
||||
@Post("/hdfs/write")
|
||||
String write(@Query("root") String root, String text, @Query("overwrite") Boolean overwrite);
|
||||
|
||||
@Get("/hdfs/download")
|
||||
InputStream download(@Query("root") String root);
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package com.lanyuanxiaoyao.service.forest;
|
||||
|
||||
import com.dtflys.forest.Forest;
|
||||
import com.dtflys.forest.annotation.Body;
|
||||
import com.dtflys.forest.annotation.Get;
|
||||
import com.dtflys.forest.annotation.Post;
|
||||
import com.dtflys.forest.config.ForestConfiguration;
|
||||
import com.dtflys.forest.converter.text.DefaultTextConverter;
|
||||
import com.dtflys.forest.utils.ForestDataType;
|
||||
@@ -21,7 +23,8 @@ public class TestClient {
|
||||
TestService testService = Forest.client(TestService.class);
|
||||
// System.out.println(testService.success());
|
||||
// System.out.println(testService.error());
|
||||
System.out.println(testService.number());
|
||||
// System.out.println(testService.number());
|
||||
System.out.println(testService.sendText("Hello world"));
|
||||
}
|
||||
|
||||
public interface TestService {
|
||||
@@ -31,8 +34,11 @@ public class TestClient {
|
||||
@Get("http://localhost:8000/error")
|
||||
String error();
|
||||
|
||||
@Get(value = "http://localhost:8000/number")
|
||||
@Get("http://localhost:8000/number")
|
||||
String number();
|
||||
|
||||
@Post("http://localhost:8000/receive_text")
|
||||
String sendText(@Body String text);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -1,55 +1,19 @@
|
||||
package com.lanyuanxiaoyao.service.forest;
|
||||
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import io.javalin.Javalin;
|
||||
|
||||
/**
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-24
|
||||
*/
|
||||
@SuppressWarnings("resource")
|
||||
public class TestServer {
|
||||
public static void main(String[] args) throws IOException {
|
||||
HttpServer server = HttpServer.create(new InetSocketAddress(8000), 0);
|
||||
server.createContext("/success", new SuccessHandler());
|
||||
server.createContext("/error", new ErrorHandler());
|
||||
server.createContext("/number", new NumberHandler());
|
||||
server.start();
|
||||
}
|
||||
|
||||
public static class SuccessHandler implements HttpHandler {
|
||||
@Override
|
||||
public void handle(HttpExchange exchange) throws IOException {
|
||||
byte[] result = "Success".getBytes();
|
||||
exchange.sendResponseHeaders(200, result.length);
|
||||
OutputStream body = exchange.getResponseBody();
|
||||
body.write(result);
|
||||
body.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class ErrorHandler implements HttpHandler {
|
||||
@Override
|
||||
public void handle(HttpExchange exchange) throws IOException {
|
||||
byte[] result = "Error".getBytes();
|
||||
exchange.sendResponseHeaders(500, result.length);
|
||||
OutputStream body = exchange.getResponseBody();
|
||||
body.write(result);
|
||||
body.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class NumberHandler implements HttpHandler {
|
||||
@Override
|
||||
public void handle(HttpExchange exchange) throws IOException {
|
||||
byte[] result = "-1".getBytes();
|
||||
exchange.sendResponseHeaders(200, result.length);
|
||||
OutputStream body = exchange.getResponseBody();
|
||||
body.write(result);
|
||||
body.close();
|
||||
}
|
||||
public static void main(String[] args) {
|
||||
Javalin.create()
|
||||
.get("/success", ctx -> ctx.status(200).result("Success"))
|
||||
.get("/error", ctx -> ctx.status(500).result("Error"))
|
||||
.get("/number", ctx -> ctx.status(200).result("-1"))
|
||||
.post("/receive_text", ctx -> ctx.status(200).result(ctx.body()))
|
||||
.start(8000);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ import org.eclipse.collections.api.list.ImmutableList;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
@@ -48,27 +50,36 @@ public class HdfsController {
|
||||
}
|
||||
|
||||
@GetMapping("get")
|
||||
public HPath get(@RequestParam("root")String root) throws IOException {
|
||||
public HPath get(@RequestParam("root") String root) throws IOException {
|
||||
return hdfsService.get(root);
|
||||
}
|
||||
|
||||
@GetMapping("list")
|
||||
public ImmutableList<HPath> list(@RequestParam("root")String root) throws IOException {
|
||||
public ImmutableList<HPath> list(@RequestParam("root") String root) throws IOException {
|
||||
return hdfsService.list(root);
|
||||
}
|
||||
|
||||
@GetMapping("read")
|
||||
public String read(@RequestParam("root")String root) throws IOException {
|
||||
public String read(@RequestParam("root") String root) throws IOException {
|
||||
return hdfsService.read(root);
|
||||
}
|
||||
|
||||
@PostMapping("write")
|
||||
public void write(
|
||||
@RequestParam("root") String root,
|
||||
@RequestParam(value = "overwrite", defaultValue = "false") Boolean overwrite,
|
||||
@RequestBody String text
|
||||
) throws IOException {
|
||||
hdfsService.write(root, text, overwrite);
|
||||
}
|
||||
|
||||
@GetMapping("download")
|
||||
public void download(@RequestParam("root")String root, HttpServletResponse response) throws IOException {
|
||||
public void download(@RequestParam("root") String root, HttpServletResponse response) throws IOException {
|
||||
hdfsService.download(root, response.getOutputStream());
|
||||
}
|
||||
|
||||
@GetMapping("size")
|
||||
public Long size(@RequestParam("root")String root) throws IOException {
|
||||
public Long size(@RequestParam("root") String root) throws IOException {
|
||||
return hdfsService.size(root);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@@ -25,6 +26,7 @@ import org.springframework.stereotype.Service;
|
||||
* @author lanyuanxiaoyao
|
||||
* @date 2024-01-24
|
||||
*/
|
||||
@SuppressWarnings("SpringCacheableMethodCallsInspection")
|
||||
@Service
|
||||
public class HdfsService {
|
||||
private static final Logger logger = LoggerFactory.getLogger(HdfsService.class);
|
||||
@@ -132,6 +134,14 @@ public class HdfsService {
|
||||
}
|
||||
}
|
||||
|
||||
public void write(String root, String text, Boolean overwrite) throws IOException {
|
||||
try (FileSystem fileSystem = FileSystem.get(new Configuration())) {
|
||||
try (FSDataOutputStream stream = fileSystem.create(new Path(root), overwrite)) {
|
||||
stream.writeBytes(text);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("SpringCacheableMethodCallsInspection")
|
||||
public void download(String root, OutputStream outputStream) throws IOException {
|
||||
if (!existsPath(root)) {
|
||||
|
||||
@@ -99,3 +99,8 @@ GET http://{{username}}:{{password}}@b12s15.hdp.dc:21685/pulsar/backlog?name=mai
|
||||
|
||||
### Test HDFS list
|
||||
GET http://{{username}}:{{password}}@b12s10.hdp.dc:16695/hdfs/list?root=hdfs://b2/apps/datalake/hive/dws_test/external_table_hudi/dws_ord_prod_inst_attr
|
||||
|
||||
### Test HDFS write
|
||||
POST http://{{username}}:{{password}}@b12s8.hdp.dc:15391/hdfs/write?root=hdfs://b2/apps/datalake/test/test.txt&overwrite=true
|
||||
|
||||
Hello world
|
||||
|
||||
Reference in New Issue
Block a user