[HUDI-3536] Add hudi-datahub-sync implementation (#5155)
This commit is contained in:
150
hudi-sync/hudi-datahub-sync/pom.xml
Normal file
150
hudi-sync/hudi-datahub-sync/pom.xml
Normal file
@@ -0,0 +1,150 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>hudi</artifactId>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<version>0.11.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>hudi-datahub-sync</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<datahub.version>0.8.31</datahub.version>
|
||||
<httpasync.version>4.1.5</httpasync.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.acryl</groupId>
|
||||
<artifactId>datahub-client</artifactId>
|
||||
<version>${datahub.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>fluent-hc</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpcore</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpasyncclient</artifactId>
|
||||
<version>${httpasync.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpcore-nio</artifactId>
|
||||
<version>${http.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Logging -->
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.parquet</groupId>
|
||||
<artifactId>parquet-avro</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Hoodie -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<artifactId>hudi-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<artifactId>hudi-sync-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-engine</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.vintage</groupId>
|
||||
<artifactId>junit-vintage-engine</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter-params</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/resources</directory>
|
||||
</resource>
|
||||
</resources>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>${maven-jar-plugin.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.jacoco</groupId>
|
||||
<artifactId>jacoco-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,234 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sync.datahub;
|
||||
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
|
||||
import org.apache.hudi.sync.common.HoodieSyncException;
|
||||
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
|
||||
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
import com.linkedin.data.template.SetMode;
|
||||
import com.linkedin.data.template.StringMap;
|
||||
import com.linkedin.dataset.DatasetProperties;
|
||||
import com.linkedin.schema.ArrayType;
|
||||
import com.linkedin.schema.BooleanType;
|
||||
import com.linkedin.schema.BytesType;
|
||||
import com.linkedin.schema.EnumType;
|
||||
import com.linkedin.schema.FixedType;
|
||||
import com.linkedin.schema.MapType;
|
||||
import com.linkedin.schema.NullType;
|
||||
import com.linkedin.schema.NumberType;
|
||||
import com.linkedin.schema.OtherSchema;
|
||||
import com.linkedin.schema.RecordType;
|
||||
import com.linkedin.schema.SchemaField;
|
||||
import com.linkedin.schema.SchemaFieldArray;
|
||||
import com.linkedin.schema.SchemaFieldDataType;
|
||||
import com.linkedin.schema.SchemaMetadata;
|
||||
import com.linkedin.schema.StringType;
|
||||
import com.linkedin.schema.UnionType;
|
||||
import datahub.client.rest.RestEmitter;
|
||||
import datahub.event.MetadataChangeProposalWrapper;
|
||||
import org.apache.avro.AvroTypeException;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DataHubSyncClient extends AbstractSyncHoodieClient {
|
||||
|
||||
private final HoodieTimeline activeTimeline;
|
||||
private final DataHubSyncConfig syncConfig;
|
||||
private final Configuration hadoopConf;
|
||||
private final DatasetUrn datasetUrn;
|
||||
|
||||
public DataHubSyncClient(DataHubSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
|
||||
super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, false, fs);
|
||||
this.syncConfig = syncConfig;
|
||||
this.hadoopConf = hadoopConf;
|
||||
this.datasetUrn = syncConfig.datasetIdentifier.getDatasetUrn();
|
||||
this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createTable(String tableName,
|
||||
MessageType storageSchema,
|
||||
String inputFormatClass,
|
||||
String outputFormatClass,
|
||||
String serdeClass,
|
||||
Map<String, String> serdeProperties,
|
||||
Map<String, String> tableProperties) {
|
||||
throw new UnsupportedOperationException("Not supported: `createTable`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doesTableExist(String tableName) {
|
||||
return tableExists(tableName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableExists(String tableName) {
|
||||
throw new UnsupportedOperationException("Not supported: `tableExists`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<String> getLastCommitTimeSynced(String tableName) {
|
||||
throw new UnsupportedOperationException("Not supported: `getLastCommitTimeSynced`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateLastCommitTimeSynced(String tableName) {
|
||||
updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, activeTimeline.lastInstant().get().getTimestamp()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<String> getLastReplicatedTime(String tableName) {
|
||||
throw new UnsupportedOperationException("Not supported: `getLastReplicatedTime`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
|
||||
throw new UnsupportedOperationException("Not supported: `updateLastReplicatedTimeStamp`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteLastReplicatedTimeStamp(String tableName) {
|
||||
throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
|
||||
throw new UnsupportedOperationException("Not supported: `addPartitionsToTable`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
|
||||
throw new UnsupportedOperationException("Not supported: `updatePartitionsToTable`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
|
||||
throw new UnsupportedOperationException("Not supported: `dropPartitionsToTable`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
|
||||
MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder()
|
||||
.entityType("dataset")
|
||||
.entityUrn(datasetUrn)
|
||||
.upsert()
|
||||
.aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties)))
|
||||
.build();
|
||||
|
||||
try (RestEmitter emitter = syncConfig.getRestEmitter()) {
|
||||
emitter.emit(propertiesChangeProposal, null).get();
|
||||
} catch (Exception e) {
|
||||
throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": " + tableProperties, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateTableDefinition(String tableName) {
|
||||
Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient);
|
||||
List<SchemaField> fields = avroSchema.getFields().stream().map(f -> new SchemaField()
|
||||
.setFieldPath(f.name())
|
||||
.setType(toSchemaFieldDataType(f.schema().getType()))
|
||||
.setDescription(f.doc(), SetMode.IGNORE_NULL)
|
||||
.setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList());
|
||||
|
||||
final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
|
||||
platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString()));
|
||||
MetadataChangeProposalWrapper schemaChangeProposal = MetadataChangeProposalWrapper.builder()
|
||||
.entityType("dataset")
|
||||
.entityUrn(datasetUrn)
|
||||
.upsert()
|
||||
.aspect(new SchemaMetadata()
|
||||
.setSchemaName(tableName)
|
||||
.setVersion(0)
|
||||
.setHash("")
|
||||
.setPlatform(datasetUrn.getPlatformEntity())
|
||||
.setPlatformSchema(platformSchema)
|
||||
.setFields(new SchemaFieldArray(fields)))
|
||||
.build();
|
||||
|
||||
try (RestEmitter emitter = syncConfig.getRestEmitter()) {
|
||||
emitter.emit(schemaChangeProposal, null).get();
|
||||
} catch (Exception e) {
|
||||
throw new HoodieDataHubSyncException("Fail to change schema for Dataset " + datasetUrn, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getTableSchema(String tableName) {
|
||||
throw new UnsupportedOperationException("Not supported: `getTableSchema`");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// no op;
|
||||
}
|
||||
|
||||
static Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
|
||||
try {
|
||||
return new TableSchemaResolver(metaClient).getTableAvroSchema(true);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieSyncException("Failed to read avro schema", e);
|
||||
}
|
||||
}
|
||||
|
||||
static SchemaFieldDataType toSchemaFieldDataType(Schema.Type type) {
|
||||
switch (type) {
|
||||
case BOOLEAN:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new BooleanType()));
|
||||
case INT:
|
||||
case LONG:
|
||||
case FLOAT:
|
||||
case DOUBLE:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NumberType()));
|
||||
case MAP:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new MapType()));
|
||||
case ENUM:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new EnumType()));
|
||||
case NULL:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NullType()));
|
||||
case ARRAY:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new ArrayType()));
|
||||
case BYTES:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new BytesType()));
|
||||
case FIXED:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new FixedType()));
|
||||
case UNION:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new UnionType()));
|
||||
case RECORD:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new RecordType()));
|
||||
case STRING:
|
||||
return new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType()));
|
||||
default:
|
||||
throw new AvroTypeException("Unexpected type: " + type.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sync.datahub;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.sync.common.AbstractSyncTool;
|
||||
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
/**
|
||||
* To sync with DataHub via REST APIs.
|
||||
*
|
||||
* @Experimental
|
||||
* @see <a href="https://datahubproject.io/">https://datahubproject.io/</a>
|
||||
*/
|
||||
public class DataHubSyncTool extends AbstractSyncTool {
|
||||
|
||||
private final DataHubSyncConfig config;
|
||||
|
||||
public DataHubSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
|
||||
this(new DataHubSyncConfig(props), conf, fs);
|
||||
}
|
||||
|
||||
public DataHubSyncTool(DataHubSyncConfig config, Configuration conf, FileSystem fs) {
|
||||
super(config.getProps(), conf, fs);
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sync to a DataHub Dataset.
|
||||
*
|
||||
* @implNote DataHub sync is an experimental feature, which overwrites the DataHub Dataset's schema
|
||||
* and last commit time sync'ed upon every invocation.
|
||||
*/
|
||||
@Override
|
||||
public void syncHoodieTable() {
|
||||
try (DataHubSyncClient syncClient = new DataHubSyncClient(config, conf, fs)) {
|
||||
syncClient.updateTableDefinition(config.tableName);
|
||||
syncClient.updateLastCommitTimeSynced(config.tableName);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
final DataHubSyncConfig cfg = new DataHubSyncConfig();
|
||||
JCommander cmd = new JCommander(cfg, null, args);
|
||||
if (cfg.help || args.length == 0) {
|
||||
cmd.usage();
|
||||
System.exit(1);
|
||||
}
|
||||
FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
|
||||
new DataHubSyncTool(cfg, fs.getConf(), fs).syncHoodieTable();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sync.datahub;
|
||||
|
||||
import org.apache.hudi.sync.common.HoodieSyncException;
|
||||
|
||||
public class HoodieDataHubSyncException extends HoodieSyncException {
|
||||
|
||||
public HoodieDataHubSyncException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public HoodieDataHubSyncException(String message, Throwable t) {
|
||||
super(message, t);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sync.datahub.config;
|
||||
|
||||
import datahub.client.rest.RestEmitter;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* To supply a {@link RestEmitter} to sync with DataHub.
|
||||
* <p>
|
||||
* Implement this to have full control of the {@link RestEmitter}'s creation.
|
||||
*/
|
||||
public interface DataHubEmitterSupplier extends Supplier<RestEmitter> {
|
||||
}
|
||||
@@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sync.datahub.config;
|
||||
|
||||
import org.apache.hudi.common.config.ConfigProperty;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.sync.common.HoodieSyncConfig;
|
||||
|
||||
import com.beust.jcommander.Parameter;
|
||||
import datahub.client.rest.RestEmitter;
|
||||
|
||||
public class DataHubSyncConfig extends HoodieSyncConfig {
|
||||
|
||||
public static final ConfigProperty<String> META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS = ConfigProperty
|
||||
.key("hoodie.meta.sync.datahub.dataset.identifier.class")
|
||||
.defaultValue(HoodieDataHubDatasetIdentifier.class.getName())
|
||||
.withDocumentation("Pluggable class to help provide info to identify a DataHub Dataset.");
|
||||
|
||||
public static final ConfigProperty<String> META_SYNC_DATAHUB_EMITTER_SERVER = ConfigProperty
|
||||
.key("hoodie.meta.sync.datahub.emitter.server")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("Server URL of the DataHub instance.");
|
||||
|
||||
public static final ConfigProperty<String> META_SYNC_DATAHUB_EMITTER_TOKEN = ConfigProperty
|
||||
.key("hoodie.meta.sync.datahub.emitter.token")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("Auth token to connect to the DataHub instance.");
|
||||
|
||||
public static final ConfigProperty<String> META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS = ConfigProperty
|
||||
.key("hoodie.meta.sync.datahub.emitter.supplier.class")
|
||||
.noDefaultValue()
|
||||
.withDocumentation("Pluggable class to supply a DataHub REST emitter to connect to the DataHub instance. This overwrites other emitter configs.");
|
||||
|
||||
@Parameter(names = {"--identifier-class"}, description = "Pluggable class to help provide info to identify a DataHub Dataset.")
|
||||
public String identifierClass;
|
||||
|
||||
@Parameter(names = {"--emitter-server"}, description = "Server URL of the DataHub instance.")
|
||||
public String emitterServer;
|
||||
|
||||
@Parameter(names = {"--emitter-token"}, description = "Auth token to connect to the DataHub instance.")
|
||||
public String emitterToken;
|
||||
|
||||
@Parameter(names = {"--emitter-supplier-class"}, description = "Pluggable class to supply a DataHub REST emitter to connect to the DataHub instance. This overwrites other emitter configs.")
|
||||
public String emitterSupplierClass;
|
||||
|
||||
@Parameter(names = {"--help", "-h"}, help = true)
|
||||
public Boolean help = false;
|
||||
|
||||
public final HoodieDataHubDatasetIdentifier datasetIdentifier;
|
||||
|
||||
public DataHubSyncConfig() {
|
||||
this(new TypedProperties());
|
||||
}
|
||||
|
||||
public DataHubSyncConfig(TypedProperties props) {
|
||||
super(props);
|
||||
identifierClass = getStringOrDefault(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS);
|
||||
emitterServer = getStringOrDefault(META_SYNC_DATAHUB_EMITTER_SERVER, null);
|
||||
emitterToken = getStringOrDefault(META_SYNC_DATAHUB_EMITTER_TOKEN, null);
|
||||
emitterSupplierClass = getStringOrDefault(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS, null);
|
||||
|
||||
datasetIdentifier = (HoodieDataHubDatasetIdentifier) ReflectionUtils
|
||||
.loadClass(identifierClass, new Class<?>[] {TypedProperties.class}, props);
|
||||
}
|
||||
|
||||
public RestEmitter getRestEmitter() {
|
||||
if (emitterSupplierClass != null) {
|
||||
return ((DataHubEmitterSupplier) ReflectionUtils.loadClass(emitterSupplierClass)).get();
|
||||
} else if (emitterServer != null) {
|
||||
return RestEmitter.create(b -> b.server(emitterServer).token(emitterToken));
|
||||
} else {
|
||||
return RestEmitter.createWithDefaults();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sync.datahub.config;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
|
||||
import com.linkedin.common.FabricType;
|
||||
import com.linkedin.common.urn.DataPlatformUrn;
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
|
||||
/**
|
||||
* Construct and provide the default {@link DatasetUrn} to identify the Dataset on DataHub.
|
||||
* <p>
|
||||
* Extend this to customize the way of constructing {@link DatasetUrn}.
|
||||
*/
|
||||
public class HoodieDataHubDatasetIdentifier {
|
||||
|
||||
public static final String DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME = "hudi";
|
||||
|
||||
protected final TypedProperties props;
|
||||
|
||||
public HoodieDataHubDatasetIdentifier(TypedProperties props) {
|
||||
this.props = props;
|
||||
}
|
||||
|
||||
public DatasetUrn getDatasetUrn() {
|
||||
DataPlatformUrn dataPlatformUrn = new DataPlatformUrn(DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME);
|
||||
DataHubSyncConfig config = new DataHubSyncConfig(props);
|
||||
return new DatasetUrn(dataPlatformUrn, String.format("%s.%s", config.databaseName, config.tableName), FabricType.DEV);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.sync.datahub.config;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
|
||||
import com.linkedin.common.FabricType;
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
class TestDataHubSyncConfig {
|
||||
|
||||
@Test
|
||||
void testInstantiationWithProps() {
|
||||
TypedProperties props = new TypedProperties();
|
||||
props.setProperty(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS.key(), DummyIdentifier.class.getName());
|
||||
DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
|
||||
DatasetUrn datasetUrn = syncConfig.datasetIdentifier.getDatasetUrn();
|
||||
assertEquals("foo", datasetUrn.getPlatformEntity().getPlatformNameEntity());
|
||||
assertEquals("project.database.table", datasetUrn.getDatasetNameEntity());
|
||||
assertEquals(FabricType.PROD, datasetUrn.getOriginEntity());
|
||||
}
|
||||
|
||||
public static class DummyIdentifier extends HoodieDataHubDatasetIdentifier {
|
||||
|
||||
public DummyIdentifier(TypedProperties props) {
|
||||
super(props);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatasetUrn getDatasetUrn() {
|
||||
try {
|
||||
return DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:foo,project.database.table,PROD)");
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
###
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
###
|
||||
log4j.rootLogger=ERROR, CONSOLE
|
||||
log4j.logger.org.apache.hudi=ERROR
|
||||
|
||||
# CONSOLE is set to be a ConsoleAppender.
|
||||
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
|
||||
# CONSOLE uses PatternLayout.
|
||||
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
|
||||
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
|
||||
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
|
||||
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
|
||||
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
|
||||
@@ -0,0 +1,29 @@
|
||||
###
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
###
|
||||
log4j.rootLogger=WARN, CONSOLE
|
||||
log4j.logger.org.apache.hudi=INFO
|
||||
|
||||
# A1 is set to be a ConsoleAppender.
|
||||
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
|
||||
# A1 uses PatternLayout.
|
||||
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
||||
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
|
||||
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
|
||||
log4j.appender.CONSOLE.filter.a.LevelMin=WARN
|
||||
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
|
||||
@@ -31,8 +31,9 @@
|
||||
</properties>
|
||||
|
||||
<modules>
|
||||
<module>hudi-sync-common</module>
|
||||
<module>hudi-hive-sync</module>
|
||||
<module>hudi-dla-sync</module>
|
||||
<module>hudi-datahub-sync</module>
|
||||
<module>hudi-dla-sync</module>
|
||||
<module>hudi-hive-sync</module>
|
||||
<module>hudi-sync-common</module>
|
||||
</modules>
|
||||
</project>
|
||||
|
||||
148
packaging/hudi-datahub-sync-bundle/pom.xml
Normal file
148
packaging/hudi-datahub-sync-bundle/pom.xml
Normal file
@@ -0,0 +1,148 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. The ASF licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>hudi</artifactId>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<version>0.11.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>hudi-datahub-sync-bundle</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<properties>
|
||||
<checkstyle.skip>true</checkstyle.skip>
|
||||
<main.basedir>${project.parent.basedir}</main.basedir>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>${maven-shade-plugin.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<createSourcesJar>${shadeSources}</createSourcesJar>
|
||||
<dependencyReducedPomLocation>${project.build.directory}/dependency-reduced-pom.xml
|
||||
</dependencyReducedPomLocation>
|
||||
<transformers>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer">
|
||||
</transformer>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
|
||||
<addHeader>true</addHeader>
|
||||
</transformer>
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
|
||||
<resource>META-INF/LICENSE</resource>
|
||||
<file>target/classes/META-INF/LICENSE</file>
|
||||
</transformer>
|
||||
</transformers>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>org.apache.hudi:hudi-common</include>
|
||||
<include>org.apache.hudi:hudi-hadoop-mr</include>
|
||||
<include>org.apache.hudi:hudi-sync-common</include>
|
||||
<include>org.apache.hudi:hudi-datahub-sync</include>
|
||||
|
||||
<include>io.acryl:datahub-client</include>
|
||||
<include>com.beust:jcommander</include>
|
||||
<include>org.apache.httpcomponents:fluent-hc</include>
|
||||
<include>org.apache.httpcomponents:httpcore</include>
|
||||
<include>org.apache.httpcomponents:httpclient</include>
|
||||
<include>org.apache.httpcomponents:httpasyncclient</include>
|
||||
<include>org.apache.httpcomponents:httpcore-nio</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
<createDependencyReducedPom>false</createDependencyReducedPom>
|
||||
<filters>
|
||||
<filter>
|
||||
<artifact>*:*</artifact>
|
||||
<excludes>
|
||||
<exclude>META-INF/*.SF</exclude>
|
||||
<exclude>META-INF/*.DSA</exclude>
|
||||
<exclude>META-INF/*.RSA</exclude>
|
||||
<exclude>META-INF/services/javax.*</exclude>
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
<finalName>${project.artifactId}-${project.version}</finalName>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
<resources>
|
||||
<resource>
|
||||
<directory>src/main/resources</directory>
|
||||
</resource>
|
||||
<resource>
|
||||
<directory>src/test/resources</directory>
|
||||
</resource>
|
||||
</resources>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<!-- Hoodie -->
|
||||
<dependency>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<artifactId>hudi-common</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<artifactId>hudi-hadoop-mr-bundle</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<artifactId>hudi-datahub-sync</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Parquet -->
|
||||
<dependency>
|
||||
<groupId>org.apache.parquet</groupId>
|
||||
<artifactId>parquet-avro</artifactId>
|
||||
<version>${parquet.version}</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Avro -->
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
<version>${avro.version}</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.datahub.bundle;
|
||||
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* A simple main class to dump all classes loaded in current classpath.
|
||||
*
|
||||
* This is a workaround for generating sources and javadoc jars for packaging modules. The maven plugins for generating
|
||||
* javadoc and sources plugins do not generate corresponding jars if there are no source files.
|
||||
*
|
||||
* This class does not have anything to do with Hudi but is there to keep mvn javadocs/source plugin happy.
|
||||
*/
|
||||
public class Main {
|
||||
|
||||
public static void main(String[] args) {
|
||||
ReflectionUtils.getTopLevelClassesInClasspath(Main.class).forEach(System.out::println);
|
||||
}
|
||||
}
|
||||
1
pom.xml
1
pom.xml
@@ -45,6 +45,7 @@
|
||||
<module>hudi-utilities</module>
|
||||
<module>hudi-sync</module>
|
||||
<module>packaging/hudi-hadoop-mr-bundle</module>
|
||||
<module>packaging/hudi-datahub-sync-bundle</module>
|
||||
<module>packaging/hudi-hive-sync-bundle</module>
|
||||
<module>packaging/hudi-spark-bundle</module>
|
||||
<module>packaging/hudi-presto-bundle</module>
|
||||
|
||||
Reference in New Issue
Block a user