1
0

[HUDI-1040] Make Hudi support Spark 3 (#2208)

* Fix flaky MOR unit test

* Update Spark APIs to make it be compatible with both spark2 & spark3

* Refactor bulk insert v2 part to make Hudi be able to compile with Spark3

* Add spark3 profile to handle fasterxml & spark version

* Create hudi-spark-common module & refactor hudi-spark related modules

Co-authored-by: Wenning Ding <wenningd@amazon.com>
This commit is contained in:
wenningd
2020-12-09 15:52:23 -08:00
committed by GitHub
parent 3a91d26d62
commit fce1453fa6
79 changed files with 1040 additions and 172 deletions

View File

@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hadoop.fs.FileSystem;
import java.util.List;
import java.util.stream.Collectors;
/**
* List of helpers to aid, construction of instanttime for read and write operations using datasource.
*/
@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
public class HoodieDataSourceHelpers {
/**
* Checks if the Hoodie table has new data since given timestamp. This can be subsequently fed to an incremental
* view read, to perform incremental processing.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public static boolean hasNewCommits(FileSystem fs, String basePath, String commitTimestamp) {
return listCommitsSince(fs, basePath, commitTimestamp).size() > 0;
}
/**
* Get a list of instant times that have occurred, from the given instant timestamp.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public static List<String> listCommitsSince(FileSystem fs, String basePath, String instantTimestamp) {
HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
return timeline.findInstantsAfter(instantTimestamp, Integer.MAX_VALUE).getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
}
/**
* Returns the last successful write operation's instant time.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public static String latestCommit(FileSystem fs, String basePath) {
HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
return timeline.lastInstant().get().getTimestamp();
}
/**
* Obtain all the commits, compactions that have occurred on the timeline, whose instant times could be fed into the
* datasource options.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, String basePath) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
return metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION,
HoodieActiveTimeline.REPLACE_COMMIT_ACTION)).filterCompletedInstants();
} else {
return metaClient.getCommitTimeline().filterCompletedInstants();
}
}
}

View File

@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import static org.apache.spark.sql.functions.callUDF;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConverters;
/**
* Helper class to assist in preparing {@link Dataset<Row>}s for bulk insert with datasource implementation.
*/
public class HoodieDatasetBulkInsertHelper {
private static final Logger LOG = LogManager.getLogger(HoodieDatasetBulkInsertHelper.class);
private static final String RECORD_KEY_UDF_FN = "hudi_recordkey_gen_function";
private static final String PARTITION_PATH_UDF_FN = "hudi_partition_gen_function";
/**
* Prepares input hoodie spark dataset for bulk insert. It does the following steps.
* 1. Uses KeyGenerator to generate hoodie record keys and partition path.
* 2. Add hoodie columns to input spark dataset.
* 3. Reorders input dataset columns so that hoodie columns appear in the beginning.
* 4. Sorts input dataset by hoodie partition path and record key
*
* @param sqlContext SQL Context
* @param config Hoodie Write Config
* @param rows Spark Input dataset
* @return hoodie dataset which is ready for bulk insert.
*/
public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlContext,
HoodieWriteConfig config, Dataset<Row> rows, String structName, String recordNamespace) {
List<Column> originalFields =
Arrays.stream(rows.schema().fields()).map(f -> new Column(f.name())).collect(Collectors.toList());
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
StructType structTypeForUDF = rows.schema();
sqlContext.udf().register(RECORD_KEY_UDF_FN, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
sqlContext.udf().register(PARTITION_PATH_UDF_FN, (UDF1<Row, String>) keyGenerator::getPartitionPath, DataTypes.StringType);
final Dataset<Row> rowDatasetWithRecordKeys = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD,
callUDF(RECORD_KEY_UDF_FN, org.apache.spark.sql.functions.struct(
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
final Dataset<Row> rowDatasetWithRecordKeysAndPartitionPath =
rowDatasetWithRecordKeys.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
callUDF(PARTITION_PATH_UDF_FN,
org.apache.spark.sql.functions.struct(
JavaConverters.collectionAsScalaIterableConverter(originalFields).asScala().toSeq())));
// Add other empty hoodie fields which will be populated before writing to parquet.
Dataset<Row> rowDatasetWithHoodieColumns =
rowDatasetWithRecordKeysAndPartitionPath.withColumn(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType))
.withColumn(HoodieRecord.FILENAME_METADATA_FIELD,
functions.lit("").cast(DataTypes.StringType));
List<Column> orderedFields = Stream.concat(HoodieRecord.HOODIE_META_COLUMNS.stream().map(Column::new),
originalFields.stream()).collect(Collectors.toList());
Dataset<Row> colOrderedDataset = rowDatasetWithHoodieColumns.select(
JavaConverters.collectionAsScalaIterableConverter(orderedFields).asScala().toSeq());
return colOrderedDataset
.sort(functions.col(HoodieRecord.PARTITION_PATH_METADATA_FIELD), functions.col(HoodieRecord.RECORD_KEY_METADATA_FIELD))
.coalesce(config.getBulkInsertShuffleParallelism());
}
}

View File

@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
* Class to be used in quickstart guide for generating inserts and updates against a corpus. Test data uses a toy Uber
* trips, data model.
*/
public class QuickstartUtils {
public static class DataGenerator {
private static final String DEFAULT_FIRST_PARTITION_PATH = "americas/united_states/san_francisco";
private static final String DEFAULT_SECOND_PARTITION_PATH = "americas/brazil/sao_paulo";
private static final String DEFAULT_THIRD_PARTITION_PATH = "asia/india/chennai";
private static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
private static Random rand = new Random(46474747);
private final Map<Integer, HoodieKey> existingKeys;
private final String[] partitionPaths;
private int numExistingKeys;
public DataGenerator() {
this(DEFAULT_PARTITION_PATHS, new HashMap<>());
}
public DataGenerator(String[] partitionPaths) {
this(partitionPaths, new HashMap<>());
}
private DataGenerator(String[] partitionPaths, Map<Integer, HoodieKey> keyPartitionMap) {
this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
this.existingKeys = keyPartitionMap;
}
private static String generateRandomString() {
int leftLimit = 48; // ascii for 0
int rightLimit = 57; // ascii for 9
int stringLength = 3;
StringBuilder buffer = new StringBuilder(stringLength);
for (int i = 0; i < stringLength; i++) {
int randomLimitedInt = leftLimit + (int) (rand.nextFloat() * (rightLimit - leftLimit + 1));
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
public int getNumExistingKeys() {
return numExistingKeys;
}
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
long timestamp) {
GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("uuid", rowKey);
rec.put("ts", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("begin_lat", rand.nextDouble());
rec.put("begin_lon", rand.nextDouble());
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
rec.put("fare", rand.nextDouble() * 100);
return rec;
}
/**
* Generates a new avro record of the above schema format, retaining the key if optionally provided. The
* riderDriverSuffix string is a random String to simulate updates by changing the rider driver fields for records
* belonging to the same commit. It is purely used for demo purposes. In real world, the actual updates are assumed
* to be provided based on the application requirements.
*/
public static OverwriteWithLatestAvroPayload generateRandomValue(HoodieKey key, String riderDriverSuffix)
throws IOException {
GenericRecord rec =
generateGenericRecord(key.getRecordKey(), "rider-" + riderDriverSuffix, "driver-" + riderDriverSuffix, 0);
return new OverwriteWithLatestAvroPayload(Option.of(rec));
}
/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public Stream<HoodieRecord> generateInsertsStream(String randomString, Integer n) {
int currSize = getNumExistingKeys();
return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
existingKeys.put(currSize + i, key);
numExistingKeys++;
try {
return new HoodieRecord(key, generateRandomValue(key, randomString));
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
});
}
/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
public List<HoodieRecord> generateInserts(Integer n) throws IOException {
String randomString = generateRandomString();
return generateInsertsStream(randomString, n).collect(Collectors.toList());
}
public HoodieRecord generateUpdateRecord(HoodieKey key, String randomString) throws IOException {
return new HoodieRecord(key, generateRandomValue(key, randomString));
}
/**
* Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned
* list
*
* @param n Number of updates (including dups)
* @return list of hoodie record updates
*/
public List<HoodieRecord> generateUpdates(Integer n) throws IOException {
String randomString = generateRandomString();
List<HoodieRecord> updates = new ArrayList<>();
for (int i = 0; i < n; i++) {
HoodieKey key = existingKeys.get(rand.nextInt(numExistingKeys));
HoodieRecord record = generateUpdateRecord(key, randomString);
updates.add(record);
}
return updates;
}
/**
* Generates delete records for the passed in rows.
*
* @param rows List of {@link Row}s for which delete record need to be generated
* @return list of hoodie records to delete
*/
public List<String> generateDeletes(List<Row> rows) {
return rows.stream().map(row ->
convertToString(row.getAs("uuid"), row.getAs("partitionpath"))).filter(os -> os.isPresent()).map(os -> os.get())
.collect(Collectors.toList());
}
public void close() {
existingKeys.clear();
}
}
private static Option<String> convertToString(HoodieRecord record) {
try {
String str = HoodieAvroUtils
.bytesToAvro(((OverwriteWithLatestAvroPayload) record.getData()).recordBytes, DataGenerator.avroSchema)
.toString();
str = "{" + str.substring(str.indexOf("\"ts\":"));
return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" + record.getPartitionPath() + "\"}"));
} catch (IOException e) {
return Option.empty();
}
}
private static Option<String> convertToString(String uuid, String partitionPath) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("{");
stringBuffer.append("\"ts\": 0.0,");
stringBuffer.append("\"uuid\": \"" + uuid + "\",");
stringBuffer.append("\"partitionpath\": \"" + partitionPath + "\"");
stringBuffer.append("}");
return Option.of(stringBuffer.toString());
}
public static List<String> convertToStringList(List<HoodieRecord> records) {
return records.stream().map(hr -> convertToString(hr)).filter(os -> os.isPresent()).map(os -> os.get())
.collect(Collectors.toList());
}
public static Map<String, String> getQuickstartWriteConfigs() {
Map<String, String> demoConfigs = new HashMap<>();
demoConfigs.put("hoodie.insert.shuffle.parallelism", "2");
demoConfigs.put("hoodie.upsert.shuffle.parallelism", "2");
return demoConfigs;
}
}

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.async;
import org.apache.hudi.client.AbstractCompactor;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.client.HoodieSparkCompactor;
import org.apache.hudi.client.common.HoodieEngineContext;
/**
* Async Compaction Service used by Structured Streaming. Here, async compaction is run in daemon mode to prevent
* blocking shutting down the Spark application.
*/
public class SparkStreamingAsyncCompactService extends AsyncCompactService {
private static final long serialVersionUID = 1L;
public SparkStreamingAsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) {
super(context, client, true);
}
@Override
protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) {
return new HoodieSparkCompactor(client);
}
}

View File

@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.bootstrap;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
import java.util.List;
/**
* Spark Data frame based bootstrap input provider.
*/
public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {
private final transient SparkSession sparkSession;
public SparkParquetBootstrapDataProvider(TypedProperties props,
HoodieSparkEngineContext context) {
super(props, context);
this.sparkSession = SparkSession.builder().config(context.getJavaSparkContext().getConf()).getOrCreate();
}
@Override
public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath,
List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles) {
String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
.toArray(String[]::new);
Dataset inputDataset = sparkSession.read().parquet(filePaths);
try {
KeyGenerator keyGenerator = DataSourceUtils.createKeyGenerator(props);
String structName = tableName + "_record";
String namespace = "hoodie." + tableName;
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace);
return genericRecords.toJavaRDD().map(gr -> {
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
gr, props.getString("hoodie.datasource.write.precombine.field"), false);
try {
return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
props.getString("hoodie.datasource.write.payload.class"));
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
});
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
}
}

View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.exception;
public class HoodieDeltaStreamerException extends HoodieException {
public HoodieDeltaStreamerException(String msg, Throwable e) {
super(msg, e);
}
public HoodieDeltaStreamerException(String msg) {
super(msg);
}
}

View File

@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.payload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
/**
* Provides support for seamlessly applying changes captured via Amazon Database Migration Service onto S3.
*
* Typically, we get the following pattern of full change records corresponding to DML against the
* source database
*
* - Full load records with no `Op` field
* - For inserts against the source table, records contain full after image with `Op=I`
* - For updates against the source table, records contain full after image with `Op=U`
* - For deletes against the source table, records contain full before image with `Op=D`
*
* This payload implementation will issue matching insert, delete, updates against the hudi table
*
*/
public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
public static final String OP_FIELD = "Op";
public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
public AWSDmsAvroPayload(Option<GenericRecord> record) {
this(record.get(), 0); // natural order
}
/**
*
* Handle a possible delete - check for "D" in Op column and return empty row if found.
* @param insertValue The new row that is being "inserted".
*/
private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertValue) throws IOException {
boolean delete = false;
if (insertValue instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertValue;
delete = record.get(OP_FIELD) != null && record.get(OP_FIELD).toString().equalsIgnoreCase("D");
}
return delete ? Option.empty() : Option.of(insertValue);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema).get();
return handleDeleteOperation(insertValue);
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema)
throws IOException {
IndexedRecord insertValue = super.getInsertValue(schema).get();
return handleDeleteOperation(insertValue);
}
}

View File

@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hudi.DefaultSource

View File

@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
/**
* Hoodie Spark Datasource, for reading and writing hoodie tables
*
*/
class DefaultSource extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider
with DataSourceRegister
with StreamSinkProvider
with Serializable {
private val log = LogManager.getLogger(classOf[DefaultSource])
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
createRelation(sqlContext, parameters, null)
}
override def createRelation(sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType): BaseRelation = {
// Add default options for unspecified read options keys.
val parameters = translateViewTypesToQueryTypes(optParams)
val path = parameters.get("path")
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)
if (path.isEmpty && readPathsStr.isEmpty) {
throw new HoodieException(s"'path' or '$READ_PATHS_OPT_KEY' or both must be specified.")
}
val readPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())
val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration)
val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
log.info("Obtained hudi table path: " + tablePath)
val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
log.info("Is bootstrapped table => " + isBootstrappedTable)
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
if (isBootstrappedTable) {
// Snapshot query is not supported for Bootstrapped MOR tables
log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." +
" Falling back to Read Optimized query.")
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
} else {
new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
}
} else {
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
}
} else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient)
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
new IncrementalRelation(sqlContext, tablePath, optParams, schema)
} else {
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
}
}
/**
* This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy
* relation here because Spark does not really make use of the relation returned, and just returns an empty
* dataset at [[org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run()]]. This saves us the cost
* of creating and returning a parquet relation here.
*
* TODO: Revisit to return a concrete relation here when we support CREATE TABLE AS for Hudi with DataSource API.
* That is the only case where Spark seems to actually need a relation to be returned here
* [[DataSource.writeAndRead()]]
*
* @param sqlContext Spark SQL Context
* @param mode Mode for saving the DataFrame at the destination
* @param optParams Parameters passed as part of the DataFrame write operation
* @param df Spark DataFrame to be written
* @return Spark Relation
*/
override def createRelation(sqlContext: SQLContext,
mode: SaveMode,
optParams: Map[String, String],
df: DataFrame): BaseRelation = {
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
if (parameters(OPERATION_OPT_KEY).equals(BOOTSTRAP_OPERATION_OPT_VAL)) {
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, parameters, df)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
}
new HoodieEmptyRelation(sqlContext, df.schema)
}
override def createSink(sqlContext: SQLContext,
optParams: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
val parameters = HoodieWriterUtils.parametersWithWriteDefaults(optParams)
new HoodieStreamingSink(
sqlContext,
parameters,
partitionColumns,
outputMode)
}
override def shortName(): String = "hudi"
private def getBaseFileOnlyView(sqlContext: SQLContext,
optParams: Map[String, String],
schema: StructType,
extraReadPaths: Seq[String],
isBootstrappedTable: Boolean,
globPaths: Seq[Path],
metaClient: HoodieTableMetaClient): BaseRelation = {
log.warn("Loading Base File Only View.")
if (isBootstrappedTable) {
// For bootstrapped tables, use our custom Spark relation for querying
new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams)
} else {
// this is just effectively RO view only, where `path` can contain a mix of
// non-hoodie/hoodie path files. set the path filter up
sqlContext.sparkContext.hadoopConfiguration.setClass(
"mapreduce.input.pathFilter.class",
classOf[HoodieROTablePathFilter],
classOf[org.apache.hadoop.fs.PathFilter])
log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
// simply return as a regular parquet relation
DataSource.apply(
sparkSession = sqlContext.sparkSession,
paths = extraReadPaths,
userSpecifiedSchema = Option(schema),
className = "parquet",
options = optParams)
.resolveRelation()
}
}
}

View File

@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
class HoodieBootstrapRDD(@transient spark: SparkSession,
dataReadFunction: PartitionedFile => Iterator[Any],
skeletonReadFunction: PartitionedFile => Iterator[Any],
regularReadFunction: PartitionedFile => Iterator[Any],
dataSchema: StructType,
skeletonSchema: StructType,
requiredColumns: Array[String],
tableState: HoodieBootstrapTableState)
extends RDD[InternalRow](spark.sparkContext, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
if (log.isDebugEnabled) {
if (bootstrapPartition.split.skeletonFile.isDefined) {
logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+ bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
+ bootstrapPartition.split.skeletonFile.get.filePath)
} else {
logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: "
+ bootstrapPartition.split.dataFile.filePath)
}
}
var partitionedFileIterator: Iterator[InternalRow] = null
if (bootstrapPartition.split.skeletonFile.isDefined) {
// It is a bootstrap split. Check both skeleton and data files.
if (dataSchema.isEmpty) {
// No data column to fetch, hence fetch only from skeleton file
partitionedFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
} else if (skeletonSchema.isEmpty) {
// No metadata column to fetch, hence fetch only from data file
partitionedFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
} else {
// Fetch from both data and skeleton file, and merge
val dataFileIterator = read(bootstrapPartition.split.dataFile, dataReadFunction)
val skeletonFileIterator = read(bootstrapPartition.split.skeletonFile.get, skeletonReadFunction)
partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
}
} else {
partitionedFileIterator = read(bootstrapPartition.split.dataFile, regularReadFunction)
}
partitionedFileIterator
}
def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow])
: Iterator[InternalRow] = {
new Iterator[InternalRow] {
override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext
override def next(): InternalRow = {
mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
}
}
}
def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = {
val skeletonArr = skeletonRow.copy().toSeq(skeletonSchema)
val dataArr = dataRow.copy().toSeq(dataSchema)
// We need to return it in the order requested
val mergedArr = requiredColumns.map(col => {
if (skeletonSchema.fieldNames.contains(col)) {
val idx = skeletonSchema.fieldIndex(col)
skeletonArr(idx)
} else {
val idx = dataSchema.fieldIndex(col)
dataArr(idx)
}
})
logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
val mergedRow = InternalRow.fromSeq(mergedArr)
mergedRow
}
def read(partitionedFile: PartitionedFile, readFileFunction: PartitionedFile => Iterator[Any])
: Iterator[InternalRow] = {
val fileIterator = readFileFunction(partitionedFile)
import scala.collection.JavaConverters._
val rows = fileIterator.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
rows
}
override protected def getPartitions: Array[Partition] = {
tableState.files.zipWithIndex.map(file => {
if (file._1.skeletonFile.isDefined) {
logDebug("Forming partition with => Index: " + file._2 + ", Files: " + file._1.dataFile.filePath
+ "," + file._1.skeletonFile.get.filePath)
HoodieBootstrapPartition(file._2, file._1)
} else {
logDebug("Forming partition with => Index: " + file._2 + ", File: " + file._1.dataFile.filePath)
HoodieBootstrapPartition(file._2, file._1)
}
}).toArray
}
}
case class HoodieBootstrapPartition(index: Int, split: HoodieBootstrapSplit) extends Partition

View File

@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.exception.HoodieException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
/**
* This is Spark relation that can be used for querying metadata/fully bootstrapped query hoodie tables, as well as
* non-bootstrapped tables. It implements PrunedFilteredScan interface in order to support column pruning and filter
* push-down. For metadata bootstrapped files, if we query columns from both metadata and actual data then it will
* perform a merge of both to return the result.
*
* Caveat: Filter push-down does not work when querying both metadata and actual data columns over metadata
* bootstrapped files, because then the metadata file and data file can return different number of rows causing errors
* merging.
*
* @param _sqlContext Spark SQL Context
* @param userSchema User specified schema in the datasource query
* @param globPaths Globbed paths obtained from the user provided path for querying
* @param metaClient Hoodie table meta client
* @param optParams DataSource options passed by the user
*/
class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
val userSchema: StructType,
val globPaths: Seq[Path],
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String]) extends BaseRelation
with PrunedFilteredScan with Logging {
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
var dataSchema: StructType = _
var fullSchema: StructType = _
val fileIndex: HoodieBootstrapFileIndex = buildFileIndex()
override def sqlContext: SQLContext = _sqlContext
override val needConversion: Boolean = false
override def schema: StructType = inferFullSchema()
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
logInfo("Starting scan..")
// Compute splits
val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
var skeletonFile: Option[PartitionedFile] = Option.empty
var dataFile: PartitionedFile = null
if (hoodieBaseFile.getBootstrapBaseFile.isPresent) {
skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getBootstrapBaseFile.get().getPath, 0,
hoodieBaseFile.getBootstrapBaseFile.get().getFileLen)
} else {
dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)
}
HoodieBootstrapSplit(dataFile, skeletonFile)
})
val tableState = HoodieBootstrapTableState(bootstrapSplits)
// Get required schemas for column pruning
var requiredDataSchema = StructType(Seq())
var requiredSkeletonSchema = StructType(Seq())
requiredColumns.foreach(col => {
var field = dataSchema.find(_.name == col)
if (field.isDefined) {
requiredDataSchema = requiredDataSchema.add(field.get)
} else {
field = skeletonSchema.find(_.name == col)
requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
}
})
// Prepare readers for reading data file and skeleton files
val dataReadFunction = new ParquetFileFormat()
.buildReaderWithPartitionValues(
sparkSession = _sqlContext.sparkSession,
dataSchema = dataSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = requiredDataSchema,
filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
)
val skeletonReadFunction = new ParquetFileFormat()
.buildReaderWithPartitionValues(
sparkSession = _sqlContext.sparkSession,
dataSchema = skeletonSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = requiredSkeletonSchema,
filters = if (requiredDataSchema.isEmpty) filters else Seq(),
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
)
val regularReadFunction = new ParquetFileFormat()
.buildReaderWithPartitionValues(
sparkSession = _sqlContext.sparkSession,
dataSchema = fullSchema,
partitionSchema = StructType(Seq.empty),
requiredSchema = StructType(requiredSkeletonSchema.fields ++ requiredDataSchema.fields),
filters = filters,
options = Map.empty,
hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf())
val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction,
regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState)
rdd.asInstanceOf[RDD[Row]]
}
def inferFullSchema(): StructType = {
if (fullSchema == null) {
logInfo("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
}
fullSchema
}
def buildFileIndex(): HoodieBootstrapFileIndex = {
logInfo("Building file index..")
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
val fileStatuses = inMemoryFileIndex.allFiles()
if (fileStatuses.isEmpty) {
throw new HoodieException("No files found for reading in user provided path.")
}
val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants, fileStatuses.toArray)
val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
if (log.isDebugEnabled) {
latestFiles.foreach(file => {
logDebug("Printing indexed files:")
if (file.getBootstrapBaseFile.isPresent) {
logDebug("Skeleton File: " + file.getPath + ", Data File: " + file.getBootstrapBaseFile.get().getPath)
} else {
logDebug("Regular Hoodie File: " + file.getPath)
}
})
}
HoodieBootstrapFileIndex(latestFiles)
}
}
case class HoodieBootstrapFileIndex(files: List[HoodieBaseFile])
case class HoodieBootstrapTableState(files: List[HoodieBootstrapSplit])
case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile])

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
/**
* This is a dummy Spark relation that can be used if needed to return a place holder relation that does not get used.
*
* @param sqlContext Spark SQL Context
* @param userSchema Users data schema
*/
class HoodieEmptyRelation(val sqlContext: SQLContext,
val userSchema: StructType) extends BaseRelation {
override def schema: StructType = userSchema
}

View File

@@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.vectorized.ColumnarBatch
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition
class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
fullSchemaFileReader: PartitionedFile => Iterator[Any],
requiredSchemaFileReader: PartitionedFile => Iterator[Any],
tableState: HoodieMergeOnReadTableState)
extends RDD[InternalRow](sc, Nil) {
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
mergeParquetPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
case skipMergeSplit if skipMergeSplit.mergeType
.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
skipMergeFileIterator(
skipMergeSplit,
read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
getConfig
)
case payloadCombineSplit if payloadCombineSplit.mergeType
.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
payloadCombineFileIterator(
payloadCombineSplit,
read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),
getConfig
)
case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
s"spark partition Index: ${mergeParquetPartition.index}" +
s"merge type: ${mergeParquetPartition.split.mergeType}")
}
}
override protected def getPartitions: Array[Partition] = {
tableState
.hoodieRealtimeFileSplits
.zipWithIndex
.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
}
private def getConfig: Configuration = {
val conf = confBroadcast.value.value
HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized {
new Configuration(conf)
}
}
private def read(partitionedFile: PartitionedFile,
readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
val fileIterator = readFileFunction(partitionedFile)
val rows = fileIterator.flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
rows
}
private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
private var recordToLoad: InternalRow = _
@scala.annotation.tailrec
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
recordToLoad = baseFileIterator.next()
true
} else {
if (logRecordsKeyIterator.hasNext) {
val curAvrokey = logRecordsKeyIterator.next()
val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema)
if (!curAvroRecord.isPresent) {
// delete record found, skipping
this.hasNext
} else {
val requiredAvroRecord = AvroConversionUtils
.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder)
recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
true
}
} else {
false
}
}
}
override def next(): InternalRow = {
recordToLoad
}
}
private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
baseFileIterator: Iterator[InternalRow],
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] {
private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
private val requiredFieldPosition =
tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
private val serializer = new AvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
private val requiredDeserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
private val keyToSkip = mutable.Set.empty[String]
private var recordToLoad: InternalRow = _
@scala.annotation.tailrec
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next()
val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
if (logRecords.containsKey(curKey)) {
// duplicate key found, merging
keyToSkip.add(curKey)
val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
if (!mergedAvroRecord.isPresent) {
// deleted
this.hasNext
} else {
// load merged record as InternalRow with required schema
val requiredAvroRecord = AvroConversionUtils
.buildAvroRecordBySchema(
mergedAvroRecord.get(),
requiredAvroSchema,
requiredFieldPosition,
recordBuilder
)
recordToLoad = unsafeProjection(requiredDeserializer
.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
true
}
} else {
// No merge needed, load current row with required schema
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow))
true
}
} else {
if (logRecordsKeyIterator.hasNext) {
val curKey = logRecordsKeyIterator.next()
if (keyToSkip.contains(curKey)) {
this.hasNext
} else {
val insertAvroRecord =
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema)
if (!insertAvroRecord.isPresent) {
// stand alone delete record, skipping
this.hasNext
} else {
val requiredAvroRecord = AvroConversionUtils
.buildAvroRecordBySchema(
insertAvroRecord.get(),
requiredAvroSchema,
requiredFieldPosition,
recordBuilder
)
recordToLoad = unsafeProjection(requiredDeserializer
.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
true
}
}
} else {
false
}
}
}
override def next(): InternalRow = recordToLoad
private def createRowWithRequiredSchema(row: InternalRow): InternalRow = {
val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
val posIterator = requiredFieldPosition.iterator
var curIndex = 0
tableState.requiredStructSchema.foreach(
f => {
val curPos = posIterator.next()
val curField = row.get(curPos, f.dataType)
rowToReturn.update(curIndex, curField)
curIndex = curIndex + 1
}
)
rowToReturn
}
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
}
}
}
private object HoodieMergeOnReadRDD {
val CONFIG_INSTANTIATION_LOCK = new Object()
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
new HoodieMergedLogRecordScanner(
fs,
split.tablePath,
split.logPaths.get.asJava,
logSchema,
split.latestCommit,
split.maxCompactionMemoryInBytes,
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false),
false,
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
}
}

View File

@@ -0,0 +1,494 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import java.util
import java.util.Properties
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.HoodieWriteResult
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.internal.HoodieDataSourceInternalWriter
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.log4j.LogManager
import org.apache.spark.SPARK_VERSION
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
private[hudi] object HoodieSparkSqlWriter {
private val log = LogManager.getLogger(getClass)
private var tableExists: Boolean = false
private var asyncCompactionTriggerFnDefined: Boolean = false
def write(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
asyncCompactionTriggerFn: Option[Function1[SparkRDDWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
)
: (Boolean, common.util.Option[String], common.util.Option[String],
SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
val sparkContext = sqlContext.sparkContext
val path = parameters.get("path")
val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME)
asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
if (path.isEmpty || tblNameOp.isEmpty) {
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.")
}
val tblName = tblNameOp.get.trim
sparkContext.getConf.getOption("spark.serializer") match {
case Some(ser) if ser.equals("org.apache.spark.serializer.KryoSerializer") =>
case _ => throw new HoodieException("hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = HoodieTableType.valueOf(parameters(TABLE_TYPE_OPT_KEY))
var operation = WriteOperationType.fromValue(parameters(OPERATION_OPT_KEY))
// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS_OPT_KEY is true
// Auto-correct the operation to "insert" if OPERATION_OPT_KEY is set to "upsert" wrongly
// or not set (in which case it will be set as "upsert" by parametersWithWriteDefaults()) .
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean &&
operation == WriteOperationType.UPSERT) {
log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
s"when $INSERT_DROP_DUPS_OPT_KEY is set to be true, " +
s"overriding the $OPERATION_OPT_KEY to be $INSERT_OPERATION_OPT_VAL")
operation = WriteOperationType.INSERT
}
// If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
// Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite
// the table. This will replace the old fs.delete(tablepath) mode.
if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
operation = WriteOperationType.INSERT_OVERWRITE_TABLE
}
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt)
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(mode, basePath, tableConfig, tblName, operation, fs)
// Create the table if not present
if (!tableExists) {
val archiveLogFolder = parameters.getOrElse(
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
tableType, tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
null.asInstanceOf[String])
tableConfig = tableMetaClient.getTableConfig
}
val commitActionType = DataSourceUtils.getCommitActionType(operation, tableConfig.getTableType)
// short-circuit if bulk_insert via row is enabled.
// scalastyle:off
if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean &&
operation == WriteOperationType.BULK_INSERT) {
if (!SPARK_VERSION.startsWith("2.")) {
throw new HoodieException("Bulk insert using row writer is not supported with Spark 3. To use row writer please switch to spark 2.")
}
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime)
return (success, commitTime, common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
}
// scalastyle:on
val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) =
if (operation != WriteOperationType.DELETE) {
// register classes & schemas
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
sparkContext.getConf.registerAvroSchemas(schema)
log.info(s"Registered avro schema : ${schema.toString(true)}")
// Convert to RDD[HoodieRecord]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT);
val hoodieAllIncomingRecords = genericRecords.map(gr => {
val hoodieRecord = if (shouldCombine) {
val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY))
} else {
DataSourceUtils.createHoodieRecord(gr, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
}
hoodieRecord
}).toJavaRDD()
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
} else {
hoodieAllIncomingRecords
}
if (hoodieRecords.isEmpty()) {
log.info("new batch has no new records, skipping...")
(true, common.util.Option.empty())
}
client.startCommitWithTime(instantTime, commitActionType)
val writeResult = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
(writeResult, client)
} else {
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
// Convert to RDD[HoodieKey]
val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace)
val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
if (!tableExists) {
throw new HoodieException(s"hoodie table at $basePath does not exist")
}
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
Schema.create(Schema.Type.NULL).toString, path.get, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
}
// Issue deletes
client.startCommitWithTime(instantTime, commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
(writeStatuses, client)
}
// Check for errors and commit the write.
val (writeSuccessful, compactionInstant) =
commitAndPerformPostOperations(writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, commitActionType, operation))
(writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
}
}
def bootstrap(sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame,
hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty): Boolean = {
val sparkContext = sqlContext.sparkContext
val path = parameters.getOrElse("path", throw new HoodieException("'path' must be set."))
val tableName = parameters.getOrElse(HoodieWriteConfig.TABLE_NAME,
throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}' must be set."))
val tableType = parameters(TABLE_TYPE_OPT_KEY)
val bootstrapBasePath = parameters.getOrElse(BOOTSTRAP_BASE_PATH_PROP,
throw new HoodieException(s"'${BOOTSTRAP_BASE_PATH_PROP}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
" operation'"))
val bootstrapIndexClass = parameters.getOrDefault(BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS)
var schema: String = null
if (df.schema.nonEmpty) {
val (structName, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
} else {
schema = HoodieAvroUtils.getNullSchema.toString
}
val basePath = new Path(path)
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
// Handle various save modes
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
false
} else {
handleSaveModes(mode, basePath, tableConfig, tableName, WriteOperationType.BOOTSTRAP, fs)
}
if (!tableExists) {
val archiveLogFolder = parameters.getOrElse(
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path,
HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
null, bootstrapIndexClass, bootstrapBasePath)
}
val jsc = new JavaSparkContext(sqlContext.sparkContext)
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schema, path, tableName, mapAsJavaMap(parameters)))
try {
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
} finally {
writeClient.close()
}
val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration)
metaSyncSuccess
}
def bulkInsertAsRow(sqlContext: SQLContext,
parameters: Map[String, String],
df: DataFrame,
tblName: String,
basePath: Path,
path: Option[String],
instantTime: String): (Boolean, common.util.Option[String]) = {
val structName = s"${tblName}_record"
val nameSpace = s"hoodie.${tblName}"
val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName, mapAsJavaMap(parameters))
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace)
hoodieDF.write.format("org.apache.hudi.internal")
.option(HoodieDataSourceInternalWriter.INSTANT_TIME_OPT_KEY, instantTime)
.options(parameters)
.save()
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled || metaSyncEnabled) {
metaSync(parameters, basePath, sqlContext.sparkContext.hadoopConfiguration)
} else {
true
}
(syncHiveSucess, common.util.Option.ofNullable(instantTime))
}
def toProperties(params: Map[String, String]): TypedProperties = {
val props = new TypedProperties()
params.foreach(kv => props.setProperty(kv._1, kv._2))
props
}
private def handleSaveModes(mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
val existingTableName = tableConfig.getTableName
if (!existingTableName.equals(tableName)) {
throw new HoodieException(s"hoodie table with name $existingTableName already exists at $tablePath")
}
}
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already exists.")
}
} else {
// Delete Operation only supports Append mode
if (mode != SaveMode.Append) {
throw new HoodieException(s"Append is the only save mode applicable for ${operation.toString} operation")
}
}
}
private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, String]): Boolean = {
val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters)
val hiveConf: HiveConf = new HiveConf()
hiveConf.addResource(fs.getConf)
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
true
}
private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
hiveSyncConfig.basePath = basePath.toString
hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY);
hiveSyncConfig.usePreApacheInputFormat =
parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => r.toBoolean)
hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY)
hiveSyncConfig.tableName = parameters(HIVE_TABLE_OPT_KEY)
hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY)
hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY)
hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY)
hiveSyncConfig.partitionFields =
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList: _*)
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
hiveSyncConfig
}
private def metaSync(parameters: Map[String, String],
basePath: Path,
hadoopConf: Configuration): Boolean = {
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
var metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
var syncClientToolClassSet = scala.collection.mutable.Set[String]()
parameters(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)
// for backward compatibility
if (hiveSyncEnabled) {
metaSyncEnabled = true
syncClientToolClassSet += classOf[HiveSyncTool].getName
}
var metaSyncSuccess = true
if (metaSyncEnabled) {
val fs = basePath.getFileSystem(hadoopConf)
syncClientToolClassSet.foreach(impl => {
val syncSuccess = impl.trim match {
case "org.apache.hudi.hive.HiveSyncTool" => {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
syncHive(basePath, fs, parameters)
true
}
case _ => {
val properties = new Properties();
properties.putAll(parameters)
properties.put("basePath", basePath.toString)
val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
syncHoodie.syncHoodieTable()
true
}
}
metaSyncSuccess = metaSyncSuccess && syncSuccess
})
}
metaSyncSuccess
}
/**
* Group all table/action specific information into a case class.
*/
case class TableInstantInfo(basePath: Path, instantTime: String, commitActionType: String, operation: WriteOperationType)
private def commitAndPerformPostOperations(writeResult: HoodieWriteResult,
parameters: Map[String, String],
client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
jsc: JavaSparkContext,
tableInstantInfo: TableInstantInfo
): (Boolean, common.util.Option[java.lang.String]) = {
val errorCount = writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count()
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess =
client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))),
tableInstantInfo.commitActionType,
writeResult.getPartitionToReplaceFileIds)
if (commitSuccess) {
log.info("Commit " + tableInstantInfo.instantTime + " successful!")
}
else {
log.info("Commit " + tableInstantInfo.instantTime + " failed!")
}
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
val compactionInstant : common.util.Option[java.lang.String] =
if (asyncCompactionEnabled) {
client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
} else {
common.util.Option.empty()
}
log.info(s"Compaction Scheduled is $compactionInstant")
val metaSyncSuccess = metaSync(parameters, tableInstantInfo.basePath, jsc.hadoopConfiguration())
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
if (!asyncCompactionEnabled) {
client.close()
}
(commitSuccess && metaSyncSuccess, compactionInstant)
} else {
log.error(s"${tableInstantInfo.operation} failed with $errorCount errors :")
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
}
(false, common.util.Option.empty())
}
}
private def isAsyncCompactionEnabled(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]],
tableConfig: HoodieTableConfig,
parameters: Map[String, String], configuration: Configuration) : Boolean = {
log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}")
if (asyncCompactionTriggerFnDefined && !client.getConfig.isInlineCompaction
&& parameters.get(ASYNC_COMPACT_ENABLE_OPT_KEY).exists(r => r.toBoolean)) {
tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
} else {
false
}
}
private def getHoodieTableConfig(sparkContext: SparkContext,
tablePath: String,
hoodieTableConfigOpt: Option[HoodieTableConfig]): HoodieTableConfig = {
if (tableExists) {
hoodieTableConfigOpt.getOrElse(
new HoodieTableMetaClient(sparkContext.hadoopConfiguration, tablePath).getTableConfig)
} else {
null
}
}
}

View File

@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.client.utils.SparkRowDeserializer
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import scala.collection.JavaConverters._
object HoodieSparkUtils {
def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
}))
}
/**
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
*/
def isGlobPath(pattern: Path): Boolean = {
pattern.toString.exists("{}[]*?\\".toSet.contains)
}
/**
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
*/
def globPath(fs: FileSystem, pattern: Path): Seq[Path] = {
Option(fs.globStatus(pattern)).map { statuses =>
statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
}.getOrElse(Seq.empty[Path])
}
/**
* This method copied from [[org.apache.spark.deploy.SparkHadoopUtil]].
* [[org.apache.spark.deploy.SparkHadoopUtil]] becomes private since Spark 3.0.0 and hence we had to copy it locally.
*/
def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = {
if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern)
}
/**
* Checks to see whether input path contains a glob pattern and if yes, maps it to a list of absolute paths
* which match the glob pattern. Otherwise, returns original path
*
* @param paths List of absolute or globbed paths
* @param fs File system
* @return list of absolute file paths
*/
def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
paths.flatMap(path => {
val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPaths = globPathIfNecessary(fs, qualified)
globPaths
})
}
def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = {
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
}
def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
createRdd(df, avroSchema, structName, recordNamespace)
}
def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
: RDD[GenericRecord] = {
// Use the Avro schema to derive the StructType which has the correct nullability information
val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val encoder = RowEncoder.apply(dataType).resolveAndBind()
val deserializer = HoodieSparkUtils.createDeserializer(encoder)
df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
records.map { x => convertor(x).asInstanceOf[GenericRecord] }
}
}
}
def createDeserializer(encoder: ExpressionEncoder[Row]): SparkRowDeserializer = {
// TODO remove Spark2RowDeserializer if Spark 2.x support is dropped
if (SPARK_VERSION.startsWith("2.")) {
new Spark2RowDeserializer(encoder)
} else {
new Spark3RowDeserializer(encoder)
}
}
}

View File

@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import java.lang
import java.util.function.Function
import org.apache.hudi.async.{AsyncCompactService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.CompactionUtils
import org.apache.hudi.exception.HoodieCorruptedDataException
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import scala.util.{Failure, Success, Try}
import scala.collection.JavaConversions._
class HoodieStreamingSink(sqlContext: SQLContext,
options: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode)
extends Sink
with Serializable {
@volatile private var latestBatchId = -1L
private val log = LogManager.getLogger(classOf[HoodieStreamingSink])
private val retryCnt = options(DataSourceWriteOptions.STREAMING_RETRY_CNT_OPT_KEY).toInt
private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong
private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY).toBoolean
private var isAsyncCompactorServiceShutdownAbnormally = false
private val mode =
if (outputMode == OutputMode.Append()) {
SaveMode.Append
} else {
SaveMode.Overwrite
}
private var asyncCompactorService : AsyncCompactService = _
private var writeClient : Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized {
if (isAsyncCompactorServiceShutdownAbnormally) {
throw new IllegalStateException("Async Compactor shutdown unexpectedly")
}
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor))
) match {
case Success((true, commitOps, compactionInstantOps, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s" with no new commits"
}))
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (compactionInstantOps.isPresent) {
asyncCompactorService.enqueuePendingCompaction(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
}
Success((true, commitOps, compactionInstantOps))
case Failure(e) =>
// clean up persist rdds in the write process
data.sparkSession.sparkContext.getPersistentRDDs
.foreach {
case (id, rdd) =>
try {
rdd.unpersist()
} catch {
case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
}
}
log.error(s"Micro batch id=$batchId threw following exception: ", e)
if (ignoreFailedBatch) {
log.info(s"Ignore the exception and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration")
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(e)
}
case Success((false, commitOps, compactionInstantOps, client, tableConfig)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s""
}))
if (ignoreFailedBatch) {
log.info(s"Ignore the errors and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration")
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
}
}
) match {
case Failure(e) =>
if (!ignoreFailedBatch) {
log.error(s"Micro batch id=$batchId threw following expections," +
s"aborting streaming app to avoid data loss: ", e)
// spark sometimes hangs upon exceptions and keep on hold of the executors
// this is to force exit upon errors / exceptions and release all executors
// will require redeployment / supervise mode to restart the streaming
reset(true)
System.exit(1)
}
case Success(_) =>
log.info(s"Micro batch id=$batchId succeeded")
}
}
override def toString: String = s"HoodieStreamingSink[${options("path")}]"
@annotation.tailrec
private def retry[T](n: Int, waitInMillis: Long)(fn: => Try[T]): Try[T] = {
fn match {
case x: Success[T] =>
x
case _ if n > 1 =>
Thread.sleep(waitInMillis)
retry(n - 1, waitInMillis * 2)(fn)
case f =>
reset(false)
f
}
}
protected def triggerAsyncCompactor(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
if (null == asyncCompactorService) {
log.info("Triggering Async compaction !!")
asyncCompactorService = new SparkStreamingAsyncCompactService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
client)
asyncCompactorService.start(new Function[java.lang.Boolean, java.lang.Boolean] {
override def apply(errored: lang.Boolean): lang.Boolean = {
log.info(s"Async Compactor shutdown. Errored ? $errored")
isAsyncCompactorServiceShutdownAbnormally = errored
reset(false)
log.info("Done resetting write client.")
true
}
})
// Add Shutdown Hook
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
override def run(): Unit = reset(true)
}))
// First time, scan .hoodie folder and get all pending compactions
val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration,
client.getConfig.getBasePath)
val pendingInstants :java.util.List[HoodieInstant] =
CompactionUtils.getPendingCompactionInstantTimes(metaClient)
pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h))
}
}
private def reset(force: Boolean) : Unit = this.synchronized {
if (asyncCompactorService != null) {
asyncCompactorService.shutdown(force)
asyncCompactorService = null
}
if (writeClient.isDefined) {
writeClient.get.close()
writeClient = Option.empty
}
}
}

View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.TypedProperties
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters.mapAsScalaMapConverter
/**
* WriterUtils to assist in write path in Datasource and tests.
*/
object HoodieWriterUtils {
def javaParametersWithWriteDefaults(parameters: java.util.Map[String, String]): java.util.Map[String, String] = {
mapAsJavaMap(parametersWithWriteDefaults(parameters.asScala.toMap))
}
/**
* Add default options for unspecified write options keys.
*
* @param parameters
* @return
*/
def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
COMMIT_METADATA_KEYPREFIX_OPT_KEY -> DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
STREAMING_RETRY_INTERVAL_MS_OPT_KEY -> DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL,
STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL,
META_SYNC_CLIENT_TOOL_CLASS -> DEFAULT_META_SYNC_CLIENT_TOOL_CLASS,
HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL,
META_SYNC_ENABLED_OPT_KEY -> DEFAULT_META_SYNC_ENABLED_OPT_VAL,
HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL,
HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL,
HIVE_BASE_FILE_FORMAT_OPT_KEY -> DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL,
HIVE_USER_OPT_KEY -> DEFAULT_HIVE_USER_OPT_VAL,
HIVE_PASS_OPT_KEY -> DEFAULT_HIVE_PASS_OPT_VAL,
HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL,
HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL,
ASYNC_COMPACT_ENABLE_OPT_KEY -> DEFAULT_ASYNC_COMPACT_ENABLE_OPT_VAL,
ENABLE_ROW_WRITER_OPT_KEY -> DEFAULT_ENABLE_ROW_WRITER_OPT_VAL
) ++ translateStorageTypeToTableType(parameters)
}
def toProperties(params: Map[String, String]): TypedProperties = {
val props = new TypedProperties()
params.foreach(kv => props.setProperty(kv._1, kv._2))
props
}
}

View File

@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hadoop.fs.GlobPattern
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.table.HoodieSparkTable
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
* Relation, that implements the Hoodie incremental view.
*
* Implemented for Copy_on_write storage.
*
*/
class IncrementalRelation(val sqlContext: SQLContext,
val basePath: String,
val optParams: Map[String, String],
val userSchema: StructType) extends BaseRelation with TableScan {
private val log = LogManager.getLogger(classOf[IncrementalRelation])
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
// MOR tables not supported yet
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
}
// TODO : Figure out a valid HoodieWriteConfig
private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(),
new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
metaClient)
private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
}
if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY)) {
throw new HoodieException(s"Specify the begin instant time to pull from using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
}
val useEndInstantSchema = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY,
DataSourceReadOptions.DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL).toBoolean
private val lastInstant = commitTimeline.lastInstant().get()
private val commitsToReturn = commitTimeline.findInstantsInRange(
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
.getInstants.iterator().toList
// use schema from a file produced in the end/latest instant
val usedSchema: StructType = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
val tableSchema = if (useEndInstantSchema) {
if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last)
} else {
schemaResolver.getTableAvroSchemaWithoutMetadataFields()
}
val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
StructType(skeletonSchema.fields ++ dataSchema.fields)
}
private val filters = {
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
val filterStr = optParams.getOrElse(
DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY,
DataSourceReadOptions.DEFAULT_PUSH_DOWN_FILTERS_OPT_VAL)
filterStr.split(",").filter(!_.isEmpty)
} else {
Array[String]()
}
}
override def schema: StructType = usedSchema
override def buildScan(): RDD[Row] = {
val regularFileIdToFullPath = mutable.HashMap[String, String]()
var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
for (commit <- commitsToReturn) {
val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
.get, classOf[HoodieCommitMetadata])
if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) {
metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
} else {
regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap
}
}
if (metaBootstrapFileIdToFullPath.nonEmpty) {
// filer out meta bootstrap files that have had more commits since metadata bootstrap
metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
.filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1))
}
val pathGlobPattern = optParams.getOrElse(
DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
(regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values)
} else {
(regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
}
}
// unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view
// will filter out all the files incorrectly.
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters)
var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
.schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS_OPT_KEY, filteredMetaBootstrapFullPaths.mkString(","))
.load()
}
if (regularFileIdToFullPath.nonEmpty)
{
df = df.union(sqlContext.read.options(sOpts)
.schema(usedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)))
}
filters.foldLeft(df)((e, f) => e.filter(f)).rdd
}
}
}

View File

@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
logPaths: Option[List[String]],
latestCommit: String,
tablePath: String,
maxCompactionMemoryInBytes: Long,
mergeType: String)
case class HoodieMergeOnReadTableState(tableStructSchema: StructType,
requiredStructSchema: StructType,
tableAvroSchema: String,
requiredAvroSchema: String,
hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit])
class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
val userSchema: StructType,
val globPaths: Seq[Path],
val metaClient: HoodieTableMetaClient)
extends BaseRelation with PrunedFilteredScan with Logging {
private val conf = sqlContext.sparkContext.hadoopConfiguration
private val jobConf = new JobConf(conf)
// use schema from latest metadata, if not present, read schema from the data file
private val schemaUtil = new TableSchemaResolver(metaClient)
private val tableAvroSchema = schemaUtil.getTableAvroSchema
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
private val mergeType = optParams.getOrElse(
DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
private val fileIndex = buildFileIndex()
override def schema: StructType = tableStructSchema
override def needConversion: Boolean = false
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
var requiredStructSchema = StructType(Seq())
requiredColumns.foreach(col => {
val field = tableStructSchema.find(_.name == col)
if (field.isDefined) {
requiredStructSchema = requiredStructSchema.add(field.get)
}
})
val requiredAvroSchema = AvroConversionUtils
.convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
val hoodieTableState = HoodieMergeOnReadTableState(
tableStructSchema,
requiredStructSchema,
tableAvroSchema.toString,
requiredAvroSchema.toString,
fileIndex
)
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = tableStructSchema,
filters = Seq(),
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = requiredStructSchema,
filters = filters,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
val rdd = new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
jobConf,
fullSchemaParquetReader,
requiredSchemaParquetReader,
hoodieTableState
)
rdd.asInstanceOf[RDD[Row]]
}
def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
val fileStatuses = inMemoryFileIndex.allFiles()
if (fileStatuses.isEmpty) {
throw new HoodieException("No files found for reading in user provided path.")
}
val fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline.getCommitsTimeline
.filterCompletedInstants, fileStatuses.toArray)
val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
val latestCommit = fsView.getLastInstant.get().getTimestamp
val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
val fileSplits = fileGroup.map(kv => {
val baseFile = kv._1
val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)
HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
}).toList
fileSplits
}
}

View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
package object hudi {
/**
* Adds a method, `hoodie`, to DataFrameWriter
*/
implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) {
def avro: String => Unit = writer.format("org.apache.hudi").save
}
/**
* Adds a method, `hoodie`, to DataFrameReader
*/
implicit class AvroDataFrameReader(reader: DataFrameReader) {
def avro: String => DataFrame = reader.format("org.apache.hudi").load
}
}