1
0

[HUDI-2451] On windows client with hdfs server for wrong file separator (#3687)

Co-authored-by: yao.zhou <yao.zhou@linkflowtech.com>
This commit is contained in:
Carl-Zhou-CN
2021-09-26 21:51:27 +08:00
committed by GitHub
parent bc4966ea73
commit aa546554ff
12 changed files with 27 additions and 36 deletions

View File

@@ -181,7 +181,7 @@ public class ExportCommand implements CommandMarker {
final HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); final HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
final HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); final HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
for (HoodieInstant instant : instants) { for (HoodieInstant instant : instants) {
String localPath = localFolder + File.separator + instant.getFileName(); String localPath = localFolder + Path.SEPARATOR + instant.getFileName();
byte[] data = null; byte[] data = null;
switch (instant.getAction()) { switch (instant.getAction()) {

View File

@@ -53,7 +53,6 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult; import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.file.Files; import java.nio.file.Files;
@@ -174,7 +173,7 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness {
// write to path '2015/03/16'. // write to path '2015/03/16'.
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
partitionPath = tablePath + File.separator + HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH; partitionPath = tablePath + Path.SEPARATOR + HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH;
Files.createDirectories(Paths.get(partitionPath)); Files.createDirectories(Paths.get(partitionPath));
HoodieLogFormat.Writer writer = null; HoodieLogFormat.Writer writer = null;

View File

@@ -142,7 +142,7 @@ public class TestTableCommand extends CLIFunctionalTestHarness {
assertTrue(cr.isSuccess()); assertTrue(cr.isSuccess());
assertEquals("Metadata for table " + tableName + " loaded", cr.getResult().toString()); assertEquals("Metadata for table " + tableName + " loaded", cr.getResult().toString());
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
assertEquals(metaPath + File.separator + "archive", client.getArchivePath()); assertEquals(metaPath + Path.SEPARATOR + "archive", client.getArchivePath());
assertEquals(tablePath, client.getBasePath()); assertEquals(tablePath, client.getBasePath());
assertEquals(metaPath, client.getMetaPath()); assertEquals(metaPath, client.getMetaPath());
assertEquals(HoodieTableType.MERGE_ON_READ, client.getTableType()); assertEquals(HoodieTableType.MERGE_ON_READ, client.getTableType());
@@ -181,7 +181,7 @@ public class TestTableCommand extends CLIFunctionalTestHarness {
private void testRefreshCommand(String command) throws IOException { private void testRefreshCommand(String command) throws IOException {
// clean table matedata // clean table matedata
FileSystem fs = FileSystem.get(hadoopConf()); FileSystem fs = FileSystem.get(hadoopConf());
fs.delete(new Path(tablePath + File.separator + HoodieTableMetaClient.METAFOLDER_NAME), true); fs.delete(new Path(tablePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME), true);
// Create table // Create table
assertTrue(prepareTable()); assertTrue(prepareTable());

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.cli.integ; package org.apache.hudi.cli.integ;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.commands.TableCommand; import org.apache.hudi.cli.commands.TableCommand;
@@ -32,7 +33,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult; import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.util.Arrays; import java.util.Arrays;
@@ -59,8 +59,8 @@ public class ITTestBootstrapCommand extends AbstractShellIntegrationTest {
public void init() { public void init() {
String srcName = "source"; String srcName = "source";
tableName = "test-table"; tableName = "test-table";
sourcePath = basePath + File.separator + srcName; sourcePath = basePath + Path.SEPARATOR + srcName;
tablePath = basePath + File.separator + tableName; tablePath = basePath + Path.SEPARATOR + tableName;
// generate test data // generate test data
partitions = Arrays.asList("2018", "2019", "2020"); partitions = Arrays.asList("2018", "2019", "2020");
@@ -68,7 +68,7 @@ public class ITTestBootstrapCommand extends AbstractShellIntegrationTest {
for (int i = 0; i < partitions.size(); i++) { for (int i = 0; i < partitions.size(); i++) {
Dataset<Row> df = TestBootstrap.generateTestRawTripDataset(timestamp, Dataset<Row> df = TestBootstrap.generateTestRawTripDataset(timestamp,
i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, sqlContext); i * NUM_OF_RECORDS, i * NUM_OF_RECORDS + NUM_OF_RECORDS, null, jsc, sqlContext);
df.write().parquet(sourcePath + File.separator + PARTITION_FIELD + "=" + partitions.get(i)); df.write().parquet(sourcePath + Path.SEPARATOR + PARTITION_FIELD + "=" + partitions.get(i));
} }
} }

View File

@@ -40,7 +40,6 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult; import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
@@ -70,7 +69,7 @@ public class ITTestHDFSParquetImportCommand extends AbstractShellIntegrationTest
@BeforeEach @BeforeEach
public void init() throws IOException, ParseException { public void init() throws IOException, ParseException {
tableName = "test_table"; tableName = "test_table";
tablePath = basePath + File.separator + tableName; tablePath = basePath + Path.SEPARATOR + tableName;
sourcePath = new Path(basePath, "source"); sourcePath = new Path(basePath, "source");
targetPath = new Path(tablePath); targetPath = new Path(tablePath);
schemaFile = new Path(basePath, "file.schema").toString(); schemaFile = new Path(basePath, "file.schema").toString();
@@ -101,7 +100,7 @@ public class ITTestHDFSParquetImportCommand extends AbstractShellIntegrationTest
() -> assertEquals("Table imported to hoodie format", cr.getResult().toString())); () -> assertEquals("Table imported to hoodie format", cr.getResult().toString()));
// Check hudi table exist // Check hudi table exist
String metaPath = targetPath + File.separator + HoodieTableMetaClient.METAFOLDER_NAME; String metaPath = targetPath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist."); assertTrue(Files.exists(Paths.get(metaPath)), "Hoodie table not exist.");
// Load meta data // Load meta data

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.cli.integ; package org.apache.hudi.cli.integ;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand; import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
@@ -32,7 +33,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult; import org.springframework.shell.core.CommandResult;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertAll;
@@ -53,7 +53,7 @@ public class ITTestSavepointsCommand extends AbstractShellIntegrationTest {
@BeforeEach @BeforeEach
public void init() throws IOException { public void init() throws IOException {
String tableName = "test_table"; String tableName = "test_table";
tablePath = basePath + File.separator + tableName; tablePath = basePath + Path.SEPARATOR + tableName;
// Create table and connect // Create table and connect
new TableCommand().createTable( new TableCommand().createTable(

View File

@@ -29,7 +29,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@@ -53,7 +52,7 @@ public class HeartbeatUtils {
boolean deleted = false; boolean deleted = false;
try { try {
String heartbeatFolderPath = HoodieTableMetaClient.getHeartbeatFolderPath(basePath); String heartbeatFolderPath = HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
deleted = fs.delete(new Path(heartbeatFolderPath + File.separator + instantTime), false); deleted = fs.delete(new Path(heartbeatFolderPath + Path.SEPARATOR + instantTime), false);
if (!deleted) { if (!deleted) {
LOG.error("Failed to delete heartbeat for instant " + instantTime); LOG.error("Failed to delete heartbeat for instant " + instantTime);
} }

View File

@@ -29,7 +29,6 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.Serializable; import java.io.Serializable;
@@ -207,7 +206,7 @@ public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
} }
public static Long getLastHeartbeatTime(FileSystem fs, String basePath, String instantTime) throws IOException { public static Long getLastHeartbeatTime(FileSystem fs, String basePath, String instantTime) throws IOException {
Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + File.separator + instantTime); Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime);
if (fs.exists(heartbeatFilePath)) { if (fs.exists(heartbeatFilePath)) {
return fs.getFileStatus(heartbeatFilePath).getModificationTime(); return fs.getFileStatus(heartbeatFilePath).getModificationTime();
} else { } else {
@@ -217,7 +216,7 @@ public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
} }
public static Boolean heartbeatExists(FileSystem fs, String basePath, String instantTime) throws IOException { public static Boolean heartbeatExists(FileSystem fs, String basePath, String instantTime) throws IOException {
Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + File.separator + instantTime); Path heartbeatFilePath = new Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + instantTime);
if (fs.exists(heartbeatFilePath)) { if (fs.exists(heartbeatFilePath)) {
return true; return true;
} }
@@ -255,7 +254,7 @@ public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
try { try {
Long newHeartbeatTime = System.currentTimeMillis(); Long newHeartbeatTime = System.currentTimeMillis();
OutputStream outputStream = OutputStream outputStream =
this.fs.create(new Path(heartbeatFolderPath + File.separator + instantTime), true); this.fs.create(new Path(heartbeatFolderPath + Path.SEPARATOR + instantTime), true);
outputStream.close(); outputStream.close();
Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime); Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
if (heartbeat.getLastHeartbeatTime() != null && isHeartbeatExpired(instantTime)) { if (heartbeat.getLastHeartbeatTime() != null && isHeartbeatExpired(instantTime)) {

View File

@@ -49,7 +49,6 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
@@ -76,10 +75,10 @@ public class HoodieTableMetaClient implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class); private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
public static final String METAFOLDER_NAME = ".hoodie"; public static final String METAFOLDER_NAME = ".hoodie";
public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp"; public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".temp";
public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux"; public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".aux";
public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + File.separator + ".bootstrap"; public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + File.separator + ".heartbeat"; public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat";
public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH
+ Path.SEPARATOR + ".partitions"; + Path.SEPARATOR + ".partitions";
public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR
@@ -205,7 +204,7 @@ public class HoodieTableMetaClient implements Serializable {
* @return Heartbeat folder path. * @return Heartbeat folder path.
*/ */
public static String getHeartbeatFolderPath(String basePath) { public static String getHeartbeatFolderPath(String basePath) {
return String.format("%s%s%s", basePath, File.separator, HEARTBEAT_FOLDER_NAME); return String.format("%s%s%s", basePath, Path.SEPARATOR, HEARTBEAT_FOLDER_NAME);
} }
/** /**
@@ -227,7 +226,7 @@ public class HoodieTableMetaClient implements Serializable {
*/ */
public String getArchivePath() { public String getArchivePath() {
String archiveFolder = tableConfig.getArchivelogFolder(); String archiveFolder = tableConfig.getArchivelogFolder();
return getMetaPath() + "/" + archiveFolder; return getMetaPath() + Path.SEPARATOR + archiveFolder;
} }
/** /**

View File

@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@@ -89,7 +88,7 @@ public class FileIndex {
} }
List<Map<String, String>> partitions = new ArrayList<>(); List<Map<String, String>> partitions = new ArrayList<>();
for (String partitionPath : partitionPaths) { for (String partitionPath : partitionPaths) {
String[] paths = partitionPath.split(File.separator); String[] paths = partitionPath.split(Path.SEPARATOR);
Map<String, String> partitionMapping = new LinkedHashMap<>(); Map<String, String> partitionMapping = new LinkedHashMap<>();
if (hivePartition) { if (hivePartition) {
Arrays.stream(paths).forEach(p -> { Arrays.stream(paths).forEach(p -> {

View File

@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@@ -83,7 +82,7 @@ public class FilePathUtils {
* @param partitionKVs The partition key value mapping * @param partitionKVs The partition key value mapping
* @param hivePartition Whether the partition path is with Hive style, * @param hivePartition Whether the partition path is with Hive style,
* e.g. {partition key} = {partition value} * e.g. {partition key} = {partition value}
* @param sepSuffix Whether to append the file separator as suffix * @param sepSuffix Whether to append the path separator as suffix
* @return an escaped, valid partition name * @return an escaped, valid partition name
*/ */
public static String generatePartitionPath( public static String generatePartitionPath(
@@ -97,7 +96,7 @@ public class FilePathUtils {
int i = 0; int i = 0;
for (Map.Entry<String, String> e : partitionKVs.entrySet()) { for (Map.Entry<String, String> e : partitionKVs.entrySet()) {
if (i > 0) { if (i > 0) {
suffixBuf.append(File.separator); suffixBuf.append(Path.SEPARATOR);
} }
if (hivePartition) { if (hivePartition) {
suffixBuf.append(escapePathName(e.getKey())); suffixBuf.append(escapePathName(e.getKey()));
@@ -107,7 +106,7 @@ public class FilePathUtils {
i++; i++;
} }
if (sepSuffix) { if (sepSuffix) {
suffixBuf.append(File.separator); suffixBuf.append(Path.SEPARATOR);
} }
return suffixBuf.toString(); return suffixBuf.toString();
} }

View File

@@ -27,8 +27,6 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import java.io.File;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
@@ -51,7 +49,7 @@ public class TestUtils {
public static String getSplitPartitionPath(MergeOnReadInputSplit split) { public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
assertTrue(split.getLogPaths().isPresent()); assertTrue(split.getLogPaths().isPresent());
final String logPath = split.getLogPaths().get().get(0); final String logPath = split.getLogPaths().get().get(0);
String[] paths = logPath.split(File.separator); String[] paths = logPath.split(Path.SEPARATOR);
return paths[paths.length - 2]; return paths[paths.length - 2];
} }