1
0

[HUDI-3106] Fix HiveSyncTool not sync schema (#4452)

This commit is contained in:
ForwardXu
2021-12-28 14:11:14 +08:00
committed by GitHub
parent 1f7afba5e4
commit 32505d5adb
4 changed files with 63 additions and 32 deletions

View File

@@ -18,6 +18,11 @@
package org.apache.hudi.common.table; package org.apache.hudi.common.table;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
@@ -35,12 +40,6 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.exception.InvalidTableException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.avro.AvroSchemaConverter;
@@ -85,7 +84,8 @@ public class TableSchemaResolver {
// If this is COW, get the last commit and read the schema from a file written in the // If this is COW, get the last commit and read the schema from a file written in the
// last commit // last commit
HoodieInstant lastCommit = HoodieInstant lastCommit =
activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath())); activeTimeline.getCommitsTimeline().filterCompletedInstantsWithCommitMetadata()
.lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class); .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
@@ -97,8 +97,8 @@ public class TableSchemaResolver {
// If this is MOR, depending on whether the latest commit is a delta commit or // If this is MOR, depending on whether the latest commit is a delta commit or
// compaction commit // compaction commit
// Get a datafile written and get the schema from that file // Get a datafile written and get the schema from that file
Option<HoodieInstant> lastCompactionCommit = Option<HoodieInstant> lastCompactionCommit = metaClient.getActiveTimeline().getCommitTimeline()
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); .filterCompletedInstantsWithCommitMetadata().lastInstant();
LOG.info("Found the last compaction commit as " + lastCompactionCommit); LOG.info("Found the last compaction commit as " + lastCompactionCommit);
Option<HoodieInstant> lastDeltaCommit; Option<HoodieInstant> lastDeltaCommit;

View File

@@ -18,6 +18,8 @@
package org.apache.hudi.common.table.timeline; package org.apache.hudi.common.table.timeline;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
@@ -100,6 +102,12 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details); return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details);
} }
@Override
public HoodieTimeline filterCompletedInstantsWithCommitMetadata() {
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted)
.filter(i -> !isDeletePartitionType(i)), details);
}
@Override @Override
public HoodieTimeline filterCompletedAndCompactionInstants() { public HoodieTimeline filterCompletedAndCompactionInstants() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted() return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted()
@@ -351,6 +359,21 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
return details.apply(instant); return details.apply(instant);
} }
@Override
public boolean isDeletePartitionType(HoodieInstant instant) {
Option<WriteOperationType> operationType;
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(getInstantDetails(instant).get(), HoodieCommitMetadata.class);
operationType = Option.of(commitMetadata.getOperationType());
} catch (Exception e) {
operationType = Option.empty();
}
return operationType.isPresent() && WriteOperationType.DELETE_PARTITION.equals(operationType.get());
}
@Override @Override
public String toString() { public String toString() {
return this.getClass().getName() + ": " + instants.stream().map(Object::toString).collect(Collectors.joining(",")); return this.getClass().getName() + ": " + instants.stream().map(Object::toString).collect(Collectors.joining(","));

View File

@@ -131,6 +131,14 @@ public interface HoodieTimeline extends Serializable {
*/ */
HoodieTimeline filterCompletedAndCompactionInstants(); HoodieTimeline filterCompletedAndCompactionInstants();
/**
* Filter this timeline to include the completed and exclude operation type is delete partition instants.
*
* @return New instance of HoodieTimeline with include the completed and
* exclude operation type is delete partition instants
*/
HoodieTimeline filterCompletedInstantsWithCommitMetadata();
/** /**
* Timeline to just include commits (commit/deltacommit), compaction and replace actions. * Timeline to just include commits (commit/deltacommit), compaction and replace actions.
* *
@@ -281,6 +289,11 @@ public interface HoodieTimeline extends Serializable {
*/ */
Option<byte[]> getInstantDetails(HoodieInstant instant); Option<byte[]> getInstantDetails(HoodieInstant instant);
/**
* Check WriteOperationType is DeletePartition.
*/
boolean isDeletePartitionType(HoodieInstant instant);
/** /**
* Helper methods to compare instants. * Helper methods to compare instants.
**/ **/

View File

@@ -18,6 +18,11 @@
package org.apache.hudi.hive; package org.apache.hudi.hive;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
@@ -31,12 +36,6 @@ import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.sync.common.AbstractSyncTool;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.GroupType;
@@ -168,26 +167,22 @@ public class HiveSyncTool extends AbstractSyncTool {
// check if isDropPartition // check if isDropPartition
boolean isDropPartition = hoodieHiveClient.isDropPartition(); boolean isDropPartition = hoodieHiveClient.isDropPartition();
// check if schemaChanged // Get the parquet schema for this table looking at the latest commit
boolean schemaChanged = false; MessageType schema = hoodieHiveClient.getDataSchema();
if (!isDropPartition) { // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// Get the parquet schema for this table looking at the latest commit // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
MessageType schema = hoodieHiveClient.getDataSchema(); // by the data source way (which will use the HoodieBootstrapRelation).
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, if (hoodieHiveClient.isBootstrap()
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
// by the data source way (which will use the HoodieBootstrapRelation). && !readAsOptimized) {
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. cfg.syncAsSparkDataSourceTable = false;
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
cfg.syncAsSparkDataSourceTable = false;
}
// Sync schema if needed
schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
} }
// Sync schema if needed
boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
LOG.info("Schema sync complete. Syncing partitions for " + tableName); LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions // Get the last time we successfully synced partitions
Option<String> lastCommitTimeSynced = Option.empty(); Option<String> lastCommitTimeSynced = Option.empty();