1
0

[HUDI-217] Provide a unified resource management class to standardize the resource allocation and release for hudi client test cases

This commit is contained in:
yanghua
2019-08-22 19:29:42 +08:00
committed by vinoth chandar
parent 64df98fc4a
commit 6f2b166005
24 changed files with 2227 additions and 2333 deletions

View File

@@ -35,7 +35,7 @@ import org.apache.spark.api.java.JavaSparkContext;
* Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs) * Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs)
* Also, manages embedded timeline-server if enabled. * Also, manages embedded timeline-server if enabled.
*/ */
public abstract class AbstractHoodieClient implements Serializable { public abstract class AbstractHoodieClient implements Serializable, AutoCloseable {
private static final Logger logger = LogManager.getLogger(AbstractHoodieClient.class); private static final Logger logger = LogManager.getLogger(AbstractHoodieClient.class);

View File

@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.FSUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The test harness for resource initialization and cleanup.
*/
public abstract class HoodieClientTestHarness implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(HoodieClientTestHarness.class);
protected transient JavaSparkContext jsc = null;
protected transient SQLContext sqlContext;
protected transient FileSystem fs;
protected String basePath = null;
protected TemporaryFolder folder = null;
protected transient HoodieTestDataGenerator dataGen = null;
protected transient ExecutorService executorService;
//dfs
protected String dfsBasePath;
protected transient HdfsTestService hdfsTestService;
protected transient MiniDFSCluster dfsCluster;
protected transient DistributedFileSystem dfs;
protected void initSparkContexts(String appName) {
// Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
jsc.setLogLevel("ERROR");
//SQLContext stuff
sqlContext = new SQLContext(jsc);
}
protected void initSparkContexts() {
initSparkContexts("TestHoodieClient");
}
protected void cleanupSparkContexts() {
if (sqlContext != null) {
logger.info("Clearing sql context cache of spark-session used in previous test-case");
sqlContext.clearCache();
sqlContext = null;
}
if (jsc != null) {
logger.info("Closing spark context used in previous test-case");
jsc.close();
jsc.stop();
jsc = null;
}
}
protected void initTempFolderAndPath() throws IOException {
folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
}
protected void cleanupTempFolderAndPath() throws IOException {
if (basePath != null) {
new File(basePath).delete();
}
if (folder != null) {
logger.info("Explicitly removing workspace used in previously run test-case");
folder.delete();
}
}
protected void initFileSystem() {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
if (jsc == null) {
throw new IllegalStateException("The Spark context has not been initialized.");
}
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
if (fs instanceof LocalFileSystem) {
LocalFileSystem lfs = (LocalFileSystem) fs;
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
// So, for the tests, we enforce checksum verification to circumvent the problem
lfs.setVerifyChecksum(true);
}
}
protected void initFileSystemWithDefaultConfiguration() {
fs = FSUtils.getFs(basePath, new Configuration());
if (fs instanceof LocalFileSystem) {
LocalFileSystem lfs = (LocalFileSystem) fs;
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
// So, for the tests, we enforce checksum verification to circumvent the problem
lfs.setVerifyChecksum(true);
}
}
protected void cleanupFileSystem() throws IOException {
if (fs != null) {
logger.warn("Closing file-system instance used in previous test-run");
fs.close();
}
}
protected void initTableType() throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
if (jsc == null) {
throw new IllegalStateException("The Spark context has not been initialized.");
}
HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, getTableType());
}
protected void cleanupTableType() {
}
protected void initTestDataGenerator() throws IOException {
dataGen = new HoodieTestDataGenerator();
}
protected void cleanupTestDataGenerator() throws IOException {
dataGen = null;
}
protected HoodieTableType getTableType() {
return HoodieTableType.COPY_ON_WRITE;
}
protected void initDFS() throws IOException {
FileSystem.closeAll();
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
dfsBasePath = dfs.getWorkingDirectory().toString();
dfs.mkdirs(new Path(dfsBasePath));
}
protected void cleanupDFS() throws IOException {
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
}
protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
executorService = Executors.newFixedThreadPool(threadNum);
}
protected void cleanupExecutorService() {
if (this.executorService != null) {
this.executorService.shutdownNow();
this.executorService = null;
}
}
}

View File

@@ -59,7 +59,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
@@ -71,7 +73,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
return getConfigBuilder(autoCommit).build(); return getConfigBuilder(autoCommit).build();
} }
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) { private HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withAutoCommit(autoCommit).withAssumeDatePartitioning(true).withCompactionConfig( .withAutoCommit(autoCommit).withAssumeDatePartitioning(true).withCompactionConfig(
HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024).withInlineCompaction(false) HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024).withInlineCompaction(false)
@@ -84,16 +86,27 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
.build()); .build());
} }
@Override @Before
public void tearDown() throws IOException { public void setUp() throws Exception {
super.tearDown(); initTempFolderAndPath();
initTestDataGenerator();
initSparkContexts();
initTableType();
}
@After
public void tearDown() throws Exception {
cleanupTableType();
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupTempFolderAndPath();
} }
@Test @Test
public void testRollbackForInflightCompaction() throws Exception { public void testRollbackForInflightCompaction() throws Exception {
// Rollback inflight compaction // Rollback inflight compaction
HoodieWriteConfig cfg = getConfig(false); HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
@@ -141,6 +154,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
metaClient.getFs().getFileStatus(new Path(metaClient.getMetaPath(), pendingCompactionInstant.getFileName())); metaClient.getFs().getFileStatus(new Path(metaClient.getMetaPath(), pendingCompactionInstant.getFileName()));
assertTrue(fstatus.getLen() > 0); assertTrue(fstatus.getLen() > 0);
} }
}
private Path getInstantPath(HoodieTableMetaClient metaClient, String timestamp, String action, State state) { private Path getInstantPath(HoodieTableMetaClient metaClient, String timestamp, String action, State state) {
HoodieInstant instant = new HoodieInstant(state, action, timestamp); HoodieInstant instant = new HoodieInstant(state, action, timestamp);
@@ -151,8 +165,6 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testRollbackInflightIngestionWithPendingCompaction() throws Exception { public void testRollbackInflightIngestionWithPendingCompaction() throws Exception {
// Rollback inflight ingestion when there is pending compaction // Rollback inflight ingestion when there is pending compaction
HoodieWriteConfig cfg = getConfig(false); HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = getHoodieWriteClient(cfg, true);
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
String compactionInstantTime = "005"; String compactionInstantTime = "005";
@@ -161,6 +173,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000; int numRecs = 2000;
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs); List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime), records = runNextDeltaCommits(client, Arrays.asList(firstInstantTime, secondInstantTime),
records, cfg, true, new ArrayList<>()); records, cfg, true, new ArrayList<>());
@@ -198,12 +212,13 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
assertTrue("Pending Compaction instant has expected instant time", assertTrue("Pending Compaction instant has expected instant time",
pendingCompactionInstant.getTimestamp().equals(compactionInstantTime)); pendingCompactionInstant.getTimestamp().equals(compactionInstantTime));
} }
}
@Test @Test
public void testInflightCompaction() throws Exception { public void testInflightCompaction() throws Exception {
// There is inflight compaction. Subsequent compaction run must work correctly // There is inflight compaction. Subsequent compaction run must work correctly
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
@@ -230,6 +245,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
// execute inflight compaction // execute inflight compaction
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true); executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
} }
}
@Test @Test
public void testScheduleIngestionBeforePendingCompaction() throws Exception { public void testScheduleIngestionBeforePendingCompaction() throws Exception {
@@ -359,7 +375,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testCompactionAfterTwoDeltaCommits() throws Exception { public void testCompactionAfterTwoDeltaCommits() throws Exception {
// No Delta Commits after compaction request // No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
@@ -374,12 +390,13 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg); HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false); scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, false);
} }
}
@Test @Test
public void testInterleavedCompaction() throws Exception { public void testInterleavedCompaction() throws Exception {
//Case: Two delta commits before and after compaction schedule //Case: Two delta commits before and after compaction schedule
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
@@ -401,6 +418,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
records, cfg, false, Arrays.asList(compactionInstantTime)); records, cfg, false, Arrays.asList(compactionInstantTime));
executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true); executeCompaction(compactionInstantTime, client, hoodieTable, cfg, numRecs, true);
} }
}
/** /**
* HELPER METHODS FOR TESTING * HELPER METHODS FOR TESTING

View File

@@ -72,7 +72,9 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerTaskEnd; import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.util.AccumulatorV2; import org.apache.spark.util.AccumulatorV2;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import scala.collection.Iterator; import scala.collection.Iterator;
@@ -84,9 +86,22 @@ public class TestCleaner extends TestHoodieClientBase {
private static final int BIG_BATCH_INSERT_SIZE = 500; private static final int BIG_BATCH_INSERT_SIZE = 500;
private static Logger logger = LogManager.getLogger(TestHoodieClientBase.class); private static Logger logger = LogManager.getLogger(TestHoodieClientBase.class);
@Override @Before
public void tearDown() throws IOException { public void setUp() throws Exception {
super.tearDown(); initTempFolderAndPath();
initSparkContexts();
initTestDataGenerator();
initFileSystem();
initTableType();
}
@After
public void tearDown() throws Exception {
cleanupTableType();
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupFileSystem();
cleanupTempFolderAndPath();
} }
/** /**
@@ -194,7 +209,7 @@ public class TestCleaner extends TestHoodieClientBase {
.withFinalizeWriteParallelism(1) .withFinalizeWriteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build(); .build();
HoodieWriteClient client = getHoodieWriteClient(cfg); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
@@ -289,7 +304,8 @@ public class TestCleaner extends TestHoodieClientBase {
List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId)); List<String> commitedVersions = new ArrayList<>(fileIdToVersions.get(fileId));
for (int i = 0; i < dataFiles.size(); i++) { for (int i = 0; i < dataFiles.size(); i++) {
assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions, assertEquals("File " + fileId + " does not have latest versions on commits" + commitedVersions,
Iterables.get(dataFiles, i).getCommitTime(), commitedVersions.get(commitedVersions.size() - 1 - i)); Iterables.get(dataFiles, i).getCommitTime(),
commitedVersions.get(commitedVersions.size() - 1 - i));
} }
} }
} }
@@ -299,6 +315,7 @@ public class TestCleaner extends TestHoodieClientBase {
} }
} }
} }
}
/** /**
* Test Clean-By-Versions using insert/upsert API * Test Clean-By-Versions using insert/upsert API

View File

@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
@@ -43,6 +42,8 @@ import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
@@ -50,9 +51,22 @@ import org.junit.Test;
*/ */
public class TestClientRollback extends TestHoodieClientBase { public class TestClientRollback extends TestHoodieClientBase {
@Override @Before
public void tearDown() throws IOException { public void setUp() throws Exception {
super.tearDown(); initTempFolderAndPath();
initTestDataGenerator();
initSparkContexts();
initFileSystem();
initTableType();
}
@After
public void tearDown() throws Exception {
cleanupTableType();
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupFileSystem();
cleanupTempFolderAndPath();
} }
/** /**
@@ -63,7 +77,7 @@ public class TestClientRollback extends TestHoodieClientBase {
HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig( HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(
HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1) HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1)
.build()).build(); .build()).build();
HoodieWriteClient client = getHoodieWriteClient(cfg); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
/** /**
@@ -167,6 +181,7 @@ public class TestClientRollback extends TestHoodieClientBase {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
assertEquals("The data files for commit 004 should be rolled back", 0, dataFiles.size()); assertEquals("The data files for commit 004 should be rolled back", 0, dataFiles.size());
} }
}
/** /**
* Test Cases for effects of rollbacking completed/inflight commits * Test Cases for effects of rollbacking completed/inflight commits
@@ -204,7 +219,7 @@ public class TestClientRollback extends TestHoodieClientBase {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig( HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
HoodieWriteClient client = getHoodieWriteClient(config, false); try (HoodieWriteClient client = getHoodieWriteClient(config, false);) {
// Rollback commit 1 (this should fail, since commit2 is still around) // Rollback commit 1 (this should fail, since commit2 is still around)
try { try {
@@ -256,6 +271,7 @@ public class TestClientRollback extends TestHoodieClientBase {
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12) || HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12)
|| HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13)); || HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13));
} }
}
/** /**
* Test auto-rollback of commits which are in flight * Test auto-rollback of commits which are in flight
@@ -294,7 +310,7 @@ public class TestClientRollback extends TestHoodieClientBase {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig( HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
getHoodieWriteClient(config, false); try (HoodieWriteClient client = getHoodieWriteClient(config, false);) {
// Check results, nothing changed // Check results, nothing changed
assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1)); assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
@@ -309,9 +325,11 @@ public class TestClientRollback extends TestHoodieClientBase {
assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime1, file11) assertTrue(HoodieTestUtils.doesDataFileExist(basePath, "2016/05/01", commitTime1, file11)
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12) && HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12)
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13)); && HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13));
}
// Turn auto rollback on // Turn auto rollback on
getHoodieWriteClient(config, true).startCommit(); try (HoodieWriteClient client = getHoodieWriteClient(config, true)) {
client.startCommit();
assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1)); assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2)); assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3)); assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3));
@@ -325,4 +343,5 @@ public class TestClientRollback extends TestHoodieClientBase {
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12) && HoodieTestUtils.doesDataFileExist(basePath, "2016/05/02", commitTime1, file12)
&& HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13)); && HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13));
} }
}
} }

View File

@@ -39,6 +39,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@@ -49,15 +50,19 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
private CompactionAdminClient client; private CompactionAdminClient client;
@Before @Before
public void init() throws IOException { public void setUp() throws Exception {
super.init(); initTempFolderAndPath();
initSparkContexts();
metaClient = HoodieTestUtils.initTableType(HoodieTestUtils.getDefaultHadoopConf(), basePath, MERGE_ON_READ); metaClient = HoodieTestUtils.initTableType(HoodieTestUtils.getDefaultHadoopConf(), basePath, MERGE_ON_READ);
client = new CompactionAdminClient(jsc, basePath); client = new CompactionAdminClient(jsc, basePath);
} }
@Override @After
public void tearDown() throws IOException { public void tearDown() throws Exception {
super.tearDown(); client.close();
metaClient = null;
cleanupSparkContexts();
cleanupTempFolderAndPath();
} }
@Test @Test
@@ -114,7 +119,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
public void testRepairCompactionPlan() throws Exception { public void testRepairCompactionPlan() throws Exception {
int numEntriesPerInstant = 10; int numEntriesPerInstant = 10;
CompactionTestUtils CompactionTestUtils
.setupAndValidateCompactionOperations(metaClient,false, numEntriesPerInstant, numEntriesPerInstant, .setupAndValidateCompactionOperations(metaClient, false, numEntriesPerInstant, numEntriesPerInstant,
numEntriesPerInstant, numEntriesPerInstant); numEntriesPerInstant, numEntriesPerInstant);
// THere are delta-commits after compaction instant // THere are delta-commits after compaction instant
validateRepair("000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant); validateRepair("000", "001", numEntriesPerInstant, 2 * numEntriesPerInstant);
@@ -172,8 +177,8 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
/** /**
* Enssure compaction plan is valid * Enssure compaction plan is valid
*
* @param compactionInstant Compaction Instant * @param compactionInstant Compaction Instant
* @throws Exception
*/ */
private void ensureValidCompactionPlan(String compactionInstant) throws Exception { private void ensureValidCompactionPlan(String compactionInstant) throws Exception {
metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true); metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), basePath, true);
@@ -282,8 +287,7 @@ public class TestCompactionAdminClient extends TestHoodieClientBase {
newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true) newFsView.getLatestFileSlicesBeforeOrOn(HoodieTestUtils.DEFAULT_PARTITION_PATHS[0], compactionInstant, true)
.filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> { .filter(fs -> fs.getBaseInstantTime().equals(compactionInstant)).forEach(fs -> {
Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent()); Assert.assertFalse("No Data file must be present", fs.getDataFile().isPresent());
Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0); Assert.assertTrue("No Log Files", fs.getLogFiles().count() == 0); });
});
// Ensure same number of log-files before and after renaming per fileId // Ensure same number of log-files before and after renaming per fileId
Map<String, Long> fileIdToCountsAfterRenaming = Map<String, Long> fileIdToCountsAfterRenaming =

View File

@@ -21,36 +21,27 @@ package org.apache.hudi;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.util.ConsistencyGuard; import org.apache.hudi.common.util.ConsistencyGuard;
import org.apache.hudi.common.util.ConsistencyGuardConfig; import org.apache.hudi.common.util.ConsistencyGuardConfig;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FailSafeConsistencyGuard; import org.apache.hudi.common.util.FailSafeConsistencyGuard;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestConsistencyGuard { public class TestConsistencyGuard extends HoodieClientTestHarness {
private String basePath;
protected transient FileSystem fs;
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
TemporaryFolder testFolder = new TemporaryFolder(); initTempFolderAndPath();
testFolder.create(); initFileSystemWithDefaultConfiguration();
basePath = testFolder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, new Configuration());
if (fs instanceof LocalFileSystem) {
LocalFileSystem lfs = (LocalFileSystem) fs;
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
// So, for the tests, we enforce checksum verification to circumvent the problem
lfs.setVerifyChecksum(true);
} }
@After
public void tearDown() throws Exception {
cleanupFileSystem();
cleanupTempFolderAndPath();
} }
@Test @Test

View File

@@ -22,9 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@@ -32,7 +30,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
@@ -40,7 +37,6 @@ import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus; import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
@@ -49,7 +45,6 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.ConsistencyGuardConfig; import org.apache.hudi.common.util.ConsistencyGuardConfig;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
@@ -61,27 +56,15 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.rules.TemporaryFolder;
/** /**
* Base Class providing setup/cleanup and utility methods for testing Hoodie Client facing tests * Base Class providing setup/cleanup and utility methods for testing Hoodie Client facing tests
*/ */
public class TestHoodieClientBase implements Serializable { public class TestHoodieClientBase extends HoodieClientTestHarness {
protected static Logger logger = LogManager.getLogger(TestHoodieClientBase.class); protected static Logger logger = LogManager.getLogger(TestHoodieClientBase.class);
protected transient JavaSparkContext jsc = null;
protected transient SQLContext sqlContext;
protected transient FileSystem fs;
protected String basePath = null;
protected TemporaryFolder folder = null;
protected transient HoodieTestDataGenerator dataGen = null;
private HoodieWriteClient writeClient; private HoodieWriteClient writeClient;
private HoodieReadClient readClient; private HoodieReadClient readClient;
@@ -120,61 +103,6 @@ public class TestHoodieClientBase implements Serializable {
} }
} }
@Before
public void init() throws IOException {
// Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieClient"));
jsc.setLogLevel("ERROR");
//SQLContext stuff
sqlContext = new SQLContext(jsc);
folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
if (fs instanceof LocalFileSystem) {
LocalFileSystem lfs = (LocalFileSystem) fs;
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
// So, for the tests, we enforce checksum verification to circumvent the problem
lfs.setVerifyChecksum(true);
}
HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, getTableType());
dataGen = new HoodieTestDataGenerator();
}
@After
/**
* Properly release resources at end of each test
*/
public void tearDown() throws IOException {
closeWriteClient();
closeReadClient();
if (null != sqlContext) {
logger.info("Clearing sql context cache of spark-session used in previous test-case");
sqlContext.clearCache();
}
if (null != jsc) {
logger.info("Closing spark context used in previous test-case");
jsc.close();
}
// Create a temp folder as the base path
if (null != folder) {
logger.info("Explicitly removing workspace used in previously run test-case");
folder.delete();
}
if (null != fs) {
logger.warn("Closing file-system instance used in previous test-run");
fs.close();
}
}
/** /**
* Get Default HoodieWriteConfig for tests * Get Default HoodieWriteConfig for tests
* *
@@ -469,16 +397,6 @@ public class TestHoodieClientBase implements Serializable {
return result; return result;
} }
@After
public void clean() {
if (basePath != null) {
new File(basePath).delete();
}
if (jsc != null) {
jsc.stop();
}
}
/** /**
* Get Cleaner state corresponding to a partition path * Get Cleaner state corresponding to a partition path
* *
@@ -537,7 +455,4 @@ public class TestHoodieClientBase implements Serializable {
R apply(T1 v1, T2 v2, T3 v3) throws IOException; R apply(T1 v1, T2 v2, T3 v3) throws IOException;
} }
protected HoodieTableType getTableType() {
return HoodieTableType.COPY_ON_WRITE;
}
} }

View File

@@ -27,7 +27,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@@ -66,15 +65,30 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
@Override @Before
public void tearDown() throws IOException { public void setUp() throws Exception {
super.tearDown(); initTempFolderAndPath();
initSparkContexts();
initTestDataGenerator();
initFileSystem();
initTableType();
}
@After
public void tearDown() throws Exception {
cleanupTableType();
cleanupTestDataGenerator();
cleanupSparkContexts();
cleanupFileSystem();
cleanupTempFolderAndPath();
} }
/** /**
@@ -137,13 +151,14 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
boolean isPrepped) throws Exception { boolean isPrepped) throws Exception {
// Set autoCommit false // Set autoCommit false
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
String prevCommitTime = "000"; String prevCommitTime = "000";
String newCommitTime = "001"; String newCommitTime = "001";
int numRecords = 200; int numRecords = 200;
JavaRDD<WriteStatus> result = JavaRDD<WriteStatus> result =
insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, writeFn, isPrepped, false, numRecords); insertFirstBatch(cfg, client, newCommitTime, prevCommitTime, numRecords, writeFn, isPrepped, false,
numRecords);
assertFalse("If Autocommit is false, then commit should not be made automatically", assertFalse("If Autocommit is false, then commit should not be made automatically",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
@@ -151,6 +166,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertTrue("After explicit commit, commit file should be created", assertTrue("After explicit commit, commit file should be created",
HoodieTestUtils.doesCommitExist(basePath, newCommitTime)); HoodieTestUtils.doesCommitExist(basePath, newCommitTime));
} }
}
/** /**
* Test De-duplication behavior for HoodieWriteClient insert API * Test De-duplication behavior for HoodieWriteClient insert API
@@ -215,8 +231,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertNodupesWithinPartition(dedupedRecs); assertNodupesWithinPartition(dedupedRecs);
// Perform write-action and check // Perform write-action and check
HoodieWriteClient client = getHoodieWriteClient( try (HoodieWriteClient client = getHoodieWriteClient(
getConfigBuilder().combineInput(true, true).build(), false); getConfigBuilder().combineInput(true, true).build(), false);) {
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = writeFn.apply(client, records, newCommitTime).collect(); List<WriteStatus> statuses = writeFn.apply(client, records, newCommitTime).collect();
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
@@ -225,6 +241,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
statuses.stream().map(WriteStatus::getWrittenRecords) statuses.stream().map(WriteStatus::getWrittenRecords)
.flatMap(Collection::stream).collect(Collectors.toList())); .flatMap(Collection::stream).collect(Collectors.toList()));
} }
}
/** /**
* Build a test Hoodie WriteClient with dummy index to configure isGlobal flag * Build a test Hoodie WriteClient with dummy index to configure isGlobal flag
@@ -534,7 +551,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
public void testCommitWritesRelativePaths() throws Exception { public void testCommitWritesRelativePaths() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
@@ -572,6 +589,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
assertTrue(commitPathNames.contains(pathName)); assertTrue(commitPathNames.contains(pathName));
} }
} }
}
/** /**
* Test to ensure commit metadata points to valid files * Test to ensure commit metadata points to valid files

View File

@@ -20,7 +20,6 @@ package org.apache.hudi;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@@ -29,7 +28,9 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@@ -38,9 +39,22 @@ import org.junit.Test;
*/ */
public class TestHoodieReadClient extends TestHoodieClientBase { public class TestHoodieReadClient extends TestHoodieClientBase {
@Override @Before
public void tearDown() throws IOException { public void setUp() throws Exception {
super.tearDown(); initTempFolderAndPath();
initTestDataGenerator();
initSparkContexts();
initFileSystem();
initTableType();
}
@After
public void tearDown() throws Exception {
cleanupTableType();
cleanupTestDataGenerator();
cleanupSparkContexts();
cleanupFileSystem();
cleanupTempFolderAndPath();
} }
/** /**
@@ -88,12 +102,12 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
*/ */
private void testReadFilterExist(HoodieWriteConfig config, private void testReadFilterExist(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception { Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
HoodieWriteClient writeClient = getHoodieWriteClient(config); try (HoodieWriteClient writeClient = getHoodieWriteClient(config);
HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());) {
String newCommitTime = writeClient.startCommit(); String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD); JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD);
// Should not find any files // Should not find any files
@@ -105,12 +119,14 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
// Verify there are no errors // Verify there are no errors
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
readClient = getHoodieReadClient(config.getBasePath()); try (HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());) {
filteredRDD = readClient.filterExists(recordsRDD); filteredRDD = anotherReadClient.filterExists(recordsRDD);
List<HoodieRecord> result = filteredRDD.collect(); List<HoodieRecord> result = filteredRDD.collect();
// Check results // Check results
Assert.assertEquals(25, result.size()); Assert.assertEquals(25, result.size());
} }
}
}
/** /**
* Test tagLocation API after insert() * Test tagLocation API after insert()
@@ -165,7 +181,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> updateFn, Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> updateFn,
boolean isPrepped) boolean isPrepped)
throws Exception { throws Exception {
HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig); try (HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig);) {
//Write 1 (only inserts) //Write 1 (only inserts)
String newCommitTime = "001"; String newCommitTime = "001";
String initCommitTime = "000"; String initCommitTime = "000";
@@ -204,4 +220,5 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
taggedRecords = readClient.tagLocation(recordRDD).collect(); taggedRecords = readClient.tagLocation(recordRDD).collect();
checkTaggedRecords(taggedRecords, newCommitTime); checkTaggedRecords(taggedRecords, newCommitTime);
} }
}
} }

View File

@@ -20,15 +20,10 @@ package org.apache.hudi;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.Serializable;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
@@ -42,47 +37,37 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
public class TestMultiFS implements Serializable { public class TestMultiFS extends HoodieClientTestHarness {
private static String dfsBasePath;
private static HdfsTestService hdfsTestService;
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private static Logger logger = LogManager.getLogger(TestMultiFS.class); private static Logger logger = LogManager.getLogger(TestMultiFS.class);
private static JavaSparkContext jsc;
private static SQLContext sqlContext;
private String tablePath = "file:///tmp/hoodie/sample-table"; private String tablePath = "file:///tmp/hoodie/sample-table";
protected String tableName = "hoodie_rt"; protected String tableName = "hoodie_rt";
private HoodieWriteClient hdfsWriteClient; private HoodieWriteClient hdfsWriteClient;
private String tableType = HoodieTableType.COPY_ON_WRITE.name(); private String tableType = HoodieTableType.COPY_ON_WRITE.name();
@BeforeClass @Before
public static void initClass() throws Exception { public void setUp() throws Exception {
hdfsTestService = new HdfsTestService(); initSparkContexts();
dfsCluster = hdfsTestService.start(true); jsc.getConf().setAppName("hoodie-client-example");
jsc.getConf().setMaster("local[1]");
jsc.getConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
jsc.getConf().set("spark.kryoserializer.buffer.max", "512m");
initDFS();
initTestDataGenerator();
}
// Create a temp folder as the base path @After
dfs = dfsCluster.getFileSystem(); public void tearDown() throws Exception {
dfsBasePath = dfs.getWorkingDirectory().toString(); cleanupSparkContexts();
dfs.mkdirs(new Path(dfsBasePath)); cleanupDFS();
cleanupTestDataGenerator();
SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example");
sparkConf.setMaster("local[1]");
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryoserializer.buffer.max", "512m");
jsc = new JavaSparkContext(sparkConf);
sqlContext = new SQLContext(jsc);
} }
private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception { private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception {
@@ -93,30 +78,6 @@ public class TestMultiFS implements Serializable {
return hdfsWriteClient; return hdfsWriteClient;
} }
@After
public void teardown() {
if (null != hdfsWriteClient) {
hdfsWriteClient.close();
hdfsWriteClient = null;
}
}
@AfterClass
public static void cleanupClass() throws Exception {
if (jsc != null) {
jsc.stop();
}
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
}
protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true) return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
@@ -126,10 +87,6 @@ public class TestMultiFS implements Serializable {
@Test @Test
public void readLocalWriteHDFS() throws Exception { public void readLocalWriteHDFS() throws Exception {
// Generator of some records to be loaded in.
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// Initialize table and filesystem // Initialize table and filesystem
HoodieTableMetaClient HoodieTableMetaClient
.initTableType(jsc.hadoopConfiguration(), dfsBasePath, HoodieTableType.valueOf(tableType), tableName, .initTableType(jsc.hadoopConfiguration(), dfsBasePath, HoodieTableType.valueOf(tableType), tableName,
@@ -137,7 +94,10 @@ public class TestMultiFS implements Serializable {
//Create write client to write some records in //Create write client to write some records in
HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath); HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath);
HoodieWriteClient hdfsWriteClient = getHoodieWriteClient(cfg); HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath);
try (HoodieWriteClient hdfsWriteClient = getHoodieWriteClient(cfg);
HoodieWriteClient localWriteClient = getHoodieWriteClient(localConfig);) {
// Write generated data to hdfs (only inserts) // Write generated data to hdfs (only inserts)
String readCommitTime = hdfsWriteClient.startCommit(); String readCommitTime = hdfsWriteClient.startCommit();
@@ -157,8 +117,6 @@ public class TestMultiFS implements Serializable {
HoodieTableMetaClient HoodieTableMetaClient
.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), tableName, .initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), tableName,
HoodieAvroPayload.class.getName()); HoodieAvroPayload.class.getName());
HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath);
HoodieWriteClient localWriteClient = getHoodieWriteClient(localConfig);
String writeCommitTime = localWriteClient.startCommit(); String writeCommitTime = localWriteClient.startCommit();
logger.info("Starting write commit " + writeCommitTime); logger.info("Starting write commit " + writeCommitTime);
@@ -171,10 +129,9 @@ public class TestMultiFS implements Serializable {
fs = FSUtils.getFs(tablePath, HoodieTestUtils.getDefaultHadoopConf()); fs = FSUtils.getFs(tablePath, HoodieTestUtils.getDefaultHadoopConf());
metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath); metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath);
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
Dataset<Row> localReadRecords = HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, writeCommitTime); Dataset<Row> localReadRecords = HoodieClientTestUtils
.readCommit(tablePath, sqlContext, timeline, writeCommitTime);
assertEquals("Should contain 100 records", localReadRecords.count(), localRecords.size()); assertEquals("Should contain 100 records", localReadRecords.count(), localRecords.size());
}
hdfsWriteClient.close();
localWriteClient.close();
} }
} }

View File

@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import java.util.List; import java.util.List;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -33,28 +34,28 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.func.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult; import org.apache.hudi.func.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import scala.Tuple2; import scala.Tuple2;
public class TestBoundedInMemoryExecutor { public class TestBoundedInMemoryExecutor extends HoodieClientTestHarness {
private final HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
private final String commitTime = HoodieActiveTimeline.createNewCommitTime(); private final String commitTime = HoodieActiveTimeline.createNewCommitTime();
private SparkBoundedInMemoryExecutor<HoodieRecord,
Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null; @Before
public void setUp() throws Exception {
initTestDataGenerator();
}
@After @After
public void afterTest() { public void tearDown() throws Exception {
if (this.executor != null) { cleanupTestDataGenerator();
this.executor.shutdownNow();
this.executor = null;
}
} }
@Test @Test
public void testExecutor() throws Exception { public void testExecutor() throws Exception {
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, 100); final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(commitTime, 100);
HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class); HoodieWriteConfig hoodieWriteConfig = mock(HoodieWriteConfig.class);
when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024); when(hoodieWriteConfig.getWriteBufferLimitBytes()).thenReturn(1024);
@@ -78,6 +79,9 @@ public class TestBoundedInMemoryExecutor {
} }
}; };
SparkBoundedInMemoryExecutor<HoodieRecord,
Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try {
executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig,
hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.avroSchema)); hoodieRecords.iterator(), consumer, getTransformFunction(HoodieTestDataGenerator.avroSchema));
int result = executor.execute(); int result = executor.execute();
@@ -85,5 +89,10 @@ public class TestBoundedInMemoryExecutor {
Assert.assertEquals(result, 100); Assert.assertEquals(result, 100);
// There should be no remaining records in the buffer // There should be no remaining records in the buffer
Assert.assertFalse(executor.isRemaining()); Assert.assertFalse(executor.isRemaining());
} finally {
if (executor != null) {
executor.shutdownNow();
}
}
} }
} }

View File

@@ -28,14 +28,13 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -55,23 +54,20 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import scala.Tuple2; import scala.Tuple2;
public class TestBoundedInMemoryQueue { public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
private final HoodieTestDataGenerator hoodieTestDataGenerator = new HoodieTestDataGenerator();
private final String commitTime = HoodieActiveTimeline.createNewCommitTime(); private final String commitTime = HoodieActiveTimeline.createNewCommitTime();
private ExecutorService executorService = null;
@Before @Before
public void beforeTest() { public void setUp() throws Exception {
this.executorService = Executors.newFixedThreadPool(2); initTestDataGenerator();
initExecutorServiceWithFixedThreadPool(2);
} }
@After @After
public void afterTest() { public void tearDown() throws Exception {
if (this.executorService != null) { cleanupTestDataGenerator();
this.executorService.shutdownNow(); cleanupExecutorService();
this.executorService = null;
}
} }
// Test to ensure that we are reading all records from queue iterator in the same order // Test to ensure that we are reading all records from queue iterator in the same order
@@ -80,7 +76,7 @@ public class TestBoundedInMemoryQueue {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testRecordReading() throws Exception { public void testRecordReading() throws Exception {
final int numRecords = 128; final int numRecords = 128;
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(commitTime, numRecords);
final BoundedInMemoryQueue<HoodieRecord, HoodieInsertValueGenResult<HoodieRecord>> queue = final BoundedInMemoryQueue<HoodieRecord, HoodieInsertValueGenResult<HoodieRecord>> queue =
new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.avroSchema)); new BoundedInMemoryQueue(FileIOUtils.KB, getTransformFunction(HoodieTestDataGenerator.avroSchema));
// Produce // Produce
@@ -128,7 +124,7 @@ public class TestBoundedInMemoryQueue {
Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>(); Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new HashMap<>();
for (int i = 0; i < numProducers; i++) { for (int i = 0; i < numProducers; i++) {
List<HoodieRecord> pRecs = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); List<HoodieRecord> pRecs = dataGen.generateInserts(commitTime, numRecords);
int j = 0; int j = 0;
for (HoodieRecord r : pRecs) { for (HoodieRecord r : pRecs) {
Assert.assertTrue(!keyToProducerAndIndexMap.containsKey(r.getRecordKey())); Assert.assertTrue(!keyToProducerAndIndexMap.containsKey(r.getRecordKey()));
@@ -211,7 +207,7 @@ public class TestBoundedInMemoryQueue {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testMemoryLimitForBuffering() throws Exception { public void testMemoryLimitForBuffering() throws Exception {
final int numRecords = 128; final int numRecords = 128;
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(commitTime, numRecords);
// maximum number of records to keep in memory. // maximum number of records to keep in memory.
final int recordLimit = 5; final int recordLimit = 5;
final SizeEstimator<HoodieInsertValueGenResult<HoodieRecord>> sizeEstimator = final SizeEstimator<HoodieInsertValueGenResult<HoodieRecord>> sizeEstimator =
@@ -262,7 +258,7 @@ public class TestBoundedInMemoryQueue {
@Test(timeout = 60000) @Test(timeout = 60000)
public void testException() throws Exception { public void testException() throws Exception {
final int numRecords = 256; final int numRecords = 256;
final List<HoodieRecord> hoodieRecords = hoodieTestDataGenerator.generateInserts(commitTime, numRecords); final List<HoodieRecord> hoodieRecords = dataGen.generateInserts(commitTime, numRecords);
final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator = final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> sizeEstimator =
new DefaultSizeEstimator<>(); new DefaultSizeEstimator<>();
// queue memory limit // queue memory limit

View File

@@ -20,8 +20,6 @@ package org.apache.hudi.func;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@@ -29,8 +27,8 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
@@ -45,37 +43,24 @@ import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieCopyOnWriteTable; import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroReadSupport;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestUpdateMapFunction implements Serializable { public class TestUpdateMapFunction extends HoodieClientTestHarness {
private String basePath = null;
private transient JavaSparkContext jsc = null;
@Before @Before
public void init() throws Exception { public void setUp() throws Exception {
// Create a temp folder as the base path initTempFolderAndPath();
TemporaryFolder folder = new TemporaryFolder();
folder.create();
this.basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath); HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath);
// Initialize a local spark env initSparkContexts("TestUpdateMapFunction");
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestUpdateMapFunction"));
} }
@After @After
public void clean() { public void tearDown() throws Exception {
if (basePath != null) { cleanupTempFolderAndPath();
new File(basePath).delete(); cleanupSparkContexts();
}
if (jsc != null) {
jsc.stop();
}
} }
@Test @Test

View File

@@ -18,12 +18,10 @@
package org.apache.hudi.index; package org.apache.hudi.index;
import java.io.File;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieHBaseIndexConfig;
@@ -33,55 +31,41 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator; import org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator;
import org.apache.hudi.index.hbase.HBaseIndex; import org.apache.hudi.index.hbase.HBaseIndex;
import org.apache.hudi.index.hbase.HBaseIndexQPSResourceAllocator; import org.apache.hudi.index.hbase.HBaseIndexQPSResourceAllocator;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestHBaseQPSResourceAllocator { public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
private static JavaSparkContext jsc = null;
private static String tableName = "test_table"; private static String tableName = "test_table";
private String basePath = null; private HBaseTestingUtility utility;
private static HBaseTestingUtility utility; private Configuration hbaseConfig;
private static Configuration hbaseConfig;
private static String QPS_TEST_SUFFIX_PATH = "qps_test_suffix"; private static String QPS_TEST_SUFFIX_PATH = "qps_test_suffix";
@AfterClass @Before
public static void clean() { public void setUp() throws Exception {
if (jsc != null) {
jsc.stop();
}
}
@BeforeClass
public static void init() throws Exception {
utility = new HBaseTestingUtility(); utility = new HBaseTestingUtility();
utility.startMiniCluster(); utility.startMiniCluster();
hbaseConfig = utility.getConnection().getConfiguration(); hbaseConfig = utility.getConnection().getConfiguration();
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestQPSResourceAllocator")); initSparkContexts("TestQPSResourceAllocator");
initTempFolderAndPath();
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
// Initialize table
initTableType();
} }
@After @After
public void clear() { public void tearDown() throws Exception {
if (basePath != null) { cleanupSparkContexts();
new File(basePath).delete(); cleanupTempFolderAndPath();
cleanupTableType();
if (utility != null) {
utility.shutdownMiniCluster();
} }
} }
@Before
public void before() throws Exception {
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
// Initialize table
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
}
@Test @Test
public void testsDefaultQPSResourceAllocator() { public void testsDefaultQPSResourceAllocator() {
HoodieWriteConfig config = getConfig(Option.empty()); HoodieWriteConfig config = getConfig(Option.empty());

View File

@@ -24,11 +24,9 @@ import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import java.io.File;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@@ -37,9 +35,9 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.HoodieWriteClient; import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
@@ -56,7 +54,6 @@ import org.apache.hudi.index.hbase.HBaseIndex.HbasePutBatchSizeCalculator;
import org.apache.hudi.index.hbase.HBaseIndexQPSResourceAllocator; import org.apache.hudi.index.hbase.HBaseIndexQPSResourceAllocator;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@@ -64,7 +61,6 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.FixMethodOrder; import org.junit.FixMethodOrder;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runners.MethodSorters; import org.junit.runners.MethodSorters;
import org.mockito.Mockito; import org.mockito.Mockito;
import scala.Tuple2; import scala.Tuple2;
@@ -75,14 +71,11 @@ import scala.Tuple2;
* MethodSorters.NAME_ASCENDING to make sure the tests run in order. Please alter the order of tests running carefully. * MethodSorters.NAME_ASCENDING to make sure the tests run in order. Please alter the order of tests running carefully.
*/ */
@FixMethodOrder(MethodSorters.NAME_ASCENDING) @FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestHbaseIndex { public class TestHbaseIndex extends HoodieClientTestHarness {
private static JavaSparkContext jsc = null;
private static HBaseTestingUtility utility; private static HBaseTestingUtility utility;
private static Configuration hbaseConfig; private static Configuration hbaseConfig;
private static String tableName = "test_table"; private static String tableName = "test_table";
private String basePath = null;
private transient FileSystem fs;
private HoodieWriteClient writeClient; private HoodieWriteClient writeClient;
public TestHbaseIndex() throws Exception { public TestHbaseIndex() throws Exception {
@@ -90,9 +83,6 @@ public class TestHbaseIndex {
@AfterClass @AfterClass
public static void clean() throws Exception { public static void clean() throws Exception {
if (jsc != null) {
jsc.stop();
}
if (utility != null) { if (utility != null) {
utility.shutdownMiniCluster(); utility.shutdownMiniCluster();
} }
@@ -100,37 +90,36 @@ public class TestHbaseIndex {
@BeforeClass @BeforeClass
public static void init() throws Exception { public static void init() throws Exception {
// Initialize HbaseMiniCluster // Initialize HbaseMiniCluster
utility = new HBaseTestingUtility(); utility = new HBaseTestingUtility();
utility.startMiniCluster(); utility.startMiniCluster();
hbaseConfig = utility.getConnection().getConfiguration(); hbaseConfig = utility.getConnection().getConfiguration();
utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s")); utility.createTable(TableName.valueOf(tableName), Bytes.toBytes("_s"));
}
@Before
public void setUp() throws Exception {
// Initialize a local spark env // Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHbaseIndex")); initSparkContexts("TestHbaseIndex");
jsc.hadoopConfiguration().addResource(utility.getConfiguration()); jsc.hadoopConfiguration().addResource(utility.getConfiguration());
// Create a temp folder as the base path
initTempFolderAndPath();
// Initialize table
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
initTestDataGenerator();
} }
@After @After
public void clear() throws Exception { public void tearDown() throws Exception {
if (null != writeClient) { if (null != writeClient) {
writeClient.close(); writeClient.close();
writeClient = null; writeClient = null;
} }
if (basePath != null) { cleanupSparkContexts();
new File(basePath).delete(); cleanupTempFolderAndPath();
} cleanupTestDataGenerator();
}
@Before
public void before() throws Exception {
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
// Initialize table
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
} }
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
@@ -145,14 +134,13 @@ public class TestHbaseIndex {
public void testSimpleTagLocationAndUpdate() throws Exception { public void testSimpleTagLocationAndUpdate() throws Exception {
String newCommitTime = "001"; String newCommitTime = "001";
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Load to memory // Load to memory
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config); HBaseIndex index = new HBaseIndex(config);
HoodieWriteClient writeClient = getWriteClient(config); try (HoodieWriteClient writeClient = getWriteClient(config);) {
writeClient.startCommit(); writeClient.startCommit();
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -181,13 +169,12 @@ public class TestHbaseIndex {
assertTrue(javaRDD.filter( assertTrue(javaRDD.filter(
record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime() record -> (record.getCurrentLocation() != null && record.getCurrentLocation().getInstantTime()
.equals(newCommitTime))).distinct().count() == 200); .equals(newCommitTime))).distinct().count() == 200);
}
} }
@Test @Test
public void testTagLocationAndDuplicateUpdate() throws Exception { public void testTagLocationAndDuplicateUpdate() throws Exception {
String newCommitTime = "001"; String newCommitTime = "001";
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -220,8 +207,6 @@ public class TestHbaseIndex {
@Test @Test
public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
// Load to memory // Load to memory
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config); HBaseIndex index = new HBaseIndex(config);
@@ -264,8 +249,6 @@ public class TestHbaseIndex {
@Test @Test
public void testTotalGetsBatching() throws Exception { public void testTotalGetsBatching() throws Exception {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config); HBaseIndex index = new HBaseIndex(config);
@@ -301,8 +284,6 @@ public class TestHbaseIndex {
@Test @Test
public void testTotalPutsBatching() throws Exception { public void testTotalPutsBatching() throws Exception {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
HBaseIndex index = new HBaseIndex(config); HBaseIndex index = new HBaseIndex(config);
HoodieWriteClient writeClient = getWriteClient(config); HoodieWriteClient writeClient = getWriteClient(config);

View File

@@ -20,44 +20,30 @@ package org.apache.hudi.index;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import org.apache.hudi.HoodieClientTestHarness;
import java.io.IOException;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.hbase.HBaseIndex; import org.apache.hudi.index.hbase.HBaseIndex;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestHoodieIndex { public class TestHoodieIndex extends HoodieClientTestHarness {
private transient JavaSparkContext jsc = null;
private String basePath = null;
@Before @Before
public void init() throws IOException { public void setUp() throws Exception {
// Initialize a local spark env initSparkContexts("TestHoodieIndex");
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieIndex")); initTempFolderAndPath();
TemporaryFolder folder = new TemporaryFolder(); initTableType();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
} }
@After @After
public void clean() { public void tearDown() throws Exception {
if (basePath != null) { cleanupSparkContexts();
new File(basePath).delete(); cleanupTempFolderAndPath();
} cleanupTableType();
if (jsc != null) {
jsc.stop();
}
} }
@Test @Test

View File

@@ -37,8 +37,8 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.TestRawTripPayload;
@@ -57,21 +57,16 @@ import org.apache.hudi.io.HoodieKeyLookupHandle;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import scala.Tuple2; import scala.Tuple2;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class TestHoodieBloomIndex { public class TestHoodieBloomIndex extends HoodieClientTestHarness {
private JavaSparkContext jsc = null;
private String basePath = null;
private transient FileSystem fs;
private String schemaStr; private String schemaStr;
private Schema schema; private Schema schema;
@@ -93,14 +88,9 @@ public class TestHoodieBloomIndex {
} }
@Before @Before
public void init() throws IOException { public void setUp() throws Exception {
// Initialize a local spark env initSparkContexts("TestHoodieBloomIndex");
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieBloomIndex")); initTempFolderAndPath();
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions) // We have some records to be tagged (two different partitions)
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
@@ -108,13 +98,9 @@ public class TestHoodieBloomIndex {
} }
@After @After
public void clean() { public void tearDown() throws Exception {
if (basePath != null) { cleanupSparkContexts();
new File(basePath).delete(); cleanupTempFolderAndPath();
}
if (jsc != null) {
jsc.stop();
}
} }
private HoodieWriteConfig makeConfig() { private HoodieWriteConfig makeConfig() {

View File

@@ -34,7 +34,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
@@ -49,18 +49,13 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Tuple2; import scala.Tuple2;
public class TestHoodieGlobalBloomIndex { public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
private JavaSparkContext jsc = null;
private String basePath = null;
private transient FileSystem fs;
private String schemaStr; private String schemaStr;
private Schema schema; private Schema schema;
@@ -68,14 +63,9 @@ public class TestHoodieGlobalBloomIndex {
} }
@Before @Before
public void init() throws IOException { public void setUp() throws Exception {
// Initialize a local spark env initSparkContexts("TestHoodieGlobalBloomIndex");
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieGlobalBloomIndex")); initTempFolderAndPath();
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions) // We have some records to be tagged (two different partitions)
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt")); schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
@@ -83,13 +73,9 @@ public class TestHoodieGlobalBloomIndex {
} }
@After @After
public void clean() { public void tearDown() throws Exception {
if (basePath != null) { cleanupSparkContexts();
new File(basePath).delete(); cleanupTempFolderAndPath();
}
if (jsc != null) {
jsc.stop();
}
} }
@Test @Test

View File

@@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@@ -32,14 +31,10 @@ import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -52,59 +47,20 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestHoodieCommitArchiveLog { public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
//NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class)
//The implementation and gurantees of many API's differ, for example check rename(src,dst)
// We need to use DFS here instead of LocalFs since the FsDataInputStream.getWrappedStream() returns a
// FsDataInputStream instead of a InputStream and thus throws java.lang.ClassCastException:
// org.apache.hadoop.fs.FSDataInputStream cannot be cast to org.apache.hadoop.fs.FSInputStream
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private static HdfsTestService hdfsTestService;
private String basePath;
private Configuration hadoopConf; private Configuration hadoopConf;
private JavaSparkContext jsc = null;
@AfterClass
public static void cleanUp() throws Exception {
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
}
}
@BeforeClass
public static void setUpDFS() throws IOException {
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
if (hdfsTestService == null) {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
}
}
@Before @Before
public void init() throws Exception { public void init() throws Exception {
TemporaryFolder folder = new TemporaryFolder(); initDFS();
folder.create();
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieCommitArchiveLog")); initTempFolderAndPath();
basePath = folder.getRoot().getAbsolutePath(); initSparkContexts("TestHoodieCommitArchiveLog");
hadoopConf = dfs.getConf(); hadoopConf = dfs.getConf();
jsc.hadoopConfiguration().addResource(dfs.getConf()); jsc.hadoopConfiguration().addResource(dfs.getConf());
dfs.mkdirs(new Path(basePath)); dfs.mkdirs(new Path(basePath));
@@ -112,13 +68,11 @@ public class TestHoodieCommitArchiveLog {
} }
@After @After
public void clean() { public void clean() throws IOException {
if (basePath != null) { cleanupDFS();
new File(basePath).delete();
} cleanupTempFolderAndPath();
if (jsc != null) { cleanupSparkContexts();
jsc.stop();
}
} }
@Test @Test

View File

@@ -21,15 +21,12 @@ package org.apache.hudi.io;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.HoodieWriteClient; import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
@@ -46,56 +43,36 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.io.compact.HoodieCompactor;
import org.apache.hudi.io.compact.HoodieRealtimeTableCompactor;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestHoodieCompactor { public class TestHoodieCompactor extends HoodieClientTestHarness {
private transient JavaSparkContext jsc = null;
private String basePath = null;
private HoodieCompactor compactor;
private transient HoodieTestDataGenerator dataGen = null;
private transient FileSystem fs;
private Configuration hadoopConf;
private HoodieWriteClient writeClient; private HoodieWriteClient writeClient;
private Configuration hadoopConf;
@Before @Before
public void init() throws IOException { public void setUp() throws Exception {
// Initialize a local spark env // Initialize a local spark env
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieCompactor")); initSparkContexts("TestHoodieCompactor");
// Create a temp folder as the base path // Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder(); initTempFolderAndPath();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
hadoopConf = HoodieTestUtils.getDefaultHadoopConf(); hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf); fs = FSUtils.getFs(basePath, hadoopConf);
HoodieTestUtils.initTableType(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.initTableType(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
initTestDataGenerator();
dataGen = new HoodieTestDataGenerator();
compactor = new HoodieRealtimeTableCompactor();
} }
@After @After
public void clean() { public void tearDown() throws Exception {
if (null != writeClient) { cleanupFileSystem();
writeClient.close(); cleanupTestDataGenerator();
writeClient = null; cleanupTempFolderAndPath();
} cleanupSparkContexts();
if (basePath != null) {
new File(basePath).delete();
}
if (jsc != null) {
jsc.stop();
}
} }
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
@@ -137,7 +114,7 @@ public class TestHoodieCompactor {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieWriteClient writeClient = getWriteClient(config); try (HoodieWriteClient writeClient = getWriteClient(config);) {
String newCommitTime = writeClient.startCommit(); String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
@@ -149,12 +126,13 @@ public class TestHoodieCompactor {
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime)); table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
assertTrue("If there is nothing to compact, result will be empty", result.isEmpty()); assertTrue("If there is nothing to compact, result will be empty", result.isEmpty());
} }
}
@Test @Test
public void testWriteStatusContentsAfterCompaction() throws Exception { public void testWriteStatusContentsAfterCompaction() throws Exception {
// insert 100 records // insert 100 records
HoodieWriteConfig config = getConfig(); HoodieWriteConfig config = getConfig();
HoodieWriteClient writeClient = getWriteClient(config); try (HoodieWriteClient writeClient = getWriteClient(config);) {
String newCommitTime = "100"; String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -206,6 +184,12 @@ public class TestHoodieCompactor {
.count() > 0); .count() > 0);
} }
} }
}
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
}
// TODO - after modifying HoodieReadClient to support realtime tables - add more tests to make // TODO - after modifying HoodieReadClient to support realtime tables - add more tests to make
// sure the data read is the updated data (compaction correctness) // sure the data read is the updated data (compaction correctness)

View File

@@ -22,17 +22,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.HoodieWriteClient; import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
@@ -45,56 +43,34 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestHoodieMergeHandle { public class TestHoodieMergeHandle extends HoodieClientTestHarness {
protected transient JavaSparkContext jsc = null;
protected transient SQLContext sqlContext;
protected transient FileSystem fs;
protected String basePath = null;
protected transient HoodieTestDataGenerator dataGen = null;
private HoodieWriteClient writeClient; private HoodieWriteClient writeClient;
@Before @Before
public void init() throws IOException { public void setUp() throws Exception {
// Initialize a local spark env initSparkContexts("TestHoodieMergeHandle");
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeHandle")); initTempFolderAndPath();
initFileSystem();
//SQLContext stuff initTableType();
sqlContext = new SQLContext(jsc); initTestDataGenerator();
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
dataGen = new HoodieTestDataGenerator();
} }
@After @After
public void clean() { public void tearDown() throws Exception {
if (null != writeClient) { cleanupTableType();
writeClient.close(); cleanupFileSystem();
writeClient = null; cleanupTestDataGenerator();
} cleanupTempFolderAndPath();
cleanupSparkContexts();
if (basePath != null) {
new File(basePath).delete();
}
if (jsc != null) {
jsc.stop();
}
} }
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
@@ -113,7 +89,7 @@ public class TestHoodieMergeHandle {
// Build a write config with bulkinsertparallelism set // Build a write config with bulkinsertparallelism set
HoodieWriteConfig cfg = getConfigBuilder().build(); HoodieWriteConfig cfg = getConfigBuilder().build();
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
/** /**
@@ -141,7 +117,8 @@ public class TestHoodieMergeHandle {
// verify that there is a commit // verify that there is a commit
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
Assert.assertEquals("Latest commit should be 001", newCommitTime, timeline.lastInstant().get().getTimestamp()); Assert.assertEquals("Latest commit should be 001", newCommitTime, timeline.lastInstant().get().getTimestamp());
assertEquals("Must contain 44 records", assertEquals("Must contain 44 records",
records.size(), records.size(),
@@ -259,12 +236,13 @@ public class TestHoodieMergeHandle {
// number of records with row_key id2 // number of records with row_key id2
assertEquals(21, record2Count); assertEquals(21, record2Count);
} }
}
@Test @Test
public void testHoodieMergeHandleWriteStatMetrics() throws Exception { public void testHoodieMergeHandleWriteStatMetrics() throws Exception {
// insert 100 records // insert 100 records
HoodieWriteConfig config = getConfigBuilder().build(); HoodieWriteConfig config = getConfigBuilder().build();
HoodieWriteClient writeClient = getWriteClient(config); try (HoodieWriteClient writeClient = getWriteClient(config);) {
String newCommitTime = "100"; String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -277,14 +255,14 @@ public class TestHoodieMergeHandle {
.filter(status -> status.getStat().getPrevCommit() != HoodieWriteStat.NULL_COMMIT).count() > 0); .filter(status -> status.getStat().getPrevCommit() != HoodieWriteStat.NULL_COMMIT).count() > 0);
// Num writes should be equal to the number of records inserted // Num writes should be equal to the number of records inserted
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 100); .map(status -> status.getStat().getNumWrites()).reduce((a, b) -> a + b).get(), 100);
// Num update writes should be equal to the number of records updated // Num update writes should be equal to the number of records updated
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 0); .map(status -> status.getStat().getNumUpdateWrites()).reduce((a, b) -> a + b).get(), 0);
// Num update writes should be equal to the number of insert records converted to updates as part of small file // Num update writes should be equal to the number of insert records converted to updates as part of small file
// handling // handling
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100); .map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get(), 100);
// Update all the 100 records // Update all the 100 records
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
@@ -302,15 +280,14 @@ public class TestHoodieMergeHandle {
.filter(status -> status.getStat().getPrevCommit() == HoodieWriteStat.NULL_COMMIT).count(), 0); .filter(status -> status.getStat().getPrevCommit() == HoodieWriteStat.NULL_COMMIT).count(), 0);
// Num writes should be equal to the number of records inserted // Num writes should be equal to the number of records inserted
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 100); .map(status -> status.getStat().getNumWrites()).reduce((a, b) -> a + b).get(), 100);
// Num update writes should be equal to the number of records updated // Num update writes should be equal to the number of records updated
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 100); .map(status -> status.getStat().getNumUpdateWrites()).reduce((a, b) -> a + b).get(), 100);
// Num update writes should be equal to the number of insert records converted to updates as part of small file // Num update writes should be equal to the number of insert records converted to updates as part of small file
// handling // handling
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 0); .map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get(), 0);
newCommitTime = "102"; newCommitTime = "102";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -325,14 +302,14 @@ public class TestHoodieMergeHandle {
.filter(status -> status.getStat().getPrevCommit() == HoodieWriteStat.NULL_COMMIT).count(), 0); .filter(status -> status.getStat().getPrevCommit() == HoodieWriteStat.NULL_COMMIT).count(), 0);
// Num writes should be equal to the total number of records written // Num writes should be equal to the total number of records written
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumWrites()).reduce((a,b) -> a + b).get(), 200); .map(status -> status.getStat().getNumWrites()).reduce((a, b) -> a + b).get(), 200);
// Num update writes should be equal to the number of records updated (including inserts converted as updates) // Num update writes should be equal to the number of records updated (including inserts converted as updates)
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumUpdateWrites()).reduce((a,b) -> a + b).get(), 100); .map(status -> status.getStat().getNumUpdateWrites()).reduce((a, b) -> a + b).get(), 100);
// Num update writes should be equal to the number of insert records converted to updates as part of small file // Num update writes should be equal to the number of insert records converted to updates as part of small file
// handling // handling
Assert.assertEquals((long) statuses.stream() Assert.assertEquals((long) statuses.stream()
.map(status -> status.getStat().getNumInserts()).reduce((a,b) -> a + b).get(), 100); .map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get(), 100);
// Verify all records have location set // Verify all records have location set
statuses.forEach(writeStatus -> { statuses.forEach(writeStatus -> {
writeStatus.getWrittenRecords().forEach(r -> { writeStatus.getWrittenRecords().forEach(r -> {
@@ -341,6 +318,7 @@ public class TestHoodieMergeHandle {
}); });
}); });
} }
}
private Dataset<Row> getRecords() { private Dataset<Row> getRecords() {
// Check the entire dataset has 8 records still // Check the entire dataset has 8 records still

View File

@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.BloomFilter; import org.apache.hudi.common.BloomFilter;
import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieClientTestUtils;
@@ -58,33 +59,30 @@ import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.TaskContext; import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Tuple2; import scala.Tuple2;
public class TestCopyOnWriteTable { public class TestCopyOnWriteTable extends HoodieClientTestHarness {
protected static Logger log = LogManager.getLogger(TestCopyOnWriteTable.class); protected static Logger log = LogManager.getLogger(TestCopyOnWriteTable.class);
private String basePath = null;
private transient JavaSparkContext jsc = null;
@Before @Before
public void init() throws Exception { public void setUp() throws Exception {
initSparkContexts("TestCopyOnWriteTable");
// Initialize a local spark env initTempFolderAndPath();
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestCopyOnWriteTable")); initTableType();
initTestDataGenerator();
// Create a temp folder as the base path }
TemporaryFolder folder = new TemporaryFolder();
folder.create();
this.basePath = folder.getRoot().getAbsolutePath();
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
cleanupTableType();
cleanupTestDataGenerator();
} }
@Test @Test
@@ -439,7 +437,7 @@ public class TestCopyOnWriteTable {
final String testPartitionPath = "2016/09/26"; final String testPartitionPath = "2016/09/26";
// Inserts + Updates .. Check updates go together & inserts subsplit, after expanding // Inserts + Updates .. Check updates go together & inserts subsplit, after expanding
// smallest file // smallest file
UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024,testPartitionPath, UpsertPartitioner partitioner = getUpsertPartitioner(1000 * 1024, 400, 100, 800 * 1024, testPartitionPath,
false); false);
List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath); List<HoodieCopyOnWriteTable.InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
@@ -479,12 +477,11 @@ public class TestCopyOnWriteTable {
HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc); final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
String commitTime = "000"; String commitTime = "000";
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor // Perform inserts of 100 records to test CreateHandle and BufferedExecutor
final List<HoodieRecord> inserts = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 100); final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(commitTime, 100);
final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> { final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
return table.handleInsert(commitTime, UUID.randomUUID().toString(), inserts.iterator()); return table.handleInsert(commitTime, UUID.randomUUID().toString(), inserts.iterator());
}).map(x -> (List<WriteStatus>)HoodieClientTestUtils.collectStatuses(x)).collect(); }).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
WriteStatus writeStatus = ws.get(0).get(0); WriteStatus writeStatus = ws.get(0).get(0);
String fileId = writeStatus.getFileId(); String fileId = writeStatus.getFileId();
@@ -492,11 +489,11 @@ public class TestCopyOnWriteTable {
final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc); final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc);
final List<HoodieRecord> updates = final List<HoodieRecord> updates =
dataGenerator.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords()); dataGen.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords());
jsc.parallelize(Arrays.asList(1)).map(x -> { jsc.parallelize(Arrays.asList(1)).map(x -> {
return table2.handleUpdate("001", fileId, updates.iterator()); return table2.handleUpdate("001", fileId, updates.iterator());
}).map(x -> (List<WriteStatus>)HoodieClientTestUtils.collectStatuses(x)).collect(); }).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
} }
@After @After

View File

@@ -34,10 +34,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.HoodieReadClient; import org.apache.hudi.HoodieReadClient;
import org.apache.hudi.HoodieWriteClient; import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.WriteStatus; import org.apache.hudi.WriteStatus;
@@ -45,7 +43,6 @@ import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieMergeOnReadTestUtils; import org.apache.hudi.common.HoodieMergeOnReadTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus; import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieDataFile;
@@ -73,85 +70,33 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
public class TestMergeOnReadTable { public class TestMergeOnReadTable extends HoodieClientTestHarness {
protected String basePath = null;
//NOTE : Be careful in using DFS (FileSystem.class) vs LocalFs(RawLocalFileSystem.class)
//The implementation and gurantees of many API's differ, for example check rename(src,dst)
private static MiniDFSCluster dfsCluster;
private static DistributedFileSystem dfs;
private static HdfsTestService hdfsTestService;
private transient JavaSparkContext jsc = null;
private transient SQLContext sqlContext;
private HoodieWriteClient writeClient; private HoodieWriteClient writeClient;
@AfterClass
public static void cleanUp() throws Exception {
if (hdfsTestService != null) {
hdfsTestService.stop();
dfsCluster.shutdown();
dfsCluster = null;
dfs = null;
hdfsTestService = null;
}
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
}
@BeforeClass
public static void setUpDFS() throws IOException {
// Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
// same JVM
FileSystem.closeAll();
if (hdfsTestService == null) {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);
// Create a temp folder as the base path
dfs = dfsCluster.getFileSystem();
}
}
@Before @Before
public void init() throws IOException { public void init() throws IOException {
// Initialize a local spark env initDFS();
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeOnReadTable")); initSparkContexts("TestHoodieMergeOnReadTable");
// Create a temp folder as the base path
TemporaryFolder folder = new TemporaryFolder();
folder.create();
basePath = folder.getRoot().getAbsolutePath();
jsc.hadoopConfiguration().addResource(dfs.getConf()); jsc.hadoopConfiguration().addResource(dfs.getConf());
initTempFolderAndPath();
dfs.mkdirs(new Path(basePath)); dfs.mkdirs(new Path(basePath));
HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ); HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.MERGE_ON_READ);
initTestDataGenerator();
sqlContext = new SQLContext(jsc); // SQLContext stuff
} }
@After @After
public void clean() { public void clean() throws IOException {
if (null != writeClient) { cleanupDFS();
writeClient.close(); cleanupTempFolderAndPath();
writeClient = null; cleanupSparkContexts();
} cleanupTestDataGenerator();
if (basePath != null) {
new File(basePath).delete();
}
if (jsc != null) {
jsc.stop();
}
} }
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
@@ -165,7 +110,7 @@ public class TestMergeOnReadTable {
@Test @Test
public void testSimpleInsertAndUpdate() throws Exception { public void testSimpleInsertAndUpdate() throws Exception {
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
/** /**
* Write 1 (only inserts) * Write 1 (only inserts)
@@ -173,7 +118,6 @@ public class TestMergeOnReadTable {
String newCommitTime = "001"; String newCommitTime = "001";
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -237,22 +181,23 @@ public class TestMergeOnReadTable {
// verify that there is a commit // verify that there is a commit
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true);
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants()); assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp(); String latestCompactionCommitTime = timeline.lastInstant().get().getTimestamp();
assertTrue(HoodieTimeline.compareTimestamps("000", latestCompactionCommitTime, HoodieTimeline.LESSER)); assertTrue(HoodieTimeline.compareTimestamps("000", latestCompactionCommitTime, HoodieTimeline.LESSER));
assertEquals("Must contain 200 records", 200, assertEquals("Must contain 200 records", 200,
HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count()); HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, "000").count());
} }
}
// Check if record level metadata is aggregated properly at the end of write. // Check if record level metadata is aggregated properly at the end of write.
@Test @Test
public void testMetadataAggregateFromWriteStatus() throws Exception { public void testMetadataAggregateFromWriteStatus() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build(); HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
String newCommitTime = "001"; String newCommitTime = "001";
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -268,11 +213,12 @@ public class TestMergeOnReadTable {
assertEquals(String.valueOf(2 * records.size()), assertEquals(String.valueOf(2 * records.size()),
allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000")); allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000"));
} }
}
@Test @Test
public void testSimpleInsertUpdateAndDelete() throws Exception { public void testSimpleInsertUpdateAndDelete() throws Exception {
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
/** /**
* Write 1 (only inserts, written as parquet file) * Write 1 (only inserts, written as parquet file)
@@ -280,7 +226,6 @@ public class TestMergeOnReadTable {
String newCommitTime = "001"; String newCommitTime = "001";
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -349,6 +294,7 @@ public class TestMergeOnReadTable {
//Wrote 20 records and deleted 20 records, so remaining 20-20 = 0 //Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals("Must contain 0 records", 0, recordsRead.size()); assertEquals("Must contain 0 records", 0, recordsRead.size());
} }
}
@Test @Test
public void testCOWToMORConvertedDatasetRollback() throws Exception { public void testCOWToMORConvertedDatasetRollback() throws Exception {
@@ -357,7 +303,7 @@ public class TestMergeOnReadTable {
HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE); HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
/** /**
* Write 1 (only inserts) * Write 1 (only inserts)
@@ -365,7 +311,6 @@ public class TestMergeOnReadTable {
String newCommitTime = "001"; String newCommitTime = "001";
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -411,12 +356,13 @@ public class TestMergeOnReadTable {
} }
}).findAny().isPresent()); }).findAny().isPresent());
} }
}
@Test @Test
public void testRollbackWithDeltaAndCompactionCommit() throws Exception { public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
HoodieWriteConfig cfg = getConfig(false); HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
// Test delta commit rollback // Test delta commit rollback
/** /**
@@ -425,7 +371,6 @@ public class TestMergeOnReadTable {
String newCommitTime = "001"; String newCommitTime = "001";
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -460,8 +405,8 @@ public class TestMergeOnReadTable {
*/ */
final String commitTime1 = "002"; final String commitTime1 = "002";
// WriteClient with custom config (disable small file handling) // WriteClient with custom config (disable small file handling)
client = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff()); try (HoodieWriteClient secondClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());) {
client.startCommitWithTime(commitTime1); secondClient.startCommitWithTime(commitTime1);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records); List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
@@ -471,12 +416,12 @@ public class TestMergeOnReadTable {
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200); assertEquals(recordsRead.size(), 200);
statuses = client.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect(); statuses = secondClient.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect();
// Verify there are no errors // Verify there are no errors
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
// Test failed delta commit rollback // Test failed delta commit rollback
client.rollback(commitTime1); secondClient.rollback(commitTime1);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time // After rollback, there should be no parquet file with the failed commit time
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName() Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
@@ -484,32 +429,32 @@ public class TestMergeOnReadTable {
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200); assertEquals(recordsRead.size(), 200);
}
/** /**
* Write 3 (inserts + updates - testing successful delta commit) * Write 3 (inserts + updates - testing successful delta commit)
*/ */
final String commitTime2 = "002"; final String commitTime2 = "002";
client = getWriteClient(cfg); try (HoodieWriteClient thirdClient = getWriteClient(cfg);) {
client.startCommitWithTime(commitTime2); thirdClient.startCommitWithTime(commitTime2);
copyOfRecords = new ArrayList<>(records); List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords); copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200)); copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200); assertEquals(recordsRead.size(), 200);
writeRecords = jsc.parallelize(copyOfRecords, 1); writeRecords = jsc.parallelize(copyOfRecords, 1);
writeStatusJavaRDD = client.upsert(writeRecords, commitTime2); writeStatusJavaRDD = thirdClient.upsert(writeRecords, commitTime2);
client.commit(commitTime2, writeStatusJavaRDD); thirdClient.commit(commitTime2, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect(); statuses = writeStatusJavaRDD.collect();
// Verify there are no errors // Verify there are no errors
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
// Test successful delta commit rollback // Test successful delta commit rollback
client.rollback(commitTime2); thirdClient.rollback(commitTime2);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time // After rollback, there should be no parquet file with the failed commit time
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName() Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
@@ -528,21 +473,21 @@ public class TestMergeOnReadTable {
* Write 4 (updates) * Write 4 (updates)
*/ */
newCommitTime = "003"; newCommitTime = "003";
client.startCommitWithTime(newCommitTime); thirdClient.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, records); records = dataGen.generateUpdates(newCommitTime, records);
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD); thirdClient.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect(); statuses = writeStatusJavaRDD.collect();
// Verify there are no errors // Verify there are no errors
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
String compactionInstantTime = client.scheduleCompaction(Option.empty()).get().toString(); String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> ws = client.compact(compactionInstantTime); JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
client.commitCompaction(compactionInstantTime, ws, Option.empty()); thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
@@ -550,7 +495,8 @@ public class TestMergeOnReadTable {
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
List<HoodieDataFile> dataFiles2 = roView.getLatestDataFiles().collect(Collectors.toList()); List<HoodieDataFile> dataFiles2 = roView.getLatestDataFiles().collect(Collectors.toList());
final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get() final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant()
.get()
.getTimestamp(); .getTimestamp();
assertTrue(roView.getLatestDataFiles().filter(file -> { assertTrue(roView.getLatestDataFiles().filter(file -> {
@@ -561,7 +507,7 @@ public class TestMergeOnReadTable {
} }
}).findAny().isPresent()); }).findAny().isPresent());
client.rollback(compactedCommitTime); thirdClient.rollback(compactedCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
@@ -576,12 +522,14 @@ public class TestMergeOnReadTable {
} }
}).findAny().isPresent()); }).findAny().isPresent());
} }
}
}
@Test @Test
public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
HoodieWriteConfig cfg = getConfig(false); HoodieWriteConfig cfg = getConfig(false);
final HoodieWriteClient client = getWriteClient(cfg); try (final HoodieWriteClient client = getWriteClient(cfg);) {
List<String> allCommits = new ArrayList<>(); List<String> allCommits = new ArrayList<>();
/** /**
* Write 1 (only inserts) * Write 1 (only inserts)
@@ -590,7 +538,6 @@ public class TestMergeOnReadTable {
allCommits.add(newCommitTime); allCommits.add(newCommitTime);
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -643,7 +590,6 @@ public class TestMergeOnReadTable {
nClient.commit(newCommitTime, writeStatusJavaRDD); nClient.commit(newCommitTime, writeStatusJavaRDD);
copyOfRecords.clear(); copyOfRecords.clear();
// Schedule a compaction // Schedule a compaction
/** /**
* Write 3 (inserts + updates) * Write 3 (inserts + updates)
@@ -698,7 +644,8 @@ public class TestMergeOnReadTable {
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles); roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get() final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant()
.get()
.getTimestamp(); .getTimestamp();
assertTrue(roView.getLatestDataFiles().filter(file -> { assertTrue(roView.getLatestDataFiles().filter(file -> {
@@ -740,6 +687,7 @@ public class TestMergeOnReadTable {
.toList()); .toList());
assertTrue(fileGroups.isEmpty()); assertTrue(fileGroups.isEmpty());
} }
}
protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff() { protected HoodieWriteConfig getHoodieWriteConfigWithSmallFileHandlingOff() {
return HoodieWriteConfig.newBuilder().withPath(basePath) return HoodieWriteConfig.newBuilder().withPath(basePath)
@@ -756,7 +704,7 @@ public class TestMergeOnReadTable {
@Test @Test
public void testUpsertPartitioner() throws Exception { public void testUpsertPartitioner() throws Exception {
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
/** /**
* Write 1 (only inserts, written as parquet file) * Write 1 (only inserts, written as parquet file)
@@ -764,7 +712,6 @@ public class TestMergeOnReadTable {
String newCommitTime = "001"; String newCommitTime = "001";
client.startCommitWithTime(newCommitTime); client.startCommitWithTime(newCommitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -832,13 +779,13 @@ public class TestMergeOnReadTable {
//Wrote 20 records in 2 batches //Wrote 20 records in 2 batches
assertEquals("Must contain 40 records", 40, recordsRead.size()); assertEquals("Must contain 40 records", 40, recordsRead.size());
} }
}
@Test @Test
public void testLogFileCountsAfterCompaction() throws Exception { public void testLogFileCountsAfterCompaction() throws Exception {
// insert 100 records // insert 100 records
HoodieWriteConfig config = getConfig(true); HoodieWriteConfig config = getConfig(true);
HoodieWriteClient writeClient = getWriteClient(config); try (HoodieWriteClient writeClient = getWriteClient(config);) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
String newCommitTime = "100"; String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -856,7 +803,7 @@ public class TestMergeOnReadTable {
List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records); List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
HoodieReadClient readClient = new HoodieReadClient(jsc, config); try (HoodieReadClient readClient = new HoodieReadClient(jsc, config);) {
updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect(); updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
// Write them to corresponding avro logfiles // Write them to corresponding avro logfiles
@@ -868,7 +815,7 @@ public class TestMergeOnReadTable {
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
table = HoodieTable.getHoodieTable(metaClient, config, jsc); table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
((SyncableFileSystemView)(table.getRTFileSystemView())).reset(); ((SyncableFileSystemView) (table.getRTFileSystemView())).reset();
for (String partitionPath : dataGen.getPartitionPaths()) { for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles = table.getRTFileSystemView().getLatestFileSlices(partitionPath) List<FileSlice> groupedLogFiles = table.getRTFileSystemView().getLatestFileSlices(partitionPath)
@@ -906,49 +853,8 @@ public class TestMergeOnReadTable {
.filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)) .filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath))
.count() > 0); .count() > 0);
} }
writeClient.close();
} }
}
@Test
public void testMetadataValuesAfterInsertUpsertAndCompaction() throws Exception {
// insert 100 records
HoodieWriteConfig config = getConfig(false);
HoodieWriteClient writeClient = getWriteClient(config);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
// total time taken for creating files should be greater than 0
long totalCreateTime = statuses.map(writeStatus -> writeStatus.getStat().getRuntimeStats().getTotalCreateTime())
.reduce((a, b) -> a + b).intValue();
Assert.assertTrue(totalCreateTime > 0);
// Update all the 100 records
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
statuses = writeClient.upsert(updatedRecordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses);
// total time taken for upsert all records should be greater than 0
long totalUpsertTime = statuses.map(writeStatus -> writeStatus.getStat().getRuntimeStats().getTotalUpsertTime())
.reduce((a, b) -> a + b).intValue();
Assert.assertTrue(totalUpsertTime > 0);
// Do a compaction
String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
statuses = writeClient.compact(compactionInstantTime);
writeClient.commitCompaction(compactionInstantTime, statuses, Option.empty());
// total time taken for scanning log files should be greater than 0
long timeTakenForScanner = statuses.map(writeStatus -> writeStatus.getStat().getRuntimeStats().getTotalScanTime())
.reduce((a, b) -> a + b).longValue();
Assert.assertTrue(timeTakenForScanner > 0);
} }
@Test @Test
@@ -956,8 +862,7 @@ public class TestMergeOnReadTable {
// insert 100 records // insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature // Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
HoodieWriteClient writeClient = getWriteClient(config); try (HoodieWriteClient writeClient = getWriteClient(config);) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
String newCommitTime = "100"; String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -989,14 +894,14 @@ public class TestMergeOnReadTable {
Assert.assertEquals(statuses.count(), numLogFiles); Assert.assertEquals(statuses.count(), numLogFiles);
writeClient.commitCompaction(commitTime, statuses, Option.empty()); writeClient.commitCompaction(commitTime, statuses, Option.empty());
} }
}
@Test @Test
public void testInsertsGeneratedIntoLogFilesRollback() throws Exception { public void testInsertsGeneratedIntoLogFilesRollback() throws Exception {
// insert 100 records // insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature // Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
HoodieWriteClient writeClient = getWriteClient(config); try (HoodieWriteClient writeClient = getWriteClient(config);) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
String newCommitTime = "100"; String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -1041,7 +946,8 @@ public class TestMergeOnReadTable {
TemporaryFolder folder = new TemporaryFolder(); TemporaryFolder folder = new TemporaryFolder();
folder.create(); folder.create();
File file = folder.newFile(); File file = folder.newFile();
metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new Path(file.getAbsolutePath())); metaClient.getFs()
.copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new Path(file.getAbsolutePath()));
writeClient.rollback(newCommitTime); writeClient.rollback(newCommitTime);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
@@ -1064,14 +970,14 @@ public class TestMergeOnReadTable {
// Rollback again to pretend the first rollback failed partially. This should not error our // Rollback again to pretend the first rollback failed partially. This should not error our
writeClient.rollback(newCommitTime); writeClient.rollback(newCommitTime);
} }
}
@Test @Test
public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction() throws Exception { public void testInsertsGeneratedIntoLogFilesRollbackAfterCompaction() throws Exception {
// insert 100 records // insert 100 records
// Setting IndexType to be InMemory to simulate Global Index nature // Setting IndexType to be InMemory to simulate Global Index nature
HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build();
HoodieWriteClient writeClient = getWriteClient(config); try (HoodieWriteClient writeClient = getWriteClient(config);) {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
String newCommitTime = "100"; String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -1109,7 +1015,7 @@ public class TestMergeOnReadTable {
writeClient.rollback(newCommitTime); writeClient.rollback(newCommitTime);
table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc); table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
tableRTFileSystemView = table.getRTFileSystemView(); tableRTFileSystemView = table.getRTFileSystemView();
((SyncableFileSystemView)tableRTFileSystemView).reset(); ((SyncableFileSystemView) tableRTFileSystemView).reset();
for (String partitionPath : dataGen.getPartitionPaths()) { for (String partitionPath : dataGen.getPartitionPaths()) {
Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).filter(fileSlice -> Assert.assertTrue(tableRTFileSystemView.getLatestFileSlices(partitionPath).filter(fileSlice ->
fileSlice.getDataFile().isPresent()).count() == 0); fileSlice.getDataFile().isPresent()).count() == 0);
@@ -1117,6 +1023,7 @@ public class TestMergeOnReadTable {
fileSlice.getLogFiles().count() > 0).count() > 0); fileSlice.getLogFiles().count() > 0).count() > 0);
} }
} }
}
/** /**
* Test to ensure rolling stats are correctly written to metadata file * Test to ensure rolling stats are correctly written to metadata file
@@ -1125,7 +1032,7 @@ public class TestMergeOnReadTable {
public void testRollingStatsInMetadata() throws Exception { public void testRollingStatsInMetadata() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
@@ -1139,7 +1046,6 @@ public class TestMergeOnReadTable {
String commitTime = "001"; String commitTime = "001";
client.startCommitWithTime(commitTime); client.startCommitWithTime(commitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 200); List<HoodieRecord> records = dataGen.generateInserts(commitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -1207,6 +1113,7 @@ public class TestMergeOnReadTable {
Assert.assertEquals(inserts, 200); Assert.assertEquals(inserts, 200);
Assert.assertEquals(upserts, 0); Assert.assertEquals(upserts, 0);
} }
}
/** /**
* Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them * Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them
@@ -1215,7 +1122,7 @@ public class TestMergeOnReadTable {
public void testRollingStatsWithSmallFileHandling() throws Exception { public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
HoodieWriteClient client = getWriteClient(cfg); try (HoodieWriteClient client = getWriteClient(cfg);) {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
Map<String, Long> fileIdToInsertsMap = new HashMap<>(); Map<String, Long> fileIdToInsertsMap = new HashMap<>();
Map<String, Long> fileIdToUpsertsMap = new HashMap<>(); Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
@@ -1224,7 +1131,6 @@ public class TestMergeOnReadTable {
String commitTime = "000"; String commitTime = "000";
client.startCommitWithTime(commitTime); client.startCommitWithTime(commitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
List<HoodieRecord> records = dataGen.generateInserts(commitTime, 200); List<HoodieRecord> records = dataGen.generateInserts(commitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
@@ -1330,7 +1236,7 @@ public class TestMergeOnReadTable {
Assert.assertEquals(inserts, 600); Assert.assertEquals(inserts, 600);
Assert.assertEquals(upserts, 600); Assert.assertEquals(upserts, 600);
}
} }
private HoodieWriteConfig getConfig(Boolean autoCommit) { private HoodieWriteConfig getConfig(Boolean autoCommit) {