From 2ea8b0c3f1eeb19f4dc1e9946331c8fd93e6daab Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Wed, 25 Sep 2019 06:20:56 -0700 Subject: [PATCH] [HUDI-279] Fix regression in Schema Evolution due to PR-755 --- .../hudi/client/embedded/EmbeddedTimelineService.java | 2 +- .../org/apache/hudi/func/TestUpdateMapFunction.java | 7 ++++--- .../apache/hudi/common/SerializableConfiguration.java | 8 ++++++-- .../hudi/common/table/HoodieTableMetaClient.java | 2 +- .../hudi/common/table/view/FileSystemViewManager.java | 10 +++++----- .../apache/hudi/utilities/HoodieSnapshotCopier.java | 4 ++-- 6 files changed, 19 insertions(+), 14 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 4c6089c8a..46247c17b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -68,7 +68,7 @@ public class EmbeddedTimelineService { } public void startServer() throws IOException { - server = new TimelineService(0, viewManager, hadoopConf.get()); + server = new TimelineService(0, viewManager, hadoopConf.newCopy()); serverPort = server.startService(); logger.info("Started embedded timeline server at " + hostAddr + ":" + serverPort); } diff --git a/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java b/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java index 74a908dfe..db986dea0 100644 --- a/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java +++ b/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.HoodieClientTestHarness; import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.SerializableConfiguration; import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -118,9 +119,9 @@ public class TestUpdateMapFunction extends HoodieClientTestHarness { try { HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, updateRecords.iterator(), fileId); - Configuration conf = new Configuration(); - AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema()); - List oldRecords = ParquetUtils.readAvroRecords(conf, + SerializableConfiguration conf = new SerializableConfiguration(new Configuration()); + AvroReadSupport.setAvroReadSchema(conf.get(), mergeHandle.getWriterSchema()); + List oldRecords = ParquetUtils.readAvroRecords(conf.get(), new Path(config2.getBasePath() + "/" + insertResult.getStat().getPath())); for (GenericRecord rec : oldRecords) { mergeHandle.write(rec); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/SerializableConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/SerializableConfiguration.java index 0d5dc6cfe..8f6f0bab9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/SerializableConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/SerializableConfiguration.java @@ -33,11 +33,15 @@ public class SerializableConfiguration implements Serializable { } public SerializableConfiguration(SerializableConfiguration configuration) { - this.configuration = configuration.get(); + this.configuration = configuration.newCopy(); + } + + public Configuration newCopy() { + return new Configuration(configuration); } public Configuration get() { - return new Configuration(configuration); + return configuration; } private void writeObject(ObjectOutputStream out) throws IOException { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 479db692e..e0c30be1a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -202,7 +202,7 @@ public class HoodieTableMetaClient implements Serializable { */ public HoodieWrapperFileSystem getFs() { if (fs == null) { - FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.get()); + FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy()); Preconditions.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem), "File System not expected to be that of HoodieWrapperFileSystem"); fs = new HoodieWrapperFileSystem(fileSystem, consistencyGuardConfig.isConsistencyCheckEnabled() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index c82e33a46..38e3967c1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -111,7 +111,7 @@ public class FileSystemViewManager { */ private static RocksDbBasedFileSystemView createRocksDBBasedFileSystemView(SerializableConfiguration conf, FileSystemViewStorageConfig viewConf, String basePath) { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true); HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); return new RocksDbBasedFileSystemView(metaClient, timeline, viewConf); } @@ -126,7 +126,7 @@ public class FileSystemViewManager { private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf, FileSystemViewStorageConfig viewConf, String basePath) { logger.info("Creating SpillableMap based view for basePath " + basePath); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true); HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf); } @@ -142,7 +142,7 @@ public class FileSystemViewManager { private static HoodieTableFileSystemView createInMemoryFileSystemView(SerializableConfiguration conf, FileSystemViewStorageConfig viewConf, String basePath) { logger.info("Creating InMemory based view for basePath " + basePath); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true); HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled()); } @@ -188,12 +188,12 @@ public class FileSystemViewManager { logger.info("Creating remote only table view"); return new FileSystemViewManager(conf, config, (basePath, viewConfig) -> createRemoteFileSystemView(conf, viewConfig, - new HoodieTableMetaClient(conf.get(), basePath))); + new HoodieTableMetaClient(conf.newCopy(), basePath))); case REMOTE_FIRST: logger.info("Creating remote first table view"); return new FileSystemViewManager(conf, config, (basePath, viewConfig) -> { RemoteHoodieTableFileSystemView remoteFileSystemView = - createRemoteFileSystemView(conf, viewConfig, new HoodieTableMetaClient(conf.get(), basePath)); + createRemoteFileSystemView(conf, viewConfig, new HoodieTableMetaClient(conf.newCopy(), basePath)); SyncableFileSystemView secondaryView = null; switch (viewConfig.getSecondaryStorageType()) { case MEMORY: diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java index 2a7054893..135095ffa 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java @@ -105,7 +105,7 @@ public class HoodieSnapshotCopier implements Serializable { jsc.parallelize(partitions, partitions.size()).flatMap(partition -> { // Only take latest version files <= latestCommit. - FileSystem fs1 = FSUtils.getFs(baseDir, serConf.get()); + FileSystem fs1 = FSUtils.getFs(baseDir, serConf.newCopy()); List> filePaths = new ArrayList<>(); Stream dataFiles = fsView.getLatestDataFilesBeforeOrOn(partition, latestCommitTimestamp); @@ -124,7 +124,7 @@ public class HoodieSnapshotCopier implements Serializable { String partition = tuple._1(); Path sourceFilePath = new Path(tuple._2()); Path toPartitionPath = new Path(outputDir, partition); - FileSystem ifs = FSUtils.getFs(baseDir, serConf.get()); + FileSystem ifs = FSUtils.getFs(baseDir, serConf.newCopy()); if (!ifs.exists(toPartitionPath)) { ifs.mkdirs(toPartitionPath);