From 5847f0c934a8ce2ead96c8adc8c0259dda6b1283 Mon Sep 17 00:00:00 2001 From: Vinoth Chandar Date: Wed, 27 Mar 2019 18:19:41 -0700 Subject: [PATCH] Fix HUDI-27 : Support num_cores > 1 for writing through spark - Users using spark.executor.cores > 1 used to fail due to "FileSystem closed" - This is due to HoodieWrapperFileSystem closing the wrapped filesytem obj - FileSystem.getInternal caching code races threads and closes the extra fs instance(s) - Bumped up num cores in tests to 8, speeds up tests by 3-4 mins --- .../hoodie/io/storage/HoodieWrapperFileSystem.java | 5 +++-- .../uber/hoodie/common/HoodieClientTestUtils.java | 13 +++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java index 5ada7c4cb..fce9b7c14 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java @@ -536,8 +536,9 @@ public class HoodieWrapperFileSystem extends FileSystem { } @Override - public void close() throws IOException { - fileSystem.close(); + public void close() { + // Don't close the underlying `fileSystem` object. This will end up closing it for every thread since it + // could be cached across jvm. We don't own that object anyway. } @Override diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java index 54ea125b1..2507dcafa 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java @@ -34,13 +34,18 @@ import com.uber.hoodie.config.HoodieStorageConfig; import com.uber.hoodie.exception.HoodieException; import com.uber.hoodie.io.storage.HoodieParquetConfig; import com.uber.hoodie.io.storage.HoodieParquetWriter; - import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; @@ -108,7 +113,7 @@ public class HoodieClientTestUtils { public static SparkConf getSparkConfForTest(String appName) { SparkConf sparkConf = new SparkConf().setAppName(appName) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .setMaster("local[1]"); + .setMaster("local[8]"); return HoodieReadClient.addHoodieSupport(sparkConf); }