[HUDI-2941] Show _hoodie_operation in spark sql results (#4649)
This commit is contained in:
@@ -679,9 +679,9 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
|
|||||||
Schema writerSchema;
|
Schema writerSchema;
|
||||||
boolean isValid;
|
boolean isValid;
|
||||||
try {
|
try {
|
||||||
TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient());
|
TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
|
||||||
writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
|
writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
|
||||||
tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields());
|
tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchemaWithoutMetadataFields());
|
||||||
isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
|
isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
|
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
|
||||||
|
|||||||
@@ -111,13 +111,13 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
|
|||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
|
|
||||||
HoodieTableMetaClient metaClient = table.getMetaClient();
|
HoodieTableMetaClient metaClient = table.getMetaClient();
|
||||||
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
|
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
|
||||||
|
|
||||||
// Here we firstly use the table schema as the reader schema to read
|
// Here we firstly use the table schema as the reader schema to read
|
||||||
// log file.That is because in the case of MergeInto, the config.getSchema may not
|
// log file.That is because in the case of MergeInto, the config.getSchema may not
|
||||||
// the same with the table schema.
|
// the same with the table schema.
|
||||||
try {
|
try {
|
||||||
Schema readerSchema = schemaUtil.getTableAvroSchema(false);
|
Schema readerSchema = schemaResolver.getTableAvroSchema(false);
|
||||||
config.setSchema(readerSchema.toString());
|
config.setSchema(readerSchema.toString());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// If there is no commit in the table, just ignore the exception.
|
// If there is no commit in the table, just ignore the exception.
|
||||||
|
|||||||
@@ -21,14 +21,13 @@ package org.apache.hudi.common.table;
|
|||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.Schema.Field;
|
import org.apache.avro.Schema.Field;
|
||||||
import org.apache.avro.SchemaCompatibility;
|
import org.apache.avro.SchemaCompatibility;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
import org.apache.hudi.common.model.HoodieLogFile;
|
import org.apache.hudi.common.model.HoodieLogFile;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||||
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
|
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
|
||||||
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
|
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
|
||||||
@@ -42,10 +41,8 @@ import org.apache.hudi.common.util.StringUtils;
|
|||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.exception.InvalidTableException;
|
import org.apache.hudi.exception.InvalidTableException;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||||
import org.apache.parquet.format.converter.ParquetMetadataConverter;
|
import org.apache.parquet.format.converter.ParquetMetadataConverter;
|
||||||
import org.apache.parquet.hadoop.ParquetFileReader;
|
import org.apache.parquet.hadoop.ParquetFileReader;
|
||||||
@@ -61,15 +58,11 @@ public class TableSchemaResolver {
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class);
|
private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class);
|
||||||
private final HoodieTableMetaClient metaClient;
|
private final HoodieTableMetaClient metaClient;
|
||||||
private final boolean withOperationField;
|
private final boolean hasOperationField;
|
||||||
|
|
||||||
public TableSchemaResolver(HoodieTableMetaClient metaClient) {
|
public TableSchemaResolver(HoodieTableMetaClient metaClient) {
|
||||||
this(metaClient, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public TableSchemaResolver(HoodieTableMetaClient metaClient, boolean withOperationField) {
|
|
||||||
this.metaClient = metaClient;
|
this.metaClient = metaClient;
|
||||||
this.withOperationField = withOperationField;
|
this.hasOperationField = hasOperationField();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -122,7 +115,7 @@ public class TableSchemaResolver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Schema getTableAvroSchemaFromDataFile() throws Exception {
|
public Schema getTableAvroSchemaFromDataFile() {
|
||||||
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
|
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -151,7 +144,7 @@ public class TableSchemaResolver {
|
|||||||
Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
|
Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
|
||||||
if (schemaFromTableConfig.isPresent()) {
|
if (schemaFromTableConfig.isPresent()) {
|
||||||
if (includeMetadataFields) {
|
if (includeMetadataFields) {
|
||||||
return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField);
|
return HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField);
|
||||||
} else {
|
} else {
|
||||||
return schemaFromTableConfig.get();
|
return schemaFromTableConfig.get();
|
||||||
}
|
}
|
||||||
@@ -176,7 +169,7 @@ public class TableSchemaResolver {
|
|||||||
}
|
}
|
||||||
Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
|
Option<Schema> schemaFromTableConfig = metaClient.getTableConfig().getTableCreateSchema();
|
||||||
if (schemaFromTableConfig.isPresent()) {
|
if (schemaFromTableConfig.isPresent()) {
|
||||||
Schema schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), withOperationField);
|
Schema schema = HoodieAvroUtils.addMetadataFields(schemaFromTableConfig.get(), hasOperationField);
|
||||||
return convertAvroSchemaToParquet(schema);
|
return convertAvroSchemaToParquet(schema);
|
||||||
}
|
}
|
||||||
return getTableParquetSchemaFromDataFile();
|
return getTableParquetSchemaFromDataFile();
|
||||||
@@ -244,7 +237,7 @@ public class TableSchemaResolver {
|
|||||||
|
|
||||||
Schema schema = new Schema.Parser().parse(existingSchemaStr);
|
Schema schema = new Schema.Parser().parse(existingSchemaStr);
|
||||||
if (includeMetadataFields) {
|
if (includeMetadataFields) {
|
||||||
schema = HoodieAvroUtils.addMetadataFields(schema, withOperationField);
|
schema = HoodieAvroUtils.addMetadataFields(schema, hasOperationField);
|
||||||
}
|
}
|
||||||
return Option.of(schema);
|
return Option.of(schema);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@@ -477,4 +470,18 @@ public class TableSchemaResolver {
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isHasOperationField() {
|
||||||
|
return hasOperationField;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasOperationField() {
|
||||||
|
try {
|
||||||
|
Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
|
||||||
|
return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to read operation field from avro schema", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -452,8 +452,8 @@ public class HoodieTableSource implements
|
|||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public Schema getTableAvroSchema() {
|
public Schema getTableAvroSchema() {
|
||||||
try {
|
try {
|
||||||
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
|
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
|
||||||
return schemaUtil.getTableAvroSchema();
|
return schemaResolver.getTableAvroSchema();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
// table exists but has no written data
|
// table exists but has no written data
|
||||||
LOG.warn("Get table avro schema error, use schema from the DDL instead", e);
|
LOG.warn("Get table avro schema error, use schema from the DDL instead", e);
|
||||||
|
|||||||
@@ -245,10 +245,10 @@ public class TestStreamReadOperator {
|
|||||||
final List<String> partitionKeys = Collections.singletonList("partition");
|
final List<String> partitionKeys = Collections.singletonList("partition");
|
||||||
|
|
||||||
// This input format is used to opening the emitted split.
|
// This input format is used to opening the emitted split.
|
||||||
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
|
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
|
||||||
final Schema tableAvroSchema;
|
final Schema tableAvroSchema;
|
||||||
try {
|
try {
|
||||||
tableAvroSchema = schemaUtil.getTableAvroSchema();
|
tableAvroSchema = schemaResolver.getTableAvroSchema();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieException("Get table avro schema error", e);
|
throw new HoodieException("Get table avro schema error", e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,8 +74,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
|
|||||||
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp))
|
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, lastInstant.getTimestamp))
|
||||||
log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
|
log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}")
|
||||||
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
|
private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList
|
||||||
private val schemaUtil = new TableSchemaResolver(metaClient)
|
private val schemaResolver = new TableSchemaResolver(metaClient)
|
||||||
private val tableAvroSchema = schemaUtil.getTableAvroSchema
|
private val tableAvroSchema = schemaResolver.getTableAvroSchema
|
||||||
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
|
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
|
||||||
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
|
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
|
||||||
private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex()
|
private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex()
|
||||||
|
|||||||
@@ -65,10 +65,10 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
|
|||||||
private val conf = sqlContext.sparkContext.hadoopConfiguration
|
private val conf = sqlContext.sparkContext.hadoopConfiguration
|
||||||
private val jobConf = new JobConf(conf)
|
private val jobConf = new JobConf(conf)
|
||||||
// use schema from latest metadata, if not present, read schema from the data file
|
// use schema from latest metadata, if not present, read schema from the data file
|
||||||
private val schemaUtil = new TableSchemaResolver(metaClient)
|
private val schemaResolver = new TableSchemaResolver(metaClient)
|
||||||
private lazy val tableAvroSchema = {
|
private lazy val tableAvroSchema = {
|
||||||
try {
|
try {
|
||||||
schemaUtil.getTableAvroSchema
|
schemaResolver.getTableAvroSchema
|
||||||
} catch {
|
} catch {
|
||||||
case _: Throwable => // If there is no commit in the table, we cann't get the schema
|
case _: Throwable => // If there is no commit in the table, we cann't get the schema
|
||||||
// with schemaUtil, use the userSchema instead.
|
// with schemaUtil, use the userSchema instead.
|
||||||
|
|||||||
@@ -17,8 +17,12 @@
|
|||||||
|
|
||||||
package org.apache.spark.sql.hudi
|
package org.apache.spark.sql.hudi
|
||||||
|
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient
|
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
|
||||||
|
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
|
||||||
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.exception.HoodieDuplicateKeyException
|
import org.apache.hudi.exception.HoodieDuplicateKeyException
|
||||||
|
import org.apache.hudi.keygen.ComplexKeyGenerator
|
||||||
|
import org.apache.spark.sql.SaveMode
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
@@ -582,8 +586,48 @@ class TestInsertTable extends TestHoodieSqlBase {
|
|||||||
checkAnswer(s"select id, name, price, ts from $tableName")(
|
checkAnswer(s"select id, name, price, ts from $tableName")(
|
||||||
Seq(1, "a1", 11.0, 1000)
|
Seq(1, "a1", 11.0, 1000)
|
||||||
)
|
)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Test For read operation's field") {
|
||||||
|
withTempDir { tmp => {
|
||||||
|
val tableName = generateTableName
|
||||||
|
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
|
||||||
|
import spark.implicits._
|
||||||
|
val day = "2021-08-02"
|
||||||
|
val df = Seq((1, "a1", 10, 1000, day, 12)).toDF("id", "name", "value", "ts", "day", "hh")
|
||||||
|
// Write a table by spark dataframe.
|
||||||
|
df.write.format("hudi")
|
||||||
|
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
|
||||||
|
.option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
|
||||||
|
.option(RECORDKEY_FIELD.key, "id")
|
||||||
|
.option(PRECOMBINE_FIELD.key, "ts")
|
||||||
|
.option(PARTITIONPATH_FIELD.key, "day,hh")
|
||||||
|
.option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName)
|
||||||
|
.option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1")
|
||||||
|
.option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1")
|
||||||
|
.option(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key, "true")
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.save(tablePath)
|
||||||
|
|
||||||
|
val metaClient = HoodieTableMetaClient.builder()
|
||||||
|
.setBasePath(tablePath)
|
||||||
|
.setConf(spark.sessionState.newHadoopConf())
|
||||||
|
.build()
|
||||||
|
|
||||||
|
assertResult(true)(new TableSchemaResolver(metaClient).isHasOperationField)
|
||||||
|
|
||||||
|
spark.sql(
|
||||||
|
s"""
|
||||||
|
|create table $tableName using hudi
|
||||||
|
|location '${tablePath}'
|
||||||
|
|""".stripMargin)
|
||||||
|
|
||||||
|
// Note: spark sql batch write currently does not write actual content to the operation field
|
||||||
|
checkAnswer(s"select id, _hoodie_operation from $tableName")(
|
||||||
|
Seq(1, null)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,8 @@
|
|||||||
|
|
||||||
package org.apache.hudi.sync.common;
|
package org.apache.hudi.sync.common;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
|
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
@@ -29,9 +31,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|||||||
import org.apache.hudi.common.table.timeline.TimelineUtils;
|
import org.apache.hudi.common.table.timeline.TimelineUtils;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.parquet.schema.MessageType;
|
import org.apache.parquet.schema.MessageType;
|
||||||
@@ -149,11 +148,7 @@ public abstract class AbstractSyncHoodieClient {
|
|||||||
*/
|
*/
|
||||||
public MessageType getDataSchema() {
|
public MessageType getDataSchema() {
|
||||||
try {
|
try {
|
||||||
if (withOperationField) {
|
|
||||||
return new TableSchemaResolver(metaClient, true).getTableParquetSchema();
|
|
||||||
} else {
|
|
||||||
return new TableSchemaResolver(metaClient).getTableParquetSchema();
|
return new TableSchemaResolver(metaClient).getTableParquetSchema();
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new HoodieSyncException("Failed to read data schema", e);
|
throw new HoodieSyncException("Failed to read data schema", e);
|
||||||
}
|
}
|
||||||
@@ -162,11 +157,7 @@ public abstract class AbstractSyncHoodieClient {
|
|||||||
public boolean isDropPartition() {
|
public boolean isDropPartition() {
|
||||||
try {
|
try {
|
||||||
Option<HoodieCommitMetadata> hoodieCommitMetadata;
|
Option<HoodieCommitMetadata> hoodieCommitMetadata;
|
||||||
if (withOperationField) {
|
|
||||||
hoodieCommitMetadata = new TableSchemaResolver(metaClient, true).getLatestCommitMetadata();
|
|
||||||
} else {
|
|
||||||
hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata();
|
hoodieCommitMetadata = new TableSchemaResolver(metaClient).getLatestCommitMetadata();
|
||||||
}
|
|
||||||
|
|
||||||
if (hoodieCommitMetadata.isPresent()
|
if (hoodieCommitMetadata.isPresent()
|
||||||
&& WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {
|
&& WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {
|
||||||
|
|||||||
@@ -189,11 +189,11 @@ public class HoodieClusteringJob {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private String getSchemaFromLatestInstant() throws Exception {
|
private String getSchemaFromLatestInstant() throws Exception {
|
||||||
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
|
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
|
||||||
if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
|
if (metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
|
||||||
throw new HoodieException("Cannot run clustering without any completed commits");
|
throw new HoodieException("Cannot run clustering without any completed commits");
|
||||||
}
|
}
|
||||||
Schema schema = schemaUtil.getTableAvroSchema(false);
|
Schema schema = schemaResolver.getTableAvroSchema(false);
|
||||||
return schema.toString();
|
return schema.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user