partitionFields = new ArrayList<>();
+ @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+ public Boolean useFileListingFromMetadata = false;
+ @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
+ public Boolean assumeDatePartitioning = false;
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ public static BigQuerySyncConfig copy(BigQuerySyncConfig cfg) {
+ BigQuerySyncConfig newConfig = new BigQuerySyncConfig();
+ newConfig.projectId = cfg.projectId;
+ newConfig.datasetName = cfg.datasetName;
+ newConfig.datasetLocation = cfg.datasetLocation;
+ newConfig.tableName = cfg.tableName;
+ newConfig.sourceUri = cfg.sourceUri;
+ newConfig.sourceUriPrefix = cfg.sourceUriPrefix;
+ newConfig.basePath = cfg.basePath;
+ newConfig.partitionFields = cfg.partitionFields;
+ newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+ newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
+ newConfig.help = cfg.help;
+ return newConfig;
+ }
+
+ public TypedProperties toProps() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(BIGQUERY_SYNC_PROJECT_ID, projectId);
+ properties.put(BIGQUERY_SYNC_DATASET_NAME, datasetName);
+ properties.put(BIGQUERY_SYNC_DATASET_LOCATION, datasetLocation);
+ properties.put(BIGQUERY_SYNC_TABLE_NAME, tableName);
+ properties.put(BIGQUERY_SYNC_SOURCE_URI, sourceUri);
+ properties.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, sourceUriPrefix);
+ properties.put(BIGQUERY_SYNC_SYNC_BASE_PATH, basePath);
+ properties.put(BIGQUERY_SYNC_PARTITION_FIELDS, String.join(",", partitionFields));
+ properties.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, useFileListingFromMetadata);
+ properties.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, assumeDatePartitioning);
+ return properties;
+ }
+
+ public static BigQuerySyncConfig fromProps(TypedProperties props) {
+ BigQuerySyncConfig config = new BigQuerySyncConfig();
+ config.projectId = props.getString(BIGQUERY_SYNC_PROJECT_ID);
+ config.datasetName = props.getString(BIGQUERY_SYNC_DATASET_NAME);
+ config.datasetLocation = props.getString(BIGQUERY_SYNC_DATASET_LOCATION);
+ config.tableName = props.getString(BIGQUERY_SYNC_TABLE_NAME);
+ config.sourceUri = props.getString(BIGQUERY_SYNC_SOURCE_URI);
+ config.sourceUriPrefix = props.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX);
+ config.basePath = props.getString(BIGQUERY_SYNC_SYNC_BASE_PATH);
+ config.partitionFields = props.getStringList(BIGQUERY_SYNC_PARTITION_FIELDS, ",", Collections.emptyList());
+ config.useFileListingFromMetadata = props.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, false);
+ config.assumeDatePartitioning = props.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, false);
+ return config;
+ }
+
+ @Override
+ public String toString() {
+ return "BigQuerySyncConfig{projectId='" + projectId
+ + "', datasetName='" + datasetName
+ + "', datasetLocation='" + datasetLocation
+ + "', tableName='" + tableName
+ + "', sourceUri='" + sourceUri
+ + "', sourceUriPrefix='" + sourceUriPrefix
+ + "', basePath='" + basePath + "'"
+ + ", partitionFields=" + partitionFields
+ + "', useFileListingFromMetadata='" + useFileListingFromMetadata
+ + "', assumeDataPartitioning='" + assumeDatePartitioning
+ + "', help=" + help + "}";
+ }
+}
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
new file mode 100644
index 000000000..0cb75eea8
--- /dev/null
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.util.ManifestFileWriter;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Tool to sync a hoodie table with a big query table. Either use it as an api
+ * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args]
+ *
+ * This utility will get the schema from the latest commit and will sync big query table schema.
+ *
+ * @Experimental
+ */
+public class BigQuerySyncTool extends AbstractSyncTool {
+
+ private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
+
+ public final BigQuerySyncConfig cfg;
+ public final String manifestTableName;
+ public final String versionsTableName;
+ public final String snapshotViewName;
+
+ public BigQuerySyncTool(TypedProperties properties, Configuration conf, FileSystem fs) {
+ super(properties, conf, fs);
+ cfg = BigQuerySyncConfig.fromProps(properties);
+ manifestTableName = cfg.tableName + "_manifest";
+ versionsTableName = cfg.tableName + "_versions";
+ snapshotViewName = cfg.tableName;
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ try (HoodieBigQuerySyncClient bqSyncClient = new HoodieBigQuerySyncClient(BigQuerySyncConfig.fromProps(props), fs)) {
+ switch (bqSyncClient.getTableType()) {
+ case COPY_ON_WRITE:
+ syncCoWTable(bqSyncClient);
+ break;
+ case MERGE_ON_READ:
+ default:
+ throw new UnsupportedOperationException(bqSyncClient.getTableType() + " table type is not supported yet.");
+ }
+ } catch (Exception e) {
+ throw new HoodieBigQuerySyncException("Got runtime exception when big query syncing " + cfg.tableName, e);
+ }
+ }
+
+ private void syncCoWTable(HoodieBigQuerySyncClient bqSyncClient) {
+ ValidationUtils.checkState(bqSyncClient.getTableType() == HoodieTableType.COPY_ON_WRITE);
+ LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + bqSyncClient.getBasePath());
+
+ if (!bqSyncClient.datasetExists()) {
+ throw new HoodieBigQuerySyncException("Dataset not found: " + cfg);
+ }
+
+ ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder()
+ .setConf(conf)
+ .setBasePath(cfg.basePath)
+ .setUseFileListingFromMetadata(cfg.useFileListingFromMetadata)
+ .setAssumeDatePartitioning(cfg.assumeDatePartitioning)
+ .build();
+ manifestFileWriter.writeManifestFile();
+
+ if (!bqSyncClient.tableExists(manifestTableName)) {
+ bqSyncClient.createManifestTable(manifestTableName, manifestFileWriter.getManifestSourceUri());
+ LOG.info("Manifest table creation complete for " + manifestTableName);
+ }
+ if (!bqSyncClient.tableExists(versionsTableName)) {
+ bqSyncClient.createVersionsTable(versionsTableName, cfg.sourceUri, cfg.sourceUriPrefix, cfg.partitionFields);
+ LOG.info("Versions table creation complete for " + versionsTableName);
+ }
+ if (!bqSyncClient.tableExists(snapshotViewName)) {
+ bqSyncClient.createSnapshotView(snapshotViewName, versionsTableName, manifestTableName);
+ LOG.info("Snapshot view creation complete for " + snapshotViewName);
+ }
+
+ // TODO: Implement automatic schema evolution when you add a new column.
+ LOG.info("Sync table complete for " + snapshotViewName);
+ }
+
+ public static void main(String[] args) {
+ BigQuerySyncConfig cfg = new BigQuerySyncConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+ FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+ new BigQuerySyncTool(cfg.toProps(), fs.getConf(), fs).syncHoodieTable();
+ }
+}
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
new file mode 100644
index 000000000..4aafbf2e5
--- /dev/null
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.CsvOptions;
+import com.google.cloud.bigquery.Dataset;
+import com.google.cloud.bigquery.DatasetId;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.HivePartitioningOptions;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.ViewDefinition;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
+ private static final Logger LOG = LogManager.getLogger(HoodieBigQuerySyncClient.class);
+
+ private final BigQuerySyncConfig syncConfig;
+ private transient BigQuery bigquery;
+
+ public HoodieBigQuerySyncClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) {
+ super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
+ false, fs);
+ this.syncConfig = syncConfig;
+ this.createBigQueryConnection();
+ }
+
+ private void createBigQueryConnection() {
+ if (bigquery == null) {
+ try {
+ // Initialize client that will be used to send requests. This client only needs to be created
+ // once, and can be reused for multiple requests.
+ bigquery = BigQueryOptions.newBuilder().setLocation(syncConfig.datasetLocation).build().getService();
+ LOG.info("Successfully established BigQuery connection.");
+ } catch (BigQueryException e) {
+ throw new HoodieBigQuerySyncException("Cannot create bigQuery connection ", e);
+ }
+ }
+ }
+
+ @Override
+ public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass,
+ final String outputFormatClass, final String serdeClass,
+ final Map serdeProperties, final Map tableProperties) {
+ // bigQuery create table arguments are different, so do nothing.
+ }
+
+ public void createManifestTable(String tableName, String sourceUri) {
+ try {
+ TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName);
+ CsvOptions csvOptions = CsvOptions.newBuilder()
+ .setFieldDelimiter(",")
+ .setAllowJaggedRows(false)
+ .setAllowQuotedNewLines(false)
+ .setSkipLeadingRows(0)
+ .build();
+ Schema schema = Schema.of(
+ Field.of("filename", StandardSQLTypeName.STRING));
+
+ ExternalTableDefinition customTable =
+ ExternalTableDefinition.newBuilder(sourceUri, schema, csvOptions)
+ .setAutodetect(false)
+ .setIgnoreUnknownValues(false)
+ .setMaxBadRecords(0)
+ .build();
+ bigquery.create(TableInfo.of(tableId, customTable));
+ LOG.info("Manifest External table created.");
+ } catch (BigQueryException e) {
+ throw new HoodieBigQuerySyncException("Manifest External table was not created ", e);
+ }
+ }
+
+ public void createVersionsTable(String tableName, String sourceUri, String sourceUriPrefix, List partitionFields) {
+ try {
+ ExternalTableDefinition customTable;
+ TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName);
+
+ if (partitionFields.isEmpty()) {
+ customTable =
+ ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+ .setAutodetect(true)
+ .setIgnoreUnknownValues(true)
+ .setMaxBadRecords(0)
+ .build();
+ } else {
+ // Configuring partitioning options for partitioned table.
+ HivePartitioningOptions hivePartitioningOptions =
+ HivePartitioningOptions.newBuilder()
+ .setMode("AUTO")
+ .setRequirePartitionFilter(false)
+ .setSourceUriPrefix(sourceUriPrefix)
+ .build();
+ customTable =
+ ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+ .setAutodetect(true)
+ .setHivePartitioningOptions(hivePartitioningOptions)
+ .setIgnoreUnknownValues(true)
+ .setMaxBadRecords(0)
+ .build();
+ }
+
+ bigquery.create(TableInfo.of(tableId, customTable));
+ LOG.info("External table created using hivepartitioningoptions");
+ } catch (BigQueryException e) {
+ throw new HoodieBigQuerySyncException("External table was not created ", e);
+ }
+ }
+
+ public void createSnapshotView(String viewName, String versionsTableName, String manifestTableName) {
+ try {
+ TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, viewName);
+ String query =
+ String.format(
+ "SELECT * FROM `%s.%s.%s` WHERE _hoodie_file_name IN "
+ + "(SELECT filename FROM `%s.%s.%s`)",
+ syncConfig.projectId,
+ syncConfig.datasetName,
+ versionsTableName,
+ syncConfig.projectId,
+ syncConfig.datasetName,
+ manifestTableName);
+
+ ViewDefinition viewDefinition =
+ ViewDefinition.newBuilder(query).setUseLegacySql(false).build();
+
+ bigquery.create(TableInfo.of(tableId, viewDefinition));
+ LOG.info("View created successfully");
+ } catch (BigQueryException e) {
+ throw new HoodieBigQuerySyncException("View was not created ", e);
+ }
+ }
+
+ @Override
+ public Map getTableSchema(String tableName) {
+ // TODO: Implement automatic schema evolution when you add a new column.
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void addPartitionsToTable(final String tableName, final List partitionsToAdd) {
+ // bigQuery discovers the new partitions automatically, so do nothing.
+ throw new UnsupportedOperationException("No support for addPartitionsToTable yet.");
+ }
+
+ @Override
+ public void dropPartitionsToTable(final String tableName, final List partitionsToDrop) {
+ // bigQuery discovers the new partitions automatically, so do nothing.
+ throw new UnsupportedOperationException("No support for dropPartitionsToTable yet.");
+ }
+
+ public boolean datasetExists() {
+ Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName));
+ return dataset != null;
+ }
+
+ @Override
+ public boolean doesTableExist(final String tableName) {
+ return tableExists(tableName);
+ }
+
+ @Override
+ public boolean tableExists(String tableName) {
+ TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName);
+ Table table = bigquery.getTable(tableId, BigQuery.TableOption.fields());
+ return table != null && table.exists();
+ }
+
+ @Override
+ public Option getLastCommitTimeSynced(final String tableName) {
+ // bigQuery doesn't support tblproperties, so do nothing.
+ throw new UnsupportedOperationException("Not support getLastCommitTimeSynced yet.");
+ }
+
+ @Override
+ public void updateLastCommitTimeSynced(final String tableName) {
+ // bigQuery doesn't support tblproperties, so do nothing.
+ throw new UnsupportedOperationException("No support for updateLastCommitTimeSynced yet.");
+ }
+
+ @Override
+ public Option getLastReplicatedTime(String tableName) {
+ // bigQuery doesn't support tblproperties, so do nothing.
+ throw new UnsupportedOperationException("Not support getLastReplicatedTime yet.");
+ }
+
+ @Override
+ public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
+ // bigQuery doesn't support tblproperties, so do nothing.
+ throw new UnsupportedOperationException("No support for updateLastReplicatedTimeStamp yet.");
+ }
+
+ @Override
+ public void deleteLastReplicatedTimeStamp(String tableName) {
+ // bigQuery doesn't support tblproperties, so do nothing.
+ throw new UnsupportedOperationException("No support for deleteLastReplicatedTimeStamp yet.");
+ }
+
+ @Override
+ public void updatePartitionsToTable(final String tableName, final List changedPartitions) {
+ // bigQuery updates the partitions automatically, so do nothing.
+ throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
+ }
+
+ @Override
+ public void close() {
+ // bigQuery has no connection close method, so do nothing.
+ }
+}
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncException.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncException.java
new file mode 100644
index 000000000..4d30b2faa
--- /dev/null
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+public class HoodieBigQuerySyncException extends RuntimeException {
+
+ public HoodieBigQuerySyncException() {
+ super();
+ }
+
+ public HoodieBigQuerySyncException(String message) {
+ super(message);
+ }
+
+ public HoodieBigQuerySyncException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public HoodieBigQuerySyncException(Throwable t) {
+ super(t);
+ }
+
+ protected static String format(String message, Object... args) {
+ return String.format(String.valueOf(message), args);
+ }
+}
diff --git a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
new file mode 100644
index 000000000..8b3250ccd
--- /dev/null
+++ b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_TABLE_NAME;
+import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestBigQuerySyncConfig {
+
+ BigQuerySyncConfig syncConfig;
+
+ @BeforeEach
+ void setUp() {
+ syncConfig = new BigQuerySyncConfig();
+ syncConfig.projectId = "fooproject";
+ syncConfig.datasetName = "foodataset";
+ syncConfig.datasetLocation = "US";
+ syncConfig.tableName = "footable";
+ syncConfig.sourceUri = "gs://test-bucket/dwh/table_name/dt=*";
+ syncConfig.sourceUriPrefix = "gs://test-bucket/dwh/table_name/";
+ syncConfig.basePath = "gs://test-bucket/dwh/table_name";
+ syncConfig.partitionFields = Arrays.asList("a", "b");
+ syncConfig.useFileListingFromMetadata = true;
+ syncConfig.assumeDatePartitioning = true;
+ syncConfig.help = true;
+ }
+
+ @Test
+ public void testCopy() {
+ BigQuerySyncConfig copied = BigQuerySyncConfig.copy(syncConfig);
+ assertEquals(copied.partitionFields, syncConfig.partitionFields);
+ assertEquals(copied.basePath, syncConfig.basePath);
+ assertEquals(copied.projectId, syncConfig.projectId);
+ assertEquals(copied.datasetName, syncConfig.datasetName);
+ assertEquals(copied.datasetLocation, syncConfig.datasetLocation);
+ assertEquals(copied.tableName, syncConfig.tableName);
+ assertEquals(copied.sourceUri, syncConfig.sourceUri);
+ assertEquals(copied.sourceUriPrefix, syncConfig.sourceUriPrefix);
+ assertEquals(copied.useFileListingFromMetadata, syncConfig.useFileListingFromMetadata);
+ assertEquals(copied.assumeDatePartitioning, syncConfig.assumeDatePartitioning);
+ assertEquals(copied.help, syncConfig.help);
+ }
+
+ @Test
+ public void testToProps() {
+ TypedProperties props = syncConfig.toProps();
+ assertEquals("fooproject", props.getString(BIGQUERY_SYNC_PROJECT_ID));
+ assertEquals("foodataset", props.getString(BIGQUERY_SYNC_DATASET_NAME));
+ assertEquals("US", props.getString(BIGQUERY_SYNC_DATASET_LOCATION));
+ assertEquals("footable", props.getString(BIGQUERY_SYNC_TABLE_NAME));
+ assertEquals("gs://test-bucket/dwh/table_name/dt=*", props.getString(BIGQUERY_SYNC_SOURCE_URI));
+ assertEquals("gs://test-bucket/dwh/table_name/", props.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX));
+ assertEquals("gs://test-bucket/dwh/table_name", props.getString(BIGQUERY_SYNC_SYNC_BASE_PATH));
+ assertEquals("a,b", props.getString(BIGQUERY_SYNC_PARTITION_FIELDS));
+ assertEquals("true", props.getString(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA));
+ assertEquals("true", props.getString(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING));
+ }
+
+ @Test
+ public void fromProps() {
+ TypedProperties props = new TypedProperties();
+ props.put(BIGQUERY_SYNC_PROJECT_ID, "fooproject");
+ props.put(BIGQUERY_SYNC_DATASET_NAME, "foodataset");
+ props.put(BIGQUERY_SYNC_DATASET_LOCATION, "US");
+ props.put(BIGQUERY_SYNC_TABLE_NAME, "footable");
+ props.put(BIGQUERY_SYNC_SOURCE_URI, "gs://test-bucket/dwh/table_name/dt=*");
+ props.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, "gs://test-bucket/dwh/table_name/");
+ props.put(BIGQUERY_SYNC_SYNC_BASE_PATH, "gs://test-bucket/dwh/table_name");
+ props.put(BIGQUERY_SYNC_PARTITION_FIELDS, "a,b");
+ props.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, true);
+ props.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, true);
+ BigQuerySyncConfig cfg = BigQuerySyncConfig.fromProps(props);
+
+ assertEquals(syncConfig.projectId, cfg.projectId);
+ assertEquals(syncConfig.datasetName, cfg.datasetName);
+ assertEquals(syncConfig.datasetLocation, cfg.datasetLocation);
+ assertEquals(syncConfig.tableName, cfg.tableName);
+ assertEquals(syncConfig.sourceUri, cfg.sourceUri);
+ assertEquals(syncConfig.sourceUriPrefix, cfg.sourceUriPrefix);
+ assertEquals(syncConfig.basePath, cfg.basePath);
+ assertEquals(syncConfig.partitionFields, cfg.partitionFields);
+ assertEquals(syncConfig.useFileListingFromMetadata, cfg.useFileListingFromMetadata);
+ assertEquals(syncConfig.assumeDatePartitioning, cfg.assumeDatePartitioning);
+ }
+}
diff --git a/hudi-gcp/src/test/resources/log4j-surefire-quiet.properties b/hudi-gcp/src/test/resources/log4j-surefire-quiet.properties
new file mode 100644
index 000000000..78d6cfe84
--- /dev/null
+++ b/hudi-gcp/src/test/resources/log4j-surefire-quiet.properties
@@ -0,0 +1,29 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=ERROR, CONSOLE
+log4j.logger.org.apache.hudi=ERROR
+
+# CONSOLE is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# CONSOLE uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
diff --git a/hudi-gcp/src/test/resources/log4j-surefire.properties b/hudi-gcp/src/test/resources/log4j-surefire.properties
new file mode 100644
index 000000000..7914f0a78
--- /dev/null
+++ b/hudi-gcp/src/test/resources/log4j-surefire.properties
@@ -0,0 +1,29 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, CONSOLE
+log4j.logger.org.apache.hudi=INFO
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileUtil.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileUtil.java
deleted file mode 100644
index 090fdd5dd..000000000
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileUtil.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sync.common.util;
-
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.engine.HoodieLocalEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
-
-public class ManifestFileUtil {
- private static final Logger LOG = LogManager.getLogger(ManifestFileUtil.class);
- private static final String MANIFEST_FOLDER_NAME = "manifest";
- private static final String MANIFEST_FILE_NAME = "latest-snapshot.csv";
- private static final String DELIMITER = "\n";
- private final SerializableConfiguration hadoopConf;
- private final String basePath;
- private final transient HoodieLocalEngineContext engineContext;
- private final HoodieTableMetaClient metaClient;
-
- private ManifestFileUtil(Configuration conf, String basePath) {
- this.hadoopConf = new SerializableConfiguration(conf);
- this.basePath = basePath;
- this.engineContext = new HoodieLocalEngineContext(conf);
- this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
- }
-
- public synchronized void writeManifestFile() {
- try {
- Path manifestFilePath = new Path(getManifestFolder(), MANIFEST_FILE_NAME);
- Option content = Option.of(fetchLatestBaseFilesForAllPartitions().collect(Collectors.joining(DELIMITER)).getBytes());
- FileIOUtils.createFileInPath(metaClient.getFs(), manifestFilePath, content);
- } catch (Exception e) {
- String msg = "Error writing manifest file";
- LOG.error(msg, e);
- throw new HoodieException(msg, e);
- }
- }
-
- public Stream fetchLatestBaseFilesForAllPartitions() {
- try {
- HoodieMetadataConfig metadataConfig = buildMetadataConfig(hadoopConf.get());
-
- List partitions = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath);
-
- return partitions.parallelStream().flatMap(p -> {
- HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf.get());
- HoodieMetadataFileSystemView fsView =
- new HoodieMetadataFileSystemView(engContext, metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metadataConfig);
- return fsView.getLatestBaseFiles(p).map(HoodieBaseFile::getFileName);
- });
- } catch (Exception e) {
- String msg = "Error checking path :" + basePath;
- LOG.error(msg, e);
- throw new HoodieException(msg, e);
- }
- }
-
- private static HoodieMetadataConfig buildMetadataConfig(Configuration conf) {
- return HoodieMetadataConfig.newBuilder()
- .enable(conf.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS))
- .build();
- }
-
- /**
- * @return Manifest File folder
- */
- public Path getManifestFolder() {
- return new Path(metaClient.getMetaPath(), MANIFEST_FOLDER_NAME);
- }
-
- /**
- * @return Manifest File Full Path
- */
- public Path getManifestFilePath() {
- return new Path(getManifestFolder(), MANIFEST_FILE_NAME);
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- /**
- * Builder for {@link ManifestFileUtil}.
- */
- public static class Builder {
-
- private Configuration conf;
- private String basePath;
-
- public Builder setConf(Configuration conf) {
- this.conf = conf;
- return this;
- }
-
- public Builder setBasePath(String basePath) {
- this.basePath = basePath;
- return this;
- }
-
- public ManifestFileUtil build() {
- ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init ManifestFileGenerator");
- ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init ManifestFileGenerator");
- return new ManifestFileUtil(conf, basePath);
- }
- }
-}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
new file mode 100644
index 000000000..3ac238c89
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/ManifestFileWriter.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common.util;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ManifestFileWriter {
+
+ public static final String MANIFEST_FOLDER_NAME = "manifest";
+ public static final String MANIFEST_FILE_NAME = "latest-snapshot.csv";
+ private static final Logger LOG = LogManager.getLogger(ManifestFileWriter.class);
+
+ private final HoodieTableMetaClient metaClient;
+ private final boolean useFileListingFromMetadata;
+ private final boolean assumeDatePartitioning;
+
+ private ManifestFileWriter(Configuration hadoopConf, String basePath, boolean useFileListingFromMetadata, boolean assumeDatePartitioning) {
+ this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
+ this.useFileListingFromMetadata = useFileListingFromMetadata;
+ this.assumeDatePartitioning = assumeDatePartitioning;
+ }
+
+ /**
+ * Write all the latest base file names to the manifest file.
+ */
+ public synchronized void writeManifestFile() {
+ try {
+ List baseFiles = fetchLatestBaseFilesForAllPartitions(metaClient, useFileListingFromMetadata, assumeDatePartitioning)
+ .collect(Collectors.toList());
+ if (baseFiles.isEmpty()) {
+ LOG.warn("No base file to generate manifest file.");
+ return;
+ } else {
+ LOG.info("Writing base file names to manifest file: " + baseFiles.size());
+ }
+ final Path manifestFilePath = getManifestFilePath();
+ try (FSDataOutputStream outputStream = metaClient.getFs().create(manifestFilePath, true);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) {
+ for (String f : baseFiles) {
+ writer.write(f);
+ writer.write("\n");
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Error in writing manifest file.", e);
+ }
+ }
+
+ public static Stream fetchLatestBaseFilesForAllPartitions(HoodieTableMetaClient metaClient,
+ boolean useFileListingFromMetadata, boolean assumeDatePartitioning) {
+ try {
+ List partitions = FSUtils.getAllPartitionPaths(new HoodieLocalEngineContext(metaClient.getHadoopConf()),
+ metaClient.getBasePath(), useFileListingFromMetadata, assumeDatePartitioning);
+ LOG.info("Retrieve all partitions: " + partitions.size());
+ return partitions.parallelStream().flatMap(p -> {
+ Configuration hadoopConf = metaClient.getHadoopConf();
+ HoodieLocalEngineContext engContext = new HoodieLocalEngineContext(hadoopConf);
+ HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(engContext, metaClient,
+ metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(),
+ HoodieMetadataConfig.newBuilder().enable(useFileListingFromMetadata).withAssumeDatePartitioning(assumeDatePartitioning).build());
+ return fsView.getLatestBaseFiles(p).map(HoodieBaseFile::getFileName);
+ });
+ } catch (Exception e) {
+ throw new HoodieException("Error in fetching latest base files.", e);
+ }
+ }
+
+ public Path getManifestFolder() {
+ return new Path(metaClient.getMetaPath(), MANIFEST_FOLDER_NAME);
+ }
+
+ public Path getManifestFilePath() {
+ return new Path(getManifestFolder(), MANIFEST_FILE_NAME);
+ }
+
+ public String getManifestSourceUri() {
+ return new Path(getManifestFolder(), "*").toUri().toString();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link ManifestFileWriter}.
+ */
+ public static class Builder {
+
+ private Configuration conf;
+ private String basePath;
+ private boolean useFileListingFromMetadata;
+ private boolean assumeDatePartitioning;
+
+ public Builder setConf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder setBasePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ public Builder setUseFileListingFromMetadata(boolean useFileListingFromMetadata) {
+ this.useFileListingFromMetadata = useFileListingFromMetadata;
+ return this;
+ }
+
+ public Builder setAssumeDatePartitioning(boolean assumeDatePartitioning) {
+ this.assumeDatePartitioning = assumeDatePartitioning;
+ return this;
+ }
+
+ public ManifestFileWriter build() {
+ ValidationUtils.checkArgument(conf != null, "Configuration needs to be set to init ManifestFileGenerator");
+ ValidationUtils.checkArgument(basePath != null, "basePath needs to be set to init ManifestFileGenerator");
+ return new ManifestFileWriter(conf, basePath, useFileListingFromMetadata, assumeDatePartitioning);
+ }
+ }
+}
diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileUtil.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileUtil.java
deleted file mode 100644
index f383cc996..000000000
--- a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileUtil.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sync.common.util;
-
-import org.apache.hudi.common.fs.FSUtils;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.IntStream;
-
-import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
-import org.apache.hudi.common.testutils.HoodieTestTable;
-
-public class TestManifestFileUtil extends HoodieCommonTestHarness {
-
- private static final List MULTI_LEVEL_PARTITIONS = Arrays.asList("2019/01", "2020/01", "2021/01");
- private static HoodieTestTable hoodieTestTable;
-
- @BeforeEach
- public void setUp() throws IOException {
- initMetaClient();
- hoodieTestTable = HoodieTestTable.of(metaClient);
- }
-
- @Test
- public void testMultiLevelPartitionedTable() throws Exception {
- // Generate 10 files under each partition
- createTestDataForPartitionedTable(10);
- ManifestFileUtil manifestFileUtil = ManifestFileUtil.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
- Assertions.assertEquals(30, manifestFileUtil.fetchLatestBaseFilesForAllPartitions().count());
- }
-
- @Test
- public void testCreateManifestFile() throws Exception {
- // Generate 10 files under each partition
- createTestDataForPartitionedTable(10);
- ManifestFileUtil manifestFileUtil = ManifestFileUtil.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
- manifestFileUtil.writeManifestFile();
- Assertions.assertTrue(FSUtils.getFileSize(metaClient.getFs(), manifestFileUtil.getManifestFilePath()) > 0);
- }
-
- public void createTestDataForPartitionedTable(int numOfFiles) throws Exception {
- String instant = "100";
- hoodieTestTable = hoodieTestTable.addCommit(instant);
- // Generate 10 files under each partition
- for (String partition : MULTI_LEVEL_PARTITIONS) {
- hoodieTestTable = hoodieTestTable.withPartitionMetaFiles(partition)
- .withBaseFilesInPartition(partition, IntStream.range(0, numOfFiles).toArray());
- }
- }
-}
diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java
new file mode 100644
index 000000000..47b60f723
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestManifestFileWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common.util;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.FileIOUtils;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.stream.IntStream;
+
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
+import static org.apache.hudi.sync.common.util.ManifestFileWriter.fetchLatestBaseFilesForAllPartitions;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestManifestFileWriter extends HoodieCommonTestHarness {
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ initMetaClient();
+ }
+
+ @Test
+ public void testMultiLevelPartitionedTable() throws Exception {
+ // Generate 10 files under each partition
+ createTestDataForPartitionedTable(metaClient, 10);
+ ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
+ assertEquals(30, fetchLatestBaseFilesForAllPartitions(metaClient, false, false).count());
+ }
+
+ @Test
+ public void testCreateManifestFile() throws Exception {
+ // Generate 10 files under each partition
+ createTestDataForPartitionedTable(metaClient, 3);
+ ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
+ manifestFileWriter.writeManifestFile();
+ Path manifestFilePath = manifestFileWriter.getManifestFilePath();
+ try (InputStream is = metaClient.getFs().open(manifestFilePath)) {
+ assertEquals(9, FileIOUtils.readAsUTFStringLines(is).size(), "there should be 9 base files in total; 3 per partition.");
+ }
+ }
+
+ private static void createTestDataForPartitionedTable(HoodieTableMetaClient metaClient, int numFilesPerPartition) throws Exception {
+ final String instantTime = "100";
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient).addCommit(instantTime);
+ for (String partition : DEFAULT_PARTITION_PATHS) {
+ testTable.withPartitionMetaFiles(partition)
+ .withBaseFilesInPartition(partition, IntStream.range(0, numFilesPerPartition).toArray());
+ }
+ }
+
+ @Test
+ public void getManifestSourceUri() {
+ ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
+ String sourceUri = manifestFileWriter.getManifestSourceUri();
+ assertEquals(new Path(basePath, ".hoodie/manifest/*").toUri().toString(), sourceUri);
+ }
+}
diff --git a/packaging/hudi-gcp-bundle/pom.xml b/packaging/hudi-gcp-bundle/pom.xml
new file mode 100644
index 000000000..dbb44595e
--- /dev/null
+++ b/packaging/hudi-gcp-bundle/pom.xml
@@ -0,0 +1,178 @@
+
+
+
+
+ hudi
+ org.apache.hudi
+ 0.11.0-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+ hudi-gcp-bundle
+ jar
+
+
+ true
+ ${project.parent.basedir}
+
+
+
+
+ com.google.cloud
+ libraries-bom
+ 25.1.0
+ pom
+ import
+
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ maven-assembly-plugin
+
+
+
+ org.apache.hudi.gcp.bigquery.BigQuerySyncTool
+
+
+
+ jar-with-dependencies
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ ${maven-shade-plugin.version}
+
+
+ package
+
+ shade
+
+
+ ${shadeSources}
+ ${project.build.directory}/dependency-reduced-pom.xml
+
+
+
+
+
+ true
+
+
+ META-INF/LICENSE
+ target/classes/META-INF/LICENSE
+
+
+
+
+ org.apache.hudi:hudi-common
+ org.apache.hudi:hudi-hadoop-mr
+ org.apache.hudi:hudi-sync-common
+ org.apache.hudi:hudi-gcp
+
+ com.google.cloud:google-cloud-bigquery
+ com.beust:jcommander
+
+
+ false
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ META-INF/services/javax.*
+
+
+
+ ${project.artifactId}-${project.version}
+
+
+
+
+
+
+
+ src/main/resources
+
+
+ src/test/resources
+
+
+
+
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${project.version}
+
+
+
+ org.apache.hudi
+ hudi-hadoop-mr-bundle
+ ${project.version}
+
+
+
+ org.apache.hudi
+ hudi-sync-common
+ ${project.version}
+
+
+
+ org.apache.hudi
+ hudi-gcp
+ ${project.version}
+
+
+
+ com.google.cloud
+ google-cloud-bigquery
+
+
+
+
+ org.apache.parquet
+ parquet-avro
+ ${parquet.version}
+ compile
+
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+ compile
+
+
+
+
diff --git a/packaging/hudi-gcp-bundle/src/main/java/org/apache/hudi/gcp/bigquery/bundle/Main.java b/packaging/hudi-gcp-bundle/src/main/java/org/apache/hudi/gcp/bigquery/bundle/Main.java
new file mode 100644
index 000000000..75324f64f
--- /dev/null
+++ b/packaging/hudi-gcp-bundle/src/main/java/org/apache/hudi/gcp/bigquery/bundle/Main.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.gcp.bigquery.bundle;
+
+import org.apache.hudi.common.util.ReflectionUtils;
+
+/**
+ * A simple main class to dump all classes loaded in current classpath.
+ *
+ * This is a workaround for generating sources and javadoc jars for packaging modules. The maven plugins for generating
+ * javadoc and sources plugins do not generate corresponding jars if there are no source files.
+ *
+ * This class does not have anything to do with Hudi but is there to keep mvn javadocs/source plugin happy.
+ */
+public class Main {
+
+ public static void main(String[] args) {
+ ReflectionUtils.getTopLevelClassesInClasspath(Main.class).forEach(System.out::println);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 6212b177b..169408ff9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,7 @@
hudi-cli
hudi-client
hudi-aws
+ hudi-gcp
hudi-hadoop-mr
hudi-spark-datasource
hudi-timeline-service
@@ -47,6 +48,7 @@
packaging/hudi-hadoop-mr-bundle
packaging/hudi-datahub-sync-bundle
packaging/hudi-hive-sync-bundle
+ packaging/hudi-gcp-bundle
packaging/hudi-spark-bundle
packaging/hudi-presto-bundle
packaging/hudi-utilities-bundle