1
0

Multi FS Support

- Reviving PR 191, to make FileSystem creation off actual path
 - Streamline all filesystem access to HoodieTableMetaClient
 - Hadoop Conf from Spark Context serialized & passed to executor code too
 - Pick up env vars prefixed with HOODIE_ENV_ into Configuration object
 - Cleanup usage of FSUtils.getFS, piggybacking off HoodieTableMetaClient.getFS
 - Adding s3a to supported schemes & support escaping "." in env vars
 - Tests use HoodieTestUtils.getDefaultHadoopConf
This commit is contained in:
Vinoth Chandar
2017-12-10 23:31:54 -08:00
committed by vinoth chandar
parent 44839b88c6
commit 0cd186c899
78 changed files with 851 additions and 535 deletions

View File

@@ -56,21 +56,18 @@ import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class HDFSParquetImporter implements Serializable {
private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
private final Config cfg;
private final transient FileSystem fs;
private transient FileSystem fs;
public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
public HDFSParquetImporter(
Config cfg) throws IOException {
this.cfg = cfg;
fs = FSUtils.getFs();
}
public static class FormatValidator implements IValueValidator<String> {
@@ -203,6 +200,7 @@ public class HDFSParquetImporter implements Serializable {
}
public int dataImport(JavaSparkContext jsc, int retry) throws Exception {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
int ret = -1;
try {
// Verify that targetPath is not present.
@@ -251,43 +249,36 @@ public class HDFSParquetImporter implements Serializable {
GenericRecord.class, job.getConfiguration())
// To reduce large number of tasks.
.coalesce(16 * cfg.parallelism)
.map(new Function<Tuple2<Void, GenericRecord>, HoodieRecord<HoodieJsonPayload>>() {
@Override
public HoodieRecord<HoodieJsonPayload> call(Tuple2<Void, GenericRecord> entry)
throws Exception {
GenericRecord genericRecord = entry._2();
Object partitionField = genericRecord.get(cfg.partitionKey);
if (partitionField == null) {
throw new HoodieIOException(
"partition key is missing. :" + cfg.partitionKey);
}
Object rowField = genericRecord.get(cfg.rowKey);
if (rowField == null) {
throw new HoodieIOException(
"row field is missing. :" + cfg.rowKey);
}
long ts = (long) ((Double) partitionField * 1000l);
String partitionPath = PARTITION_FORMATTER.format(new Date(ts));
return new HoodieRecord<HoodieJsonPayload>(
new HoodieKey((String) rowField, partitionPath),
new HoodieJsonPayload(genericRecord.toString()));
}
}
.map(entry -> {
GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) entry)._2();
Object partitionField = genericRecord.get(cfg.partitionKey);
if (partitionField == null) {
throw new HoodieIOException(
"partition key is missing. :" + cfg.partitionKey);
}
Object rowField = genericRecord.get(cfg.rowKey);
if (rowField == null) {
throw new HoodieIOException(
"row field is missing. :" + cfg.rowKey);
}
long ts = (long) ((Double) partitionField * 1000l);
String partitionPath = PARTITION_FORMATTER.format(new Date(ts));
return new HoodieRecord<>(
new HoodieKey((String) rowField, partitionPath),
new HoodieJsonPayload(genericRecord.toString()));
}
);
// Get commit time.
String commitTime = client.startCommit();
JavaRDD<WriteStatus> writeResponse = client.bulkInsert(hoodieRecords, commitTime);
Accumulator<Integer> errors = jsc.accumulator(0);
writeResponse.foreach(new VoidFunction<WriteStatus>() {
@Override
public void call(WriteStatus writeStatus) throws Exception {
writeResponse.foreach(writeStatus -> {
if (writeStatus.hasErrors()) {
errors.add(1);
logger.error(String.format("Error processing records :writeStatus:%s",
writeStatus.getStat().toString()));
}
}
});
if (errors.value() == 0) {
logger.info(String

View File

@@ -291,7 +291,7 @@ public class HiveIncrementalPuller {
if (!fs.exists(new Path(targetDataPath)) || !fs.exists(new Path(targetDataPath + "/.hoodie"))) {
return "0";
}
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, targetDataPath);
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), targetDataPath);
Optional<HoodieInstant>
lastCommit = metadata.getActiveTimeline().getCommitsTimeline()
@@ -331,7 +331,7 @@ public class HiveIncrementalPuller {
private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation)
throws IOException {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, sourceTableLocation);
HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), sourceTableLocation);
List<String> commitsToSync = metadata.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants()
.findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants()

View File

@@ -20,6 +20,7 @@ package com.uber.hoodie.utilities;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.uber.hoodie.common.SerializableConfiguration;
import com.uber.hoodie.common.model.HoodieDataFile;
import com.uber.hoodie.common.model.HoodiePartitionMetadata;
import com.uber.hoodie.common.table.HoodieTableConfig;
@@ -70,8 +71,10 @@ public class HoodieSnapshotCopier implements Serializable {
public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir,
final boolean shouldAssumeDatePartitioning) throws IOException {
FileSystem fs = FSUtils.getFs();
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir);
FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration());
final SerializableConfiguration serConf = new SerializableConfiguration(
jsc.hadoopConfiguration());
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs.getConf(), baseDir);
final TableFileSystemView.ReadOptimizedView fsView = new HoodieTableFileSystemView(
tableMetadata,
tableMetadata.getActiveTimeline().getCommitsTimeline()
@@ -104,7 +107,7 @@ public class HoodieSnapshotCopier implements Serializable {
jsc.parallelize(partitions, partitions.size())
.flatMap(partition -> {
// Only take latest version files <= latestCommit.
FileSystem fs1 = FSUtils.getFs();
FileSystem fs1 = FSUtils.getFs(baseDir, serConf.get());
List<Tuple2<String, String>> filePaths = new ArrayList<>();
Stream<HoodieDataFile> dataFiles = fsView
.getLatestDataFilesBeforeOrOn(partition, latestCommitTimestamp);
@@ -123,13 +126,13 @@ public class HoodieSnapshotCopier implements Serializable {
String partition = tuple._1();
Path sourceFilePath = new Path(tuple._2());
Path toPartitionPath = new Path(outputDir, partition);
FileSystem fs1 = FSUtils.getFs();
FileSystem ifs = FSUtils.getFs(baseDir, serConf.get());
if (!fs1.exists(toPartitionPath)) {
fs1.mkdirs(toPartitionPath);
if (!ifs.exists(toPartitionPath)) {
ifs.mkdirs(toPartitionPath);
}
FileUtil.copy(fs1, sourceFilePath, fs1,
new Path(toPartitionPath, sourceFilePath.getName()), false, fs1.getConf());
FileUtil.copy(ifs, sourceFilePath, ifs,
new Path(toPartitionPath, sourceFilePath.getName()), false, ifs.getConf());
});
// Also copy the .commit files

View File

@@ -115,10 +115,11 @@ public class HoodieDeltaStreamer implements Serializable {
public HoodieDeltaStreamer(Config cfg) throws IOException {
this.cfg = cfg;
this.fs = FSUtils.getFs();
this.jssc = getSparkContext();
this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration());
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs, cfg.targetBasePath);
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), cfg.targetBasePath);
this.commitTimelineOpt = Optional
.of(meta.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants());
@@ -129,8 +130,6 @@ public class HoodieDeltaStreamer implements Serializable {
//TODO(vc) Should these be passed from outside?
initSchemaProvider();
initKeyGenerator();
this.jssc = getSparkContext();
initSource();
}
@@ -203,7 +202,9 @@ public class HoodieDeltaStreamer implements Serializable {
Properties properties = new Properties();
properties.put(HoodieWriteConfig.TABLE_NAME, cfg.targetTableName);
HoodieTableMetaClient
.initializePathAsHoodieDataset(FSUtils.getFs(), cfg.targetBasePath, properties);
.initializePathAsHoodieDataset(
FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), cfg.targetBasePath,
properties);
}
log.info("Checkpoint to resume from : " + resumeCheckpointStr);

View File

@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -50,7 +51,7 @@ public class FilebasedSchemaProvider extends SchemaProvider {
public FilebasedSchemaProvider(PropertiesConfiguration config) {
super(config);
this.fs = FSUtils.getFs();
this.fs = FSUtils.getFs(config.getBasePath(), new Configuration());
DataSourceUtils.checkRequiredProperties(config,
Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP, Config.TARGET_SCHEMA_FILE_PROP));

View File

@@ -65,7 +65,7 @@ public class DFSSource extends Source {
public DFSSource(PropertiesConfiguration config, JavaSparkContext sparkContext,
SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
super(config, sparkContext, dataFormat, schemaProvider);
this.fs = FSUtils.getFs();
this.fs = FSUtils.getFs(config.getBasePath(), sparkContext.hadoopConfiguration());
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
}

View File

@@ -72,7 +72,7 @@ public class HiveIncrPullSource extends Source {
public HiveIncrPullSource(PropertiesConfiguration config, JavaSparkContext sparkContext,
SourceDataFormat dataFormat, SchemaProvider schemaProvider) {
super(config, sparkContext, dataFormat, schemaProvider);
this.fs = FSUtils.getFs();
this.fs = FSUtils.getFs(config.getBasePath(), sparkContext.hadoopConfiguration());
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
this.incrPullRootPath = config.getString(Config.ROOT_INPUT_PATH_PROP);
}

View File

@@ -48,7 +48,6 @@ import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.collection.immutable.Set;
@@ -134,16 +133,16 @@ public class KafkaSource extends Source {
public static <K, V> Map<K, V> toScalaMap(HashMap<K, V> m) {
return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap(
Predef.<Tuple2<K, V>>conforms()
Predef.conforms()
);
}
public static Set<String> toScalaSet(HashSet<String> s) {
return JavaConverters.asScalaSetConverter(s).asScala().<String>toSet();
return JavaConverters.asScalaSetConverter(s).asScala().toSet();
}
public static <K, V> java.util.Map<K, V> toJavaMap(Map<K, V> m) {
return JavaConverters.<K, V>mapAsJavaMapConverter(m).asJava();
return JavaConverters.mapAsJavaMapConverter(m).asJava();
}
}

View File

@@ -24,6 +24,7 @@ import com.uber.hoodie.HoodieReadClient;
import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.common.HoodieTestDataGenerator;
import com.uber.hoodie.common.minicluster.HdfsTestService;
import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
import com.uber.hoodie.common.util.FSUtils;
@@ -38,7 +39,6 @@ import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -174,7 +174,7 @@ public class TestHDFSParquetImporter implements Serializable {
ParquetWriter<GenericRecord> writer = AvroParquetWriter
.<GenericRecord>builder(srcFile)
.withSchema(HoodieTestDataGenerator.avroSchema)
.withConf(new Configuration())
.withConf(HoodieTestUtils.getDefaultHadoopConf())
.build();
for (GenericRecord record : records) {
writer.write(record);

View File

@@ -44,17 +44,22 @@ public class TestHoodieSnapshotCopier {
@Before
public void init() throws IOException {
// Prepare directories
TemporaryFolder folder = new TemporaryFolder();
folder.create();
rootPath = folder.getRoot().getAbsolutePath();
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
HoodieTestUtils.init(basePath);
outputPath = rootPath + "/output";
fs = FSUtils.getFs();
// Start a local Spark job
SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]");
jsc = new JavaSparkContext(conf);
try {
// Prepare directories
TemporaryFolder folder = new TemporaryFolder();
folder.create();
rootPath = "file://" + folder.getRoot().getAbsolutePath();
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
outputPath = rootPath + "/output";
fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf());
HoodieTestUtils.init(basePath);
// Start a local Spark job
SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]");
jsc = new JavaSparkContext(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test