HUDI-1827 : Add ORC support in Bootstrap Op (#3457)
Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
@@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.bootstrap;
|
||||
|
||||
import org.apache.hudi.DataSourceUtils;
|
||||
import org.apache.hudi.HoodieSparkUtils;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.avro.model.HoodieFileStatus;
|
||||
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
import org.apache.hudi.common.bootstrap.FileStatusUtils;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.keygen.KeyGenerator;
|
||||
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public abstract class SparkFullBootstrapDataProviderBase extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {
|
||||
|
||||
private final transient SparkSession sparkSession;
|
||||
|
||||
public SparkFullBootstrapDataProviderBase(TypedProperties props,
|
||||
HoodieSparkEngineContext context) {
|
||||
super(props, context);
|
||||
this.sparkSession = SparkSession.builder().config(context.getJavaSparkContext().getConf()).getOrCreate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath,
|
||||
List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles) {
|
||||
String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
|
||||
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
|
||||
.toArray(String[]::new);
|
||||
|
||||
Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);
|
||||
try {
|
||||
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
|
||||
String structName = tableName + "_record";
|
||||
String namespace = "hoodie." + tableName;
|
||||
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
|
||||
Option.empty());
|
||||
return genericRecords.toJavaRDD().map(gr -> {
|
||||
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
|
||||
gr, props.getString("hoodie.datasource.write.precombine.field"), false);
|
||||
try {
|
||||
return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
|
||||
props.getString("hoodie.datasource.write.payload.class"));
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
}
|
||||
});
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract String getFormat();
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.bootstrap;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
|
||||
/**
|
||||
* Spark Data frame based bootstrap input provider.
|
||||
*/
|
||||
public class SparkOrcBootstrapDataProvider extends SparkFullBootstrapDataProviderBase {
|
||||
|
||||
public SparkOrcBootstrapDataProvider(TypedProperties props,
|
||||
HoodieSparkEngineContext context) {
|
||||
super(props, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getFormat() {
|
||||
return "orc";
|
||||
}
|
||||
}
|
||||
@@ -18,69 +18,21 @@
|
||||
|
||||
package org.apache.hudi.bootstrap;
|
||||
|
||||
import org.apache.hudi.DataSourceUtils;
|
||||
import org.apache.hudi.HoodieSparkUtils;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.avro.model.HoodieFileStatus;
|
||||
import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
import org.apache.hudi.common.bootstrap.FileStatusUtils;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.keygen.KeyGenerator;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.rdd.RDD;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Spark Data frame based bootstrap input provider.
|
||||
*/
|
||||
public class SparkParquetBootstrapDataProvider extends FullRecordBootstrapDataProvider<JavaRDD<HoodieRecord>> {
|
||||
|
||||
private final transient SparkSession sparkSession;
|
||||
public class SparkParquetBootstrapDataProvider extends SparkFullBootstrapDataProviderBase {
|
||||
|
||||
public SparkParquetBootstrapDataProvider(TypedProperties props,
|
||||
HoodieSparkEngineContext context) {
|
||||
super(props, context);
|
||||
this.sparkSession = SparkSession.builder().config(context.getJavaSparkContext().getConf()).getOrCreate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JavaRDD<HoodieRecord> generateInputRecords(String tableName, String sourceBasePath,
|
||||
List<Pair<String, List<HoodieFileStatus>>> partitionPathsWithFiles) {
|
||||
String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue)
|
||||
.flatMap(f -> f.stream().map(fs -> FileStatusUtils.toPath(fs.getPath()).toString()))
|
||||
.toArray(String[]::new);
|
||||
|
||||
Dataset inputDataset = sparkSession.read().parquet(filePaths);
|
||||
try {
|
||||
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
|
||||
String structName = tableName + "_record";
|
||||
String namespace = "hoodie." + tableName;
|
||||
RDD<GenericRecord> genericRecords = HoodieSparkUtils.createRdd(inputDataset, structName, namespace, false,
|
||||
Option.empty());
|
||||
return genericRecords.toJavaRDD().map(gr -> {
|
||||
String orderingVal = HoodieAvroUtils.getNestedFieldValAsString(
|
||||
gr, props.getString("hoodie.datasource.write.precombine.field"), false);
|
||||
try {
|
||||
return DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
|
||||
props.getString("hoodie.datasource.write.payload.class"));
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
}
|
||||
});
|
||||
} catch (IOException ioe) {
|
||||
throw new HoodieIOException(ioe.getMessage(), ioe);
|
||||
}
|
||||
protected String getFormat() {
|
||||
return "parquet";
|
||||
}
|
||||
}
|
||||
@@ -377,6 +377,7 @@ object HoodieSparkSqlWriter {
|
||||
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
|
||||
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
|
||||
val populateMetaFields = parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()).toBoolean
|
||||
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
|
||||
|
||||
HoodieTableMetaClient.withPropertyBuilder()
|
||||
.setTableType(HoodieTableType.valueOf(tableType))
|
||||
@@ -386,6 +387,7 @@ object HoodieSparkSqlWriter {
|
||||
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
|
||||
.setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null))
|
||||
.setBootstrapIndexClass(bootstrapIndexClass)
|
||||
.setBaseFileFormat(baseFileFormat)
|
||||
.setBootstrapBasePath(bootstrapBasePath)
|
||||
.setPartitionFields(partitionColumns)
|
||||
.setPopulateMetaFields(populateMetaFields)
|
||||
|
||||
Reference in New Issue
Block a user