1
0

Use hadoopConf in HoodieTableMetaClient and related tests

This commit is contained in:
Jian Xu
2018-03-08 17:21:11 -08:00
committed by vinoth chandar
parent 73534d467f
commit 7f079632a6
20 changed files with 57 additions and 50 deletions

View File

@@ -85,7 +85,7 @@ public class HoodieClientExample {
FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration()); FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration());
if (!fs.exists(path)) { if (!fs.exists(path)) {
HoodieTableMetaClient HoodieTableMetaClient
.initTableType(fs, tablePath, HoodieTableType.valueOf(tableType), tableName, .initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), tableName,
HoodieAvroPayload.class.getName()); HoodieAvroPayload.class.getName());
} }

View File

@@ -105,7 +105,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
folder.create(); folder.create();
basePath = folder.getRoot().getAbsolutePath(); basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath.toString(), jsc.hadoopConfiguration()); fs = FSUtils.getFs(basePath.toString(), jsc.hadoopConfiguration());
HoodieTestUtils.init(fs, basePath); HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
dataGen = new HoodieTestDataGenerator(); dataGen = new HoodieTestDataGenerator();
} }
@@ -1258,7 +1258,7 @@ public class TestHoodieClientOnCopyOnWriteStorage implements Serializable {
.retainFileVersions(1).build()).build(); .retainFileVersions(1).build()).build();
HoodieTableMetaClient metaClient = HoodieTestUtils HoodieTableMetaClient metaClient = HoodieTestUtils
.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ); .initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
// Make 3 files, one base file and 2 log files associated with base file // Make 3 files, one base file and 2 log files associated with base file
String file1P0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000"); String file1P0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000");

View File

@@ -103,9 +103,8 @@ public class TestMultiFS implements Serializable {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// Initialize table and filesystem // Initialize table and filesystem
FileSystem hdfs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
HoodieTableMetaClient HoodieTableMetaClient
.initTableType(hdfs, dfsBasePath, HoodieTableType.valueOf(tableType), tableName, .initTableType(jsc.hadoopConfiguration(), dfsBasePath, HoodieTableType.valueOf(tableType), tableName,
HoodieAvroPayload.class.getName()); HoodieAvroPayload.class.getName());
//Create write client to write some records in //Create write client to write some records in
@@ -133,9 +132,8 @@ public class TestMultiFS implements Serializable {
assertEquals("Should contain 100 records", readRecords.count(), records.size()); assertEquals("Should contain 100 records", readRecords.count(), records.size());
// Write to local // Write to local
FileSystem local = FSUtils.getFs(tablePath, jsc.hadoopConfiguration());
HoodieTableMetaClient HoodieTableMetaClient
.initTableType(local, tablePath, HoodieTableType.valueOf(tableType), tableName, .initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), tableName,
HoodieAvroPayload.class.getName()); HoodieAvroPayload.class.getName());
HoodieWriteConfig localConfig = HoodieWriteConfig.newBuilder().withPath(tablePath) HoodieWriteConfig localConfig = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)

View File

@@ -48,7 +48,7 @@ public class TestUpdateMapFunction {
TemporaryFolder folder = new TemporaryFolder(); TemporaryFolder folder = new TemporaryFolder();
folder.create(); folder.create();
this.basePath = folder.getRoot().getAbsolutePath(); this.basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()), basePath); HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath);
} }
@Test @Test

View File

@@ -112,7 +112,7 @@ public class TestHbaseIndex {
basePath = folder.getRoot().getAbsolutePath(); basePath = folder.getRoot().getAbsolutePath();
// Initialize table // Initialize table
metaClient = HoodieTableMetaClient metaClient = HoodieTableMetaClient
.initTableType(utility.getTestFileSystem(), basePath, HoodieTableType.COPY_ON_WRITE, .initTableType(utility.getConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE,
tableName, HoodieTableConfig.DEFAULT_PAYLOAD_CLASS); tableName, HoodieTableConfig.DEFAULT_PAYLOAD_CLASS);
} }

View File

@@ -88,7 +88,7 @@ public class TestHoodieBloomIndex {
folder.create(); folder.create();
basePath = folder.getRoot().getAbsolutePath(); basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(fs, basePath); HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions) // We have some records to be tagged (two different partitions)
schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8");
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr)); schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));

View File

@@ -39,6 +39,7 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Before; import org.junit.Before;
@@ -49,14 +50,16 @@ public class TestHoodieCommitArchiveLog {
private String basePath; private String basePath;
private FileSystem fs; private FileSystem fs;
private Configuration hadoopConf;
@Before @Before
public void init() throws Exception { public void init() throws Exception {
TemporaryFolder folder = new TemporaryFolder(); TemporaryFolder folder = new TemporaryFolder();
folder.create(); folder.create();
basePath = folder.getRoot().getAbsolutePath(); basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
HoodieTestUtils.init(fs, basePath); fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.init(hadoopConf, basePath);
} }
@Test @Test
@@ -76,7 +79,7 @@ public class TestHoodieCommitArchiveLog {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4).build())
.forTable("test-trip-table").build(); .forTable("test-trip-table").build();
HoodieTestUtils.init(fs, basePath); HoodieTestUtils.init(hadoopConf, basePath);
HoodieTestDataGenerator.createCommitFile(basePath, "100"); HoodieTestDataGenerator.createCommitFile(basePath, "100");
HoodieTestDataGenerator.createCommitFile(basePath, "101"); HoodieTestDataGenerator.createCommitFile(basePath, "101");
HoodieTestDataGenerator.createCommitFile(basePath, "102"); HoodieTestDataGenerator.createCommitFile(basePath, "102");

View File

@@ -43,6 +43,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
@@ -58,6 +59,7 @@ public class TestHoodieCompactor {
private HoodieCompactor compactor; private HoodieCompactor compactor;
private transient HoodieTestDataGenerator dataGen = null; private transient HoodieTestDataGenerator dataGen = null;
private transient FileSystem fs; private transient FileSystem fs;
private Configuration hadoopConf;
@Before @Before
public void init() throws IOException { public void init() throws IOException {
@@ -68,8 +70,9 @@ public class TestHoodieCompactor {
TemporaryFolder folder = new TemporaryFolder(); TemporaryFolder folder = new TemporaryFolder();
folder.create(); folder.create();
basePath = folder.getRoot().getAbsolutePath(); basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ); fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.initTableType(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
dataGen = new HoodieTestDataGenerator(); dataGen = new HoodieTestDataGenerator();
compactor = new HoodieRealtimeTableCompactor(); compactor = new HoodieRealtimeTableCompactor();
@@ -102,7 +105,7 @@ public class TestHoodieCompactor {
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testCompactionOnCopyOnWriteFail() throws Exception { public void testCompactionOnCopyOnWriteFail() throws Exception {
HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.COPY_ON_WRITE); HoodieTestUtils.initTableType(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath); basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());

View File

@@ -76,7 +76,7 @@ public class TestCopyOnWriteTable {
TemporaryFolder folder = new TemporaryFolder(); TemporaryFolder folder = new TemporaryFolder();
folder.create(); folder.create();
this.basePath = folder.getRoot().getAbsolutePath(); this.basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(FSUtils.getFs(basePath, jsc.hadoopConfiguration()), basePath); HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
} }
@Test @Test

View File

@@ -121,7 +121,7 @@ public class TestMergeOnReadTable {
jsc.hadoopConfiguration().addResource(dfs.getConf()); jsc.hadoopConfiguration().addResource(dfs.getConf());
dfs.mkdirs(new Path(basePath)); dfs.mkdirs(new Path(basePath));
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
sqlContext = new SQLContext(jsc); // SQLContext stuff sqlContext = new SQLContext(jsc); // SQLContext stuff
} }
@@ -346,7 +346,7 @@ public class TestMergeOnReadTable {
public void testCOWToMORConvertedDatasetRollback() throws Exception { public void testCOWToMORConvertedDatasetRollback() throws Exception {
//Set TableType to COW //Set TableType to COW
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.COPY_ON_WRITE); HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
@@ -385,7 +385,7 @@ public class TestMergeOnReadTable {
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
//Set TableType to MOR //Set TableType to MOR
HoodieTestUtils.initTableType(dfs, basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
//rollback a COW commit when TableType is MOR //rollback a COW commit when TableType is MOR
client.rollback(newCommitTime); client.rollback(newCommitTime);

View File

@@ -193,7 +193,7 @@ public class HoodieTableMetaClient implements Serializable {
/** /**
* Helper method to initialize a given path, as a given storage type and table name * Helper method to initialize a given path, as a given storage type and table name
*/ */
public static HoodieTableMetaClient initTableType(FileSystem fs, String basePath, public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType, String tableName, String payloadClassName) throws IOException { HoodieTableType tableType, String tableName, String payloadClassName) throws IOException {
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName);
@@ -201,7 +201,7 @@ public class HoodieTableMetaClient implements Serializable {
if (tableType == HoodieTableType.MERGE_ON_READ) { if (tableType == HoodieTableType.MERGE_ON_READ) {
properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName); properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, payloadClassName);
} }
return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties); return HoodieTableMetaClient.initializePathAsHoodieDataset(hadoopConf, basePath, properties);
} }
/** /**
@@ -210,10 +210,11 @@ public class HoodieTableMetaClient implements Serializable {
* *
* @return Instance of HoodieTableMetaClient * @return Instance of HoodieTableMetaClient
*/ */
public static HoodieTableMetaClient initializePathAsHoodieDataset(FileSystem fs, public static HoodieTableMetaClient initializePathAsHoodieDataset(Configuration hadoopConf,
String basePath, Properties props) throws IOException { String basePath, Properties props) throws IOException {
log.info("Initializing " + basePath + " as hoodie dataset " + basePath); log.info("Initializing " + basePath + " as hoodie dataset " + basePath);
Path basePathDir = new Path(basePath); Path basePathDir = new Path(basePath);
final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
if (!fs.exists(basePathDir)) { if (!fs.exists(basePathDir)) {
fs.mkdirs(basePathDir); fs.mkdirs(basePathDir);
} }
@@ -239,7 +240,9 @@ public class HoodieTableMetaClient implements Serializable {
fs.mkdirs(temporaryFolder); fs.mkdirs(temporaryFolder);
} }
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props); HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), basePath); // We should not use fs.getConf as this might be different from the original configuration
// used to create the fs in unit tests
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() log.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType()
+ " from " + basePath); + " from " + basePath);
return metaClient; return metaClient;

View File

@@ -80,11 +80,12 @@ public class HoodieTestUtils {
return new Configuration(); return new Configuration();
} }
public static HoodieTableMetaClient init(FileSystem fs, String basePath) throws IOException { public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath)
return initTableType(fs, basePath, HoodieTableType.COPY_ON_WRITE); throws IOException {
return initTableType(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
} }
public static HoodieTableMetaClient initTableType(FileSystem fs, String basePath, public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath,
HoodieTableType tableType) HoodieTableType tableType)
throws IOException { throws IOException {
Properties properties = new Properties(); Properties properties = new Properties();
@@ -92,7 +93,7 @@ public class HoodieTestUtils {
properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name());
properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME,
HoodieAvroPayload.class.getName()); HoodieAvroPayload.class.getName());
return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties); return HoodieTableMetaClient.initializePathAsHoodieDataset(hadoopConf, basePath, properties);
} }
public static HoodieTableMetaClient initOnTemp() throws IOException { public static HoodieTableMetaClient initOnTemp() throws IOException {
@@ -100,8 +101,7 @@ public class HoodieTestUtils {
TemporaryFolder folder = new TemporaryFolder(); TemporaryFolder folder = new TemporaryFolder();
folder.create(); folder.create();
String basePath = folder.getRoot().getAbsolutePath(); String basePath = folder.getRoot().getAbsolutePath();
return HoodieTestUtils return HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath);
.init(FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()), basePath);
} }
public static String makeNewCommitTime() { public static String makeNewCommitTime() {

View File

@@ -109,7 +109,7 @@ public class HoodieLogFormatTest {
assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath()))); assertTrue(fs.mkdirs(new Path(folder.getRoot().getPath())));
this.partitionPath = new Path(folder.getRoot().getPath()); this.partitionPath = new Path(folder.getRoot().getPath());
this.basePath = folder.getRoot().getParent(); this.basePath = folder.getRoot().getParent();
HoodieTestUtils.initTableType(fs, basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.initTableType(MiniClusterUtil.configuration, basePath, HoodieTableType.MERGE_ON_READ);
} }
@After @After

View File

@@ -39,9 +39,7 @@ public class InputFormatTestUtil {
public static File prepareDataset(TemporaryFolder basePath, int numberOfFiles, public static File prepareDataset(TemporaryFolder basePath, int numberOfFiles,
String commitNumber) throws IOException { String commitNumber) throws IOException {
basePath.create(); basePath.create();
HoodieTestUtils HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString());
.init(FSUtils.getFs(basePath.getRoot().toString(), HoodieTestUtils.getDefaultHadoopConf()),
basePath.getRoot().toString());
File partitionPath = basePath.newFolder("2016", "05", "01"); File partitionPath = basePath.newFolder("2016", "05", "01");
for (int i = 0; i < numberOfFiles; i++) { for (int i = 0; i < numberOfFiles; i++) {
File dataFile = File dataFile =
@@ -101,9 +99,7 @@ public class InputFormatTestUtil {
int numberOfFiles, int numberOfRecords, int numberOfFiles, int numberOfRecords,
String commitNumber) throws IOException { String commitNumber) throws IOException {
basePath.create(); basePath.create();
HoodieTestUtils HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.getRoot().toString());
.init(FSUtils.getFs(basePath.getRoot().toString(), HoodieTestUtils.getDefaultHadoopConf()),
basePath.getRoot().toString());
File partitionPath = basePath.newFolder("2016", "05", "01"); File partitionPath = basePath.newFolder("2016", "05", "01");
AvroParquetWriter parquetWriter; AvroParquetWriter parquetWriter;
for (int i = 0; i < numberOfFiles; i++) { for (int i = 0; i < numberOfFiles; i++) {

View File

@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -66,12 +67,13 @@ public class HoodieRealtimeRecordReaderTest {
private JobConf jobConf; private JobConf jobConf;
private FileSystem fs; private FileSystem fs;
private Configuration hadoopConf;
@Before @Before
public void setUp() { public void setUp() {
jobConf = new JobConf(); jobConf = new JobConf();
fs = FSUtils hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
.getFs(basePath.getRoot().getAbsolutePath(), HoodieTestUtils.getDefaultHadoopConf()); fs = FSUtils.getFs(basePath.getRoot().getAbsolutePath(), hadoopConf);
} }
@Rule @Rule
@@ -105,7 +107,7 @@ public class HoodieRealtimeRecordReaderTest {
// initial commit // initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils HoodieTestUtils
.initTableType(fs, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); .initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
String commitTime = "100"; String commitTime = "100";
File partitionDir = InputFormatTestUtil File partitionDir = InputFormatTestUtil
.prepareParquetDataset(basePath, schema, 1, 100, commitTime); .prepareParquetDataset(basePath, schema, 1, 100, commitTime);
@@ -163,7 +165,7 @@ public class HoodieRealtimeRecordReaderTest {
// initial commit // initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema());
HoodieTestUtils HoodieTestUtils
.initTableType(fs, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ); .initTableType(hadoopConf, basePath.getRoot().getAbsolutePath(), HoodieTableType.MERGE_ON_READ);
String commitTime = "100"; String commitTime = "100";
int numberOfRecords = 100; int numberOfRecords = 100;
int numberOfLogRecords = numberOfRecords / 2; int numberOfLogRecords = numberOfRecords / 2;

View File

@@ -118,7 +118,7 @@ public class TestUtil {
static void clear() throws IOException { static void clear() throws IOException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true); fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient HoodieTableMetaClient
.initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, .initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(),
@@ -154,7 +154,7 @@ public class TestUtil {
Path path = new Path(hiveSyncConfig.basePath); Path path = new Path(hiveSyncConfig.basePath);
FileUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); FileUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient HoodieTableMetaClient
.initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE, .initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
boolean result = fileSystem.mkdirs(path); boolean result = fileSystem.mkdirs(path);
checkResult(result); checkResult(result);
@@ -170,7 +170,7 @@ public class TestUtil {
Path path = new Path(hiveSyncConfig.basePath); Path path = new Path(hiveSyncConfig.basePath);
FileUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); FileUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient HoodieTableMetaClient
.initTableType(fileSystem, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ, .initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ,
hiveSyncConfig.tableName, HoodieAvroPayload.class.getName()); hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
boolean result = fileSystem.mkdirs(path); boolean result = fileSystem.mkdirs(path);

View File

@@ -190,7 +190,8 @@ class DefaultSource extends RelationProvider
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get); properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tblName.get);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType); properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, storageType);
properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived"); properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived");
HoodieTableMetaClient.initializePathAsHoodieDataset(fs, path.get, properties); HoodieTableMetaClient.initializePathAsHoodieDataset(
sqlContext.sparkContext.hadoopConfiguration, path.get, properties);
} }
// Create a HoodieWriteClient & issue the write. // Create a HoodieWriteClient & issue the write.

View File

@@ -232,7 +232,7 @@ public class HDFSParquetImporter implements Serializable {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName); properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType); properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
HoodieTableMetaClient.initializePathAsHoodieDataset(fs, cfg.targetPath, properties); HoodieTableMetaClient.initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties);
HoodieWriteClient client = createHoodieClient(jsc, cfg.targetPath, schemaStr, HoodieWriteClient client = createHoodieClient(jsc, cfg.targetPath, schemaStr,
cfg.parallelism); cfg.parallelism);

View File

@@ -202,8 +202,7 @@ public class HoodieDeltaStreamer implements Serializable {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(HoodieWriteConfig.TABLE_NAME, cfg.targetTableName); properties.put(HoodieWriteConfig.TABLE_NAME, cfg.targetTableName);
HoodieTableMetaClient HoodieTableMetaClient
.initializePathAsHoodieDataset( .initializePathAsHoodieDataset(jssc.hadoopConfiguration(), cfg.targetBasePath,
FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), cfg.targetBasePath,
properties); properties);
} }
log.info("Checkpoint to resume from : " + resumeCheckpointStr); log.info("Checkpoint to resume from : " + resumeCheckpointStr);

View File

@@ -25,6 +25,7 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.FSUtils;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@@ -52,8 +53,9 @@ public class TestHoodieSnapshotCopier {
basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME; basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME;
outputPath = rootPath + "/output"; outputPath = rootPath + "/output";
fs = FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()); final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
HoodieTestUtils.init(fs, basePath); fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.init(hadoopConf, basePath);
// Start a local Spark job // Start a local Spark job
SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]"); SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]");
jsc = new JavaSparkContext(conf); jsc = new JavaSparkContext(conf);