feature(hudi-query): 增加单独的Schema查询方法

This commit is contained in:
2023-11-30 12:29:28 +08:00
parent b69485aaee
commit 68c0abb8e3
2 changed files with 18 additions and 9 deletions

View File

@@ -4,9 +4,7 @@ import com.eshore.odcp.hudi.connector.entity.TableMeta;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lanyuanxiaoyao.service.forest.service.InfoService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import com.lanyuanxiaoyao.service.hudi.utils.HoodieUtils;
import org.apache.hudi.org.apache.avro.Schema;
import org.eclipse.collections.api.map.ImmutableMap;
import org.slf4j.Logger;
@@ -45,12 +43,7 @@ public class TableService {
@Cacheable(value = "table-schema", sync = true)
@Retryable(Throwable.class)
public ImmutableMap<String, Object> schema(String hdfs) throws Exception {
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(hdfs)
.build();
TableSchemaResolver schemaUtil = new TableSchemaResolver(client);
Schema schema = schemaUtil.getTableAvroSchema(true);
Schema schema = HoodieUtils.getSchema(hdfs);
return mapper.readValue(schema.toString(), new TypeReference<ImmutableMap<String, Object>>() {
});
}

View File

@@ -8,12 +8,15 @@ import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.org.apache.avro.Schema;
import org.eclipse.collections.api.factory.Lists;
import org.eclipse.collections.api.list.ImmutableList;
import org.slf4j.Logger;
@@ -28,6 +31,19 @@ import org.slf4j.LoggerFactory;
public class HoodieUtils {
private static final Logger logger = LoggerFactory.getLogger(HoodieUtils.class);
public static Schema getSchema(String hdfs) throws Exception {
HoodieTableMetaClient client = HoodieTableMetaClient.builder()
.setConf(new Configuration())
.setBasePath(hdfs)
.build();
return getSchema(client);
}
public static Schema getSchema(HoodieTableMetaClient client) throws Exception {
TableSchemaResolver schemaUtil = new TableSchemaResolver(client);
return schemaUtil.getTableAvroSchema(true);
}
public static ImmutableList<HoodieInstant> getAllInstants(HoodieTableMetaClient client, Function<HoodieTableMetaClient, HoodieDefaultTimeline> getTimeline) throws IOException {
FileSystem fileSystem = client.getRawFs();
// 直接使用 toString 方法得到的值是被缓存的