1
0

[HUDI-2757] Implement Hudi AWS Glue sync (#5076)

This commit is contained in:
Raymond Xu
2022-03-28 11:54:59 -07:00
committed by GitHub
parent 4ed84b216d
commit 6ccbae4d2a
25 changed files with 1151 additions and 204 deletions

View File

@@ -40,6 +40,11 @@
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Logging -->
<dependency>
@@ -75,6 +80,28 @@
<version>${dynamodb.lockclient.version}</version>
</dependency>
<!-- Hive -->
<dependency>
<groupId>${hive.groupid}</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- AWS SDK -->
<dependency>
<groupId>com.amazonaws</groupId>
@@ -103,6 +130,12 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-glue -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-glue</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<!-- Test -->
<dependency>

View File

@@ -0,0 +1,479 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.aws.sync;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.AbstractHiveSyncHoodieClient;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.model.Partition;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.AWSGlueClientBuilder;
import com.amazonaws.services.glue.model.AlreadyExistsException;
import com.amazonaws.services.glue.model.BatchCreatePartitionRequest;
import com.amazonaws.services.glue.model.BatchCreatePartitionResult;
import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest;
import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry;
import com.amazonaws.services.glue.model.BatchUpdatePartitionResult;
import com.amazonaws.services.glue.model.Column;
import com.amazonaws.services.glue.model.CreateDatabaseRequest;
import com.amazonaws.services.glue.model.CreateDatabaseResult;
import com.amazonaws.services.glue.model.CreateTableRequest;
import com.amazonaws.services.glue.model.CreateTableResult;
import com.amazonaws.services.glue.model.DatabaseInput;
import com.amazonaws.services.glue.model.EntityNotFoundException;
import com.amazonaws.services.glue.model.GetDatabaseRequest;
import com.amazonaws.services.glue.model.GetPartitionsRequest;
import com.amazonaws.services.glue.model.GetPartitionsResult;
import com.amazonaws.services.glue.model.GetTableRequest;
import com.amazonaws.services.glue.model.PartitionInput;
import com.amazonaws.services.glue.model.SerDeInfo;
import com.amazonaws.services.glue.model.StorageDescriptor;
import com.amazonaws.services.glue.model.Table;
import com.amazonaws.services.glue.model.TableInput;
import com.amazonaws.services.glue.model.UpdateTableRequest;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.apache.hudi.aws.utils.S3Utils.s3aToS3;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType;
import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
/**
* This class implements all the AWS APIs to enable syncing of a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html).
*/
public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class);
private static final int MAX_PARTITIONS_PER_REQUEST = 100;
private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L;
private final AWSGlue awsGlue;
private final String databaseName;
public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
super(syncConfig, hadoopConf, fs);
this.awsGlue = AWSGlueClientBuilder.standard().build();
this.databaseName = syncConfig.databaseName;
}
@Override
public List<Partition> getAllPartitions(String tableName) {
try {
GetPartitionsRequest request = new GetPartitionsRequest();
request.withDatabaseName(databaseName).withTableName(tableName);
GetPartitionsResult result = awsGlue.getPartitions(request);
return result.getPartitions()
.stream()
.map(p -> new Partition(p.getValues(), p.getStorageDescriptor().getLocation()))
.collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieGlueSyncException("Failed to get all partitions for table " + tableId(databaseName, tableName), e);
}
}
@Override
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
LOG.info("No partitions to add for " + tableId(databaseName, tableName));
return;
}
LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " + tableId(databaseName, tableName));
try {
Table table = getTable(awsGlue, databaseName, tableName);
StorageDescriptor sd = table.getStorageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
}).collect(Collectors.toList());
for (List<PartitionInput> batch : CollectionUtils.batches(partitionInputs, MAX_PARTITIONS_PER_REQUEST)) {
BatchCreatePartitionRequest request = new BatchCreatePartitionRequest();
request.withDatabaseName(databaseName).withTableName(tableName).withPartitionInputList(batch);
BatchCreatePartitionResult result = awsGlue.batchCreatePartition(request);
if (CollectionUtils.nonEmpty(result.getErrors())) {
throw new HoodieGlueSyncException("Fail to add partitions to " + tableId(databaseName, tableName)
+ " with error(s): " + result.getErrors());
}
Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS);
}
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to add partitions to " + tableId(databaseName, tableName), e);
}
}
@Override
public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
LOG.info("No partitions to change for " + tableName);
return;
}
LOG.info("Updating " + changedPartitions.size() + "partition(s) in table " + tableId(databaseName, tableName));
try {
Table table = getTable(awsGlue, databaseName, tableName);
StorageDescriptor sd = table.getStorageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
sd.setLocation(fullPartitionPath);
PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
return new BatchUpdatePartitionRequestEntry().withPartitionInput(partitionInput).withPartitionValueList(partitionValues);
}).collect(Collectors.toList());
for (List<BatchUpdatePartitionRequestEntry> batch : CollectionUtils.batches(updatePartitionEntries, MAX_PARTITIONS_PER_REQUEST)) {
BatchUpdatePartitionRequest request = new BatchUpdatePartitionRequest();
request.withDatabaseName(databaseName).withTableName(tableName).withEntries(batch);
BatchUpdatePartitionResult result = awsGlue.batchUpdatePartition(request);
if (CollectionUtils.nonEmpty(result.getErrors())) {
throw new HoodieGlueSyncException("Fail to update partitions to " + tableId(databaseName, tableName)
+ " with error(s): " + result.getErrors());
}
Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS);
}
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update partitions to " + tableId(databaseName, tableName), e);
}
}
@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
throw new UnsupportedOperationException("Not support dropPartitionsToTable yet.");
}
/**
* Update the table properties to the table.
*/
@Override
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
if (nonEmpty(tableProperties)) {
return;
}
try {
updateTableParameters(awsGlue, databaseName, tableName, tableProperties, true);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update properties for table " + tableId(databaseName, tableName), e);
}
}
@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
boolean cascade = syncConfig.partitionFields.size() > 0;
try {
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
StorageDescriptor sd = table.getStorageDescriptor();
sd.setColumns(newColumns);
final Date now = new Date();
TableInput updatedTableInput = new TableInput()
.withName(tableName)
.withTableType(table.getTableType())
.withParameters(table.getParameters())
.withPartitionKeys(table.getPartitionKeys())
.withStorageDescriptor(sd)
.withLastAccessTime(now)
.withLastAnalyzedTime(now);
UpdateTableRequest request = new UpdateTableRequest()
.withDatabaseName(databaseName)
.withTableInput(updatedTableInput);
awsGlue.updateTable(request);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update definition for table " + tableId(databaseName, tableName), e);
}
}
@Override
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`");
}
@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}
@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments) {
throw new UnsupportedOperationException("Not supported: `updateTableComments`");
}
@Override
public void createTable(String tableName,
MessageType storageSchema,
String inputFormatClass,
String outputFormatClass,
String serdeClass,
Map<String, String> serdeProperties,
Map<String, String> tableProperties) {
if (tableExists(tableName)) {
return;
}
CreateTableRequest request = new CreateTableRequest();
Map<String, String> params = new HashMap<>();
if (!syncConfig.createManagedTable) {
params.put("EXTERNAL", "TRUE");
}
params.putAll(tableProperties);
try {
Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
List<Column> schemaPartitionKeys = new ArrayList<>();
List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
if (syncConfig.partitionFields.contains(key)) {
schemaPartitionKeys.add(column);
} else {
schemaWithoutPartitionKeys.add(column);
}
}
StorageDescriptor storageDescriptor = new StorageDescriptor();
serdeProperties.put("serialization.format", "1");
storageDescriptor
.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
.withLocation(s3aToS3(syncConfig.basePath))
.withInputFormat(inputFormatClass)
.withOutputFormat(outputFormatClass)
.withColumns(schemaWithoutPartitionKeys);
final Date now = new Date();
TableInput tableInput = new TableInput()
.withName(tableName)
.withTableType(TableType.EXTERNAL_TABLE.toString())
.withParameters(params)
.withPartitionKeys(schemaPartitionKeys)
.withStorageDescriptor(storageDescriptor)
.withLastAccessTime(now)
.withLastAnalyzedTime(now);
request.withDatabaseName(databaseName)
.withTableInput(tableInput);
CreateTableResult result = awsGlue.createTable(request);
LOG.info("Created table " + tableId(databaseName, tableName) + " : " + result);
} catch (AlreadyExistsException e) {
LOG.warn("Table " + tableId(databaseName, tableName) + " already exists.", e);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to create " + tableId(databaseName, tableName), e);
}
}
@Override
public Map<String, String> getTableSchema(String tableName) {
try {
// GlueMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
Table table = getTable(awsGlue, databaseName, tableName);
Map<String, String> partitionKeysMap =
table.getPartitionKeys().stream().collect(Collectors.toMap(Column::getName, f -> f.getType().toUpperCase()));
Map<String, String> columnsMap =
table.getStorageDescriptor().getColumns().stream().collect(Collectors.toMap(Column::getName, f -> f.getType().toUpperCase()));
Map<String, String> schema = new HashMap<>();
schema.putAll(columnsMap);
schema.putAll(partitionKeysMap);
return schema;
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to get schema for table " + tableId(databaseName, tableName), e);
}
}
@Override
public boolean doesTableExist(String tableName) {
return tableExists(tableName);
}
@Override
public boolean tableExists(String tableName) {
GetTableRequest request = new GetTableRequest()
.withDatabaseName(databaseName)
.withName(tableName);
try {
return Objects.nonNull(awsGlue.getTable(request).getTable());
} catch (EntityNotFoundException e) {
LOG.info("Table not found: " + tableId(databaseName, tableName), e);
return false;
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to get table: " + tableId(databaseName, tableName), e);
}
}
@Override
public boolean databaseExists(String databaseName) {
GetDatabaseRequest request = new GetDatabaseRequest();
request.setName(databaseName);
try {
return Objects.nonNull(awsGlue.getDatabase(request).getDatabase());
} catch (EntityNotFoundException e) {
LOG.info("Database not found: " + databaseName, e);
return false;
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to check if database exists " + databaseName, e);
}
}
@Override
public void createDatabase(String databaseName) {
if (databaseExists(databaseName)) {
return;
}
CreateDatabaseRequest request = new CreateDatabaseRequest();
request.setDatabaseInput(new DatabaseInput()
.withName(databaseName)
.withDescription("Automatically created by " + this.getClass().getName())
.withParameters(null)
.withLocationUri(null));
try {
CreateDatabaseResult result = awsGlue.createDatabase(request);
LOG.info("Successfully created database in AWS Glue: " + result.toString());
} catch (AlreadyExistsException e) {
LOG.warn("AWS Glue Database " + databaseName + " already exists", e);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to create database " + databaseName, e);
}
}
@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
try {
Table table = getTable(awsGlue, databaseName, tableName);
return Option.of(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to get last sync commit time for " + tableId(databaseName, tableName), e);
}
}
@Override
public void close() {
awsGlue.shutdown();
}
@Override
public void updateLastCommitTimeSynced(String tableName) {
if (!activeTimeline.lastInstant().isPresent()) {
LOG.warn("No commit in active timeline.");
return;
}
final String lastCommitTimestamp = activeTimeline.lastInstant().get().getTimestamp();
try {
updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update last sync commit time for " + tableId(databaseName, tableName), e);
}
}
@Override
public Option<String> getLastReplicatedTime(String tableName) {
throw new UnsupportedOperationException("Not supported: `getLastReplicatedTime`");
}
@Override
public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
throw new UnsupportedOperationException("Not supported: `updateLastReplicatedTimeStamp`");
}
@Override
public void deleteLastReplicatedTimeStamp(String tableName) {
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
}
private enum TableType {
MANAGED_TABLE,
EXTERNAL_TABLE,
VIRTUAL_VIEW,
INDEX_TABLE,
MATERIALIZED_VIEW
}
private static Table getTable(AWSGlue awsGlue, String databaseName, String tableName) throws HoodieGlueSyncException {
GetTableRequest request = new GetTableRequest()
.withDatabaseName(databaseName)
.withName(tableName);
try {
return awsGlue.getTable(request).getTable();
} catch (EntityNotFoundException e) {
throw new HoodieGlueSyncException("Table not found: " + tableId(databaseName, tableName), e);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to get table " + tableId(databaseName, tableName), e);
}
}
private static void updateTableParameters(AWSGlue awsGlue, String databaseName, String tableName, Map<String, String> updatingParams, boolean shouldReplace) {
final Map<String, String> newParams = new HashMap<>();
try {
Table table = getTable(awsGlue, databaseName, tableName);
if (!shouldReplace) {
newParams.putAll(table.getParameters());
}
newParams.putAll(updatingParams);
final Date now = new Date();
TableInput updatedTableInput = new TableInput()
.withName(tableName)
.withTableType(table.getTableType())
.withParameters(newParams)
.withPartitionKeys(table.getPartitionKeys())
.withStorageDescriptor(table.getStorageDescriptor())
.withLastAccessTime(now)
.withLastAnalyzedTime(now);
UpdateTableRequest request = new UpdateTableRequest();
request.withDatabaseName(databaseName)
.withTableInput(updatedTableInput);
awsGlue.updateTable(request);
} catch (Exception e) {
throw new HoodieGlueSyncException("Fail to update params for table " + tableId(databaseName, tableName) + ": " + newParams, e);
}
}
}

View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.aws.sync;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import com.beust.jcommander.JCommander;
import jdk.jfr.Experimental;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
/**
* Currently Experimental. Utility class that implements syncing a Hudi Table with the
* AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
* to enable querying via Glue ETLs, Athena etc.
*
* Extends HiveSyncTool since most logic is similar to Hive syncing,
* expect using a different client {@link AWSGlueCatalogSyncClient} that implements
* the necessary functionality using Glue APIs.
*/
@Experimental
public class AwsGlueCatalogSyncTool extends HiveSyncTool {
public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, new HiveConf(conf, HiveConf.class), fs);
}
private AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
super(hiveSyncConfig, hiveConf, fs);
}
@Override
protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs);
}
public static void main(String[] args) {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
}
}

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.aws.sync;
import org.apache.hudi.hive.HoodieHiveSyncException;
public class HoodieGlueSyncException extends HoodieHiveSyncException {
public HoodieGlueSyncException(String message) {
super(message);
}
public HoodieGlueSyncException(String message, Throwable t) {
super(message, t);
}
}

View File

@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.aws.utils;
public final class S3Utils {
public static String s3aToS3(String s3aUrl) {
return s3aUrl.replaceFirst("(?i)^s3a://", "s3://");
}
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.common.util.collection.Pair;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -32,12 +33,21 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class CollectionUtils {
public static final Properties EMPTY_PROPERTIES = new Properties();
public static boolean isNullOrEmpty(Collection<?> c) {
return Objects.isNull(c) || c.isEmpty();
}
public static boolean nonEmpty(Collection<?> c) {
return !isNullOrEmpty(c);
}
/**
* Combines provided arrays into one
*/
@@ -105,6 +115,21 @@ public class CollectionUtils {
return diff;
}
public static <E> Stream<List<E>> batchesAsStream(List<E> list, int batchSize) {
ValidationUtils.checkArgument(batchSize > 0, "batch size must be positive.");
int total = list.size();
if (total <= 0) {
return Stream.empty();
}
int numFullBatches = (total - 1) / batchSize;
return IntStream.range(0, numFullBatches + 1).mapToObj(
n -> list.subList(n * batchSize, n == numFullBatches ? total : (n + 1) * batchSize));
}
public static <E> List<List<E>> batches(List<E> list, int batchSize) {
return batchesAsStream(list, batchSize).collect(Collectors.toList());
}
/**
* Determines whether two iterators contain equal elements in the same order. More specifically,
* this method returns {@code true} if {@code iterator1} and {@code iterator2} contain the same

View File

@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.util;
import java.util.Map;
import java.util.Objects;
public class MapUtils {
public static boolean isNullOrEmpty(Map<?, ?> m) {
return Objects.isNull(m) || m.isEmpty();
}
public static boolean nonEmpty(Map<?, ?> m) {
return !isNullOrEmpty(m);
}
}

View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.common.util;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.apache.hudi.common.util.CollectionUtils.batches;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
class TestCollectionUtils {
@Test
void getBatchesFromList() {
assertThrows(IllegalArgumentException.class, () -> {
batches(Collections.emptyList(), -1);
});
assertThrows(IllegalArgumentException.class, () -> {
batches(Collections.emptyList(), 0);
});
assertEquals(Collections.emptyList(), batches(Collections.emptyList(), 1));
List<List<Integer>> intsBatches1 = batches(Arrays.asList(1, 2, 3, 4, 5, 6), 3);
assertEquals(2, intsBatches1.size());
assertEquals(Arrays.asList(1, 2, 3), intsBatches1.get(0));
assertEquals(Arrays.asList(4, 5, 6), intsBatches1.get(1));
List<List<Integer>> intsBatches2 = batches(Arrays.asList(1, 2, 3, 4, 5, 6), 5);
assertEquals(2, intsBatches2.size());
assertEquals(Arrays.asList(1, 2, 3, 4, 5), intsBatches2.get(0));
assertEquals(Collections.singletonList(6), intsBatches2.get(1));
}
}

View File

@@ -163,7 +163,9 @@ public class KafkaConnectTransactionServices implements ConnectTransactionServic
Arrays.asList(connectConfigs.getMetaSyncClasses().split(",")));
FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration());
for (String impl : syncClientToolClasses) {
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue());
// TODO kafka connect config needs to support setting base file format
String baseFileFormat = connectConfigs.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT);
SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, baseFileFormat);
}
}
}

View File

@@ -562,6 +562,7 @@ object HoodieSparkSqlWriter {
if (metaSyncEnabled) {
val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf())
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT);
val properties = new TypedProperties()
properties.putAll(hoodieConfig.getProps)
properties.put(HiveSyncConfig.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key, spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString)
@@ -572,7 +573,7 @@ object HoodieSparkSqlWriter {
hiveConf.addResource(fs.getConf)
syncClientToolClassSet.foreach(impl => {
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, hiveConf, fs, basePath.toString, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue)
SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, hiveConf, fs, basePath.toString, baseFileFormat)
})
}
true

View File

@@ -114,7 +114,7 @@ public class DLASyncTool extends AbstractSyncTool {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+ " of type " + hoodieDLAClient.getTableType());
// Check if the necessary table exists
boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
boolean tableExists = hoodieDLAClient.tableExists(tableName);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieDLAClient.getDataSchema();
// Sync schema if needed

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.dla;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -31,14 +29,17 @@ import org.apache.hudi.hive.PartitionValueExtractor;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -115,7 +116,7 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient {
}
public Map<String, String> getTableSchema(String tableName) {
if (!doesTableExist(tableName)) {
if (!tableExists(tableName)) {
throw new IllegalArgumentException(
"Failed to get schema for table " + tableName + " does not exist");
}
@@ -222,6 +223,11 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient {
@Override
public boolean doesTableExist(String tableName) {
return tableExists(tableName);
}
@Override
public boolean tableExists(String tableName) {
String sql = consutructShowCreateTableSQL(tableName);
Statement stmt = null;
ResultSet rs = null;
@@ -274,6 +280,22 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient {
// TODO : dla do not support update tblproperties, so do nothing.
}
@Override
public Option<String> getLastReplicatedTime(String tableName) {
// no op; unsupported
return Option.empty();
}
@Override
public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
// no op; unsupported
}
@Override
public void deleteLastReplicatedTimeStamp(String tableName) {
// no op; unsupported
}
@Override
public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
@@ -370,6 +392,7 @@ public class HoodieDLAClient extends AbstractSyncHoodieClient {
}
}
@Override
public void close() {
try {
if (connection != null) {

View File

@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hudi.sync.common.HoodieSyncException;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.parquet.schema.MessageType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Base class to sync Hudi tables with Hive based metastores, such as Hive server, HMS or managed Hive services.
*/
public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieClient {
protected final HoodieTimeline activeTimeline;
protected final HiveSyncConfig syncConfig;
protected final Configuration hadoopConf;
protected final PartitionValueExtractor partitionValueExtractor;
public AbstractHiveSyncHoodieClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, syncConfig.withOperationField, fs);
this.syncConfig = syncConfig;
this.hadoopConf = hadoopConf;
this.partitionValueExtractor = ReflectionUtils.loadClass(syncConfig.partitionValueExtractorClass);
this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
}
public HoodieTimeline getActiveTimeline() {
return activeTimeline;
}
/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
protected List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getStorageLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
}
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : partitionStoragePartitions) {
Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (isDropPartition) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
}
return events;
}
/**
* Get all partitions for the table in the metastore.
*/
public abstract List<Partition> getAllPartitions(String tableName);
/**
* Check if a database already exists in the metastore.
*/
public abstract boolean databaseExists(String databaseName);
/**
* Create a database in the metastore.
*/
public abstract void createDatabase(String databaseName);
/**
* Update schema for the table in the metastore.
*/
public abstract void updateTableDefinition(String tableName, MessageType newSchema);
/*
* APIs below need to be re-worked by modeling field comment in hudi-sync-common,
* instead of relying on Avro or Hive schema class.
*/
public Schema getAvroSchemaWithoutMetadataFields() {
try {
return new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields();
} catch (Exception e) {
throw new HoodieSyncException("Failed to read avro schema", e);
}
}
public abstract List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName);
public abstract void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema);
public abstract void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String, String> newComments);
/*
* APIs above need to be re-worked by modeling field comment in hudi-sync-common,
* instead of relying on Avro or Hive schema class.
*/
}

View File

@@ -18,14 +18,6 @@
package org.apache.hudi.hive;
import com.beust.jcommander.JCommander;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -41,7 +33,14 @@ import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.sync.common.model.Partition;
import com.beust.jcommander.JCommander;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.GroupType;
@@ -66,35 +65,33 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
* partitions incrementally (all the partitions modified since the last commit)
*/
@SuppressWarnings("WeakerAccess")
public class HiveSyncTool extends AbstractSyncTool {
public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
protected final HiveSyncConfig hiveSyncConfig;
protected HoodieHiveClient hoodieHiveClient = null;
protected HiveSyncConfig hiveSyncConfig;
protected AbstractHiveSyncHoodieClient hoodieHiveClient;
protected String snapshotTableName = null;
protected Option<String> roTableName = null;
public HiveSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, conf, fs);
this.hiveSyncConfig = new HiveSyncConfig(props);
init(hiveSyncConfig, new HiveConf(conf, HiveConf.class));
this(new HiveSyncConfig(props), new HiveConf(conf, HiveConf.class), fs);
}
@Deprecated
public HiveSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
super(hiveSyncConfig.getProps(), hiveConf, fs);
this.hiveSyncConfig = hiveSyncConfig;
init(hiveSyncConfig, hiveConf);
// TODO: reconcile the way to set METASTOREURIS
if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris);
}
initClient(hiveSyncConfig, hiveConf);
initConfig(hiveSyncConfig);
}
private void init(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) {
try {
if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris);
}
this.hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveConf, fs);
} catch (RuntimeException e) {
if (hiveSyncConfig.ignoreExceptions) {
@@ -103,12 +100,16 @@ public class HiveSyncTool extends AbstractSyncTool {
throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
}
}
}
private void initConfig(HiveSyncConfig hiveSyncConfig) {
// Set partitionFields to empty, when the NonPartitionedExtractor is used
// TODO: HiveSyncConfig should be responsible for inferring config value
if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.partitionValueExtractorClass)) {
LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
hiveSyncConfig.partitionFields = new ArrayList<>();
}
this.hiveSyncConfig = hiveSyncConfig;
if (hoodieHiveClient != null) {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
@@ -139,9 +140,7 @@ public class HiveSyncTool extends AbstractSyncTool {
} catch (RuntimeException re) {
throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.tableName, re);
} finally {
if (hoodieHiveClient != null) {
hoodieHiveClient.close();
}
close();
}
}
@@ -162,6 +161,17 @@ public class HiveSyncTool extends AbstractSyncTool {
}
}
@Override
public void close() {
if (hoodieHiveClient != null) {
try {
hoodieHiveClient.close();
} catch (Exception e) {
throw new HoodieHiveSyncException("Fail to close sync client.", e);
}
}
}
protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
boolean readAsOptimized) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath()
@@ -170,7 +180,7 @@ public class HiveSyncTool extends AbstractSyncTool {
// check if the database exists else create it
if (hiveSyncConfig.autoCreateDatabase) {
try {
if (!hoodieHiveClient.doesDataBaseExist(hiveSyncConfig.databaseName)) {
if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) {
hoodieHiveClient.createDatabase(hiveSyncConfig.databaseName);
}
} catch (Exception e) {
@@ -178,14 +188,14 @@ public class HiveSyncTool extends AbstractSyncTool {
LOG.warn("Unable to create database", e);
}
} else {
if (!hoodieHiveClient.doesDataBaseExist(hiveSyncConfig.databaseName)) {
if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) {
LOG.error("Hive database does not exist " + hiveSyncConfig.databaseName);
throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.databaseName);
}
}
// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);
boolean tableExists = hoodieHiveClient.tableExists(tableName);
// check if isDropPartition
boolean isDropPartition = hoodieHiveClient.isDropPartition();
@@ -375,7 +385,7 @@ public class HiveSyncTool extends AbstractSyncTool {
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions(tableName);
List<Partition> hivePartitions = hoodieHiveClient.getAllPartitions(tableName);
List<PartitionEvent> partitionEvents =
hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition);

View File

@@ -18,10 +18,7 @@
package org.apache.hudi.hive;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -30,18 +27,14 @@ import org.apache.hudi.hive.ddl.HMSDDLExecutor;
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.hive.ddl.JDBCExecutor;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hudi.sync.common.HoodieSyncException;
import org.apache.hudi.sync.common.model.Partition;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.log4j.LogManager;
@@ -49,7 +42,6 @@ import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -57,22 +49,19 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;
public class HoodieHiveClient extends AbstractSyncHoodieClient {
private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
private static final String HIVE_ESCAPE_CHARACTER = HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;
/**
* This class implements logic to sync a Hudi table with either the Hive server or the Hive Metastore.
*/
public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
private final PartitionValueExtractor partitionValueExtractor;
private final HoodieTimeline activeTimeline;
DDLExecutor ddlExecutor;
private IMetaStoreClient client;
private final HiveSyncConfig syncConfig;
public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.withOperationField, fs);
this.syncConfig = cfg;
super(cfg, configuration, fs);
// Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should
// disable jdbc and depend on metastore client for all hive registrations
@@ -99,20 +88,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to create HiveMetaStoreClient", e);
}
try {
this.partitionValueExtractor =
(PartitionValueExtractor) Class.forName(cfg.partitionValueExtractorClass).newInstance();
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to initialize PartitionValueExtractor class " + cfg.partitionValueExtractorClass, e);
}
activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
}
public HoodieTimeline getActiveTimeline() {
return activeTimeline;
}
/**
@@ -159,61 +134,33 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions) {
return getPartitionEvents(tablePartitions, partitionStoragePartitions, false);
}
/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
}
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : partitionStoragePartitions) {
Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (isDropPartition) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
}
return events;
}
/**
* Scan table partitions.
*
* @deprecated Use {@link #getAllPartitions} instead.
*/
public List<Partition> scanTablePartitions(String tableName) throws TException {
@Deprecated
public List<org.apache.hadoop.hive.metastore.api.Partition> scanTablePartitions(String tableName) throws TException {
return client.listPartitions(syncConfig.databaseName, tableName, (short) -1);
}
void updateTableDefinition(String tableName, MessageType newSchema) {
@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
ddlExecutor.updateTableDefinition(tableName, newSchema);
}
@Override
public List<Partition> getAllPartitions(String tableName) {
try {
return client.listPartitions(syncConfig.databaseName, tableName, (short) -1)
.stream()
.map(p -> new Partition(p.getValues(), p.getSd().getLocation()))
.collect(Collectors.toList());
} catch (TException e) {
throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(syncConfig.databaseName, tableName), e);
}
}
@Override
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass,
@@ -226,18 +173,21 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
*/
@Override
public Map<String, String> getTableSchema(String tableName) {
if (!doesTableExist(tableName)) {
if (!tableExists(tableName)) {
throw new IllegalArgumentException(
"Failed to get schema for table " + tableName + " does not exist");
}
return ddlExecutor.getTableSchema(tableName);
}
/**
* @return true if the configured table exists
*/
@Deprecated
@Override
public boolean doesTableExist(String tableName) {
return tableExists(tableName);
}
@Override
public boolean tableExists(String tableName) {
try {
return client.tableExists(syncConfig.databaseName, tableName);
} catch (TException e) {
@@ -245,11 +195,13 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
/**
* @param databaseName
* @return true if the configured database exists
*/
@Deprecated
public boolean doesDataBaseExist(String databaseName) {
return databaseExists(databaseName);
}
@Override
public boolean databaseExists(String databaseName) {
try {
client.getDatabase(databaseName);
return true;
@@ -261,6 +213,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
@Override
public void createDatabase(String databaseName) {
ddlExecutor.createDatabase(databaseName);
}
@@ -321,6 +274,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
@Override
public void close() {
try {
ddlExecutor.close();
@@ -333,10 +287,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
List<String> getAllTables(String db) throws Exception {
return client.getAllTables(db);
}
@Override
public void updateLastCommitTimeSynced(String tableName) {
// Set the last commit time from the TBLproperties
@@ -352,14 +302,7 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
public Schema getAvroSchemaWithoutMetadataFields() {
try {
return new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields();
} catch (Exception e) {
throw new HoodieSyncException("Failed to read avro schema", e);
}
}
@Override
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
try {
return client.getSchema(syncConfig.databaseName, tableName);
@@ -368,11 +311,13 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
}
}
@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, List<Schema.Field> newSchema) {
Map<String,String> newComments = newSchema.stream().collect(Collectors.toMap(field -> field.name().toLowerCase(Locale.ROOT), field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
updateTableComments(tableName,oldSchema,newComments);
}
@Override
public void updateTableComments(String tableName, List<FieldSchema> oldSchema, Map<String,String> newComments) {
Map<String,String> oldComments = oldSchema.stream().collect(Collectors.toMap(fieldSchema -> fieldSchema.getName().toLowerCase(Locale.ROOT),
fieldSchema -> StringUtils.isNullOrEmpty(fieldSchema.getComment()) ? "" : fieldSchema.getComment()));

View File

@@ -20,10 +20,6 @@ package org.apache.hudi.hive;
public class HoodieHiveSyncException extends RuntimeException {
public HoodieHiveSyncException() {
super();
}
public HoodieHiveSyncException(String message) {
super(message);
}
@@ -32,11 +28,4 @@ public class HoodieHiveSyncException extends RuntimeException {
super(message, t);
}
public HoodieHiveSyncException(Throwable t) {
super(t);
}
protected static String format(String message, Object... args) {
return String.format(String.valueOf(message), (Object[]) args);
}
}

View File

@@ -30,11 +30,12 @@ import java.util.Map;
* There are two main implementations one is QueryBased other is based on HiveMetaStore
* QueryBasedDDLExecutor also has two implementations namely HiveQL based and other JDBC based.
*/
public interface DDLExecutor {
public interface DDLExecutor extends AutoCloseable {
/**
* @param databaseName name of database to be created.
*/
public void createDatabase(String databaseName);
void createDatabase(String databaseName);
/**
* Creates a table with the following properties.
@@ -47,9 +48,9 @@ public interface DDLExecutor {
* @param serdeProperties
* @param tableProperties
*/
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String, String> tableProperties);
void createTable(String tableName, MessageType storageSchema, String inputFormatClass,
String outputFormatClass, String serdeClass,
Map<String, String> serdeProperties, Map<String, String> tableProperties);
/**
* Updates the table with the newSchema.
@@ -57,7 +58,7 @@ public interface DDLExecutor {
* @param tableName
* @param newSchema
*/
public void updateTableDefinition(String tableName, MessageType newSchema);
void updateTableDefinition(String tableName, MessageType newSchema);
/**
* Fetches tableSchema for a table.
@@ -65,7 +66,7 @@ public interface DDLExecutor {
* @param tableName
* @return
*/
public Map<String, String> getTableSchema(String tableName);
Map<String, String> getTableSchema(String tableName);
/**
* Adds partition to table.
@@ -73,7 +74,7 @@ public interface DDLExecutor {
* @param tableName
* @param partitionsToAdd
*/
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
/**
* Updates partitions for a given table.
@@ -81,7 +82,7 @@ public interface DDLExecutor {
* @param tableName
* @param changedPartitions
*/
public void updatePartitionsToTable(String tableName, List<String> changedPartitions);
void updatePartitionsToTable(String tableName, List<String> changedPartitions);
/**
* Drop partitions for a given table.
@@ -89,15 +90,13 @@ public interface DDLExecutor {
* @param tableName
* @param partitionsToDrop
*/
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop);
void dropPartitionsToTable(String tableName, List<String> partitionsToDrop);
/**
* update table comments
*
* @param tableName
* @param newSchema
* @param newSchema Map key: field name, Map value: [field type, field comment]
*/
public void updateTableComments(String tableName, Map<String, ImmutablePair<String,String>> newSchema);
public void close();
void updateTableComments(String tableName, Map<String, ImmutablePair<String, String>> newSchema);
}

View File

@@ -55,10 +55,6 @@ public class GlobalHiveSyncTool extends HiveSyncTool {
LOG.info("Sync complete for " + tableName);
}
public void close() {
hoodieHiveClient.close();
}
public Map<String, Option<String>> getLastReplicatedTimeStampMap() {
Map<String, Option<String>> timeStampMap = new HashMap<>();
Option<String> timeStamp = hoodieHiveClient.getLastReplicatedTime(snapshotTableName);

View File

@@ -138,12 +138,12 @@ public class TestHiveSyncTool {
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
@@ -176,9 +176,9 @@ public class TestHiveSyncTool {
ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.TABLE_NAME);
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
"The one partition event must of type UPDATE");
@@ -211,20 +211,20 @@ public class TestHiveSyncTool {
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key(), "true");
reinitHiveSyncClient();
assertDoesNotThrow((this::reSyncHiveTable));
assertTrue(hiveClient.doesDataBaseExist(HiveTestUtil.DB_NAME),
assertTrue(hiveClient.databaseExists(HiveTestUtil.DB_NAME),
"DataBases " + HiveTestUtil.DB_NAME + " should exist after sync completes");
// while autoCreateDatabase is false and database exists;
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key(), "false");
reinitHiveSyncClient();
assertDoesNotThrow((this::reSyncHiveTable));
assertTrue(hiveClient.doesDataBaseExist(HiveTestUtil.DB_NAME),
assertTrue(hiveClient.databaseExists(HiveTestUtil.DB_NAME),
"DataBases " + HiveTestUtil.DB_NAME + " should exist after sync completes");
// while autoCreateDatabase is true and database exists;
hiveSyncProps.setProperty(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key(), "true");
assertDoesNotThrow((this::reSyncHiveTable));
assertTrue(hiveClient.doesDataBaseExist(HiveTestUtil.DB_NAME),
assertTrue(hiveClient.databaseExists(HiveTestUtil.DB_NAME),
"DataBases " + HiveTestUtil.DB_NAME + " should exist after sync completes");
}
@@ -457,8 +457,8 @@ public class TestHiveSyncTool {
reSyncHiveTable();
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
@@ -581,11 +581,11 @@ public class TestHiveSyncTool {
String roTableName = HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(roTableName), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
assertFalse(hiveClient.tableExists(roTableName), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes");
assertTrue(hiveClient.tableExists(roTableName), "Table " + roTableName + " should exist after sync completes");
if (useSchemaFromCommitMetadata) {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -643,14 +643,14 @@ public class TestHiveSyncTool {
String snapshotTableName = HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata);
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(snapshotTableName),
assertFalse(hiveClient.tableExists(snapshotTableName),
"Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially");
// Lets do the sync
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(snapshotTableName),
assertTrue(hiveClient.tableExists(snapshotTableName),
"Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes");
@@ -713,11 +713,11 @@ public class TestHiveSyncTool {
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME);
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getDataSchema().getColumns().size() + 3,
@@ -736,8 +736,8 @@ public class TestHiveSyncTool {
reinitHiveSyncClient();
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
List<org.apache.hudi.sync.common.model.Partition> hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false);
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
@@ -755,7 +755,7 @@ public class TestHiveSyncTool {
reinitHiveSyncClient();
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getDataSchema().getColumns().size() + 3,
@@ -776,12 +776,12 @@ public class TestHiveSyncTool {
HiveTestUtil.createCOWTable(instantTime, 1, true);
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
@@ -820,11 +820,11 @@ public class TestHiveSyncTool {
HiveTestUtil.createCOWTable(instantTime, 1, true);
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
@@ -860,11 +860,11 @@ public class TestHiveSyncTool {
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME);
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getDataSchema().getColumns().size(),
@@ -882,13 +882,13 @@ public class TestHiveSyncTool {
HiveTestUtil.createMORTable(commitTime, "", 5, false, true);
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
assertFalse(hiveClient.tableExists(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially");
// Lets do the sync
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
assertTrue(hiveClient.tableExists(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes");
// Schema being read from compacted base files
@@ -925,7 +925,7 @@ public class TestHiveSyncTool {
HiveTestUtil.createCOWTable(instantTime, 5, false);
reinitHiveSyncClient();
HoodieHiveClient prevHiveClient = hiveClient;
assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
@@ -936,12 +936,12 @@ public class TestHiveSyncTool {
reSyncHiveTable();
assertNull(hiveClient);
assertFalse(prevHiveClient.doesTableExist(HiveTestUtil.TABLE_NAME),
assertFalse(prevHiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
}
private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyCommitTime) throws Exception {
assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
@@ -973,7 +973,7 @@ public class TestHiveSyncTool {
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime, true);
reinitHiveSyncClient();
assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
reinitHiveSyncClient();
reSyncHiveTable();
@@ -1000,7 +1000,7 @@ public class TestHiveSyncTool {
reinitHiveSyncClient();
assertFalse(
hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
HiveSyncTool tool = new HiveSyncTool(hiveSyncProps, getHiveConf(), fileSystem);
// now delete the evolved commit instant
@@ -1017,7 +1017,7 @@ public class TestHiveSyncTool {
}
// table should not be synced yet
assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist at all");
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist at all");
}
@ParameterizedTest
@@ -1033,7 +1033,7 @@ public class TestHiveSyncTool {
//HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
reinitHiveSyncClient();
assertFalse(
hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
reSyncHiveTable();
@@ -1120,7 +1120,7 @@ public class TestHiveSyncTool {
reinitHiveSyncClient();
reSyncHiveTable();
assertTrue(hiveClient.doesTableExist(tableName));
assertTrue(hiveClient.tableExists(tableName));
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get());
HiveTestUtil.addMORPartitions(0, true, true, true, ZonedDateTime.now().plusDays(2), commitTime1, commitTime2);
@@ -1138,7 +1138,7 @@ public class TestHiveSyncTool {
private void reinitHiveSyncClient() {
hiveSyncTool = new HiveSyncTool(hiveSyncProps, HiveTestUtil.getHiveConf(), fileSystem);
hiveClient = hiveSyncTool.hoodieHiveClient;
hiveClient = (HoodieHiveClient) hiveSyncTool.hoodieHiveClient;
}
private int getPartitionFieldSize() {

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.sync.common;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -31,6 +29,9 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -43,10 +44,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
public abstract class AbstractSyncHoodieClient {
public abstract class AbstractSyncHoodieClient implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
public static final TypeConverter TYPE_CONVERTOR = new TypeConverter() {};
protected final HoodieTableMetaClient metaClient;
@@ -89,12 +91,24 @@ public abstract class AbstractSyncHoodieClient {
String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties);
/**
* @deprecated Use {@link #tableExists} instead.
*/
@Deprecated
public abstract boolean doesTableExist(String tableName);
public abstract boolean tableExists(String tableName);
public abstract Option<String> getLastCommitTimeSynced(String tableName);
public abstract void updateLastCommitTimeSynced(String tableName);
public abstract Option<String> getLastReplicatedTime(String tableName);
public abstract void updateLastReplicatedTimeStamp(String tableName, String timeStamp);
public abstract void deleteLastReplicatedTimeStamp(String tableName);
public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.sync.common.model;
import java.util.List;
public class Partition {
private final List<String> values;
private final String storageLocation;
public Partition(List<String> values, String storageLocation) {
this.values = values;
this.storageLocation = storageLocation;
}
public List<String> getValues() {
return values;
}
public String getStorageLocation() {
return storageLocation;
}
}

View File

@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.sync.common.util;
public final class TableUtils {
public static String tableId(String database, String table) {
return String.format("%s.%s", database, table);
}
}

View File

@@ -167,7 +167,7 @@ public class TestHiveIncrementalPuller {
puller.saveDelta();
HoodieHiveClient assertingClient = new HoodieHiveClient(new HiveSyncConfig(getAssertionSyncConfig(cfg.tmpDb)), HiveTestUtil.getHiveConf(), fileSystem);
String tmpTable = cfg.targetTable + "__" + cfg.sourceTable;
assertTrue(assertingClient.doesTableExist(tmpTable));
assertTrue(assertingClient.tableExists(tmpTable));
}
}

View File

@@ -1240,8 +1240,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("year", "month", "day");
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist");
assertEquals(3, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
assertTrue(hiveClient.tableExists(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist");
assertEquals(3, hiveClient.getAllPartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(lastInstantForUpstreamTable,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),

View File

@@ -201,7 +201,7 @@ public class UtilitiesTestBase {
*
* @throws IOException
*/
private static void clearHiveDb() throws IOException {
private static void clearHiveDb() throws Exception {
HiveConf hiveConf = new HiveConf();
// Create Dummy hive sync config
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");