1
0

[HUDI-652] Decouple HoodieReadClient and AbstractHoodieClient to break the inheritance chain (#1372)

* Removed timeline server support
* Removed try-with-resource
This commit is contained in:
vinoyang
2020-03-07 01:59:35 +08:00
committed by GitHub
parent 3d3781810c
commit ee5b32f5d4
5 changed files with 81 additions and 87 deletions

View File

@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
@@ -46,6 +45,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@@ -56,7 +56,7 @@ import scala.Tuple2;
/** /**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on keys. * Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
*/ */
public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoodieClient { public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class); private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class);
@@ -65,9 +65,9 @@ public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoo
* basepath pointing to the table. Until, then just always assume a BloomIndex * basepath pointing to the table. Until, then just always assume a BloomIndex
*/ */
private final transient HoodieIndex<T> index; private final transient HoodieIndex<T> index;
private final HoodieTimeline commitTimeline;
private HoodieTable hoodieTable; private HoodieTable hoodieTable;
private transient Option<SQLContext> sqlContextOpt; private transient Option<SQLContext> sqlContextOpt;
private final transient JavaSparkContext jsc;
/** /**
* @param basePath path to Hoodie table * @param basePath path to Hoodie table
@@ -108,12 +108,11 @@ public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoo
*/ */
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineService) { Option<EmbeddedTimelineService> timelineService) {
super(jsc, clientConfig, timelineService); this.jsc = jsc;
final String basePath = clientConfig.getBasePath(); final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc); this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, jsc);
this.commitTimeline = metaClient.getCommitTimeline().filterCompletedInstants();
this.index = HoodieIndex.createIndex(clientConfig, jsc); this.index = HoodieIndex.createIndex(clientConfig, jsc);
this.sqlContextOpt = Option.empty(); this.sqlContextOpt = Option.empty();
} }

View File

@@ -96,8 +96,8 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
*/ */
private void testReadFilterExist(HoodieWriteConfig config, private void testReadFilterExist(HoodieWriteConfig config,
Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception { Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
try (HoodieWriteClient writeClient = getHoodieWriteClient(config); try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());) { HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
String newCommitTime = writeClient.startCommit(); String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
@@ -113,37 +113,36 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
// Verify there are no errors // Verify there are no errors
assertNoWriteErrors(statuses); assertNoWriteErrors(statuses);
try (HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());) { HoodieReadClient anotherReadClient = getHoodieReadClient(config.getBasePath());
filteredRDD = anotherReadClient.filterExists(recordsRDD); filteredRDD = anotherReadClient.filterExists(recordsRDD);
List<HoodieRecord> result = filteredRDD.collect(); List<HoodieRecord> result = filteredRDD.collect();
// Check results // Check results
assertEquals(25, result.size()); assertEquals(25, result.size());
// check path exists for written keys // check path exists for written keys
JavaPairRDD<HoodieKey, Option<String>> keyToPathPair = JavaPairRDD<HoodieKey, Option<String>> keyToPathPair =
anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey())); anotherReadClient.checkExists(recordsRDD.map(r -> r.getKey()));
JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent()) JavaRDD<HoodieKey> keysWithPaths = keyToPathPair.filter(keyPath -> keyPath._2.isPresent())
.map(keyPath -> keyPath._1); .map(keyPath -> keyPath._1);
assertEquals(75, keysWithPaths.count()); assertEquals(75, keysWithPaths.count());
// verify rows match inserted records // verify rows match inserted records
Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1); Dataset<Row> rows = anotherReadClient.readROView(keysWithPaths, 1);
assertEquals(75, rows.count()); assertEquals(75, rows.count());
JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent()) JavaRDD<HoodieKey> keysWithoutPaths = keyToPathPair.filter(keyPath -> !keyPath._2.isPresent())
.map(keyPath -> keyPath._1); .map(keyPath -> keyPath._1);
try { try {
anotherReadClient.readROView(keysWithoutPaths, 1); anotherReadClient.readROView(keysWithoutPaths, 1);
} catch (Exception e) { } catch (Exception e) {
// data frame reader throws exception for empty records. ignore the error. // data frame reader throws exception for empty records. ignore the error.
assertEquals(e.getClass(), AnalysisException.class); assertEquals(e.getClass(), AnalysisException.class);
}
// Actual tests of getPendingCompactions method are in TestAsyncCompaction
// This is just testing empty list
assertEquals(0, anotherReadClient.getPendingCompactions().size());
} }
// Actual tests of getPendingCompactions method are in TestAsyncCompaction
// This is just testing empty list
assertEquals(0, anotherReadClient.getPendingCompactions().size());
} }
} }

View File

@@ -759,54 +759,54 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records); List<HoodieRecord> updatedRecords = dataGen.generateUpdates(newCommitTime, records);
JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); JavaRDD<HoodieRecord> updatedRecordsRDD = jsc.parallelize(updatedRecords, 1);
try (HoodieReadClient readClient = new HoodieReadClient(jsc, config);) {
updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
// Write them to corresponding avro logfiles HoodieReadClient readClient = new HoodieReadClient(jsc, config);
HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(), updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
// Verify that all data file has one log file // Write them to corresponding avro logfiles
metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(),
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
((SyncableFileSystemView) (table.getSliceView())).reset();
for (String partitionPath : dataGen.getPartitionPaths()) { // Verify that all data file has one log file
List<FileSlice> groupedLogFiles = metaClient = HoodieTableMetaClient.reload(metaClient);
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
for (FileSlice fileSlice : groupedLogFiles) { // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count()); ((SyncableFileSystemView) (table.getSliceView())).reset();
}
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
for (FileSlice fileSlice : groupedLogFiles) {
assertEquals("There should be 1 log file written for every data file", 1, fileSlice.getLogFiles().count());
} }
}
// Mark 2nd delta-instant as completed // Mark 2nd delta-instant as completed
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime)); HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime));
metaClient.getActiveTimeline().saveAsComplete( metaClient.getActiveTimeline().saveAsComplete(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
// Do a compaction // Do a compaction
String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString(); String compactionInstantTime = writeClient.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime); JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
// Verify that recently written compacted data file has no log file // Verify that recently written compacted data file has no log file
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc); table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
assertTrue("Compaction commit should be > than last insert", HoodieTimeline assertTrue("Compaction commit should be > than last insert", HoodieTimeline
.compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, HoodieTimeline.GREATER)); .compareTimestamps(timeline.lastInstant().get().getTimestamp(), newCommitTime, HoodieTimeline.GREATER));
for (String partitionPath : dataGen.getPartitionPaths()) { for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles = List<FileSlice> groupedLogFiles =
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
for (FileSlice slice : groupedLogFiles) { for (FileSlice slice : groupedLogFiles) {
assertEquals("After compaction there should be no log files visible on a full view", 0, slice.getLogFiles().count()); assertEquals("After compaction there should be no log files visible on a full view", 0, slice.getLogFiles().count());
}
List<WriteStatus> writeStatuses = result.collect();
assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
} }
List<WriteStatus> writeStatuses = result.collect();
assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
} }
} }
} }

View File

@@ -92,9 +92,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testRollbackForInflightCompaction() throws Exception { public void testRollbackForInflightCompaction() throws Exception {
// Rollback inflight compaction // Rollback inflight compaction
HoodieWriteConfig cfg = getConfig(false); HoodieWriteConfig cfg = getConfig(false);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
String compactionInstantTime = "005"; String compactionInstantTime = "005";
@@ -155,9 +154,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
int numRecs = 2000; int numRecs = 2000;
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs); List<HoodieRecord> records = dataGen.generateInserts(firstInstantTime, numRecs);
records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true, records = runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>()); new ArrayList<>());
@@ -197,9 +195,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testInflightCompaction() throws Exception { public void testInflightCompaction() throws Exception {
// There is inflight compaction. Subsequent compaction run must work correctly // There is inflight compaction. Subsequent compaction run must work correctly
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
String compactionInstantTime = "005"; String compactionInstantTime = "005";
@@ -351,9 +348,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testCompactionAfterTwoDeltaCommits() throws Exception { public void testCompactionAfterTwoDeltaCommits() throws Exception {
// No Delta Commits after compaction request // No Delta Commits after compaction request
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
String compactionInstantTime = "005"; String compactionInstantTime = "005";
@@ -373,9 +369,8 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
public void testInterleavedCompaction() throws Exception { public void testInterleavedCompaction() throws Exception {
// Case: Two delta commits before and after compaction schedule // Case: Two delta commits before and after compaction schedule
HoodieWriteConfig cfg = getConfig(true); HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getHoodieWriteClient(cfg, true); try (HoodieWriteClient client = getHoodieWriteClient(cfg, true);) {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());) { HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
String firstInstantTime = "001"; String firstInstantTime = "001";
String secondInstantTime = "004"; String secondInstantTime = "004";
String compactionInstantTime = "005"; String compactionInstantTime = "005";

View File

@@ -223,7 +223,8 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords, public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) { HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
try (HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService)) { try {
HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
return client.tagLocation(incomingHoodieRecords) return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown()); .filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
} catch (TableNotFoundException e) { } catch (TableNotFoundException e) {