Merge pull request #10 from vinothchandar/master
Adding hoodie-utilities module
This commit is contained in:
@@ -17,12 +17,11 @@
|
|||||||
package com.uber.hoodie.cli
|
package com.uber.hoodie.cli
|
||||||
|
|
||||||
import com.uber.hoodie.avro.HoodieAvroWriteSupport
|
import com.uber.hoodie.avro.HoodieAvroWriteSupport
|
||||||
import com.uber.hoodie.common.BloomFilter
|
import com.uber.hoodie.common.{BloomFilter, HoodieJsonPayload}
|
||||||
import com.uber.hoodie.common.model.HoodieRecord
|
import com.uber.hoodie.common.model.HoodieRecord
|
||||||
import com.uber.hoodie.common.util.ParquetUtils
|
import com.uber.hoodie.common.util.ParquetUtils
|
||||||
import com.uber.hoodie.config.{HoodieIndexConfig, HoodieStorageConfig}
|
import com.uber.hoodie.config.{HoodieIndexConfig, HoodieStorageConfig}
|
||||||
import com.uber.hoodie.io.storage.{HoodieParquetConfig, HoodieParquetWriter}
|
import com.uber.hoodie.io.storage.{HoodieParquetConfig, HoodieParquetWriter}
|
||||||
import com.uber.hoodie.common.GenericHoodiePayload
|
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.avro.generic.IndexedRecord
|
import org.apache.avro.generic.IndexedRecord
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
@@ -44,7 +43,7 @@ object SparkHelpers {
|
|||||||
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble)
|
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble)
|
||||||
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
|
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
|
||||||
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf)
|
val parquetConfig: HoodieParquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_PAGE_SIZE_BYTES.toInt, HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES.toInt, fs.getConf)
|
||||||
val writer = new HoodieParquetWriter[GenericHoodiePayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema)
|
val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](commitTime, destinationFile, parquetConfig, schema)
|
||||||
for (rec <- sourceRecords) {
|
for (rec <- sourceRecords) {
|
||||||
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
|
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
|
||||||
if (!keysToSkip.contains(key)) {
|
if (!keysToSkip.contains(key)) {
|
||||||
|
|||||||
@@ -18,9 +18,13 @@ package com.uber.hoodie.common;
|
|||||||
|
|
||||||
import com.uber.hoodie.avro.MercifulJsonConverter;
|
import com.uber.hoodie.avro.MercifulJsonConverter;
|
||||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||||
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
import org.apache.avro.Schema;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.codehaus.jackson.JsonNode;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
@@ -30,16 +34,16 @@ import java.util.zip.Deflater;
|
|||||||
import java.util.zip.DeflaterOutputStream;
|
import java.util.zip.DeflaterOutputStream;
|
||||||
import java.util.zip.InflaterInputStream;
|
import java.util.zip.InflaterInputStream;
|
||||||
|
|
||||||
public class GenericHoodiePayload implements HoodieRecordPayload<GenericHoodiePayload> {
|
public class HoodieJsonPayload implements HoodieRecordPayload<HoodieJsonPayload> {
|
||||||
private byte[] jsonDataCompressed;
|
private byte[] jsonDataCompressed;
|
||||||
private int dataSize;
|
private int dataSize;
|
||||||
|
|
||||||
public GenericHoodiePayload(String json) throws IOException {
|
public HoodieJsonPayload(String json) throws IOException {
|
||||||
this.jsonDataCompressed = compressData(json);
|
this.jsonDataCompressed = compressData(json);
|
||||||
this.dataSize = json.length();
|
this.dataSize = json.length();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public GenericHoodiePayload preCombine(GenericHoodiePayload another) {
|
@Override public HoodieJsonPayload preCombine(HoodieJsonPayload another) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -85,4 +89,19 @@ public class GenericHoodiePayload implements HoodieRecordPayload<GenericHoodiePa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getFieldFromJsonOrFail(String field) throws IOException {
|
||||||
|
JsonNode node = new ObjectMapper().readTree(getJsonData());
|
||||||
|
if(!node.has(field)) {
|
||||||
|
throw new HoodieException("Field :" + field + " not found in payload => " + node.toString());
|
||||||
|
}
|
||||||
|
return node.get(field).getTextValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRowKey(String keyColumnField) throws IOException {
|
||||||
|
return getFieldFromJsonOrFail(keyColumnField);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPartitionPath(String partitionPathField) throws IOException {
|
||||||
|
return getFieldFromJsonOrFail(partitionPathField);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
211
hoodie-utilities/pom.xml
Normal file
211
hoodie-utilities/pom.xml
Normal file
@@ -0,0 +1,211 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!--
|
||||||
|
~ Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
~
|
||||||
|
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
~ you may not use this file except in compliance with the License.
|
||||||
|
~ You may obtain a copy of the License at
|
||||||
|
~
|
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~
|
||||||
|
~ Unless required by applicable law or agreed to in writing, software
|
||||||
|
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
~ See the License for the specific language governing permissions and
|
||||||
|
~ limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>hoodie</artifactId>
|
||||||
|
<groupId>com.uber.hoodie</groupId>
|
||||||
|
<version>0.2.5-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>hoodie-utilities</artifactId>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.codehaus.mojo</groupId>
|
||||||
|
<artifactId>cobertura-maven-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<source>1.7</source>
|
||||||
|
<target>1.7</target>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-assembly-plugin</artifactId>
|
||||||
|
<version>2.4.1</version>
|
||||||
|
<configuration>
|
||||||
|
<descriptors>
|
||||||
|
<descriptor>src/assembly/src.xml</descriptor>
|
||||||
|
</descriptors>
|
||||||
|
<archive>
|
||||||
|
<manifest>
|
||||||
|
<mainClass>com.uber.hoodie.utilities.HoodieDeltaStreamer</mainClass>
|
||||||
|
</manifest>
|
||||||
|
</archive>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>make-assembly</id>
|
||||||
|
<!-- bind to the packaging phase -->
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>single</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
|
||||||
|
</plugins>
|
||||||
|
|
||||||
|
<resources>
|
||||||
|
<resource>
|
||||||
|
<directory>src/main/resources</directory>
|
||||||
|
</resource>
|
||||||
|
<resource>
|
||||||
|
<directory>src/test/resources</directory>
|
||||||
|
</resource>
|
||||||
|
</resources>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.uber.hoodie</groupId>
|
||||||
|
<artifactId>hoodie-common</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.uber.hoodie</groupId>
|
||||||
|
<artifactId>hoodie-common</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.uber.hoodie</groupId>
|
||||||
|
<artifactId>hoodie-hive</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>javax.servlet</groupId>
|
||||||
|
<artifactId>servlet-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.uber.hoodie</groupId>
|
||||||
|
<artifactId>hoodie-client</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.uber.hoodie</groupId>
|
||||||
|
<artifactId>hoodie-client</artifactId>
|
||||||
|
<version>${project.version}</version>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hive</groupId>
|
||||||
|
<artifactId>hive-jdbc</artifactId>
|
||||||
|
<version>${hive.version}-cdh${cdh.version}</version>
|
||||||
|
<classifier>standalone</classifier>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>javax.servlet</groupId>
|
||||||
|
<artifactId>servlet-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>commons-dbcp</groupId>
|
||||||
|
<artifactId>commons-dbcp</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.httpcomponents</groupId>
|
||||||
|
<artifactId>httpcore</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>log4j</groupId>
|
||||||
|
<artifactId>log4j</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-mapreduce-client-common</artifactId>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>javax.servlet</groupId>
|
||||||
|
<artifactId>servlet-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-client</artifactId>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>javax.servlet</groupId>
|
||||||
|
<artifactId>servlet-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-core_2.10</artifactId>
|
||||||
|
<scope>provided</scope>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>javax.servlet</groupId>
|
||||||
|
<artifactId>servlet-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Used for SQL templating -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.antlr</groupId>
|
||||||
|
<artifactId>stringtemplate</artifactId>
|
||||||
|
<version>4.0.2</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.beust</groupId>
|
||||||
|
<artifactId>jcommander</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.mockito</groupId>
|
||||||
|
<artifactId>mockito-all</artifactId>
|
||||||
|
<version>1.10.19</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
||||||
50
hoodie-utilities/src/assembly/src.xml
Normal file
50
hoodie-utilities/src/assembly/src.xml
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
<!--
|
||||||
|
~ Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
~
|
||||||
|
~ Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
~ you may not use this file except in compliance with the License.
|
||||||
|
~ You may obtain a copy of the License at
|
||||||
|
~
|
||||||
|
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
~
|
||||||
|
~ Unless required by applicable law or agreed to in writing, software
|
||||||
|
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
~ See the License for the specific language governing permissions and
|
||||||
|
~ limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
|
||||||
|
<id>bin</id>
|
||||||
|
<formats>
|
||||||
|
<format>jar</format>
|
||||||
|
</formats>
|
||||||
|
|
||||||
|
<includeBaseDirectory>false</includeBaseDirectory>
|
||||||
|
<dependencySets>
|
||||||
|
<dependencySet>
|
||||||
|
<outputDirectory>/</outputDirectory>
|
||||||
|
<unpack>true</unpack>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
<excludes>
|
||||||
|
<exclude>junit:junit</exclude>
|
||||||
|
<exclude>com.google.code.findbugs:*</exclude>
|
||||||
|
<exclude>org.apache.hadoop:*</exclude>
|
||||||
|
<exclude>org.apache.hbase:*</exclude>
|
||||||
|
<!--<exclude>log4j:*</exclude>-->
|
||||||
|
<!--<exclude>org.slf4j:*</exclude>-->
|
||||||
|
<!--<exclude>commons-dbcp:*</exclude>-->
|
||||||
|
<!--<exclude>org.apache.httpcomponents:*</exclude>-->
|
||||||
|
</excludes>
|
||||||
|
<unpackOptions>
|
||||||
|
<!--<excludes>-->
|
||||||
|
<!--<exclude>-->
|
||||||
|
<!--**/slf4j/**-->
|
||||||
|
<!--</exclude>-->
|
||||||
|
<!--</excludes>-->
|
||||||
|
</unpackOptions>
|
||||||
|
</dependencySet>
|
||||||
|
</dependencySets>
|
||||||
|
</assembly>
|
||||||
@@ -0,0 +1,337 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.utilities;
|
||||||
|
|
||||||
|
import com.beust.jcommander.JCommander;
|
||||||
|
import com.beust.jcommander.Parameter;
|
||||||
|
import com.uber.hoodie.common.model.HoodieTableMetadata;
|
||||||
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
|
import com.uber.hoodie.utilities.exception.HoodieIncrementalPullException;
|
||||||
|
import com.uber.hoodie.utilities.exception.HoodieIncrementalPullSQLException;
|
||||||
|
|
||||||
|
import org.apache.commons.dbcp.BasicDataSource;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.stringtemplate.v4.ST;
|
||||||
|
|
||||||
|
import javax.sql.DataSource;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Scanner;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility to pull data after a given commit, based on the supplied HiveQL & save the delta as another hive temporary table.
|
||||||
|
*
|
||||||
|
* Current Limitations:
|
||||||
|
*
|
||||||
|
* - Only the source table can be incrementally pulled (usually the largest table)
|
||||||
|
* - The incrementally pulled table can't be referenced more than once.
|
||||||
|
*/
|
||||||
|
public class HiveIncrementalPuller {
|
||||||
|
|
||||||
|
private static Logger log = LogManager.getLogger(HiveIncrementalPuller.class);
|
||||||
|
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
|
||||||
|
|
||||||
|
public static class Config implements Serializable {
|
||||||
|
@Parameter(names = {"--hiveUrl"}) public String hiveJDBCUrl =
|
||||||
|
"jdbc:hive2://localhost:10014/;transportMode=http;httpPath=hs2";
|
||||||
|
@Parameter(names = {"--hiveUser"}) public String hiveUsername = "hive";
|
||||||
|
@Parameter(names = {"--hivePass"}) public String hivePassword = "";
|
||||||
|
@Parameter(names = {"--queue"}) public String yarnQueueName = "hadoop-queue";
|
||||||
|
@Parameter(names = {"--tmp"}) public String hoodieTmpDir = "/app/hoodie/intermediate";
|
||||||
|
@Parameter(names = {"--extractSQLFile"}, required = true) public String incrementalSQLFile;
|
||||||
|
@Parameter(names = {"--sourceDb"}, required = true) public String sourceDb;
|
||||||
|
@Parameter(names = {"--sourceTable"}, required = true) public String sourceTable;
|
||||||
|
@Parameter(names = {"--targetDb"}) public String targetDb;
|
||||||
|
@Parameter(names = {"--targetTable"}, required = true) public String targetTable;
|
||||||
|
@Parameter(names = {"--tmpdb"}) public String tmpDb = "tmp";
|
||||||
|
@Parameter(names = {"--fromCommitTime"}) public String fromCommitTime;
|
||||||
|
@Parameter(names = {"--maxCommits"}) public int maxCommits = 3;
|
||||||
|
@Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false;
|
||||||
|
@Parameter(names = {"--storageFormat"}) public String tempTableStorageFormat = "PARQUET";
|
||||||
|
}
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
Class.forName(driverName);
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new IllegalStateException("Could not find " + driverName + " in classpath. ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Connection connection;
|
||||||
|
protected final Config config;
|
||||||
|
private final ST incrementalPullSQLtemplate;
|
||||||
|
|
||||||
|
public HiveIncrementalPuller(Config config) throws IOException {
|
||||||
|
this.config = config;
|
||||||
|
validateConfig(config);
|
||||||
|
String templateContent = IOUtils.toString(this.getClass().getResourceAsStream("IncrementalPull.sqltemplate"));
|
||||||
|
incrementalPullSQLtemplate = new ST(templateContent);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateConfig(Config config) {
|
||||||
|
if(config.maxCommits == -1) {
|
||||||
|
config.maxCommits = Integer.MAX_VALUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void saveDelta() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
Statement stmt = null;
|
||||||
|
try {
|
||||||
|
if (config.fromCommitTime == null) {
|
||||||
|
config.fromCommitTime = inferCommitTime(fs);
|
||||||
|
log.info("FromCommitTime inferred as " + config.fromCommitTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("FromCommitTime - " + config.fromCommitTime);
|
||||||
|
String sourceTableLocation = getTableLocation(config.sourceDb, config.sourceTable);
|
||||||
|
String lastCommitTime = getLastCommitTimePulled(fs, sourceTableLocation);
|
||||||
|
if (lastCommitTime == null) {
|
||||||
|
log.info("Nothing to pull. However we will continue to create a empty table");
|
||||||
|
lastCommitTime = config.fromCommitTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
Connection conn = getConnection();
|
||||||
|
stmt = conn.createStatement();
|
||||||
|
// drop the temp table if exists
|
||||||
|
String tempDbTable = config.tmpDb + "." + config.targetTable + "__" + config.sourceTable;
|
||||||
|
String tempDbTablePath = config.hoodieTmpDir + "/" + config.targetTable + "__" + config.sourceTable + "/" + lastCommitTime;
|
||||||
|
executeStatement("drop table " + tempDbTable, stmt);
|
||||||
|
deleteHDFSPath(fs, tempDbTablePath);
|
||||||
|
if (!ensureTempPathExists(fs, lastCommitTime)) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Could not create target path at " + new Path(config.hoodieTmpDir,
|
||||||
|
config.targetTable + "/" + lastCommitTime));
|
||||||
|
}
|
||||||
|
|
||||||
|
initHiveBeelineProperties(stmt);
|
||||||
|
executeIncrementalSQL(tempDbTable, tempDbTablePath, stmt);
|
||||||
|
log.info("Finished HoodieReader execution");
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Exception when executing SQL", e);
|
||||||
|
throw new IOException("Could not scan " + config.sourceTable + " incrementally", e);
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if (stmt != null)
|
||||||
|
stmt.close();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Could not close the resultset opened ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeIncrementalSQL(String tempDbTable, String tempDbTablePath, Statement stmt)
|
||||||
|
throws FileNotFoundException, SQLException {
|
||||||
|
incrementalPullSQLtemplate.add("tempDbTable", tempDbTable);
|
||||||
|
incrementalPullSQLtemplate.add("tempDbTablePath", tempDbTablePath);
|
||||||
|
|
||||||
|
String storedAsClause = getStoredAsClause();
|
||||||
|
|
||||||
|
incrementalPullSQLtemplate.add("storedAsClause", storedAsClause);
|
||||||
|
String incrementalSQL =
|
||||||
|
new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
|
||||||
|
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
|
||||||
|
log.info("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
|
||||||
|
+ ", which means its pulling from a different table. Fencing this from happening.");
|
||||||
|
throw new HoodieIncrementalPullSQLException(
|
||||||
|
"Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable);
|
||||||
|
}
|
||||||
|
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) {
|
||||||
|
log.info("Incremental SQL : " + incrementalSQL
|
||||||
|
+ " does not contain `_hoodie_commit_time` > '%s'. Please add this clause for incremental to work properly.");
|
||||||
|
throw new HoodieIncrementalPullSQLException(
|
||||||
|
"Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which means its not pulling incrementally");
|
||||||
|
}
|
||||||
|
|
||||||
|
incrementalPullSQLtemplate
|
||||||
|
.add("incrementalSQL", String.format(incrementalSQL, config.fromCommitTime));
|
||||||
|
String sql = incrementalPullSQLtemplate.render();
|
||||||
|
// Check if the SQL is pulling from the right database
|
||||||
|
executeStatement(sql, stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getStoredAsClause() {
|
||||||
|
if(config.tempTableStorageFormat.equalsIgnoreCase("json")) {
|
||||||
|
// Special case for json
|
||||||
|
// default json serde does not support having same key even if its under multiple depths
|
||||||
|
return "ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE";
|
||||||
|
}
|
||||||
|
return "STORED AS " + config.tempTableStorageFormat;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initHiveBeelineProperties(Statement stmt) throws SQLException {
|
||||||
|
log.info("Setting up Hive JDBC Session with properties");
|
||||||
|
// set the queue
|
||||||
|
executeStatement("set mapred.job.queue.name=" + config.yarnQueueName, stmt);
|
||||||
|
// Set the inputformat to HoodieCombineHiveInputFormat
|
||||||
|
executeStatement("set hive.input.format=com.uber.hoodie.hadoop.hive.HoodieCombineHiveInputFormat", stmt);
|
||||||
|
// Allow queries without partition predicate
|
||||||
|
executeStatement("set hive.strict.checks.large.query=false", stmt);
|
||||||
|
// Dont gather stats for the table created
|
||||||
|
executeStatement("set hive.stats.autogather=false", stmt);
|
||||||
|
// Set the hoodie modie
|
||||||
|
executeStatement("set hoodie." + config.sourceTable + ".consume.mode=INCREMENTAL", stmt);
|
||||||
|
// Set the from commit time
|
||||||
|
executeStatement("set hoodie." + config.sourceTable + ".consume.start.timestamp="
|
||||||
|
+ config.fromCommitTime, stmt);
|
||||||
|
// Set number of commits to pull
|
||||||
|
executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + String
|
||||||
|
.valueOf(config.maxCommits), stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException {
|
||||||
|
log.info("Deleting path " + path);
|
||||||
|
return fs.delete(new Path(path), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeStatement(String sql, Statement stmt) throws SQLException {
|
||||||
|
log.info("Executing: " + sql);
|
||||||
|
stmt.execute(sql);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String inferCommitTime(FileSystem fs) throws SQLException, IOException {
|
||||||
|
log.info("FromCommitTime not specified. Trying to infer it from Hoodie dataset "
|
||||||
|
+ config.targetDb + "." + config.targetTable);
|
||||||
|
String targetDataLocation = getTableLocation(config.targetDb, config.targetTable);
|
||||||
|
return scanForCommitTime(fs, targetDataLocation);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getTableLocation(String db, String table) throws SQLException {
|
||||||
|
ResultSet resultSet = null;
|
||||||
|
Statement stmt = null;
|
||||||
|
try {
|
||||||
|
Connection conn = getConnection();
|
||||||
|
stmt = conn.createStatement();
|
||||||
|
resultSet = stmt.executeQuery("describe formatted `" + db + "." + table + "`");
|
||||||
|
while (resultSet.next()) {
|
||||||
|
if (resultSet.getString(1).trim().equals("Location:")) {
|
||||||
|
log.info("Inferred table location for " + db + "." + table + " as " + resultSet
|
||||||
|
.getString(2));
|
||||||
|
return resultSet.getString(2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (SQLException e) {
|
||||||
|
throw new HoodieIncrementalPullException(
|
||||||
|
"Failed to get data location for table " + db + "." + table, e);
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if (stmt != null)
|
||||||
|
stmt.close();
|
||||||
|
if (resultSet != null)
|
||||||
|
resultSet.close();
|
||||||
|
} catch (SQLException e) {
|
||||||
|
log.error("Could not close the resultset opened ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String scanForCommitTime(FileSystem fs, String targetDataPath) throws IOException {
|
||||||
|
if(targetDataPath == null) {
|
||||||
|
throw new IllegalArgumentException("Please specify either --fromCommitTime or --targetDataPath");
|
||||||
|
}
|
||||||
|
if(!fs.exists(new Path(targetDataPath)) || !fs.exists(new Path(targetDataPath + "/.hoodie"))) {
|
||||||
|
return "0";
|
||||||
|
}
|
||||||
|
HoodieTableMetadata metadata = new HoodieTableMetadata(fs, targetDataPath);
|
||||||
|
String lastCommit = metadata.getAllCommits().lastCommit();
|
||||||
|
return lastCommit == null ? "0" : lastCommit;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime)
|
||||||
|
throws IOException {
|
||||||
|
Path targetBaseDirPath = new Path(config.hoodieTmpDir, config.targetTable + "__" + config.sourceTable);
|
||||||
|
if(!fs.exists(targetBaseDirPath)) {
|
||||||
|
log.info("Creating " + targetBaseDirPath + " with permission drwxrwxrwx");
|
||||||
|
boolean result = FileSystem.mkdirs(fs, targetBaseDirPath,
|
||||||
|
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
|
||||||
|
if (!result) {
|
||||||
|
throw new HoodieException(
|
||||||
|
"Could not create " + targetBaseDirPath + " with the required permissions");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Path targetPath = new Path(targetBaseDirPath, lastCommitTime);
|
||||||
|
if(fs.exists(targetPath)) {
|
||||||
|
boolean result = fs.delete(targetPath, true);
|
||||||
|
if (!result) {
|
||||||
|
throw new HoodieException(
|
||||||
|
"Could not delete existing " + targetPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.info("Creating " + targetPath + " with permission drwxrwxrwx");
|
||||||
|
return FileSystem.mkdirs(fs, targetBaseDirPath,
|
||||||
|
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) throws IOException {
|
||||||
|
HoodieTableMetadata metadata = new HoodieTableMetadata(fs, sourceTableLocation);
|
||||||
|
List<String> commitsToSync =
|
||||||
|
metadata.getAllCommits().findCommitsAfter(config.fromCommitTime, config.maxCommits);
|
||||||
|
if (commitsToSync.isEmpty()) {
|
||||||
|
log.warn("Nothing to sync. All commits in " + config.sourceTable + " are " + metadata
|
||||||
|
.getAllCommits().getCommitList() + " and from commit time is " + config.fromCommitTime);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
log.info("Syncing commits " + commitsToSync);
|
||||||
|
return commitsToSync.get(commitsToSync.size() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Connection getConnection() throws SQLException {
|
||||||
|
if (connection == null) {
|
||||||
|
DataSource ds = getDatasource();
|
||||||
|
log.info("Getting Hive Connection from Datasource " + ds);
|
||||||
|
this.connection = ds.getConnection();
|
||||||
|
}
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
private DataSource getDatasource() {
|
||||||
|
BasicDataSource ds = new BasicDataSource();
|
||||||
|
ds.setDriverClassName(driverName);
|
||||||
|
ds.setUrl(config.hiveJDBCUrl);
|
||||||
|
ds.setUsername(config.hiveUsername);
|
||||||
|
ds.setPassword(config.hivePassword);
|
||||||
|
return ds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
final Config cfg = new Config();
|
||||||
|
JCommander cmd = new JCommander(cfg, args);
|
||||||
|
if (cfg.help || args.length == 0) {
|
||||||
|
cmd.usage();
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
new HiveIncrementalPuller(cfg).saveDelta();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,230 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.utilities;
|
||||||
|
|
||||||
|
import com.google.common.io.Files;
|
||||||
|
|
||||||
|
import com.beust.jcommander.JCommander;
|
||||||
|
import com.beust.jcommander.Parameter;
|
||||||
|
import com.uber.hoodie.HoodieWriteClient;
|
||||||
|
import com.uber.hoodie.common.HoodieJsonPayload;
|
||||||
|
import com.uber.hoodie.common.model.HoodieCommits;
|
||||||
|
import com.uber.hoodie.common.model.HoodieKey;
|
||||||
|
import com.uber.hoodie.common.model.HoodieRecord;
|
||||||
|
import com.uber.hoodie.common.model.HoodieTableMetadata;
|
||||||
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
|
import com.uber.hoodie.config.HoodieIndexConfig;
|
||||||
|
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||||
|
import com.uber.hoodie.index.HoodieIndex;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.Function;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target dataset.
|
||||||
|
* <p/>
|
||||||
|
* Does not maintain any state, queries at runtime to see how far behind the target dataset is from
|
||||||
|
* the source dataset. This can be overriden to force sync from a timestamp.
|
||||||
|
*/
|
||||||
|
public class HoodieDeltaStreamer implements Serializable {
|
||||||
|
private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
|
||||||
|
private final Config cfg;
|
||||||
|
|
||||||
|
public HoodieDeltaStreamer(Config cfg) throws IOException {
|
||||||
|
this.cfg = cfg;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sync() throws Exception {
|
||||||
|
JavaSparkContext sc = getSparkContext(cfg);
|
||||||
|
FileSystem fs = FSUtils.getFs();
|
||||||
|
HoodieTableMetadata targetHoodieMetadata =
|
||||||
|
new HoodieTableMetadata(fs, cfg.targetPath, cfg.targetTableName);
|
||||||
|
String lastCommitPulled = findLastCommitPulled(fs, cfg.dataPath);
|
||||||
|
log.info("Last commit pulled on the source dataset is " + lastCommitPulled);
|
||||||
|
if (!targetHoodieMetadata.getAllCommits().isEmpty() && HoodieCommits
|
||||||
|
.isCommit1After(targetHoodieMetadata.getAllCommits().lastCommit(), lastCommitPulled)) {
|
||||||
|
// this should never be the case
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Last commit pulled from source table " + lastCommitPulled
|
||||||
|
+ " is before the last commit in the target table " + targetHoodieMetadata
|
||||||
|
.getAllCommits().lastCommit());
|
||||||
|
}
|
||||||
|
if (!cfg.override && targetHoodieMetadata.getAllCommits().contains(lastCommitPulled)) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Target Table already has the commit " + lastCommitPulled
|
||||||
|
+ ". Not overriding as cfg.override is false");
|
||||||
|
}
|
||||||
|
syncTill(lastCommitPulled, targetHoodieMetadata, sc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String findLastCommitPulled(FileSystem fs, String dataPath) throws IOException {
|
||||||
|
FileStatus[] commitTimePaths = fs.listStatus(new Path(dataPath));
|
||||||
|
List<String> commitTimes = new ArrayList<>(commitTimePaths.length);
|
||||||
|
for (FileStatus commitTimePath : commitTimePaths) {
|
||||||
|
String[] splits = commitTimePath.getPath().toString().split("/");
|
||||||
|
commitTimes.add(splits[splits.length - 1]);
|
||||||
|
}
|
||||||
|
Collections.sort(commitTimes);
|
||||||
|
Collections.reverse(commitTimes);
|
||||||
|
log.info("Retrieved commit times " + commitTimes);
|
||||||
|
return commitTimes.get(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncTill(String lastCommitPulled, HoodieTableMetadata target,
|
||||||
|
JavaSparkContext sc) throws Exception {
|
||||||
|
// Step 1 : Scan incrementally and get the input records as a RDD of source format
|
||||||
|
String dataPath = cfg.dataPath + "/" + lastCommitPulled;
|
||||||
|
log.info("Using data path " + dataPath);
|
||||||
|
JavaRDD<String> rdd = sc.textFile(dataPath);
|
||||||
|
|
||||||
|
// Step 2 : Create the hoodie records
|
||||||
|
JavaRDD<HoodieRecord<HoodieJsonPayload>> records =
|
||||||
|
rdd.map(new Function<String, HoodieRecord<HoodieJsonPayload>>() {
|
||||||
|
@Override
|
||||||
|
public HoodieRecord<HoodieJsonPayload> call(String json)
|
||||||
|
throws Exception {
|
||||||
|
HoodieJsonPayload payload = new HoodieJsonPayload(json);
|
||||||
|
HoodieKey key = new HoodieKey(payload.getRowKey(cfg.keyColumnField),
|
||||||
|
payload.getPartitionPath(cfg.partitionPathField));
|
||||||
|
return new HoodieRecord<>(key, payload);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Step 3: Use Hoodie Client to upsert/bulk load the records into target hoodie dataset
|
||||||
|
HoodieWriteConfig hoodieCfg = getHoodieClientConfig(target);
|
||||||
|
HoodieWriteClient<HoodieJsonPayload> client = new HoodieWriteClient<>(sc, hoodieCfg);
|
||||||
|
log.info("Rollback started " + lastCommitPulled);
|
||||||
|
client.rollback(lastCommitPulled);
|
||||||
|
|
||||||
|
client.startCommitWithTime(lastCommitPulled);
|
||||||
|
log.info("Starting commit " + lastCommitPulled);
|
||||||
|
if (cfg.upsert) {
|
||||||
|
log.info("Upserting records");
|
||||||
|
client.upsert(records, lastCommitPulled);
|
||||||
|
} else {
|
||||||
|
log.info("Inserting records");
|
||||||
|
// insert the records.
|
||||||
|
client.insert(records, lastCommitPulled);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO - revisit this - can we clean this up.
|
||||||
|
// determine if this write should be committed.
|
||||||
|
// final Accumulator<Integer> errorCount = sc.intAccumulator(0);
|
||||||
|
// final Accumulator<Integer> totalCount = sc.intAccumulator(0);
|
||||||
|
// statuses.foreach(new VoidFunction<WriteStatus>() {
|
||||||
|
// @Override public void call(WriteStatus status) throws Exception {
|
||||||
|
// if (status.hasGlobalError()) {
|
||||||
|
// log.error(status.getGlobalError());
|
||||||
|
// errorCount.add(1);
|
||||||
|
// }
|
||||||
|
// if (status.hasErrors()) {
|
||||||
|
// log.info(status);
|
||||||
|
// for (Map.Entry<HoodieKey, Throwable> keyErrEntry : status.getErrors()
|
||||||
|
// .entrySet()) {
|
||||||
|
// log.error(String.format("\t %s error %s", keyErrEntry.getKey(),
|
||||||
|
// keyErrEntry.getValue().getMessage()), keyErrEntry.getValue());
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// errorCount.add(status.getErrors().size());
|
||||||
|
// totalCount.add(status.getWrittenRecords().size());
|
||||||
|
// }
|
||||||
|
// })
|
||||||
|
}
|
||||||
|
|
||||||
|
private HoodieWriteConfig getHoodieClientConfig(HoodieTableMetadata metadata)
|
||||||
|
throws Exception {
|
||||||
|
final String schemaStr = Files.toString(new File(cfg.schemaFile), Charset.forName("UTF-8"));
|
||||||
|
return HoodieWriteConfig.newBuilder().withPath(metadata.getBasePath())
|
||||||
|
.withSchema(schemaStr)
|
||||||
|
.withParallelism(cfg.groupByParallelism, cfg.groupByParallelism)
|
||||||
|
.forTable(metadata.getTableName()).withIndexConfig(
|
||||||
|
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private JavaSparkContext getSparkContext(Config cfg) {
|
||||||
|
SparkConf sparkConf = new SparkConf().setAppName("hoodie-delta-streamer-" + cfg.targetTableName);
|
||||||
|
sparkConf.setMaster(cfg.sparkMaster);
|
||||||
|
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
|
sparkConf.set("spark.driver.maxResultSize", "2g");
|
||||||
|
|
||||||
|
if (cfg.sparkMaster.startsWith("yarn")) {
|
||||||
|
sparkConf.set("spark.eventLog.overwrite", "true");
|
||||||
|
sparkConf.set("spark.eventLog.enabled", "true");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure hadoop conf
|
||||||
|
sparkConf.set("spark.hadoop.mapred.output.compress", "true");
|
||||||
|
sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
|
||||||
|
sparkConf.set("spark.hadoop.mapred.output.compression.codec",
|
||||||
|
"org.apache.hadoop.io.compress.GzipCodec");
|
||||||
|
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
|
||||||
|
|
||||||
|
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
|
||||||
|
return new JavaSparkContext(sparkConf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Config implements Serializable {
|
||||||
|
@Parameter(names = {"--dataPath"})
|
||||||
|
public String dataPath;
|
||||||
|
@Parameter(names = {"--parallelism"})
|
||||||
|
public int groupByParallelism = 10000;
|
||||||
|
@Parameter(names = {"--upsert"})
|
||||||
|
public boolean upsert = false;
|
||||||
|
@Parameter(names = {"--master"})
|
||||||
|
public String sparkMaster = "yarn-client";
|
||||||
|
@Parameter(names = {"--targetPath"}, required = true)
|
||||||
|
public String targetPath;
|
||||||
|
@Parameter(names = {"--targetTable"})
|
||||||
|
public String targetTableName;
|
||||||
|
@Parameter(names = {"--keyColumn"})
|
||||||
|
public String keyColumnField = "uuid";
|
||||||
|
@Parameter(names = {"--partitionPathField"})
|
||||||
|
public String partitionPathField = "request_at";
|
||||||
|
@Parameter(names = {"--schemaFile"})
|
||||||
|
public String schemaFile;
|
||||||
|
@Parameter(names = {"--override"})
|
||||||
|
public boolean override = false;
|
||||||
|
@Parameter(names = {"--help", "-h"}, help = true)
|
||||||
|
public Boolean help = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
final Config cfg = new Config();
|
||||||
|
JCommander cmd = new JCommander(cfg, args);
|
||||||
|
if (cfg.help || args.length == 0) {
|
||||||
|
cmd.usage();
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
|
new HoodieDeltaStreamer(cfg).sync();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,159 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.utilities;
|
||||||
|
|
||||||
|
import com.beust.jcommander.JCommander;
|
||||||
|
import com.beust.jcommander.Parameter;
|
||||||
|
|
||||||
|
import com.uber.hoodie.common.model.HoodieCommits;
|
||||||
|
import com.uber.hoodie.common.model.HoodieTableMetadata;
|
||||||
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
|
import org.apache.spark.api.java.function.VoidFunction;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hoodie snapshot copy job which copies latest files from all partitions to another place, for snapshot backup.
|
||||||
|
*/
|
||||||
|
public class HoodieSnapshotCopier implements Serializable {
|
||||||
|
private static Logger logger = LogManager.getLogger(HoodieSnapshotCopier.class);
|
||||||
|
|
||||||
|
static class Config implements Serializable {
|
||||||
|
@Parameter(names = {"--base-path", "-bp"}, description = "Hoodie table base path", required = true)
|
||||||
|
String basePath = null;
|
||||||
|
|
||||||
|
@Parameter(names = {"--output-path", "-op"}, description = "The snapshot output path", required = true)
|
||||||
|
String outputPath = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir) throws IOException {
|
||||||
|
FileSystem fs = FSUtils.getFs();
|
||||||
|
final HoodieTableMetadata tableMetadata = new HoodieTableMetadata(fs, baseDir);
|
||||||
|
|
||||||
|
// Get the latest commit
|
||||||
|
final String latestCommit = tableMetadata.getAllCommits().lastCommit();
|
||||||
|
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommit));
|
||||||
|
|
||||||
|
List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir);
|
||||||
|
if (partitions.size() > 0) {
|
||||||
|
logger.info(String.format("The job needs to copy %d partitions.", partitions.size()));
|
||||||
|
|
||||||
|
// Make sure the output directory is empty
|
||||||
|
Path outputPath = new Path(outputDir);
|
||||||
|
if (fs.exists(outputPath)) {
|
||||||
|
logger.warn(String.format("The output path %s already exists, deleting", outputPath));
|
||||||
|
fs.delete(new Path(outputDir), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
jsc.parallelize(partitions, partitions.size()).flatMap(new FlatMapFunction<String, Tuple2<String, String>>() {
|
||||||
|
@Override
|
||||||
|
public Iterable<Tuple2<String, String>> call(String partition) throws Exception {
|
||||||
|
// Only take latest version files <= latestCommit.
|
||||||
|
FileSystem fs = FSUtils.getFs();
|
||||||
|
List<Tuple2<String, String>> filePaths = new ArrayList<>();
|
||||||
|
for (FileStatus fileStatus : tableMetadata.getLatestVersionInPartition(fs, partition, latestCommit)) {
|
||||||
|
filePaths.add(new Tuple2<>(partition, fileStatus.getPath().toString()));
|
||||||
|
}
|
||||||
|
return filePaths;
|
||||||
|
}
|
||||||
|
}).foreach(new VoidFunction<Tuple2<String, String>>() {
|
||||||
|
@Override
|
||||||
|
public void call(Tuple2<String, String> tuple) throws Exception {
|
||||||
|
String partition = tuple._1();
|
||||||
|
Path sourceFilePath = new Path(tuple._2());
|
||||||
|
Path toPartitionPath = new Path(outputDir, partition);
|
||||||
|
FileSystem fs = FSUtils.getFs();
|
||||||
|
|
||||||
|
if (!fs.exists(toPartitionPath)) {
|
||||||
|
fs.mkdirs(toPartitionPath);
|
||||||
|
}
|
||||||
|
FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()),
|
||||||
|
false, fs.getConf());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Also copy the .commit files
|
||||||
|
logger.info(String.format("Copying .commit files which are no-late-than %s.", latestCommit));
|
||||||
|
FileStatus[] commitFilesToCopy = fs.listStatus(
|
||||||
|
new Path(baseDir + "/" + HoodieTableMetadata.METAFOLDER_NAME), new PathFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path commitFilePath) {
|
||||||
|
if (commitFilePath.getName().equals(HoodieTableMetadata.HOODIE_PROPERTIES_FILE)) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
String commitTime = FSUtils.getCommitFromCommitFile(commitFilePath.getName());
|
||||||
|
return HoodieCommits.isCommit1BeforeOrOn(commitTime, latestCommit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
for (FileStatus commitStatus : commitFilesToCopy) {
|
||||||
|
Path targetFilePath =
|
||||||
|
new Path(outputDir + "/" + HoodieTableMetadata.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
|
||||||
|
if (! fs.exists(targetFilePath.getParent())) {
|
||||||
|
fs.mkdirs(targetFilePath.getParent());
|
||||||
|
}
|
||||||
|
if (fs.exists(targetFilePath)) {
|
||||||
|
logger.error(String.format("The target output commit file (%s) already exists.", targetFilePath));
|
||||||
|
}
|
||||||
|
FileUtil.copy(fs, commitStatus.getPath(), fs, targetFilePath, false, fs.getConf());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.info("The job has 0 partition to copy.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the _SUCCESS tag
|
||||||
|
Path successTagPath = new Path(outputDir + "/_SUCCESS");
|
||||||
|
if (!fs.exists(successTagPath)) {
|
||||||
|
logger.info(String.format("Creating _SUCCESS under %s.", outputDir));
|
||||||
|
fs.createNewFile(successTagPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
// Take input configs
|
||||||
|
final Config cfg = new Config();
|
||||||
|
new JCommander(cfg, args);
|
||||||
|
logger.info(String.format("Snapshot hoodie table from %s to %s", cfg.basePath, cfg.outputPath));
|
||||||
|
|
||||||
|
// Create a spark job to do the snapshot copy
|
||||||
|
SparkConf sparkConf = new SparkConf().setAppName("Hoodie-snapshot-copier");
|
||||||
|
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||||
|
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
|
||||||
|
logger.info("Initializing spark job.");
|
||||||
|
|
||||||
|
// Copy
|
||||||
|
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
||||||
|
copier.snapshot(jsc, cfg.basePath, cfg.outputPath);
|
||||||
|
|
||||||
|
// Stop the job
|
||||||
|
jsc.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.utilities.exception;
|
||||||
|
|
||||||
|
import com.uber.hoodie.exception.HoodieException;
|
||||||
|
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
public class HoodieIncrementalPullException extends HoodieException {
|
||||||
|
public HoodieIncrementalPullException(String msg, SQLException e) {
|
||||||
|
super(msg, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HoodieIncrementalPullException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,29 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.utilities.exception;
|
||||||
|
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
public class HoodieIncrementalPullSQLException extends HoodieIncrementalPullException {
|
||||||
|
public HoodieIncrementalPullSQLException(String msg, SQLException e) {
|
||||||
|
super(msg, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HoodieIncrementalPullSQLException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
CREATE TABLE <tempDbTable>
|
||||||
|
<storedAsClause>
|
||||||
|
LOCATION '<tempDbTablePath>'
|
||||||
|
AS
|
||||||
|
<incrementalSQL>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -0,0 +1,148 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.uber.hoodie.utilities;
|
||||||
|
|
||||||
|
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||||
|
import com.uber.hoodie.common.util.FSUtils;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class TestHoodieSnapshotCopier {
|
||||||
|
private String rootPath = null;
|
||||||
|
private String basePath = null;
|
||||||
|
private String outputPath = null;
|
||||||
|
private FileSystem fs = null;
|
||||||
|
private JavaSparkContext jsc = null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws IOException {
|
||||||
|
// Prepare directories
|
||||||
|
TemporaryFolder folder = new TemporaryFolder();
|
||||||
|
folder.create();
|
||||||
|
rootPath = folder.getRoot().getAbsolutePath();
|
||||||
|
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
|
||||||
|
HoodieTestUtils.initializeHoodieDirectory(basePath);
|
||||||
|
outputPath = rootPath + "/output";
|
||||||
|
fs = FSUtils.getFs();
|
||||||
|
// Start a local Spark job
|
||||||
|
SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]");
|
||||||
|
jsc = new JavaSparkContext(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptySnapshotCopy() throws IOException {
|
||||||
|
// There is no real data (only .hoodie directory)
|
||||||
|
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
|
||||||
|
assertFalse(fs.exists(new Path(outputPath)));
|
||||||
|
|
||||||
|
// Do the snapshot
|
||||||
|
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
||||||
|
copier.snapshot(jsc, basePath, outputPath);
|
||||||
|
|
||||||
|
// Nothing changed except _SUCCESS
|
||||||
|
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
|
||||||
|
assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO - uncomment this after fixing test failures
|
||||||
|
// @Test
|
||||||
|
// public void testSnapshotCopy() throws Exception {
|
||||||
|
// // Generate some commits and corresponding parquets
|
||||||
|
// String commitTime1 = "20160501010101";
|
||||||
|
// String commitTime2 = "20160502020601";
|
||||||
|
// String commitTime3 = "20160506030611";
|
||||||
|
// new File(basePath + "/.hoodie").mkdirs();
|
||||||
|
// new File(basePath + "/.hoodie/hoodie.properties").createNewFile();
|
||||||
|
// // Only first two have commit files
|
||||||
|
// new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
|
||||||
|
// new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
|
||||||
|
// new File(basePath + "/.hoodie/" + commitTime3 + ".inflight").createNewFile();
|
||||||
|
//
|
||||||
|
// // Some parquet files
|
||||||
|
// new File(basePath + "/2016/05/01/").mkdirs();
|
||||||
|
// new File(basePath + "/2016/05/02/").mkdirs();
|
||||||
|
// new File(basePath + "/2016/05/06/").mkdirs();
|
||||||
|
//
|
||||||
|
// // Make commit1
|
||||||
|
// File file11 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, 1, "id11"));
|
||||||
|
// file11.createNewFile();
|
||||||
|
// File file12 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, 1, "id12"));
|
||||||
|
// file12.createNewFile();
|
||||||
|
// File file13 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, 1, "id13"));
|
||||||
|
// file13.createNewFile();
|
||||||
|
//
|
||||||
|
// // Make commit2
|
||||||
|
// File file21 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, 1, "id21"));
|
||||||
|
// file21.createNewFile();
|
||||||
|
// File file22 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, 1, "id22"));
|
||||||
|
// file22.createNewFile();
|
||||||
|
// File file23 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, 1, "id23"));
|
||||||
|
// file23.createNewFile();
|
||||||
|
//
|
||||||
|
// // Make commit3
|
||||||
|
// File file31 = new File(basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, 1, "id31"));
|
||||||
|
// file31.createNewFile();
|
||||||
|
// File file32 = new File(basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, 1, "id32"));
|
||||||
|
// file32.createNewFile();
|
||||||
|
// File file33 = new File(basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, 1, "id33"));
|
||||||
|
// file33.createNewFile();
|
||||||
|
//
|
||||||
|
// // Do a snapshot copy
|
||||||
|
// HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
||||||
|
// copier.snapshot(jsc, basePath, outputPath);
|
||||||
|
//
|
||||||
|
// // Check results
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file12.getName())));
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file13.getName())));
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file21.getName())));
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/02/" + file22.getName())));
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/2016/05/06/" + file23.getName())));
|
||||||
|
// assertFalse(fs.exists(new Path(outputPath + "/2016/05/01/" + file31.getName())));
|
||||||
|
// assertFalse(fs.exists(new Path(outputPath + "/2016/05/02/" + file32.getName())));
|
||||||
|
// assertFalse(fs.exists(new Path(outputPath + "/2016/05/06/" + file33.getName())));
|
||||||
|
//
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime1 + ".commit")));
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime2 + ".commit")));
|
||||||
|
// assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".commit")));
|
||||||
|
// assertFalse(fs.exists(new Path(outputPath + "/.hoodie/" + commitTime3 + ".inflight")));
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/.hoodie/hoodie.properties")));
|
||||||
|
//
|
||||||
|
// assertTrue(fs.exists(new Path(outputPath + "/_SUCCESS")));
|
||||||
|
// }
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanup() {
|
||||||
|
if (rootPath != null) {
|
||||||
|
new File(rootPath).delete();
|
||||||
|
}
|
||||||
|
if (jsc != null) {
|
||||||
|
jsc.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user