1
0

[HUDI-3357] MVP implementation of BigQuerySyncTool (#5125)

Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com>
This commit is contained in:
Vinoth Govindarajan
2022-04-02 13:18:06 -07:00
committed by GitHub
parent c19f505b5a
commit 20964df770
16 changed files with 1328 additions and 214 deletions

117
hudi-gcp/pom.xml Normal file
View File

@@ -0,0 +1,117 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.11.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-gcp</artifactId>
<packaging>jar</packaging>
<!-- Using libraries-bom to manage versions.
See https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/The-Google-Cloud-Platform-Libraries-BOM -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>25.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,46 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
<id>jar-with-dependencies</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<unpack>true</unpack>
<scope>runtime</scope>
<excludes>
<exclude>junit:junit</exclude>
<exclude>com.google.code.findbugs:*</exclude>
<exclude>org.apache.hbase:*</exclude>
</excludes>
</dependencySet>
<dependencySet>
<unpack>true</unpack>
<scope>provided</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.gcp.bigquery;
import org.apache.hudi.common.config.TypedProperties;
import com.beust.jcommander.Parameter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Configs needed to sync data into BigQuery.
*/
public class BigQuerySyncConfig implements Serializable {
public static String BIGQUERY_SYNC_PROJECT_ID = "hoodie.gcp.bigquery.sync.project_id";
public static String BIGQUERY_SYNC_DATASET_NAME = "hoodie.gcp.bigquery.sync.dataset_name";
public static String BIGQUERY_SYNC_DATASET_LOCATION = "hoodie.gcp.bigquery.sync.dataset_location";
public static String BIGQUERY_SYNC_TABLE_NAME = "hoodie.gcp.bigquery.sync.table_name";
public static String BIGQUERY_SYNC_SOURCE_URI = "hoodie.gcp.bigquery.sync.source_uri";
public static String BIGQUERY_SYNC_SOURCE_URI_PREFIX = "hoodie.gcp.bigquery.sync.source_uri_prefix";
public static String BIGQUERY_SYNC_SYNC_BASE_PATH = "hoodie.gcp.bigquery.sync.base_path";
public static String BIGQUERY_SYNC_PARTITION_FIELDS = "hoodie.gcp.bigquery.sync.partition_fields";
public static String BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA = "hoodie.gcp.bigquery.sync.use_file_listing_from_metadata";
public static String BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING = "hoodie.gcp.bigquery.sync.assume_date_partitioning";
@Parameter(names = {"--project-id"}, description = "name of the target project in BigQuery", required = true)
public String projectId;
@Parameter(names = {"--dataset-name"}, description = "name of the target dataset in BigQuery", required = true)
public String datasetName;
@Parameter(names = {"--dataset-location"}, description = "location of the target dataset in BigQuery", required = true)
public String datasetLocation;
@Parameter(names = {"--table-name"}, description = "name of the target table in BigQuery", required = true)
public String tableName;
@Parameter(names = {"--source-uri"}, description = "name of the source uri gcs path of the table", required = true)
public String sourceUri;
@Parameter(names = {"--source-uri-prefix"}, description = "name of the source uri gcs path prefix of the table", required = true)
public String sourceUriPrefix;
@Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
public String basePath;
@Parameter(names = {"--partitioned-by"}, description = "Comma-delimited partition fields. Default to non-partitioned.")
public List<String> partitionFields = new ArrayList<>();
@Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
public Boolean useFileListingFromMetadata = false;
@Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
public Boolean assumeDatePartitioning = false;
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
public static BigQuerySyncConfig copy(BigQuerySyncConfig cfg) {
BigQuerySyncConfig newConfig = new BigQuerySyncConfig();
newConfig.projectId = cfg.projectId;
newConfig.datasetName = cfg.datasetName;
newConfig.datasetLocation = cfg.datasetLocation;
newConfig.tableName = cfg.tableName;
newConfig.sourceUri = cfg.sourceUri;
newConfig.sourceUriPrefix = cfg.sourceUriPrefix;
newConfig.basePath = cfg.basePath;
newConfig.partitionFields = cfg.partitionFields;
newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
newConfig.help = cfg.help;
return newConfig;
}
public TypedProperties toProps() {
TypedProperties properties = new TypedProperties();
properties.put(BIGQUERY_SYNC_PROJECT_ID, projectId);
properties.put(BIGQUERY_SYNC_DATASET_NAME, datasetName);
properties.put(BIGQUERY_SYNC_DATASET_LOCATION, datasetLocation);
properties.put(BIGQUERY_SYNC_TABLE_NAME, tableName);
properties.put(BIGQUERY_SYNC_SOURCE_URI, sourceUri);
properties.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, sourceUriPrefix);
properties.put(BIGQUERY_SYNC_SYNC_BASE_PATH, basePath);
properties.put(BIGQUERY_SYNC_PARTITION_FIELDS, String.join(",", partitionFields));
properties.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, useFileListingFromMetadata);
properties.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, assumeDatePartitioning);
return properties;
}
public static BigQuerySyncConfig fromProps(TypedProperties props) {
BigQuerySyncConfig config = new BigQuerySyncConfig();
config.projectId = props.getString(BIGQUERY_SYNC_PROJECT_ID);
config.datasetName = props.getString(BIGQUERY_SYNC_DATASET_NAME);
config.datasetLocation = props.getString(BIGQUERY_SYNC_DATASET_LOCATION);
config.tableName = props.getString(BIGQUERY_SYNC_TABLE_NAME);
config.sourceUri = props.getString(BIGQUERY_SYNC_SOURCE_URI);
config.sourceUriPrefix = props.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX);
config.basePath = props.getString(BIGQUERY_SYNC_SYNC_BASE_PATH);
config.partitionFields = props.getStringList(BIGQUERY_SYNC_PARTITION_FIELDS, ",", Collections.emptyList());
config.useFileListingFromMetadata = props.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, false);
config.assumeDatePartitioning = props.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, false);
return config;
}
@Override
public String toString() {
return "BigQuerySyncConfig{projectId='" + projectId
+ "', datasetName='" + datasetName
+ "', datasetLocation='" + datasetLocation
+ "', tableName='" + tableName
+ "', sourceUri='" + sourceUri
+ "', sourceUriPrefix='" + sourceUriPrefix
+ "', basePath='" + basePath + "'"
+ ", partitionFields=" + partitionFields
+ "', useFileListingFromMetadata='" + useFileListingFromMetadata
+ "', assumeDataPartitioning='" + assumeDatePartitioning
+ "', help=" + help + "}";
}
}

View File

@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.gcp.bigquery;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.sync.common.util.ManifestFileWriter;
import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/**
* Tool to sync a hoodie table with a big query table. Either use it as an api
* BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args]
* <p>
* This utility will get the schema from the latest commit and will sync big query table schema.
*
* @Experimental
*/
public class BigQuerySyncTool extends AbstractSyncTool {
private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
public final BigQuerySyncConfig cfg;
public final String manifestTableName;
public final String versionsTableName;
public final String snapshotViewName;
public BigQuerySyncTool(TypedProperties properties, Configuration conf, FileSystem fs) {
super(properties, conf, fs);
cfg = BigQuerySyncConfig.fromProps(properties);
manifestTableName = cfg.tableName + "_manifest";
versionsTableName = cfg.tableName + "_versions";
snapshotViewName = cfg.tableName;
}
@Override
public void syncHoodieTable() {
try (HoodieBigQuerySyncClient bqSyncClient = new HoodieBigQuerySyncClient(BigQuerySyncConfig.fromProps(props), fs)) {
switch (bqSyncClient.getTableType()) {
case COPY_ON_WRITE:
syncCoWTable(bqSyncClient);
break;
case MERGE_ON_READ:
default:
throw new UnsupportedOperationException(bqSyncClient.getTableType() + " table type is not supported yet.");
}
} catch (Exception e) {
throw new HoodieBigQuerySyncException("Got runtime exception when big query syncing " + cfg.tableName, e);
}
}
private void syncCoWTable(HoodieBigQuerySyncClient bqSyncClient) {
ValidationUtils.checkState(bqSyncClient.getTableType() == HoodieTableType.COPY_ON_WRITE);
LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + bqSyncClient.getBasePath());
if (!bqSyncClient.datasetExists()) {
throw new HoodieBigQuerySyncException("Dataset not found: " + cfg);
}
ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder()
.setConf(conf)
.setBasePath(cfg.basePath)
.setUseFileListingFromMetadata(cfg.useFileListingFromMetadata)
.setAssumeDatePartitioning(cfg.assumeDatePartitioning)
.build();
manifestFileWriter.writeManifestFile();
if (!bqSyncClient.tableExists(manifestTableName)) {
bqSyncClient.createManifestTable(manifestTableName, manifestFileWriter.getManifestSourceUri());
LOG.info("Manifest table creation complete for " + manifestTableName);
}
if (!bqSyncClient.tableExists(versionsTableName)) {
bqSyncClient.createVersionsTable(versionsTableName, cfg.sourceUri, cfg.sourceUriPrefix, cfg.partitionFields);
LOG.info("Versions table creation complete for " + versionsTableName);
}
if (!bqSyncClient.tableExists(snapshotViewName)) {
bqSyncClient.createSnapshotView(snapshotViewName, versionsTableName, manifestTableName);
LOG.info("Snapshot view creation complete for " + snapshotViewName);
}
// TODO: Implement automatic schema evolution when you add a new column.
LOG.info("Sync table complete for " + snapshotViewName);
}
public static void main(String[] args) {
BigQuerySyncConfig cfg = new BigQuerySyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
new BigQuerySyncTool(cfg.toProps(), fs.getConf(), fs).syncHoodieTable();
}
}

View File

@@ -0,0 +1,243 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.gcp.bigquery;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.HivePartitioningOptions;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.ViewDefinition;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
private static final Logger LOG = LogManager.getLogger(HoodieBigQuerySyncClient.class);
private final BigQuerySyncConfig syncConfig;
private transient BigQuery bigquery;
public HoodieBigQuerySyncClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) {
super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
false, fs);
this.syncConfig = syncConfig;
this.createBigQueryConnection();
}
private void createBigQueryConnection() {
if (bigquery == null) {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
bigquery = BigQueryOptions.newBuilder().setLocation(syncConfig.datasetLocation).build().getService();
LOG.info("Successfully established BigQuery connection.");
} catch (BigQueryException e) {
throw new HoodieBigQuerySyncException("Cannot create bigQuery connection ", e);
}
}
}
@Override
public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass,
final String outputFormatClass, final String serdeClass,
final Map<String, String> serdeProperties, final Map<String, String> tableProperties) {
// bigQuery create table arguments are different, so do nothing.
}
public void createManifestTable(String tableName, String sourceUri) {
try {
TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName);
CsvOptions csvOptions = CsvOptions.newBuilder()
.setFieldDelimiter(",")
.setAllowJaggedRows(false)
.setAllowQuotedNewLines(false)
.setSkipLeadingRows(0)
.build();
Schema schema = Schema.of(
Field.of("filename", StandardSQLTypeName.STRING));
ExternalTableDefinition customTable =
ExternalTableDefinition.newBuilder(sourceUri, schema, csvOptions)
.setAutodetect(false)
.setIgnoreUnknownValues(false)
.setMaxBadRecords(0)
.build();
bigquery.create(TableInfo.of(tableId, customTable));
LOG.info("Manifest External table created.");
} catch (BigQueryException e) {
throw new HoodieBigQuerySyncException("Manifest External table was not created ", e);
}
}
public void createVersionsTable(String tableName, String sourceUri, String sourceUriPrefix, List<String> partitionFields) {
try {
ExternalTableDefinition customTable;
TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName);
if (partitionFields.isEmpty()) {
customTable =
ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
.setAutodetect(true)
.setIgnoreUnknownValues(true)
.setMaxBadRecords(0)
.build();
} else {
// Configuring partitioning options for partitioned table.
HivePartitioningOptions hivePartitioningOptions =
HivePartitioningOptions.newBuilder()
.setMode("AUTO")
.setRequirePartitionFilter(false)
.setSourceUriPrefix(sourceUriPrefix)
.build();
customTable =
ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
.setAutodetect(true)
.setHivePartitioningOptions(hivePartitioningOptions)
.setIgnoreUnknownValues(true)
.setMaxBadRecords(0)
.build();
}
bigquery.create(TableInfo.of(tableId, customTable));
LOG.info("External table created using hivepartitioningoptions");
} catch (BigQueryException e) {
throw new HoodieBigQuerySyncException("External table was not created ", e);
}
}
public void createSnapshotView(String viewName, String versionsTableName, String manifestTableName) {
try {
TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, viewName);
String query =
String.format(
"SELECT * FROM `%s.%s.%s` WHERE _hoodie_file_name IN "
+ "(SELECT filename FROM `%s.%s.%s`)",
syncConfig.projectId,
syncConfig.datasetName,
versionsTableName,
syncConfig.projectId,
syncConfig.datasetName,
manifestTableName);
ViewDefinition viewDefinition =
ViewDefinition.newBuilder(query).setUseLegacySql(false).build();
bigquery.create(TableInfo.of(tableId, viewDefinition));
LOG.info("View created successfully");
} catch (BigQueryException e) {
throw new HoodieBigQuerySyncException("View was not created ", e);
}
}
@Override
public Map<String, String> getTableSchema(String tableName) {
// TODO: Implement automatic schema evolution when you add a new column.
return Collections.emptyMap();
}
@Override
public void addPartitionsToTable(final String tableName, final List<String> partitionsToAdd) {
// bigQuery discovers the new partitions automatically, so do nothing.
throw new UnsupportedOperationException("No support for addPartitionsToTable yet.");
}
@Override
public void dropPartitionsToTable(final String tableName, final List<String> partitionsToDrop) {
// bigQuery discovers the new partitions automatically, so do nothing.
throw new UnsupportedOperationException("No support for dropPartitionsToTable yet.");
}
public boolean datasetExists() {
Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName));
return dataset != null;
}
@Override
public boolean doesTableExist(final String tableName) {
return tableExists(tableName);
}
@Override
public boolean tableExists(String tableName) {
TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName);
Table table = bigquery.getTable(tableId, BigQuery.TableOption.fields());
return table != null && table.exists();
}
@Override
public Option<String> getLastCommitTimeSynced(final String tableName) {
// bigQuery doesn't support tblproperties, so do nothing.
throw new UnsupportedOperationException("Not support getLastCommitTimeSynced yet.");
}
@Override
public void updateLastCommitTimeSynced(final String tableName) {
// bigQuery doesn't support tblproperties, so do nothing.
throw new UnsupportedOperationException("No support for updateLastCommitTimeSynced yet.");
}
@Override
public Option<String> getLastReplicatedTime(String tableName) {
// bigQuery doesn't support tblproperties, so do nothing.
throw new UnsupportedOperationException("Not support getLastReplicatedTime yet.");
}
@Override
public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
// bigQuery doesn't support tblproperties, so do nothing.
throw new UnsupportedOperationException("No support for updateLastReplicatedTimeStamp yet.");
}
@Override
public void deleteLastReplicatedTimeStamp(String tableName) {
// bigQuery doesn't support tblproperties, so do nothing.
throw new UnsupportedOperationException("No support for deleteLastReplicatedTimeStamp yet.");
}
@Override
public void updatePartitionsToTable(final String tableName, final List<String> changedPartitions) {
// bigQuery updates the partitions automatically, so do nothing.
throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
}
@Override
public void close() {
// bigQuery has no connection close method, so do nothing.
}
}

View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.gcp.bigquery;
public class HoodieBigQuerySyncException extends RuntimeException {
public HoodieBigQuerySyncException() {
super();
}
public HoodieBigQuerySyncException(String message) {
super(message);
}
public HoodieBigQuerySyncException(String message, Throwable t) {
super(message, t);
}
public HoodieBigQuerySyncException(Throwable t) {
super(t);
}
protected static String format(String message, Object... args) {
return String.format(String.valueOf(message), args);
}
}

View File

@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.gcp.bigquery;
import org.apache.hudi.common.config.TypedProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestBigQuerySyncConfig {
BigQuerySyncConfig syncConfig;
@BeforeEach
void setUp() {
syncConfig = new BigQuerySyncConfig();
syncConfig.projectId = "fooproject";
syncConfig.datasetName = "foodataset";
syncConfig.datasetLocation = "US";
syncConfig.tableName = "footable";
syncConfig.sourceUri = "gs://test-bucket/dwh/table_name/dt=*";
syncConfig.sourceUriPrefix = "gs://test-bucket/dwh/table_name/";
syncConfig.basePath = "gs://test-bucket/dwh/table_name";
syncConfig.partitionFields = Arrays.asList("a", "b");
syncConfig.useFileListingFromMetadata = true;
syncConfig.assumeDatePartitioning = true;
syncConfig.help = true;
}
@Test
public void testCopy() {
BigQuerySyncConfig copied = BigQuerySyncConfig.copy(syncConfig);
assertEquals(copied.partitionFields, syncConfig.partitionFields);
assertEquals(copied.basePath, syncConfig.basePath);
assertEquals(copied.projectId, syncConfig.projectId);
assertEquals(copied.datasetName, syncConfig.datasetName);
assertEquals(copied.datasetLocation, syncConfig.datasetLocation);
assertEquals(copied.tableName, syncConfig.tableName);
assertEquals(copied.sourceUri, syncConfig.sourceUri);
assertEquals(copied.sourceUriPrefix, syncConfig.sourceUriPrefix);
assertEquals(copied.useFileListingFromMetadata, syncConfig.useFileListingFromMetadata);
assertEquals(copied.assumeDatePartitioning, syncConfig.assumeDatePartitioning);
assertEquals(copied.help, syncConfig.help);
}
@Test
public void testToProps() {
TypedProperties props = syncConfig.toProps();
assertEquals("fooproject", props.getString(BIGQUERY_SYNC_PROJECT_ID));
assertEquals("foodataset", props.getString(BIGQUERY_SYNC_DATASET_NAME));
assertEquals("US", props.getString(BIGQUERY_SYNC_DATASET_LOCATION));
assertEquals("footable", props.getString(BIGQUERY_SYNC_TABLE_NAME));
assertEquals("gs://test-bucket/dwh/table_name/dt=*", props.getString(BIGQUERY_SYNC_SOURCE_URI));
assertEquals("gs://test-bucket/dwh/table_name/", props.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX));
assertEquals("gs://test-bucket/dwh/table_name", props.getString(BIGQUERY_SYNC_SYNC_BASE_PATH));
assertEquals("a,b", props.getString(BIGQUERY_SYNC_PARTITION_FIELDS));
assertEquals("true", props.getString(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA));
assertEquals("true", props.getString(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING));
}
@Test
public void fromProps() {
TypedProperties props = new TypedProperties();
props.put(BIGQUERY_SYNC_PROJECT_ID, "fooproject");
props.put(BIGQUERY_SYNC_DATASET_NAME, "foodataset");
props.put(BIGQUERY_SYNC_DATASET_LOCATION, "US");
props.put(BIGQUERY_SYNC_TABLE_NAME, "footable");
props.put(BIGQUERY_SYNC_SOURCE_URI, "gs://test-bucket/dwh/table_name/dt=*");
props.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, "gs://test-bucket/dwh/table_name/");
props.put(BIGQUERY_SYNC_SYNC_BASE_PATH, "gs://test-bucket/dwh/table_name");
props.put(BIGQUERY_SYNC_PARTITION_FIELDS, "a,b");
props.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, true);
props.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, true);
BigQuerySyncConfig cfg = BigQuerySyncConfig.fromProps(props);
assertEquals(syncConfig.projectId, cfg.projectId);
assertEquals(syncConfig.datasetName, cfg.datasetName);
assertEquals(syncConfig.datasetLocation, cfg.datasetLocation);
assertEquals(syncConfig.tableName, cfg.tableName);
assertEquals(syncConfig.sourceUri, cfg.sourceUri);
assertEquals(syncConfig.sourceUriPrefix, cfg.sourceUriPrefix);
assertEquals(syncConfig.basePath, cfg.basePath);
assertEquals(syncConfig.partitionFields, cfg.partitionFields);
assertEquals(syncConfig.useFileListingFromMetadata, cfg.useFileListingFromMetadata);
assertEquals(syncConfig.assumeDatePartitioning, cfg.assumeDatePartitioning);
}
}

View File

@@ -0,0 +1,29 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=ERROR, CONSOLE
log4j.logger.org.apache.hudi=ERROR
# CONSOLE is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
# CONSOLE uses PatternLayout.
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL

View File

@@ -0,0 +1,29 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, CONSOLE
log4j.logger.org.apache.hudi=INFO
# A1 is set to be a ConsoleAppender.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL

View File

@@ -1,141 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
public class ManifestFileUtil {
private static final Logger LOG = LogManager.getLogger(ManifestFileUtil.class);
private static final String MANIFEST_FOLDER_NAME = "manifest";
private static final String MANIFEST_FILE_NAME = "latest-snapshot.csv";
private static final String DELIMITER = "\n";
private final SerializableConfiguration hadoopConf;
private final String basePath;
private final transient HoodieLocalEngineContext engineContext;
private final HoodieTableMetaClient metaClient;
private ManifestFileUtil(Configuration conf, String basePath) {
this.hadoopConf = new SerializableConfiguration(conf);
this.basePath = basePath;
this.engineContext = new HoodieLocalEngineContext(conf);
this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
}
public synchronized void writeManifestFile() {
try {
Path manifestFilePath = new Path(getManifestFolder(), MANIFEST_FILE_NAME);
Option<byte[]> content = Option.of(fetchLatestBaseFilesForAllPartitions().collect(Collectors.joining(DELIMITER)).getBytes());
FileIOUtils.createFileInPath(metaClient.getFs(), manifestFilePath, content);
} catch (Exception e) {
String msg = "Error writing manifest file";
LOG.error(msg, e);
throw new HoodieException(msg, e);
}
}
public Stream<String> fetchLatestBaseFilesForAllPartitions() {
try {
HoodieMetadataConfig metadataConfig = buildMetadataConfig(hadoopConf.get());
List<String> partitions = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath);
return partitions.parallelStream().flatMap(p -> {
HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf.get());
HoodieMetadataFileSystemView fsView =
new HoodieMetadataFileSystemView(engContext, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metadataConfig);
return fsView.getLatestBaseFiles(p).map(HoodieBaseFile::getFileName);
});
} catch (Exception e) {
String msg = "Error checking path :" + basePath;
LOG.error(msg, e);
throw new HoodieException(msg, e);
}
}
private static HoodieMetadataConfig buildMetadataConfig(Configuration conf) {
return HoodieMetadataConfig.newBuilder()
.enable(conf.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS))
.build();
}
/**
* @return Manifest File folder
*/
public Path getManifestFolder() {
return new Path(metaClient.getMetaPath(), MANIFEST_FOLDER_NAME);
}
/**
* @return Manifest File Full Path
*/
public Path getManifestFilePath() {
return new Path(getManifestFolder(), MANIFEST_FILE_NAME);
}
public static Builder builder() {
return new Builder();
}
/**
* Builder for {@link ManifestFileUtil}.
*/
public static class Builder {
private Configuration conf;
private String basePath;
public Builder setConf(Configuration conf) {
this.conf = conf;
return this;
}
public Builder setBasePath(String basePath) {
this.basePath = basePath;
return this;
}
public ManifestFileUtil build() {
ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init ManifestFileGenerator");
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init ManifestFileGenerator");
return new ManifestFileUtil(conf, basePath);
}
}
}

View File

@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ManifestFileWriter {
public static final String MANIFEST_FOLDER_NAME = "manifest";
public static final String MANIFEST_FILE_NAME = "latest-snapshot.csv";
private static final Logger LOG = LogManager.getLogger(ManifestFileWriter.class);
private final HoodieTableMetaClient metaClient;
private final boolean useFileListingFromMetadata;
private final boolean assumeDatePartitioning;
private ManifestFileWriter(Configuration hadoopConf, String basePath, boolean useFileListingFromMetadata, boolean assumeDatePartitioning) {
this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
this.useFileListingFromMetadata = useFileListingFromMetadata;
this.assumeDatePartitioning = assumeDatePartitioning;
}
/**
* Write all the latest base file names to the manifest file.
*/
public synchronized void writeManifestFile() {
try {
List<String> baseFiles = fetchLatestBaseFilesForAllPartitions(metaClient, useFileListingFromMetadata, assumeDatePartitioning)
.collect(Collectors.toList());
if (baseFiles.isEmpty()) {
LOG.warn("No base file to generate manifest file.");
return;
} else {
LOG.info("Writing base file names to manifest file: " + baseFiles.size());
}
final Path manifestFilePath = getManifestFilePath();
try (FSDataOutputStream outputStream = metaClient.getFs().create(manifestFilePath, true);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) {
for (String f : baseFiles) {
writer.write(f);
writer.write("\n");
}
}
} catch (Exception e) {
throw new HoodieException("Error in writing manifest file.", e);
}
}
public static Stream<String> fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient,
boolean useFileListingFromMetadata, boolean assumeDatePartitioning) {
try {
List<String> partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()),
metaClient.getBasePath(), useFileListingFromMetadata, assumeDatePartitioning);
LOG.info("Retrieve all partitions: " + partitions.size());
return partitions.parallelStream().flatMap(p -> {
Configuration hadoopConf = metaClient.getHadoopConf();
HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf);
HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build());
return fsView.getLatestBaseFiles(p).map(HoodieBaseFile::getFileName);
});
} catch (Exception e) {
throw new HoodieException("Error in fetching latest base files.", e);
}
}
public Path getManifestFolder() {
return new Path(metaClient.getMetaPath(), MANIFEST_FOLDER_NAME);
}
public Path getManifestFilePath() {
return new Path(getManifestFolder(), MANIFEST_FILE_NAME);
}
public String getManifestSourceUri() {
return new Path(getManifestFolder(), "*").toUri().toString();
}
public static Builder builder() {
return new Builder();
}
/**
* Builder for {@link ManifestFileWriter}.
*/
public static class Builder {
private Configuration conf;
private String basePath;
private boolean useFileListingFromMetadata;
private boolean assumeDatePartitioning;
public Builder setConf(Configuration conf) {
this.conf = conf;
return this;
}
public Builder setBasePath(String basePath) {
this.basePath = basePath;
return this;
}
public Builder setUseFileListingFromMetadata(boolean useFileListingFromMetadata) {
this.useFileListingFromMetadata = useFileListingFromMetadata;
return this;
}
public Builder setAssumeDatePartitioning(boolean assumeDatePartitioning) {
this.assumeDatePartitioning = assumeDatePartitioning;
return this;
}
public ManifestFileWriter build() {
ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init ManifestFileGenerator");
ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init ManifestFileGenerator");
return new ManifestFileWriter(conf, basePath, useFileListingFromMetadata, assumeDatePartitioning);
}
}
}

View File

@@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.fs.FSUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
public class TestManifestFileUtil extends HoodieCommonTestHarness {
private static final List<String> MULTI_LEVEL_PARTITIONS = Arrays.asList("2019/01", "2020/01", "2021/01");
private static HoodieTestTable hoodieTestTable;
@BeforeEach
public void setUp() throws IOException {
initMetaClient();
hoodieTestTable = HoodieTestTable.of(metaClient);
}
@Test
public void testMultiLevelPartitionedTable() throws Exception {
// Generate 10 files under each partition
createTestDataForPartitionedTable(10);
ManifestFileUtil manifestFileUtil = ManifestFileUtil.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
Assertions.assertEquals(30, manifestFileUtil.fetchLatestBaseFilesForAllPartitions().count());
}
@Test
public void testCreateManifestFile() throws Exception {
// Generate 10 files under each partition
createTestDataForPartitionedTable(10);
ManifestFileUtil manifestFileUtil = ManifestFileUtil.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
manifestFileUtil.writeManifestFile();
Assertions.assertTrue(FSUtils.getFileSize(metaClient.getFs(), manifestFileUtil.getManifestFilePath()) > 0);
}
public void createTestDataForPartitionedTable(int numOfFiles) throws Exception {
String instant = "100";
hoodieTestTable = hoodieTestTable.addCommit(instant);
// Generate 10 files under each partition
for (String partition : MULTI_LEVEL_PARTITIONS) {
hoodieTestTable = hoodieTestTable.withPartitionMetaFiles(partition)
.withBaseFilesInPartition(partition, IntStream.range(0, numOfFiles).toArray());
}
}
}

View File

@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.stream.IntStream;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
import static org.apache.hudi.sync.common.util.ManifestFileWriter.fetchLatestBaseFilesForAllPartitions;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestManifestFileWriter extends HoodieCommonTestHarness {
@BeforeEach
public void setUp() throws IOException {
initMetaClient();
}
@Test
public void testMultiLevelPartitionedTable() throws Exception {
// Generate 10 files under each partition
createTestDataForPartitionedTable(metaClient, 10);
ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
assertEquals(30, fetchLatestBaseFilesForAllPartitions(metaClient, false, false).count());
}
@Test
public void testCreateManifestFile() throws Exception {
// Generate 10 files under each partition
createTestDataForPartitionedTable(metaClient, 3);
ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
manifestFileWriter.writeManifestFile();
Path manifestFilePath = manifestFileWriter.getManifestFilePath();
try (InputStream is = metaClient.getFs().open(manifestFilePath)) {
assertEquals(9, FileIOUtils.readAsUTFStringLines(is).size(), "there should be 9 base files in total; 3 per partition.");
}
}
private static void createTestDataForPartitionedTable(HoodieTableMetaClient metaClient, int numFilesPerPartition) throws Exception {
final String instantTime = "100";
HoodieTestTable testTable = HoodieTestTable.of(metaClient).addCommit(instantTime);
for (String partition : DEFAULT_PARTITION_PATHS) {
testTable.withPartitionMetaFiles(partition)
.withBaseFilesInPartition(partition, IntStream.range(0, numFilesPerPartition).toArray());
}
}
@Test
public void getManifestSourceUri() {
ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
String sourceUri = manifestFileWriter.getManifestSourceUri();
assertEquals(new Path(basePath, ".hoodie/manifest/*").toUri().toString(), sourceUri);
}
}

View File

@@ -0,0 +1,178 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.11.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hudi-gcp-bundle</artifactId>
<packaging>jar</packaging>
<properties>
<checkstyle.skip>true</checkstyle.skip>
<main.basedir>${project.parent.basedir}</main.basedir>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>25.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hudi.gcp.bigquery.BigQuerySyncTool</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createSourcesJar>${shadeSources}</createSourcesJar>
<dependencyReducedPomLocation>${project.build.directory}/dependency-reduced-pom.xml
</dependencyReducedPomLocation>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
<addHeader>true</addHeader>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>target/classes/META-INF/LICENSE</file>
</transformer>
</transformers>
<artifactSet>
<includes>
<include>org.apache.hudi:hudi-common</include>
<include>org.apache.hudi:hudi-hadoop-mr</include>
<include>org.apache.hudi:hudi-sync-common</include>
<include>org.apache.hudi:hudi-gcp</include>
<include>com.google.cloud:google-cloud-bigquery</include>
<include>com.beust:jcommander</include>
</includes>
</artifactSet>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/services/javax.*</exclude>
</excludes>
</filter>
</filters>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/test/resources</directory>
</resource>
</resources>
</build>
<dependencies>
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hadoop-mr-bundle</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-gcp</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${parquet.version}</version>
<scope>compile</scope>
</dependency>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.gcp.bigquery.bundle;
import org.apache.hudi.common.util.ReflectionUtils;
/**
* A simple main class to dump all classes loaded in current classpath.
*
* This is a workaround for generating sources and javadoc jars for packaging modules. The maven plugins for generating
* javadoc and sources plugins do not generate corresponding jars if there are no source files.
*
* This class does not have anything to do with Hudi but is there to keep mvn javadocs/source plugin happy.
*/
public class Main {
public static void main(String[] args) {
ReflectionUtils.getTopLevelClassesInClasspath(Main.class).forEach(System.out::println);
}
}

View File

@@ -39,6 +39,7 @@
<module>hudi-cli</module>
<module>hudi-client</module>
<module>hudi-aws</module>
<module>hudi-gcp</module>
<module>hudi-hadoop-mr</module>
<module>hudi-spark-datasource</module>
<module>hudi-timeline-service</module>
@@ -47,6 +48,7 @@
<module>packaging/hudi-hadoop-mr-bundle</module>
<module>packaging/hudi-datahub-sync-bundle</module>
<module>packaging/hudi-hive-sync-bundle</module>
<module>packaging/hudi-gcp-bundle</module>
<module>packaging/hudi-spark-bundle</module>
<module>packaging/hudi-presto-bundle</module>
<module>packaging/hudi-utilities-bundle</module>