From 20964df7707b0cdb460cecfaa724537244adb0a4 Mon Sep 17 00:00:00 2001 From: Vinoth Govindarajan Date: Sat, 2 Apr 2022 13:18:06 -0700 Subject: [PATCH] [HUDI-3357] MVP implementation of BigQuerySyncTool (#5125) Co-authored-by: Raymond Xu <2701446+xushiyan@users.noreply.github.com> --- hudi-gcp/pom.xml | 117 +++++++++ hudi-gcp/src/assembly/src.xml | 46 ++++ .../hudi/gcp/bigquery/BigQuerySyncConfig.java | 131 ++++++++++ .../hudi/gcp/bigquery/BigQuerySyncTool.java | 119 +++++++++ .../bigquery/HoodieBigQuerySyncClient.java | 243 ++++++++++++++++++ .../bigquery/HoodieBigQuerySyncException.java | 43 ++++ .../gcp/bigquery/TestBigQuerySyncConfig.java | 118 +++++++++ .../resources/log4j-surefire-quiet.properties | 29 +++ .../test/resources/log4j-surefire.properties | 29 +++ .../sync/common/util/ManifestFileUtil.java | 141 ---------- .../sync/common/util/ManifestFileWriter.java | 156 +++++++++++ .../common/util/TestManifestFileUtil.java | 73 ------ .../common/util/TestManifestFileWriter.java | 80 ++++++ packaging/hudi-gcp-bundle/pom.xml | 178 +++++++++++++ .../apache/hudi/gcp/bigquery/bundle/Main.java | 37 +++ pom.xml | 2 + 16 files changed, 1328 insertions(+), 214 deletions(-) create mode 100644 hudi-gcp/pom.xml create mode 100644 hudi-gcp/src/assembly/src.xml create mode 100644 hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java create mode 100644 hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java create mode 100644 hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java create mode 100644 hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncException.java create mode 100644 hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java create mode 100644 hudi-gcp/src/test/resources/log4j-surefire-quiet.properties create mode 100644 hudi-gcp/src/test/resources/log4j-surefire.properties delete mode 100644 hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileUtil.java create mode 100644 hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java delete mode 100644 hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileUtil.java create mode 100644 hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java create mode 100644 packaging/hudi-gcp-bundle/pom.xml create mode 100644 packaging/hudi-gcp-bundle/src/main/java/org/apache/hudi/gcp/bigquery/bundle/Main.java diff --git a/hudi-gcp/pom.xml b/hudi-gcp/pom.xml new file mode 100644 index 000000000..aedca4f38 --- /dev/null +++ b/hudi-gcp/pom.xml @@ -0,0 +1,117 @@ + + + + + hudi + org.apache.hudi + 0.11.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + + hudi-gcp + jar + + + + + + com.google.cloud + libraries-bom + 25.1.0 + pom + import + + + + + + + + org.apache.hudi + hudi-common + ${project.version} + + + org.apache.hudi + hudi-sync-common + ${project.version} + + + + com.google.cloud + google-cloud-bigquery + + + + + log4j + log4j + + + + org.apache.parquet + parquet-avro + + + + + org.apache.hadoop + hadoop-common + + + + org.junit.jupiter + junit-jupiter-api + test + + + + + + + src/main/resources + + + + + org.apache.rat + apache-rat-plugin + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + + test-jar + + + + + + org.jacoco + jacoco-maven-plugin + + + + diff --git a/hudi-gcp/src/assembly/src.xml b/hudi-gcp/src/assembly/src.xml new file mode 100644 index 000000000..646e94c1a --- /dev/null +++ b/hudi-gcp/src/assembly/src.xml @@ -0,0 +1,46 @@ + + + + jar-with-dependencies + + jar + + + false + + + + / + true + runtime + + junit:junit + com.google.code.findbugs:* + org.apache.hbase:* + + + + + true + provided + + + diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java new file mode 100644 index 000000000..6aa9bc0b5 --- /dev/null +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +import org.apache.hudi.common.config.TypedProperties; + +import com.beust.jcommander.Parameter; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Configs needed to sync data into BigQuery. + */ +public class BigQuerySyncConfig implements Serializable { + + public static String BIGQUERY_SYNC_PROJECT_ID = "hoodie.gcp.bigquery.sync.project_id"; + public static String BIGQUERY_SYNC_DATASET_NAME = "hoodie.gcp.bigquery.sync.dataset_name"; + public static String BIGQUERY_SYNC_DATASET_LOCATION = "hoodie.gcp.bigquery.sync.dataset_location"; + public static String BIGQUERY_SYNC_TABLE_NAME = "hoodie.gcp.bigquery.sync.table_name"; + public static String BIGQUERY_SYNC_SOURCE_URI = "hoodie.gcp.bigquery.sync.source_uri"; + public static String BIGQUERY_SYNC_SOURCE_URI_PREFIX = "hoodie.gcp.bigquery.sync.source_uri_prefix"; + public static String BIGQUERY_SYNC_SYNC_BASE_PATH = "hoodie.gcp.bigquery.sync.base_path"; + public static String BIGQUERY_SYNC_PARTITION_FIELDS = "hoodie.gcp.bigquery.sync.partition_fields"; + public static String BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA = "hoodie.gcp.bigquery.sync.use_file_listing_from_metadata"; + public static String BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING = "hoodie.gcp.bigquery.sync.assume_date_partitioning"; + + @Parameter(names = {"--project-id"}, description = "name of the target project in BigQuery", required = true) + public String projectId; + @Parameter(names = {"--dataset-name"}, description = "name of the target dataset in BigQuery", required = true) + public String datasetName; + @Parameter(names = {"--dataset-location"}, description = "location of the target dataset in BigQuery", required = true) + public String datasetLocation; + @Parameter(names = {"--table-name"}, description = "name of the target table in BigQuery", required = true) + public String tableName; + @Parameter(names = {"--source-uri"}, description = "name of the source uri gcs path of the table", required = true) + public String sourceUri; + @Parameter(names = {"--source-uri-prefix"}, description = "name of the source uri gcs path prefix of the table", required = true) + public String sourceUriPrefix; + @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true) + public String basePath; + @Parameter(names = {"--partitioned-by"}, description = "Comma-delimited partition fields. Default to non-partitioned.") + public List partitionFields = new ArrayList<>(); + @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata") + public Boolean useFileListingFromMetadata = false; + @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this" + + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter") + public Boolean assumeDatePartitioning = false; + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + public static BigQuerySyncConfig copy(BigQuerySyncConfig cfg) { + BigQuerySyncConfig newConfig = new BigQuerySyncConfig(); + newConfig.projectId = cfg.projectId; + newConfig.datasetName = cfg.datasetName; + newConfig.datasetLocation = cfg.datasetLocation; + newConfig.tableName = cfg.tableName; + newConfig.sourceUri = cfg.sourceUri; + newConfig.sourceUriPrefix = cfg.sourceUriPrefix; + newConfig.basePath = cfg.basePath; + newConfig.partitionFields = cfg.partitionFields; + newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata; + newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning; + newConfig.help = cfg.help; + return newConfig; + } + + public TypedProperties toProps() { + TypedProperties properties = new TypedProperties(); + properties.put(BIGQUERY_SYNC_PROJECT_ID, projectId); + properties.put(BIGQUERY_SYNC_DATASET_NAME, datasetName); + properties.put(BIGQUERY_SYNC_DATASET_LOCATION, datasetLocation); + properties.put(BIGQUERY_SYNC_TABLE_NAME, tableName); + properties.put(BIGQUERY_SYNC_SOURCE_URI, sourceUri); + properties.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, sourceUriPrefix); + properties.put(BIGQUERY_SYNC_SYNC_BASE_PATH, basePath); + properties.put(BIGQUERY_SYNC_PARTITION_FIELDS, String.join(",", partitionFields)); + properties.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, useFileListingFromMetadata); + properties.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, assumeDatePartitioning); + return properties; + } + + public static BigQuerySyncConfig fromProps(TypedProperties props) { + BigQuerySyncConfig config = new BigQuerySyncConfig(); + config.projectId = props.getString(BIGQUERY_SYNC_PROJECT_ID); + config.datasetName = props.getString(BIGQUERY_SYNC_DATASET_NAME); + config.datasetLocation = props.getString(BIGQUERY_SYNC_DATASET_LOCATION); + config.tableName = props.getString(BIGQUERY_SYNC_TABLE_NAME); + config.sourceUri = props.getString(BIGQUERY_SYNC_SOURCE_URI); + config.sourceUriPrefix = props.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX); + config.basePath = props.getString(BIGQUERY_SYNC_SYNC_BASE_PATH); + config.partitionFields = props.getStringList(BIGQUERY_SYNC_PARTITION_FIELDS, ",", Collections.emptyList()); + config.useFileListingFromMetadata = props.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, false); + config.assumeDatePartitioning = props.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, false); + return config; + } + + @Override + public String toString() { + return "BigQuerySyncConfig{projectId='" + projectId + + "', datasetName='" + datasetName + + "', datasetLocation='" + datasetLocation + + "', tableName='" + tableName + + "', sourceUri='" + sourceUri + + "', sourceUriPrefix='" + sourceUriPrefix + + "', basePath='" + basePath + "'" + + ", partitionFields=" + partitionFields + + "', useFileListingFromMetadata='" + useFileListingFromMetadata + + "', assumeDataPartitioning='" + assumeDatePartitioning + + "', help=" + help + "}"; + } +} diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java new file mode 100644 index 000000000..0cb75eea8 --- /dev/null +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.sync.common.AbstractSyncTool; +import org.apache.hudi.sync.common.util.ManifestFileWriter; + +import com.beust.jcommander.JCommander; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Tool to sync a hoodie table with a big query table. Either use it as an api + * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args] + *

+ * This utility will get the schema from the latest commit and will sync big query table schema. + * + * @Experimental + */ +public class BigQuerySyncTool extends AbstractSyncTool { + + private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class); + + public final BigQuerySyncConfig cfg; + public final String manifestTableName; + public final String versionsTableName; + public final String snapshotViewName; + + public BigQuerySyncTool(TypedProperties properties, Configuration conf, FileSystem fs) { + super(properties, conf, fs); + cfg = BigQuerySyncConfig.fromProps(properties); + manifestTableName = cfg.tableName + "_manifest"; + versionsTableName = cfg.tableName + "_versions"; + snapshotViewName = cfg.tableName; + } + + @Override + public void syncHoodieTable() { + try (HoodieBigQuerySyncClient bqSyncClient = new HoodieBigQuerySyncClient(BigQuerySyncConfig.fromProps(props), fs)) { + switch (bqSyncClient.getTableType()) { + case COPY_ON_WRITE: + syncCoWTable(bqSyncClient); + break; + case MERGE_ON_READ: + default: + throw new UnsupportedOperationException(bqSyncClient.getTableType() + " table type is not supported yet."); + } + } catch (Exception e) { + throw new HoodieBigQuerySyncException("Got runtime exception when big query syncing " + cfg.tableName, e); + } + } + + private void syncCoWTable(HoodieBigQuerySyncClient bqSyncClient) { + ValidationUtils.checkState(bqSyncClient.getTableType() == HoodieTableType.COPY_ON_WRITE); + LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + bqSyncClient.getBasePath()); + + if (!bqSyncClient.datasetExists()) { + throw new HoodieBigQuerySyncException("Dataset not found: " + cfg); + } + + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder() + .setConf(conf) + .setBasePath(cfg.basePath) + .setUseFileListingFromMetadata(cfg.useFileListingFromMetadata) + .setAssumeDatePartitioning(cfg.assumeDatePartitioning) + .build(); + manifestFileWriter.writeManifestFile(); + + if (!bqSyncClient.tableExists(manifestTableName)) { + bqSyncClient.createManifestTable(manifestTableName, manifestFileWriter.getManifestSourceUri()); + LOG.info("Manifest table creation complete for " + manifestTableName); + } + if (!bqSyncClient.tableExists(versionsTableName)) { + bqSyncClient.createVersionsTable(versionsTableName, cfg.sourceUri, cfg.sourceUriPrefix, cfg.partitionFields); + LOG.info("Versions table creation complete for " + versionsTableName); + } + if (!bqSyncClient.tableExists(snapshotViewName)) { + bqSyncClient.createSnapshotView(snapshotViewName, versionsTableName, manifestTableName); + LOG.info("Snapshot view creation complete for " + snapshotViewName); + } + + // TODO: Implement automatic schema evolution when you add a new column. + LOG.info("Sync table complete for " + snapshotViewName); + } + + public static void main(String[] args) { + BigQuerySyncConfig cfg = new BigQuerySyncConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration()); + new BigQuerySyncTool(cfg.toProps(), fs.getConf(), fs).syncHoodieTable(); + } +} diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java new file mode 100644 index 000000000..4aafbf2e5 --- /dev/null +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.CsvOptions; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.ExternalTableDefinition; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.HivePartitioningOptions; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.ViewDefinition; +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient { + private static final Logger LOG = LogManager.getLogger(HoodieBigQuerySyncClient.class); + + private final BigQuerySyncConfig syncConfig; + private transient BigQuery bigquery; + + public HoodieBigQuerySyncClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) { + super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, + false, fs); + this.syncConfig = syncConfig; + this.createBigQueryConnection(); + } + + private void createBigQueryConnection() { + if (bigquery == null) { + try { + // Initialize client that will be used to send requests. This client only needs to be created + // once, and can be reused for multiple requests. + bigquery = BigQueryOptions.newBuilder().setLocation(syncConfig.datasetLocation).build().getService(); + LOG.info("Successfully established BigQuery connection."); + } catch (BigQueryException e) { + throw new HoodieBigQuerySyncException("Cannot create bigQuery connection ", e); + } + } + } + + @Override + public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass, + final String outputFormatClass, final String serdeClass, + final Map serdeProperties, final Map tableProperties) { + // bigQuery create table arguments are different, so do nothing. + } + + public void createManifestTable(String tableName, String sourceUri) { + try { + TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName); + CsvOptions csvOptions = CsvOptions.newBuilder() + .setFieldDelimiter(",") + .setAllowJaggedRows(false) + .setAllowQuotedNewLines(false) + .setSkipLeadingRows(0) + .build(); + Schema schema = Schema.of( + Field.of("filename", StandardSQLTypeName.STRING)); + + ExternalTableDefinition customTable = + ExternalTableDefinition.newBuilder(sourceUri, schema, csvOptions) + .setAutodetect(false) + .setIgnoreUnknownValues(false) + .setMaxBadRecords(0) + .build(); + bigquery.create(TableInfo.of(tableId, customTable)); + LOG.info("Manifest External table created."); + } catch (BigQueryException e) { + throw new HoodieBigQuerySyncException("Manifest External table was not created ", e); + } + } + + public void createVersionsTable(String tableName, String sourceUri, String sourceUriPrefix, List partitionFields) { + try { + ExternalTableDefinition customTable; + TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName); + + if (partitionFields.isEmpty()) { + customTable = + ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet()) + .setAutodetect(true) + .setIgnoreUnknownValues(true) + .setMaxBadRecords(0) + .build(); + } else { + // Configuring partitioning options for partitioned table. + HivePartitioningOptions hivePartitioningOptions = + HivePartitioningOptions.newBuilder() + .setMode("AUTO") + .setRequirePartitionFilter(false) + .setSourceUriPrefix(sourceUriPrefix) + .build(); + customTable = + ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet()) + .setAutodetect(true) + .setHivePartitioningOptions(hivePartitioningOptions) + .setIgnoreUnknownValues(true) + .setMaxBadRecords(0) + .build(); + } + + bigquery.create(TableInfo.of(tableId, customTable)); + LOG.info("External table created using hivepartitioningoptions"); + } catch (BigQueryException e) { + throw new HoodieBigQuerySyncException("External table was not created ", e); + } + } + + public void createSnapshotView(String viewName, String versionsTableName, String manifestTableName) { + try { + TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, viewName); + String query = + String.format( + "SELECT * FROM `%s.%s.%s` WHERE _hoodie_file_name IN " + + "(SELECT filename FROM `%s.%s.%s`)", + syncConfig.projectId, + syncConfig.datasetName, + versionsTableName, + syncConfig.projectId, + syncConfig.datasetName, + manifestTableName); + + ViewDefinition viewDefinition = + ViewDefinition.newBuilder(query).setUseLegacySql(false).build(); + + bigquery.create(TableInfo.of(tableId, viewDefinition)); + LOG.info("View created successfully"); + } catch (BigQueryException e) { + throw new HoodieBigQuerySyncException("View was not created ", e); + } + } + + @Override + public Map getTableSchema(String tableName) { + // TODO: Implement automatic schema evolution when you add a new column. + return Collections.emptyMap(); + } + + @Override + public void addPartitionsToTable(final String tableName, final List partitionsToAdd) { + // bigQuery discovers the new partitions automatically, so do nothing. + throw new UnsupportedOperationException("No support for addPartitionsToTable yet."); + } + + @Override + public void dropPartitionsToTable(final String tableName, final List partitionsToDrop) { + // bigQuery discovers the new partitions automatically, so do nothing. + throw new UnsupportedOperationException("No support for dropPartitionsToTable yet."); + } + + public boolean datasetExists() { + Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName)); + return dataset != null; + } + + @Override + public boolean doesTableExist(final String tableName) { + return tableExists(tableName); + } + + @Override + public boolean tableExists(String tableName) { + TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName); + Table table = bigquery.getTable(tableId, BigQuery.TableOption.fields()); + return table != null && table.exists(); + } + + @Override + public Option getLastCommitTimeSynced(final String tableName) { + // bigQuery doesn't support tblproperties, so do nothing. + throw new UnsupportedOperationException("Not support getLastCommitTimeSynced yet."); + } + + @Override + public void updateLastCommitTimeSynced(final String tableName) { + // bigQuery doesn't support tblproperties, so do nothing. + throw new UnsupportedOperationException("No support for updateLastCommitTimeSynced yet."); + } + + @Override + public Option getLastReplicatedTime(String tableName) { + // bigQuery doesn't support tblproperties, so do nothing. + throw new UnsupportedOperationException("Not support getLastReplicatedTime yet."); + } + + @Override + public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) { + // bigQuery doesn't support tblproperties, so do nothing. + throw new UnsupportedOperationException("No support for updateLastReplicatedTimeStamp yet."); + } + + @Override + public void deleteLastReplicatedTimeStamp(String tableName) { + // bigQuery doesn't support tblproperties, so do nothing. + throw new UnsupportedOperationException("No support for deleteLastReplicatedTimeStamp yet."); + } + + @Override + public void updatePartitionsToTable(final String tableName, final List changedPartitions) { + // bigQuery updates the partitions automatically, so do nothing. + throw new UnsupportedOperationException("No support for updatePartitionsToTable yet."); + } + + @Override + public void close() { + // bigQuery has no connection close method, so do nothing. + } +} diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncException.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncException.java new file mode 100644 index 000000000..4d30b2faa --- /dev/null +++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +public class HoodieBigQuerySyncException extends RuntimeException { + + public HoodieBigQuerySyncException() { + super(); + } + + public HoodieBigQuerySyncException(String message) { + super(message); + } + + public HoodieBigQuerySyncException(String message, Throwable t) { + super(message, t); + } + + public HoodieBigQuerySyncException(Throwable t) { + super(t); + } + + protected static String format(String message, Object... args) { + return String.format(String.valueOf(message), args); + } +} diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java new file mode 100644 index 000000000..8b3250ccd --- /dev/null +++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery; + +import org.apache.hudi.common.config.TypedProperties; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME; +import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestBigQuerySyncConfig { + + BigQuerySyncConfig syncConfig; + + @BeforeEach + void setUp() { + syncConfig = new BigQuerySyncConfig(); + syncConfig.projectId = "fooproject"; + syncConfig.datasetName = "foodataset"; + syncConfig.datasetLocation = "US"; + syncConfig.tableName = "footable"; + syncConfig.sourceUri = "gs://test-bucket/dwh/table_name/dt=*"; + syncConfig.sourceUriPrefix = "gs://test-bucket/dwh/table_name/"; + syncConfig.basePath = "gs://test-bucket/dwh/table_name"; + syncConfig.partitionFields = Arrays.asList("a", "b"); + syncConfig.useFileListingFromMetadata = true; + syncConfig.assumeDatePartitioning = true; + syncConfig.help = true; + } + + @Test + public void testCopy() { + BigQuerySyncConfig copied = BigQuerySyncConfig.copy(syncConfig); + assertEquals(copied.partitionFields, syncConfig.partitionFields); + assertEquals(copied.basePath, syncConfig.basePath); + assertEquals(copied.projectId, syncConfig.projectId); + assertEquals(copied.datasetName, syncConfig.datasetName); + assertEquals(copied.datasetLocation, syncConfig.datasetLocation); + assertEquals(copied.tableName, syncConfig.tableName); + assertEquals(copied.sourceUri, syncConfig.sourceUri); + assertEquals(copied.sourceUriPrefix, syncConfig.sourceUriPrefix); + assertEquals(copied.useFileListingFromMetadata, syncConfig.useFileListingFromMetadata); + assertEquals(copied.assumeDatePartitioning, syncConfig.assumeDatePartitioning); + assertEquals(copied.help, syncConfig.help); + } + + @Test + public void testToProps() { + TypedProperties props = syncConfig.toProps(); + assertEquals("fooproject", props.getString(BIGQUERY_SYNC_PROJECT_ID)); + assertEquals("foodataset", props.getString(BIGQUERY_SYNC_DATASET_NAME)); + assertEquals("US", props.getString(BIGQUERY_SYNC_DATASET_LOCATION)); + assertEquals("footable", props.getString(BIGQUERY_SYNC_TABLE_NAME)); + assertEquals("gs://test-bucket/dwh/table_name/dt=*", props.getString(BIGQUERY_SYNC_SOURCE_URI)); + assertEquals("gs://test-bucket/dwh/table_name/", props.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX)); + assertEquals("gs://test-bucket/dwh/table_name", props.getString(BIGQUERY_SYNC_SYNC_BASE_PATH)); + assertEquals("a,b", props.getString(BIGQUERY_SYNC_PARTITION_FIELDS)); + assertEquals("true", props.getString(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA)); + assertEquals("true", props.getString(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING)); + } + + @Test + public void fromProps() { + TypedProperties props = new TypedProperties(); + props.put(BIGQUERY_SYNC_PROJECT_ID, "fooproject"); + props.put(BIGQUERY_SYNC_DATASET_NAME, "foodataset"); + props.put(BIGQUERY_SYNC_DATASET_LOCATION, "US"); + props.put(BIGQUERY_SYNC_TABLE_NAME, "footable"); + props.put(BIGQUERY_SYNC_SOURCE_URI, "gs://test-bucket/dwh/table_name/dt=*"); + props.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, "gs://test-bucket/dwh/table_name/"); + props.put(BIGQUERY_SYNC_SYNC_BASE_PATH, "gs://test-bucket/dwh/table_name"); + props.put(BIGQUERY_SYNC_PARTITION_FIELDS, "a,b"); + props.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, true); + props.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, true); + BigQuerySyncConfig cfg = BigQuerySyncConfig.fromProps(props); + + assertEquals(syncConfig.projectId, cfg.projectId); + assertEquals(syncConfig.datasetName, cfg.datasetName); + assertEquals(syncConfig.datasetLocation, cfg.datasetLocation); + assertEquals(syncConfig.tableName, cfg.tableName); + assertEquals(syncConfig.sourceUri, cfg.sourceUri); + assertEquals(syncConfig.sourceUriPrefix, cfg.sourceUriPrefix); + assertEquals(syncConfig.basePath, cfg.basePath); + assertEquals(syncConfig.partitionFields, cfg.partitionFields); + assertEquals(syncConfig.useFileListingFromMetadata, cfg.useFileListingFromMetadata); + assertEquals(syncConfig.assumeDatePartitioning, cfg.assumeDatePartitioning); + } +} diff --git a/hudi-gcp/src/test/resources/log4j-surefire-quiet.properties b/hudi-gcp/src/test/resources/log4j-surefire-quiet.properties new file mode 100644 index 000000000..78d6cfe84 --- /dev/null +++ b/hudi-gcp/src/test/resources/log4j-surefire-quiet.properties @@ -0,0 +1,29 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=ERROR, CONSOLE +log4j.logger.org.apache.hudi=ERROR + +# CONSOLE is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# CONSOLE uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-gcp/src/test/resources/log4j-surefire.properties b/hudi-gcp/src/test/resources/log4j-surefire.properties new file mode 100644 index 000000000..7914f0a78 --- /dev/null +++ b/hudi-gcp/src/test/resources/log4j-surefire.properties @@ -0,0 +1,29 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache.hudi=INFO + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileUtil.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileUtil.java deleted file mode 100644 index 090fdd5dd..000000000 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileUtil.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sync.common.util; - -import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.engine.HoodieLocalEngineContext; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.FileIOUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.metadata.HoodieMetadataFileSystemView; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; -import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE; - -public class ManifestFileUtil { - private static final Logger LOG = LogManager.getLogger(ManifestFileUtil.class); - private static final String MANIFEST_FOLDER_NAME = "manifest"; - private static final String MANIFEST_FILE_NAME = "latest-snapshot.csv"; - private static final String DELIMITER = "\n"; - private final SerializableConfiguration hadoopConf; - private final String basePath; - private final transient HoodieLocalEngineContext engineContext; - private final HoodieTableMetaClient metaClient; - - private ManifestFileUtil(Configuration conf, String basePath) { - this.hadoopConf = new SerializableConfiguration(conf); - this.basePath = basePath; - this.engineContext = new HoodieLocalEngineContext(conf); - this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); - } - - public synchronized void writeManifestFile() { - try { - Path manifestFilePath = new Path(getManifestFolder(), MANIFEST_FILE_NAME); - Option content = Option.of(fetchLatestBaseFilesForAllPartitions().collect(Collectors.joining(DELIMITER)).getBytes()); - FileIOUtils.createFileInPath(metaClient.getFs(), manifestFilePath, content); - } catch (Exception e) { - String msg = "Error writing manifest file"; - LOG.error(msg, e); - throw new HoodieException(msg, e); - } - } - - public Stream fetchLatestBaseFilesForAllPartitions() { - try { - HoodieMetadataConfig metadataConfig = buildMetadataConfig(hadoopConf.get()); - - List partitions = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath); - - return partitions.parallelStream().flatMap(p -> { - HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf.get()); - HoodieMetadataFileSystemView fsView = - new HoodieMetadataFileSystemView(engContext, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metadataConfig); - return fsView.getLatestBaseFiles(p).map(HoodieBaseFile::getFileName); - }); - } catch (Exception e) { - String msg = "Error checking path :" + basePath; - LOG.error(msg, e); - throw new HoodieException(msg, e); - } - } - - private static HoodieMetadataConfig buildMetadataConfig(Configuration conf) { - return HoodieMetadataConfig.newBuilder() - .enable(conf.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS)) - .build(); - } - - /** - * @return Manifest File folder - */ - public Path getManifestFolder() { - return new Path(metaClient.getMetaPath(), MANIFEST_FOLDER_NAME); - } - - /** - * @return Manifest File Full Path - */ - public Path getManifestFilePath() { - return new Path(getManifestFolder(), MANIFEST_FILE_NAME); - } - - public static Builder builder() { - return new Builder(); - } - - /** - * Builder for {@link ManifestFileUtil}. - */ - public static class Builder { - - private Configuration conf; - private String basePath; - - public Builder setConf(Configuration conf) { - this.conf = conf; - return this; - } - - public Builder setBasePath(String basePath) { - this.basePath = basePath; - return this; - } - - public ManifestFileUtil build() { - ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init ManifestFileGenerator"); - ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init ManifestFileGenerator"); - return new ManifestFileUtil(conf, basePath); - } - } -} diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java new file mode 100644 index 000000000..3ac238c89 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common.util; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metadata.HoodieMetadataFileSystemView; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ManifestFileWriter { + + public static final String MANIFEST_FOLDER_NAME = "manifest"; + public static final String MANIFEST_FILE_NAME = "latest-snapshot.csv"; + private static final Logger LOG = LogManager.getLogger(ManifestFileWriter.class); + + private final HoodieTableMetaClient metaClient; + private final boolean useFileListingFromMetadata; + private final boolean assumeDatePartitioning; + + private ManifestFileWriter(Configuration hadoopConf, String basePath, boolean useFileListingFromMetadata, boolean assumeDatePartitioning) { + this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); + this.useFileListingFromMetadata = useFileListingFromMetadata; + this.assumeDatePartitioning = assumeDatePartitioning; + } + + /** + * Write all the latest base file names to the manifest file. + */ + public synchronized void writeManifestFile() { + try { + List baseFiles = fetchLatestBaseFilesForAllPartitions(metaClient, useFileListingFromMetadata, assumeDatePartitioning) + .collect(Collectors.toList()); + if (baseFiles.isEmpty()) { + LOG.warn("No base file to generate manifest file."); + return; + } else { + LOG.info("Writing base file names to manifest file: " + baseFiles.size()); + } + final Path manifestFilePath = getManifestFilePath(); + try (FSDataOutputStream outputStream = metaClient.getFs().create(manifestFilePath, true); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) { + for (String f : baseFiles) { + writer.write(f); + writer.write("\n"); + } + } + } catch (Exception e) { + throw new HoodieException("Error in writing manifest file.", e); + } + } + + public static Stream fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient, + boolean useFileListingFromMetadata, boolean assumeDatePartitioning) { + try { + List partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()), + metaClient.getBasePath(), useFileListingFromMetadata, assumeDatePartitioning); + LOG.info("Retrieve all partitions: " + partitions.size()); + return partitions.parallelStream().flatMap(p -> { + Configuration hadoopConf = metaClient.getHadoopConf(); + HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf); + HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient, + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), + HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build()); + return fsView.getLatestBaseFiles(p).map(HoodieBaseFile::getFileName); + }); + } catch (Exception e) { + throw new HoodieException("Error in fetching latest base files.", e); + } + } + + public Path getManifestFolder() { + return new Path(metaClient.getMetaPath(), MANIFEST_FOLDER_NAME); + } + + public Path getManifestFilePath() { + return new Path(getManifestFolder(), MANIFEST_FILE_NAME); + } + + public String getManifestSourceUri() { + return new Path(getManifestFolder(), "*").toUri().toString(); + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link ManifestFileWriter}. + */ + public static class Builder { + + private Configuration conf; + private String basePath; + private boolean useFileListingFromMetadata; + private boolean assumeDatePartitioning; + + public Builder setConf(Configuration conf) { + this.conf = conf; + return this; + } + + public Builder setBasePath(String basePath) { + this.basePath = basePath; + return this; + } + + public Builder setUseFileListingFromMetadata(boolean useFileListingFromMetadata) { + this.useFileListingFromMetadata = useFileListingFromMetadata; + return this; + } + + public Builder setAssumeDatePartitioning(boolean assumeDatePartitioning) { + this.assumeDatePartitioning = assumeDatePartitioning; + return this; + } + + public ManifestFileWriter build() { + ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init ManifestFileGenerator"); + ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init ManifestFileGenerator"); + return new ManifestFileWriter(conf, basePath, useFileListingFromMetadata, assumeDatePartitioning); + } + } +} diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileUtil.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileUtil.java deleted file mode 100644 index f383cc996..000000000 --- a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileUtil.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.sync.common.util; - -import org.apache.hudi.common.fs.FSUtils; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.IntStream; - -import org.apache.hudi.common.testutils.HoodieCommonTestHarness; -import org.apache.hudi.common.testutils.HoodieTestTable; - -public class TestManifestFileUtil extends HoodieCommonTestHarness { - - private static final List MULTI_LEVEL_PARTITIONS = Arrays.asList("2019/01", "2020/01", "2021/01"); - private static HoodieTestTable hoodieTestTable; - - @BeforeEach - public void setUp() throws IOException { - initMetaClient(); - hoodieTestTable = HoodieTestTable.of(metaClient); - } - - @Test - public void testMultiLevelPartitionedTable() throws Exception { - // Generate 10 files under each partition - createTestDataForPartitionedTable(10); - ManifestFileUtil manifestFileUtil = ManifestFileUtil.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); - Assertions.assertEquals(30, manifestFileUtil.fetchLatestBaseFilesForAllPartitions().count()); - } - - @Test - public void testCreateManifestFile() throws Exception { - // Generate 10 files under each partition - createTestDataForPartitionedTable(10); - ManifestFileUtil manifestFileUtil = ManifestFileUtil.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); - manifestFileUtil.writeManifestFile(); - Assertions.assertTrue(FSUtils.getFileSize(metaClient.getFs(), manifestFileUtil.getManifestFilePath()) > 0); - } - - public void createTestDataForPartitionedTable(int numOfFiles) throws Exception { - String instant = "100"; - hoodieTestTable = hoodieTestTable.addCommit(instant); - // Generate 10 files under each partition - for (String partition : MULTI_LEVEL_PARTITIONS) { - hoodieTestTable = hoodieTestTable.withPartitionMetaFiles(partition) - .withBaseFilesInPartition(partition, IntStream.range(0, numOfFiles).toArray()); - } - } -} diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java new file mode 100644 index 000000000..47b60f723 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common.util; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.FileIOUtils; + +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.stream.IntStream; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; +import static org.apache.hudi.sync.common.util.ManifestFileWriter.fetchLatestBaseFilesForAllPartitions; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestManifestFileWriter extends HoodieCommonTestHarness { + + @BeforeEach + public void setUp() throws IOException { + initMetaClient(); + } + + @Test + public void testMultiLevelPartitionedTable() throws Exception { + // Generate 10 files under each partition + createTestDataForPartitionedTable(metaClient, 10); + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); + assertEquals(30, fetchLatestBaseFilesForAllPartitions(metaClient, false, false).count()); + } + + @Test + public void testCreateManifestFile() throws Exception { + // Generate 10 files under each partition + createTestDataForPartitionedTable(metaClient, 3); + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); + manifestFileWriter.writeManifestFile(); + Path manifestFilePath = manifestFileWriter.getManifestFilePath(); + try (InputStream is = metaClient.getFs().open(manifestFilePath)) { + assertEquals(9, FileIOUtils.readAsUTFStringLines(is).size(), "there should be 9 base files in total; 3 per partition."); + } + } + + private static void createTestDataForPartitionedTable(HoodieTableMetaClient metaClient, int numFilesPerPartition) throws Exception { + final String instantTime = "100"; + HoodieTestTable testTable = HoodieTestTable.of(metaClient).addCommit(instantTime); + for (String partition : DEFAULT_PARTITION_PATHS) { + testTable.withPartitionMetaFiles(partition) + .withBaseFilesInPartition(partition, IntStream.range(0, numFilesPerPartition).toArray()); + } + } + + @Test + public void getManifestSourceUri() { + ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build(); + String sourceUri = manifestFileWriter.getManifestSourceUri(); + assertEquals(new Path(basePath, ".hoodie/manifest/*").toUri().toString(), sourceUri); + } +} diff --git a/packaging/hudi-gcp-bundle/pom.xml b/packaging/hudi-gcp-bundle/pom.xml new file mode 100644 index 000000000..dbb44595e --- /dev/null +++ b/packaging/hudi-gcp-bundle/pom.xml @@ -0,0 +1,178 @@ + + + + + hudi + org.apache.hudi + 0.11.0-SNAPSHOT + ../../pom.xml + + 4.0.0 + hudi-gcp-bundle + jar + + + true + ${project.parent.basedir} + + + + + com.google.cloud + libraries-bom + 25.1.0 + pom + import + + + + + + + org.apache.rat + apache-rat-plugin + + + maven-assembly-plugin + + + + org.apache.hudi.gcp.bigquery.BigQuerySyncTool + + + + jar-with-dependencies + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + package + + shade + + + ${shadeSources} + ${project.build.directory}/dependency-reduced-pom.xml + + + + + + true + + + META-INF/LICENSE + target/classes/META-INF/LICENSE + + + + + org.apache.hudi:hudi-common + org.apache.hudi:hudi-hadoop-mr + org.apache.hudi:hudi-sync-common + org.apache.hudi:hudi-gcp + + com.google.cloud:google-cloud-bigquery + com.beust:jcommander + + + false + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/services/javax.* + + + + ${project.artifactId}-${project.version} + + + + + + + + src/main/resources + + + src/test/resources + + + + + + + + org.apache.hudi + hudi-common + ${project.version} + + + + org.apache.hudi + hudi-hadoop-mr-bundle + ${project.version} + + + + org.apache.hudi + hudi-sync-common + ${project.version} + + + + org.apache.hudi + hudi-gcp + ${project.version} + + + + com.google.cloud + google-cloud-bigquery + + + + + org.apache.parquet + parquet-avro + ${parquet.version} + compile + + + + + org.apache.avro + avro + ${avro.version} + compile + + + + diff --git a/packaging/hudi-gcp-bundle/src/main/java/org/apache/hudi/gcp/bigquery/bundle/Main.java b/packaging/hudi-gcp-bundle/src/main/java/org/apache/hudi/gcp/bigquery/bundle/Main.java new file mode 100644 index 000000000..75324f64f --- /dev/null +++ b/packaging/hudi-gcp-bundle/src/main/java/org/apache/hudi/gcp/bigquery/bundle/Main.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.gcp.bigquery.bundle; + +import org.apache.hudi.common.util.ReflectionUtils; + +/** + * A simple main class to dump all classes loaded in current classpath. + * + * This is a workaround for generating sources and javadoc jars for packaging modules. The maven plugins for generating + * javadoc and sources plugins do not generate corresponding jars if there are no source files. + * + * This class does not have anything to do with Hudi but is there to keep mvn javadocs/source plugin happy. + */ +public class Main { + + public static void main(String[] args) { + ReflectionUtils.getTopLevelClassesInClasspath(Main.class).forEach(System.out::println); + } +} diff --git a/pom.xml b/pom.xml index 6212b177b..169408ff9 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ hudi-cli hudi-client hudi-aws + hudi-gcp hudi-hadoop-mr hudi-spark-datasource hudi-timeline-service @@ -47,6 +48,7 @@ packaging/hudi-hadoop-mr-bundle packaging/hudi-datahub-sync-bundle packaging/hudi-hive-sync-bundle + packaging/hudi-gcp-bundle packaging/hudi-spark-bundle packaging/hudi-presto-bundle packaging/hudi-utilities-bundle