LogFile comparator must handle log file names without write token for backwards compatibility
This commit is contained in:
committed by
n3nash
parent
66893bfef2
commit
a0391b7c01
@@ -129,12 +129,12 @@ public class HoodieLogFile implements Serializable {
|
||||
*/
|
||||
public static class LogFileComparator implements Comparator<HoodieLogFile>, Serializable {
|
||||
|
||||
private transient Comparator<HoodieLogFile> writeTokenComparator;
|
||||
private transient Comparator<String> writeTokenComparator;
|
||||
|
||||
private Comparator<HoodieLogFile> getWriteTokenComparator() {
|
||||
private Comparator<String> getWriteTokenComparator() {
|
||||
if (null == writeTokenComparator) {
|
||||
// writeTokenComparator is not serializable. Hence, lazy loading
|
||||
writeTokenComparator = Comparator.nullsFirst(Comparator.comparing(HoodieLogFile::getLogWriteToken));
|
||||
writeTokenComparator = Comparator.nullsFirst(Comparator.naturalOrder());
|
||||
}
|
||||
return writeTokenComparator;
|
||||
}
|
||||
@@ -148,7 +148,7 @@ public class HoodieLogFile implements Serializable {
|
||||
|
||||
if (o1.getLogVersion() == o2.getLogVersion()) {
|
||||
// Compare by write token when base-commit and log-version is same
|
||||
return getWriteTokenComparator().compare(o1, o2);
|
||||
return getWriteTokenComparator().compare(o1.getLogWriteToken(), o2.getLogWriteToken());
|
||||
}
|
||||
|
||||
// compare by log-version when base-commit is same
|
||||
|
||||
@@ -19,6 +19,7 @@ package com.uber.hoodie.common.util;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.model.HoodieTestUtils;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.exception.HoodieException;
|
||||
@@ -30,6 +31,7 @@ import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Assert;
|
||||
@@ -216,7 +218,47 @@ public class TestFSUtils {
|
||||
Assert.assertEquals(new Integer(1), FSUtils.getTaskPartitionIdFromLogPath(rlPath));
|
||||
Assert.assertEquals(new Integer(0), FSUtils.getStageIdFromLogPath(rlPath));
|
||||
Assert.assertEquals(new Integer(1), FSUtils.getTaskAttemptIdFromLogPath(rlPath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Log File Comparisons when log files do not have write tokens.
|
||||
*/
|
||||
@Test
|
||||
public void testOldLogFilesComparison() {
|
||||
String log1Ver0 = makeOldLogFileName("file1", ".log", "1", 0);
|
||||
String log1Ver1 = makeOldLogFileName("file1", ".log", "1", 1);
|
||||
String log1base2 = makeOldLogFileName("file1", ".log", "2", 0);
|
||||
List<HoodieLogFile> logFiles =
|
||||
Arrays.asList(log1base2, log1Ver1, log1Ver0).stream()
|
||||
.map(f -> new HoodieLogFile(f)).collect(Collectors.toList());
|
||||
logFiles.sort(HoodieLogFile.getLogFileComparator());
|
||||
assertEquals(log1Ver0, logFiles.get(0).getFileName());
|
||||
assertEquals(log1Ver1, logFiles.get(1).getFileName());
|
||||
assertEquals(log1base2, logFiles.get(2).getFileName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test Log File Comparisons when log files do not have write tokens.
|
||||
*/
|
||||
@Test
|
||||
public void testLogFilesComparison() {
|
||||
String log1Ver0W0 = FSUtils.makeLogFileName("file1", ".log", "1", 0, "0-0-1");
|
||||
String log1Ver0W1 = FSUtils.makeLogFileName("file1", ".log", "1", 0, "1-1-1");
|
||||
String log1Ver1W0 = FSUtils.makeLogFileName("file1", ".log", "1", 1, "0-0-1");
|
||||
String log1Ver1W1 = FSUtils.makeLogFileName("file1", ".log", "1", 1, "1-1-1");
|
||||
String log1base2W0 = FSUtils.makeLogFileName("file1", ".log", "2", 0, "0-0-1");
|
||||
String log1base2W1 = FSUtils.makeLogFileName("file1", ".log", "2", 0, "1-1-1");
|
||||
|
||||
List<HoodieLogFile> logFiles =
|
||||
Arrays.asList(log1Ver1W1, log1base2W0, log1base2W1, log1Ver1W0, log1Ver0W1, log1Ver0W0).stream()
|
||||
.map(f -> new HoodieLogFile(f)).collect(Collectors.toList());
|
||||
logFiles.sort(HoodieLogFile.getLogFileComparator());
|
||||
assertEquals(log1Ver0W0, logFiles.get(0).getFileName());
|
||||
assertEquals(log1Ver0W1, logFiles.get(1).getFileName());
|
||||
assertEquals(log1Ver1W0, logFiles.get(2).getFileName());
|
||||
assertEquals(log1Ver1W1, logFiles.get(3).getFileName());
|
||||
assertEquals(log1base2W0, logFiles.get(4).getFileName());
|
||||
assertEquals(log1base2W1, logFiles.get(5).getFileName());
|
||||
}
|
||||
|
||||
public static String makeOldLogFileName(String fileId, String logFileExtension,
|
||||
|
||||
Reference in New Issue
Block a user