[HUDI-279] Fix regression in Schema Evolution due to PR-755
This commit is contained in:
committed by
Balaji Varadarajan
parent
bf05f95413
commit
2ea8b0c3f1
@@ -68,7 +68,7 @@ public class EmbeddedTimelineService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void startServer() throws IOException {
|
public void startServer() throws IOException {
|
||||||
server = new TimelineService(0, viewManager, hadoopConf.get());
|
server = new TimelineService(0, viewManager, hadoopConf.newCopy());
|
||||||
serverPort = server.startService();
|
serverPort = server.startService();
|
||||||
logger.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
|
logger.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hudi.HoodieClientTestHarness;
|
import org.apache.hudi.HoodieClientTestHarness;
|
||||||
import org.apache.hudi.WriteStatus;
|
import org.apache.hudi.WriteStatus;
|
||||||
|
import org.apache.hudi.common.SerializableConfiguration;
|
||||||
import org.apache.hudi.common.TestRawTripPayload;
|
import org.apache.hudi.common.TestRawTripPayload;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
@@ -118,9 +119,9 @@ public class TestUpdateMapFunction extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, updateRecords.iterator(), fileId);
|
HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, updateRecords.iterator(), fileId);
|
||||||
Configuration conf = new Configuration();
|
SerializableConfiguration conf = new SerializableConfiguration(new Configuration());
|
||||||
AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema());
|
AvroReadSupport.setAvroReadSchema(conf.get(), mergeHandle.getWriterSchema());
|
||||||
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
|
List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf.get(),
|
||||||
new Path(config2.getBasePath() + "/" + insertResult.getStat().getPath()));
|
new Path(config2.getBasePath() + "/" + insertResult.getStat().getPath()));
|
||||||
for (GenericRecord rec : oldRecords) {
|
for (GenericRecord rec : oldRecords) {
|
||||||
mergeHandle.write(rec);
|
mergeHandle.write(rec);
|
||||||
|
|||||||
@@ -33,11 +33,15 @@ public class SerializableConfiguration implements Serializable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public SerializableConfiguration(SerializableConfiguration configuration) {
|
public SerializableConfiguration(SerializableConfiguration configuration) {
|
||||||
this.configuration = configuration.get();
|
this.configuration = configuration.newCopy();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Configuration newCopy() {
|
||||||
|
return new Configuration(configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Configuration get() {
|
public Configuration get() {
|
||||||
return new Configuration(configuration);
|
return configuration;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeObject(ObjectOutputStream out) throws IOException {
|
private void writeObject(ObjectOutputStream out) throws IOException {
|
||||||
|
|||||||
@@ -202,7 +202,7 @@ public class HoodieTableMetaClient implements Serializable {
|
|||||||
*/
|
*/
|
||||||
public HoodieWrapperFileSystem getFs() {
|
public HoodieWrapperFileSystem getFs() {
|
||||||
if (fs == null) {
|
if (fs == null) {
|
||||||
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.get());
|
FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy());
|
||||||
Preconditions.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
|
Preconditions.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
|
||||||
"File System not expected to be that of HoodieWrapperFileSystem");
|
"File System not expected to be that of HoodieWrapperFileSystem");
|
||||||
fs = new HoodieWrapperFileSystem(fileSystem, consistencyGuardConfig.isConsistencyCheckEnabled()
|
fs = new HoodieWrapperFileSystem(fileSystem, consistencyGuardConfig.isConsistencyCheckEnabled()
|
||||||
|
|||||||
@@ -111,7 +111,7 @@ public class FileSystemViewManager {
|
|||||||
*/
|
*/
|
||||||
private static RocksDbBasedFileSystemView createRocksDBBasedFileSystemView(SerializableConfiguration conf,
|
private static RocksDbBasedFileSystemView createRocksDBBasedFileSystemView(SerializableConfiguration conf,
|
||||||
FileSystemViewStorageConfig viewConf, String basePath) {
|
FileSystemViewStorageConfig viewConf, String basePath) {
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true);
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||||
return new RocksDbBasedFileSystemView(metaClient, timeline, viewConf);
|
return new RocksDbBasedFileSystemView(metaClient, timeline, viewConf);
|
||||||
}
|
}
|
||||||
@@ -126,7 +126,7 @@ public class FileSystemViewManager {
|
|||||||
private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf,
|
private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf,
|
||||||
FileSystemViewStorageConfig viewConf, String basePath) {
|
FileSystemViewStorageConfig viewConf, String basePath) {
|
||||||
logger.info("Creating SpillableMap based view for basePath " + basePath);
|
logger.info("Creating SpillableMap based view for basePath " + basePath);
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true);
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||||
return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf);
|
return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf);
|
||||||
}
|
}
|
||||||
@@ -142,7 +142,7 @@ public class FileSystemViewManager {
|
|||||||
private static HoodieTableFileSystemView createInMemoryFileSystemView(SerializableConfiguration conf,
|
private static HoodieTableFileSystemView createInMemoryFileSystemView(SerializableConfiguration conf,
|
||||||
FileSystemViewStorageConfig viewConf, String basePath) {
|
FileSystemViewStorageConfig viewConf, String basePath) {
|
||||||
logger.info("Creating InMemory based view for basePath " + basePath);
|
logger.info("Creating InMemory based view for basePath " + basePath);
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.get(), basePath, true);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true);
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
|
||||||
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
|
return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
|
||||||
}
|
}
|
||||||
@@ -188,12 +188,12 @@ public class FileSystemViewManager {
|
|||||||
logger.info("Creating remote only table view");
|
logger.info("Creating remote only table view");
|
||||||
return new FileSystemViewManager(conf, config,
|
return new FileSystemViewManager(conf, config,
|
||||||
(basePath, viewConfig) -> createRemoteFileSystemView(conf, viewConfig,
|
(basePath, viewConfig) -> createRemoteFileSystemView(conf, viewConfig,
|
||||||
new HoodieTableMetaClient(conf.get(), basePath)));
|
new HoodieTableMetaClient(conf.newCopy(), basePath)));
|
||||||
case REMOTE_FIRST:
|
case REMOTE_FIRST:
|
||||||
logger.info("Creating remote first table view");
|
logger.info("Creating remote first table view");
|
||||||
return new FileSystemViewManager(conf, config, (basePath, viewConfig) -> {
|
return new FileSystemViewManager(conf, config, (basePath, viewConfig) -> {
|
||||||
RemoteHoodieTableFileSystemView remoteFileSystemView =
|
RemoteHoodieTableFileSystemView remoteFileSystemView =
|
||||||
createRemoteFileSystemView(conf, viewConfig, new HoodieTableMetaClient(conf.get(), basePath));
|
createRemoteFileSystemView(conf, viewConfig, new HoodieTableMetaClient(conf.newCopy(), basePath));
|
||||||
SyncableFileSystemView secondaryView = null;
|
SyncableFileSystemView secondaryView = null;
|
||||||
switch (viewConfig.getSecondaryStorageType()) {
|
switch (viewConfig.getSecondaryStorageType()) {
|
||||||
case MEMORY:
|
case MEMORY:
|
||||||
|
|||||||
@@ -105,7 +105,7 @@ public class HoodieSnapshotCopier implements Serializable {
|
|||||||
|
|
||||||
jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
|
jsc.parallelize(partitions, partitions.size()).flatMap(partition -> {
|
||||||
// Only take latest version files <= latestCommit.
|
// Only take latest version files <= latestCommit.
|
||||||
FileSystem fs1 = FSUtils.getFs(baseDir, serConf.get());
|
FileSystem fs1 = FSUtils.getFs(baseDir, serConf.newCopy());
|
||||||
List<Tuple2<String, String>> filePaths = new ArrayList<>();
|
List<Tuple2<String, String>> filePaths = new ArrayList<>();
|
||||||
Stream<HoodieDataFile> dataFiles = fsView.getLatestDataFilesBeforeOrOn(partition,
|
Stream<HoodieDataFile> dataFiles = fsView.getLatestDataFilesBeforeOrOn(partition,
|
||||||
latestCommitTimestamp);
|
latestCommitTimestamp);
|
||||||
@@ -124,7 +124,7 @@ public class HoodieSnapshotCopier implements Serializable {
|
|||||||
String partition = tuple._1();
|
String partition = tuple._1();
|
||||||
Path sourceFilePath = new Path(tuple._2());
|
Path sourceFilePath = new Path(tuple._2());
|
||||||
Path toPartitionPath = new Path(outputDir, partition);
|
Path toPartitionPath = new Path(outputDir, partition);
|
||||||
FileSystem ifs = FSUtils.getFs(baseDir, serConf.get());
|
FileSystem ifs = FSUtils.getFs(baseDir, serConf.newCopy());
|
||||||
|
|
||||||
if (!ifs.exists(toPartitionPath)) {
|
if (!ifs.exists(toPartitionPath)) {
|
||||||
ifs.mkdirs(toPartitionPath);
|
ifs.mkdirs(toPartitionPath);
|
||||||
|
|||||||
Reference in New Issue
Block a user