[HUDI-880] Replace part of spark context by hadoop configuration in HoodieTable. (#1614)
This commit is contained in:
@@ -92,7 +92,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
|
|||||||
|
|
||||||
// archive
|
// archive
|
||||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
|
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
|
||||||
archiveLog.archiveIfRequired(jsc);
|
archiveLog.archiveIfRequired(hadoopConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.client;
|
package org.apache.hudi.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
|
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
|
||||||
import org.apache.hudi.client.utils.ClientUtils;
|
import org.apache.hudi.client.utils.ClientUtils;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
@@ -43,6 +44,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
|
|||||||
|
|
||||||
protected final transient FileSystem fs;
|
protected final transient FileSystem fs;
|
||||||
protected final transient JavaSparkContext jsc;
|
protected final transient JavaSparkContext jsc;
|
||||||
|
protected final transient Configuration hadoopConf;
|
||||||
protected final HoodieWriteConfig config;
|
protected final HoodieWriteConfig config;
|
||||||
protected final String basePath;
|
protected final String basePath;
|
||||||
|
|
||||||
@@ -62,6 +64,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
|
|||||||
Option<EmbeddedTimelineService> timelineServer) {
|
Option<EmbeddedTimelineService> timelineServer) {
|
||||||
this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration());
|
this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration());
|
||||||
this.jsc = jsc;
|
this.jsc = jsc;
|
||||||
|
this.hadoopConf = jsc.hadoopConfiguration();
|
||||||
this.basePath = clientConfig.getBasePath();
|
this.basePath = clientConfig.getBasePath();
|
||||||
this.config = clientConfig;
|
this.config = clientConfig;
|
||||||
this.timelineServer = timelineServer;
|
this.timelineServer = timelineServer;
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
|
|||||||
|
|
||||||
LOG.info("Committing " + instantTime);
|
LOG.info("Committing " + instantTime);
|
||||||
// Create a Hoodie table which encapsulated the commits and files visible
|
// Create a Hoodie table which encapsulated the commits and files visible
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
|
|
||||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||||
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
||||||
@@ -180,7 +180,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
|
|||||||
// TODO : make sure we cannot rollback / archive last commit file
|
// TODO : make sure we cannot rollback / archive last commit file
|
||||||
try {
|
try {
|
||||||
// Create a Hoodie table which encapsulated the commits and files visible
|
// Create a Hoodie table which encapsulated the commits and files visible
|
||||||
HoodieTable table = HoodieTable.create(config, jsc);
|
HoodieTable table = HoodieTable.create(config, hadoopConf);
|
||||||
// 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise
|
// 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise
|
||||||
// there may be race conditions
|
// there may be race conditions
|
||||||
HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);
|
HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);
|
||||||
@@ -231,7 +231,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
|
|||||||
setWriteSchemaForDeletes(metaClient);
|
setWriteSchemaForDeletes(metaClient);
|
||||||
}
|
}
|
||||||
// Create a Hoodie table which encapsulated the commits and files visible
|
// Create a Hoodie table which encapsulated the commits and files visible
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
|
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
|
||||||
writeContext = metrics.getCommitCtx();
|
writeContext = metrics.getCommitCtx();
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.client;
|
package org.apache.hudi.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
@@ -68,6 +69,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
|
|||||||
private HoodieTable hoodieTable;
|
private HoodieTable hoodieTable;
|
||||||
private transient Option<SQLContext> sqlContextOpt;
|
private transient Option<SQLContext> sqlContextOpt;
|
||||||
private final transient JavaSparkContext jsc;
|
private final transient JavaSparkContext jsc;
|
||||||
|
private final transient Configuration hadoopConf;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param basePath path to Hoodie table
|
* @param basePath path to Hoodie table
|
||||||
@@ -93,10 +95,11 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
|
|||||||
*/
|
*/
|
||||||
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
|
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
|
||||||
this.jsc = jsc;
|
this.jsc = jsc;
|
||||||
|
this.hadoopConf = jsc.hadoopConfiguration();
|
||||||
final String basePath = clientConfig.getBasePath();
|
final String basePath = clientConfig.getBasePath();
|
||||||
// Create a Hoodie table which encapsulated the commits and files visible
|
// Create a Hoodie table which encapsulated the commits and files visible
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
|
||||||
this.hoodieTable = HoodieTable.create(metaClient, clientConfig, jsc);
|
this.hoodieTable = HoodieTable.create(metaClient, clientConfig, hadoopConf);
|
||||||
this.index = HoodieIndex.createIndex(clientConfig);
|
this.index = HoodieIndex.createIndex(clientConfig);
|
||||||
this.sqlContextOpt = Option.empty();
|
this.sqlContextOpt = Option.empty();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -139,7 +139,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
*/
|
*/
|
||||||
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
|
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
|
||||||
// Create a Hoodie table which encapsulated the commits and files visible
|
// Create a Hoodie table which encapsulated the commits and files visible
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
Timer.Context indexTimer = metrics.getIndexCtx();
|
Timer.Context indexTimer = metrics.getIndexCtx();
|
||||||
JavaRDD<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table);
|
JavaRDD<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table);
|
||||||
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
|
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
|
||||||
@@ -336,7 +336,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
}
|
}
|
||||||
// We cannot have unbounded commit files. Archive commits if we have to archive
|
// We cannot have unbounded commit files. Archive commits if we have to archive
|
||||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
|
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, createMetaClient(true));
|
||||||
archiveLog.archiveIfRequired(jsc);
|
archiveLog.archiveIfRequired(hadoopConf);
|
||||||
if (config.isAutoClean()) {
|
if (config.isAutoClean()) {
|
||||||
// Call clean to cleanup if there is anything to cleanup after the commit,
|
// Call clean to cleanup if there is anything to cleanup after the commit,
|
||||||
LOG.info("Auto cleaning is enabled. Running cleaner now");
|
LOG.info("Auto cleaning is enabled. Running cleaner now");
|
||||||
@@ -356,7 +356,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* @param comment - Comment for the savepoint
|
* @param comment - Comment for the savepoint
|
||||||
*/
|
*/
|
||||||
public void savepoint(String user, String comment) {
|
public void savepoint(String user, String comment) {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
if (table.getCompletedCommitsTimeline().empty()) {
|
if (table.getCompletedCommitsTimeline().empty()) {
|
||||||
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
|
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
|
||||||
}
|
}
|
||||||
@@ -380,7 +380,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* @param comment - Comment for the savepoint
|
* @param comment - Comment for the savepoint
|
||||||
*/
|
*/
|
||||||
public void savepoint(String instantTime, String user, String comment) {
|
public void savepoint(String instantTime, String user, String comment) {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
table.savepoint(jsc, instantTime, user, comment);
|
table.savepoint(jsc, instantTime, user, comment);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -392,7 +392,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* @return true if the savepoint was deleted successfully
|
* @return true if the savepoint was deleted successfully
|
||||||
*/
|
*/
|
||||||
public void deleteSavepoint(String savepointTime) {
|
public void deleteSavepoint(String savepointTime) {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
SavepointHelpers.deleteSavepoint(table, savepointTime);
|
SavepointHelpers.deleteSavepoint(table, savepointTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -407,7 +407,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* @return true if the savepoint was restored to successfully
|
* @return true if the savepoint was restored to successfully
|
||||||
*/
|
*/
|
||||||
public void restoreToSavepoint(String savepointTime) {
|
public void restoreToSavepoint(String savepointTime) {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
SavepointHelpers.validateSavepointPresence(table, savepointTime);
|
SavepointHelpers.validateSavepointPresence(table, savepointTime);
|
||||||
restoreToInstant(savepointTime);
|
restoreToInstant(savepointTime);
|
||||||
SavepointHelpers.validateSavepointRestore(table, savepointTime);
|
SavepointHelpers.validateSavepointRestore(table, savepointTime);
|
||||||
@@ -424,7 +424,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
final Timer.Context context = this.metrics.getRollbackCtx();
|
final Timer.Context context = this.metrics.getRollbackCtx();
|
||||||
try {
|
try {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
|
Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
|
||||||
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
|
.filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
|
||||||
.findFirst());
|
.findFirst());
|
||||||
@@ -455,7 +455,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
Timer.Context context = metrics.getRollbackCtx();
|
Timer.Context context = metrics.getRollbackCtx();
|
||||||
try {
|
try {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
HoodieRestoreMetadata restoreMetadata = table.restore(jsc, restoreInstantTime, instantTime);
|
HoodieRestoreMetadata restoreMetadata = table.restore(jsc, restoreInstantTime, instantTime);
|
||||||
if (context != null) {
|
if (context != null) {
|
||||||
final long durationInMs = metrics.getDurationInMs(context.stop());
|
final long durationInMs = metrics.getDurationInMs(context.stop());
|
||||||
@@ -488,7 +488,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
|
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
|
||||||
LOG.info("Cleaner started");
|
LOG.info("Cleaner started");
|
||||||
final Timer.Context context = metrics.getCleanCtx();
|
final Timer.Context context = metrics.getCleanCtx();
|
||||||
HoodieCleanMetadata metadata = HoodieTable.create(config, jsc).clean(jsc, cleanInstantTime);
|
HoodieCleanMetadata metadata = HoodieTable.create(config, hadoopConf).clean(jsc, cleanInstantTime);
|
||||||
if (context != null) {
|
if (context != null) {
|
||||||
long durationMs = metrics.getDurationInMs(context.stop());
|
long durationMs = metrics.getDurationInMs(context.stop());
|
||||||
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
|
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
|
||||||
@@ -540,7 +540,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
|
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
|
||||||
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
|
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
|
||||||
+ latestPending + ", Ingesting at " + instantTime));
|
+ latestPending + ", Ingesting at " + instantTime));
|
||||||
HoodieTable<T> table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable<T> table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||||
String commitActionType = table.getMetaClient().getCommitActionType();
|
String commitActionType = table.getMetaClient().getCommitActionType();
|
||||||
activeTimeline.createNewInstant(new HoodieInstant(State.REQUESTED, commitActionType, instantTime));
|
activeTimeline.createNewInstant(new HoodieInstant(State.REQUESTED, commitActionType, instantTime));
|
||||||
@@ -564,7 +564,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
*/
|
*/
|
||||||
public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
|
public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
|
||||||
LOG.info("Scheduling compaction at instant time :" + instantTime);
|
LOG.info("Scheduling compaction at instant time :" + instantTime);
|
||||||
Option<HoodieCompactionPlan> plan = HoodieTable.create(config, jsc)
|
Option<HoodieCompactionPlan> plan = HoodieTable.create(config, hadoopConf)
|
||||||
.scheduleCompaction(jsc, instantTime, extraMetadata);
|
.scheduleCompaction(jsc, instantTime, extraMetadata);
|
||||||
return plan.isPresent();
|
return plan.isPresent();
|
||||||
}
|
}
|
||||||
@@ -588,7 +588,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
*/
|
*/
|
||||||
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
|
public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses,
|
||||||
Option<Map<String, String>> extraMetadata) throws IOException {
|
Option<Map<String, String>> extraMetadata) throws IOException {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata(
|
HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata(
|
||||||
table, compactionInstantTime, writeStatuses, config.getSchema());
|
table, compactionInstantTime, writeStatuses, config.getSchema());
|
||||||
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
|
extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
|
||||||
@@ -634,7 +634,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* Cleanup all pending commits.
|
* Cleanup all pending commits.
|
||||||
*/
|
*/
|
||||||
private void rollbackPendingCommits() {
|
private void rollbackPendingCommits() {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
|
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
|
||||||
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
|
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
@@ -650,7 +650,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
|
|||||||
* @return RDD of Write Status
|
* @return RDD of Write Status
|
||||||
*/
|
*/
|
||||||
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
|
private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) {
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
|
||||||
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
|
||||||
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
|
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
|
||||||
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
|
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
|
|||||||
|
|
||||||
import org.apache.avro.generic.GenericRecord;
|
import org.apache.avro.generic.GenericRecord;
|
||||||
import org.apache.avro.generic.IndexedRecord;
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||||
@@ -86,8 +87,8 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieCopyOnWriteTable.class);
|
||||||
|
|
||||||
public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
|
public HoodieCopyOnWriteTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
|
||||||
super(config, jsc, metaClient);
|
super(config, hadoopConf, metaClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
@@ -70,8 +71,8 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
|
|||||||
|
|
||||||
private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class);
|
private static final Logger LOG = LogManager.getLogger(HoodieMergeOnReadTable.class);
|
||||||
|
|
||||||
HoodieMergeOnReadTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
|
HoodieMergeOnReadTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
|
||||||
super(config, jsc, metaClient);
|
super(config, hadoopConf, metaClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -89,10 +89,10 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
|
|
||||||
protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
|
protected final SparkTaskContextSupplier sparkTaskContextSupplier = new SparkTaskContextSupplier();
|
||||||
|
|
||||||
protected HoodieTable(HoodieWriteConfig config, JavaSparkContext jsc, HoodieTableMetaClient metaClient) {
|
protected HoodieTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.hadoopConfiguration = new SerializableConfiguration(jsc.hadoopConfiguration());
|
this.hadoopConfiguration = new SerializableConfiguration(hadoopConf);
|
||||||
this.viewManager = FileSystemViewManager.createViewManager(new SerializableConfiguration(jsc.hadoopConfiguration()),
|
this.viewManager = FileSystemViewManager.createViewManager(new SerializableConfiguration(hadoopConf),
|
||||||
config.getViewStorageConfig());
|
config.getViewStorageConfig());
|
||||||
this.metaClient = metaClient;
|
this.metaClient = metaClient;
|
||||||
this.index = HoodieIndex.createIndex(config);
|
this.index = HoodieIndex.createIndex(config);
|
||||||
@@ -105,25 +105,25 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
|
|||||||
return viewManager;
|
return viewManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieWriteConfig config, JavaSparkContext jsc) {
|
public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieWriteConfig config, Configuration hadoopConf) {
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
|
||||||
jsc.hadoopConfiguration(),
|
hadoopConf,
|
||||||
config.getBasePath(),
|
config.getBasePath(),
|
||||||
true,
|
true,
|
||||||
config.getConsistencyGuardConfig(),
|
config.getConsistencyGuardConfig(),
|
||||||
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
|
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
|
||||||
);
|
);
|
||||||
return HoodieTable.create(metaClient, config, jsc);
|
return HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieTableMetaClient metaClient,
|
public static <T extends HoodieRecordPayload> HoodieTable<T> create(HoodieTableMetaClient metaClient,
|
||||||
HoodieWriteConfig config,
|
HoodieWriteConfig config,
|
||||||
JavaSparkContext jsc) {
|
Configuration hadoopConf) {
|
||||||
switch (metaClient.getTableType()) {
|
switch (metaClient.getTableType()) {
|
||||||
case COPY_ON_WRITE:
|
case COPY_ON_WRITE:
|
||||||
return new HoodieCopyOnWriteTable<>(config, jsc, metaClient);
|
return new HoodieCopyOnWriteTable<>(config, hadoopConf, metaClient);
|
||||||
case MERGE_ON_READ:
|
case MERGE_ON_READ:
|
||||||
return new HoodieMergeOnReadTable<>(config, jsc, metaClient);
|
return new HoodieMergeOnReadTable<>(config, hadoopConf, metaClient);
|
||||||
default:
|
default:
|
||||||
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
|
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.table;
|
package org.apache.hudi.table;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
|
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
@@ -53,7 +54,6 @@ import org.apache.avro.generic.IndexedRecord;
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@@ -112,9 +112,9 @@ public class HoodieTimelineArchiveLog {
|
|||||||
/**
|
/**
|
||||||
* Check if commits need to be archived. If yes, archive commits.
|
* Check if commits need to be archived. If yes, archive commits.
|
||||||
*/
|
*/
|
||||||
public boolean archiveIfRequired(final JavaSparkContext jsc) throws IOException {
|
public boolean archiveIfRequired(final Configuration hadoopConf) throws IOException {
|
||||||
try {
|
try {
|
||||||
List<HoodieInstant> instantsToArchive = getInstantsToArchive(jsc).collect(Collectors.toList());
|
List<HoodieInstant> instantsToArchive = getInstantsToArchive(hadoopConf).collect(Collectors.toList());
|
||||||
|
|
||||||
boolean success = true;
|
boolean success = true;
|
||||||
if (!instantsToArchive.isEmpty()) {
|
if (!instantsToArchive.isEmpty()) {
|
||||||
@@ -133,13 +133,13 @@ public class HoodieTimelineArchiveLog {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Stream<HoodieInstant> getInstantsToArchive(JavaSparkContext jsc) {
|
private Stream<HoodieInstant> getInstantsToArchive(Configuration hadoopConf) {
|
||||||
|
|
||||||
// TODO : rename to max/minInstantsToKeep
|
// TODO : rename to max/minInstantsToKeep
|
||||||
int maxCommitsToKeep = config.getMaxCommitsToKeep();
|
int maxCommitsToKeep = config.getMaxCommitsToKeep();
|
||||||
int minCommitsToKeep = config.getMinCommitsToKeep();
|
int minCommitsToKeep = config.getMinCommitsToKeep();
|
||||||
|
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// GroupBy each action and limit each action timeline to maxCommitsToKeep
|
// GroupBy each action and limit each action timeline to maxCommitsToKeep
|
||||||
// TODO: Handle ROLLBACK_ACTION in future
|
// TODO: Handle ROLLBACK_ACTION in future
|
||||||
|
|||||||
@@ -182,7 +182,7 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
|
|||||||
String actionType = table.getMetaClient().getCommitActionType();
|
String actionType = table.getMetaClient().getCommitActionType();
|
||||||
LOG.info("Committing " + instantTime + ", action Type " + actionType);
|
LOG.info("Committing " + instantTime + ", action Type " + actionType);
|
||||||
// Create a Hoodie table which encapsulated the commits and files visible
|
// Create a Hoodie table which encapsulated the commits and files visible
|
||||||
HoodieTable<T> table = HoodieTable.create(config, jsc);
|
HoodieTable<T> table = HoodieTable.create(config, jsc.hadoopConfiguration());
|
||||||
|
|
||||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||||
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
|
||||||
|
|||||||
@@ -89,7 +89,7 @@ public class HoodieMergeOnReadTableCompactor implements HoodieCompactor {
|
|||||||
}
|
}
|
||||||
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
|
||||||
// Compacting is very similar to applying updates to existing file
|
// Compacting is very similar to applying updates to existing file
|
||||||
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc, metaClient);
|
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc.hadoopConfiguration(), metaClient);
|
||||||
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
|
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
|
||||||
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
|
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
|
||||||
LOG.info("Compactor compacting " + operations + " files");
|
LOG.info("Compactor compacting " + operations + " files");
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ public class TestClientRollback extends TestHoodieClientBase {
|
|||||||
List<String> partitionPaths =
|
List<String> partitionPaths =
|
||||||
FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
|
FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc);
|
HoodieTable table = HoodieTable.create(metaClient, getConfig(), hadoopConf);
|
||||||
final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
|
final BaseFileOnlyView view1 = table.getBaseFileOnlyView();
|
||||||
|
|
||||||
List<HoodieBaseFile> dataFiles = partitionPaths.stream().flatMap(s -> {
|
List<HoodieBaseFile> dataFiles = partitionPaths.stream().flatMap(s -> {
|
||||||
@@ -124,7 +124,7 @@ public class TestClientRollback extends TestHoodieClientBase {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
table = HoodieTable.create(metaClient, getConfig(), jsc);
|
table = HoodieTable.create(metaClient, getConfig(), hadoopConf);
|
||||||
final BaseFileOnlyView view2 = table.getBaseFileOnlyView();
|
final BaseFileOnlyView view2 = table.getBaseFileOnlyView();
|
||||||
|
|
||||||
dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
|
dataFiles = partitionPaths.stream().flatMap(s -> view2.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("004"))).collect(Collectors.toList());
|
||||||
@@ -140,7 +140,7 @@ public class TestClientRollback extends TestHoodieClientBase {
|
|||||||
client.restoreToSavepoint(savepoint.getTimestamp());
|
client.restoreToSavepoint(savepoint.getTimestamp());
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
table = HoodieTable.create(metaClient, getConfig(), jsc);
|
table = HoodieTable.create(metaClient, getConfig(), hadoopConf);
|
||||||
final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
|
final BaseFileOnlyView view3 = table.getBaseFileOnlyView();
|
||||||
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList());
|
dataFiles = partitionPaths.stream().flatMap(s -> view3.getAllBaseFiles(s).filter(f -> f.getCommitTime().equals("002"))).collect(Collectors.toList());
|
||||||
assertEquals(3, dataFiles.size(), "The data files for commit 002 be available");
|
assertEquals(3, dataFiles.size(), "The data files for commit 002 be available");
|
||||||
|
|||||||
@@ -156,7 +156,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
|
protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
((SyncableFileSystemView) (table.getSliceView())).reset();
|
((SyncableFileSystemView) (table.getSliceView())).reset();
|
||||||
return table;
|
return table;
|
||||||
}
|
}
|
||||||
@@ -250,7 +250,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
|
|||||||
final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
|
final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
|
||||||
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
|
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
|
||||||
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
|
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf);
|
||||||
JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
|
JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
|
||||||
return taggedRecords.collect();
|
return taggedRecords.collect();
|
||||||
};
|
};
|
||||||
@@ -271,7 +271,7 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
|
|||||||
final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
|
final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
|
||||||
List<HoodieKey> records = keyGenFunction.apply(numRecords);
|
List<HoodieKey> records = keyGenFunction.apply(numRecords);
|
||||||
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
|
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, writeConfig, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, writeConfig, hadoopConf);
|
||||||
JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
|
JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
|
||||||
.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
|
.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
|
||||||
JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
|
JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
|
||||||
|
|||||||
@@ -809,7 +809,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
|||||||
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
|
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
|
||||||
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
|
|
||||||
String instantTime = "000";
|
String instantTime = "000";
|
||||||
client.startCommitWithTime(instantTime);
|
client.startCommitWithTime(instantTime);
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
|
|||||||
public void testSchemaEvolutionOnUpdate() throws Exception {
|
public void testSchemaEvolutionOnUpdate() throws Exception {
|
||||||
// Create a bunch of records with a old version of schema
|
// Create a bunch of records with a old version of schema
|
||||||
final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt");
|
final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt");
|
||||||
final HoodieTable<?> table = HoodieTable.create(config, jsc);
|
final HoodieTable<?> table = HoodieTable.create(config, hadoopConf);
|
||||||
|
|
||||||
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
|
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
|
||||||
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
||||||
@@ -102,7 +102,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
|
|||||||
final WriteStatus insertResult = statuses.get(0);
|
final WriteStatus insertResult = statuses.get(0);
|
||||||
String fileId = insertResult.getFileId();
|
String fileId = insertResult.getFileId();
|
||||||
|
|
||||||
final HoodieTable table2 = HoodieTable.create(config2, jsc);
|
final HoodieTable table2 = HoodieTable.create(config2, hadoopConf);
|
||||||
assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
|
assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
|
||||||
// New content with values for the newly added field
|
// New content with values for the newly added field
|
||||||
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
|||||||
private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
|
private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
|
||||||
|
|
||||||
protected transient JavaSparkContext jsc = null;
|
protected transient JavaSparkContext jsc = null;
|
||||||
|
protected transient Configuration hadoopConf = null;
|
||||||
protected transient SQLContext sqlContext;
|
protected transient SQLContext sqlContext;
|
||||||
protected transient FileSystem fs;
|
protected transient FileSystem fs;
|
||||||
protected transient HoodieTestDataGenerator dataGen = null;
|
protected transient HoodieTestDataGenerator dataGen = null;
|
||||||
@@ -103,6 +104,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
|
|||||||
// Initialize a local spark env
|
// Initialize a local spark env
|
||||||
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
|
jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
|
||||||
jsc.setLogLevel("ERROR");
|
jsc.setLogLevel("ERROR");
|
||||||
|
hadoopConf = jsc.hadoopConfiguration();
|
||||||
|
|
||||||
// SQLContext stuff
|
// SQLContext stuff
|
||||||
sqlContext = new SQLContext(jsc);
|
sqlContext = new SQLContext(jsc);
|
||||||
|
|||||||
@@ -143,7 +143,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
HBaseIndex index = new HBaseIndex(config);
|
HBaseIndex index = new HBaseIndex(config);
|
||||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Test tagLocation without any entries in index
|
// Test tagLocation without any entries in index
|
||||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
@@ -163,7 +163,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
writeClient.commit(newCommitTime, writeStatues);
|
writeClient.commit(newCommitTime, writeStatues);
|
||||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||||
assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||||
@@ -184,7 +184,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
|
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
|
||||||
writeClient.startCommitWithTime(newCommitTime);
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||||
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
@@ -202,7 +202,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
writeClient.commit(newCommitTime, writeStatues);
|
writeClient.commit(newCommitTime, writeStatues);
|
||||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
|
assertEquals(10, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
|
||||||
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||||
@@ -228,7 +228,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
// commit this upsert
|
// commit this upsert
|
||||||
writeClient.commit(newCommitTime, writeStatues);
|
writeClient.commit(newCommitTime, writeStatues);
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
// Now tagLocation for these records, hbaseIndex should tag them
|
// Now tagLocation for these records, hbaseIndex should tag them
|
||||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200);
|
assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 200);
|
||||||
@@ -243,7 +243,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
// Rollback the last commit
|
// Rollback the last commit
|
||||||
writeClient.rollback(newCommitTime);
|
writeClient.rollback(newCommitTime);
|
||||||
|
|
||||||
hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
|
// Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
|
||||||
// back commit
|
// back commit
|
||||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
@@ -272,7 +272,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
|
||||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Insert 250 records
|
// Insert 250 records
|
||||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||||
@@ -297,7 +297,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
|
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
|
||||||
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Insert 200 records
|
// Insert 200 records
|
||||||
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
|
||||||
@@ -409,7 +409,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
HBaseIndex index = new HBaseIndex(config);
|
HBaseIndex index = new HBaseIndex(config);
|
||||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Test tagLocation without any entries in index
|
// Test tagLocation without any entries in index
|
||||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
@@ -429,7 +429,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
writeClient.commit(newCommitTime, writeStatues);
|
writeClient.commit(newCommitTime, writeStatues);
|
||||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
assertEquals(200, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||||
assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
assertEquals(200, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||||
@@ -449,7 +449,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
HBaseIndex index = new HBaseIndex(config);
|
HBaseIndex index = new HBaseIndex(config);
|
||||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Test tagLocation without any entries in index
|
// Test tagLocation without any entries in index
|
||||||
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
@@ -463,7 +463,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
// Now tagLocation for these records, hbaseIndex should tag them correctly
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
|
||||||
assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
assertEquals(10, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
|
||||||
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
assertEquals(10, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
|
||||||
@@ -500,7 +500,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
assertTrue(index.canIndexLogFiles());
|
assertTrue(index.canIndexLogFiles());
|
||||||
assertThrows(UnsupportedOperationException.class, () -> {
|
assertThrows(UnsupportedOperationException.class, () -> {
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(), 1), jsc, hoodieTable);
|
index.fetchRecordLocation(jsc.parallelize(new ArrayList<HoodieKey>(), 1), jsc, hoodieTable);
|
||||||
}, "HbaseIndex supports fetchRecordLocation");
|
}, "HbaseIndex supports fetchRecordLocation");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -153,7 +153,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
|
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||||
// Still 0, as no valid commit
|
// Still 0, as no valid commit
|
||||||
assertEquals(0, filesList.size());
|
assertEquals(0, filesList.size());
|
||||||
@@ -163,7 +163,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
Files.createFile(hoodieDir.resolve("20160401010101.commit"));
|
Files.createFile(hoodieDir.resolve("20160401010101.commit"));
|
||||||
Files.createFile(hoodieDir.resolve("20150312101010.commit"));
|
Files.createFile(hoodieDir.resolve("20150312101010.commit"));
|
||||||
|
|
||||||
table = HoodieTable.create(metaClient, config, jsc);
|
table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||||
assertEquals(4, filesList.size());
|
assertEquals(4, filesList.size());
|
||||||
|
|
||||||
@@ -279,7 +279,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
// Also create the metadata and config
|
// Also create the metadata and config
|
||||||
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Let's tag
|
// Let's tag
|
||||||
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
||||||
@@ -318,7 +318,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
// Also create the metadata and config
|
// Also create the metadata and config
|
||||||
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Let's tag
|
// Let's tag
|
||||||
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
||||||
@@ -339,7 +339,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
// We do the tag again
|
// We do the tag again
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
table = HoodieTable.create(metaClient, config, jsc);
|
table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
|
taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
|
||||||
|
|
||||||
@@ -389,7 +389,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
// Also create the metadata and config
|
// Also create the metadata and config
|
||||||
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Let's tag
|
// Let's tag
|
||||||
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
||||||
@@ -411,7 +411,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
// We do the tag again
|
// We do the tag again
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
table = HoodieTable.create(metaClient, config, jsc);
|
table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
|
taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
|
||||||
|
|
||||||
// Check results
|
// Check results
|
||||||
@@ -461,7 +461,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
|
|||||||
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
|
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
|
||||||
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
|
||||||
JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
|
JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
|
||||||
|
|||||||
@@ -131,7 +131,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
||||||
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
|
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
// partitions will NOT be respected by this loadInvolvedFiles(...) call
|
// partitions will NOT be respected by this loadInvolvedFiles(...) call
|
||||||
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||||
// Still 0, as no valid commit
|
// Still 0, as no valid commit
|
||||||
@@ -142,7 +142,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
Files.createFile(hoodieDir.resolve("20160401010101.commit"));
|
Files.createFile(hoodieDir.resolve("20160401010101.commit"));
|
||||||
Files.createFile(hoodieDir.resolve("20150312101010.commit"));
|
Files.createFile(hoodieDir.resolve("20150312101010.commit"));
|
||||||
|
|
||||||
table = HoodieTable.create(metaClient, config, jsc);
|
table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
filesList = index.loadInvolvedFiles(partitions, jsc, table);
|
||||||
assertEquals(4, filesList.size());
|
assertEquals(4, filesList.size());
|
||||||
|
|
||||||
@@ -261,7 +261,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Add some commits
|
// Add some commits
|
||||||
Files.createDirectories(Paths.get(basePath, ".hoodie"));
|
Files.createDirectories(Paths.get(basePath, ".hoodie"));
|
||||||
@@ -346,7 +346,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
|
|||||||
.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false);
|
.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false);
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Add some commits
|
// Add some commits
|
||||||
Files.createDirectories(Paths.get(basePath, ".hoodie"));
|
Files.createDirectories(Paths.get(basePath, ".hoodie"));
|
||||||
|
|||||||
@@ -80,7 +80,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
.withParallelism(2, 2).forTable("test-trip-table").build();
|
.withParallelism(2, 2).forTable("test-trip-table").build();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
|
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
|
||||||
boolean result = archiveLog.archiveIfRequired(jsc);
|
boolean result = archiveLog.archiveIfRequired(hadoopConf);
|
||||||
assertTrue(result);
|
assertTrue(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,7 +159,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
|
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient);
|
||||||
|
|
||||||
assertTrue(archiveLog.archiveIfRequired(jsc));
|
assertTrue(archiveLog.archiveIfRequired(hadoopConf));
|
||||||
|
|
||||||
// reload the timeline and remove the remaining commits
|
// reload the timeline and remove the remaining commits
|
||||||
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
|
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
|
||||||
@@ -248,7 +248,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||||
assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
|
assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
|
||||||
boolean result = archiveLog.archiveIfRequired(jsc);
|
boolean result = archiveLog.archiveIfRequired(hadoopConf);
|
||||||
assertTrue(result);
|
assertTrue(result);
|
||||||
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
||||||
assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5");
|
assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5");
|
||||||
@@ -291,7 +291,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||||
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
|
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
|
||||||
boolean result = archiveLog.archiveIfRequired(jsc);
|
boolean result = archiveLog.archiveIfRequired(hadoopConf);
|
||||||
assertTrue(result);
|
assertTrue(result);
|
||||||
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
||||||
assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe");
|
assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe");
|
||||||
@@ -318,7 +318,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||||
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
|
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
|
||||||
boolean result = archiveLog.archiveIfRequired(jsc);
|
boolean result = archiveLog.archiveIfRequired(hadoopConf);
|
||||||
assertTrue(result);
|
assertTrue(result);
|
||||||
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
|
||||||
assertEquals(5, timeline.countInstants(),
|
assertEquals(5, timeline.countInstants(),
|
||||||
@@ -354,7 +354,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
|
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
|
||||||
assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
|
assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
|
||||||
boolean result = archiveLog.archiveIfRequired(jsc);
|
boolean result = archiveLog.archiveIfRequired(hadoopConf);
|
||||||
assertTrue(result);
|
assertTrue(result);
|
||||||
timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
|
timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
|
||||||
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
|
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
|
||||||
@@ -401,7 +401,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
|
|||||||
HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf());
|
HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf());
|
||||||
HoodieInstant instant5 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5");
|
HoodieInstant instant5 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5");
|
||||||
|
|
||||||
boolean result = archiveLog.archiveIfRequired(jsc);
|
boolean result = archiveLog.archiveIfRequired(hadoopConf);
|
||||||
assertTrue(result);
|
assertTrue(result);
|
||||||
|
|
||||||
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
|
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ public class TestHoodieStorageWriterFactory extends TestHoodieClientBase {
|
|||||||
final String instantTime = "100";
|
final String instantTime = "100";
|
||||||
final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet");
|
final Path parquetPath = new Path(basePath + "/partition/path/f1_1-0-1_000.parquet");
|
||||||
final HoodieWriteConfig cfg = getConfig();
|
final HoodieWriteConfig cfg = getConfig();
|
||||||
HoodieTable table = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
|
SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
|
||||||
HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime,
|
HoodieStorageWriter<IndexedRecord> parquetWriter = HoodieStorageWriterFactory.getStorageWriter(instantTime,
|
||||||
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
|
parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier);
|
||||||
|
|||||||
@@ -125,7 +125,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
|
||||||
assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
|
assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit.");
|
||||||
// Should have 100 records in table (check using Index), all in locations marked at commit
|
// Should have 100 records in table (check using Index), all in locations marked at commit
|
||||||
HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), jsc);
|
HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), hadoopConf);
|
||||||
|
|
||||||
assertFalse(table.getCompletedCommitsTimeline().empty());
|
assertFalse(table.getCompletedCommitsTimeline().empty());
|
||||||
if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
|
if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
|
||||||
@@ -211,7 +211,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
|
|
||||||
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
|
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, getConfig(), jsc);
|
HoodieTable table = HoodieTable.create(metaClient, getConfig(), hadoopConf);
|
||||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||||
TableFileSystemView fsView = table.getFileSystemView();
|
TableFileSystemView fsView = table.getFileSystemView();
|
||||||
Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> {
|
Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst().map(fg -> {
|
||||||
@@ -248,7 +248,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
table = HoodieTable.create(metaClient, getConfig(), jsc);
|
table = HoodieTable.create(metaClient, getConfig(), hadoopConf);
|
||||||
HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
|
HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
|
||||||
|
|
||||||
TableFileSystemView fsView = table.getFileSystemView();
|
TableFileSystemView fsView = table.getFileSystemView();
|
||||||
@@ -381,7 +381,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable table1 = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
|
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
|
||||||
// NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
|
// NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
|
||||||
// commit
|
// commit
|
||||||
@@ -894,7 +894,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
|
|
||||||
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED,
|
table.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED,
|
||||||
HoodieTimeline.COMMIT_ACTION, "000"));
|
HoodieTimeline.COMMIT_ACTION, "000"));
|
||||||
table.getActiveTimeline().transitionRequestedToInflight(
|
table.getActiveTimeline().transitionRequestedToInflight(
|
||||||
@@ -1040,7 +1040,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
if (j == i && j <= maxNumFileIdsForCompaction) {
|
if (j == i && j <= maxNumFileIdsForCompaction) {
|
||||||
expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
|
expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
FileSlice slice =
|
FileSlice slice =
|
||||||
table.getSliceView().getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
|
table.getSliceView().getLatestFileSlices(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
|
||||||
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
|
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
|
||||||
@@ -1082,7 +1082,7 @@ public class TestCleaner extends TestHoodieClientBase {
|
|||||||
|
|
||||||
// Test for safety
|
// Test for safety
|
||||||
final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient);
|
final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
final HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc);
|
final HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
expFileIdToPendingCompaction.forEach((fileId, value) -> {
|
expFileIdToPendingCompaction.forEach((fileId, value) -> {
|
||||||
String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId);
|
String baseInstantForCompaction = fileIdToLatestInstantBeforeCompaction.get(fileId);
|
||||||
|
|||||||
@@ -153,7 +153,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
client.compact(compactionCommitTime);
|
client.compact(compactionCommitTime);
|
||||||
|
|
||||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
|
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
|
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
|
||||||
assertTrue(dataFilesToRead.findAny().isPresent());
|
assertTrue(dataFilesToRead.findAny().isPresent());
|
||||||
@@ -308,7 +308,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
|
|
||||||
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
||||||
assertTrue(deltaCommit.isPresent());
|
assertTrue(deltaCommit.isPresent());
|
||||||
@@ -417,7 +417,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
client.rollback(newCommitTime);
|
client.rollback(newCommitTime);
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
|
||||||
HoodieTableFileSystemView roView =
|
HoodieTableFileSystemView roView =
|
||||||
new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
@@ -449,7 +449,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
|
|
||||||
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
||||||
assertTrue(deltaCommit.isPresent());
|
assertTrue(deltaCommit.isPresent());
|
||||||
@@ -530,7 +530,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
.filter(file -> file.getPath().getName().contains(commitTime2)).count());
|
.filter(file -> file.getPath().getName().contains(commitTime2)).count());
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
|
||||||
dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
|
dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
|
||||||
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
|
||||||
@@ -596,7 +596,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
|
|
||||||
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
||||||
assertTrue(deltaCommit.isPresent());
|
assertTrue(deltaCommit.isPresent());
|
||||||
@@ -759,7 +759,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
|
|
||||||
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
||||||
assertTrue(deltaCommit.isPresent());
|
assertTrue(deltaCommit.isPresent());
|
||||||
@@ -849,7 +849,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
// Verify that all data file has one log file
|
// Verify that all data file has one log file
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
|
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
|
||||||
((SyncableFileSystemView) (table.getSliceView())).reset();
|
((SyncableFileSystemView) (table.getSliceView())).reset();
|
||||||
|
|
||||||
@@ -873,7 +873,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
|
|
||||||
// Verify that recently written compacted data file has no log file
|
// Verify that recently written compacted data file has no log file
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
table = HoodieTable.create(metaClient, config, jsc);
|
table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
|
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
|
||||||
|
|
||||||
assertTrue(HoodieTimeline
|
assertTrue(HoodieTimeline
|
||||||
@@ -907,7 +907,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
writeClient.commit(newCommitTime, statuses);
|
writeClient.commit(newCommitTime, statuses);
|
||||||
|
|
||||||
HoodieTable table =
|
HoodieTable table =
|
||||||
HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
|
HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf);
|
||||||
SliceView tableRTFileSystemView = table.getSliceView();
|
SliceView tableRTFileSystemView = table.getSliceView();
|
||||||
|
|
||||||
long numLogFiles = 0;
|
long numLogFiles = 0;
|
||||||
@@ -981,7 +981,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
writeClient.rollback(newCommitTime);
|
writeClient.rollback(newCommitTime);
|
||||||
|
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
SliceView tableRTFileSystemView = table.getSliceView();
|
SliceView tableRTFileSystemView = table.getSliceView();
|
||||||
|
|
||||||
long numLogFiles = 0;
|
long numLogFiles = 0;
|
||||||
@@ -1017,7 +1017,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
statuses.collect();
|
statuses.collect();
|
||||||
|
|
||||||
HoodieTable table =
|
HoodieTable table =
|
||||||
HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
|
HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf);
|
||||||
SliceView tableRTFileSystemView = table.getSliceView();
|
SliceView tableRTFileSystemView = table.getSliceView();
|
||||||
|
|
||||||
long numLogFiles = 0;
|
long numLogFiles = 0;
|
||||||
@@ -1038,7 +1038,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
|
writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
|
||||||
// Trigger a rollback of compaction
|
// Trigger a rollback of compaction
|
||||||
writeClient.rollback(newCommitTime);
|
writeClient.rollback(newCommitTime);
|
||||||
table = HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, jsc);
|
table = HoodieTable.create(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf);
|
||||||
tableRTFileSystemView = table.getSliceView();
|
tableRTFileSystemView = table.getSliceView();
|
||||||
((SyncableFileSystemView) tableRTFileSystemView).reset();
|
((SyncableFileSystemView) tableRTFileSystemView).reset();
|
||||||
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
|
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
|
||||||
@@ -1059,7 +1059,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
|
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
|
||||||
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
try (HoodieWriteClient client = getWriteClient(cfg);) {
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
|
|
||||||
// Create a commit without rolling stats in metadata to test backwards compatibility
|
// Create a commit without rolling stats in metadata to test backwards compatibility
|
||||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||||
@@ -1080,7 +1080,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
||||||
|
|
||||||
// Read from commit file
|
// Read from commit file
|
||||||
table = HoodieTable.create(cfg, jsc);
|
table = HoodieTable.create(cfg, hadoopConf);
|
||||||
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
|
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
|
||||||
table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
||||||
HoodieCommitMetadata.class);
|
HoodieCommitMetadata.class);
|
||||||
@@ -1104,7 +1104,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
||||||
|
|
||||||
// Read from commit file
|
// Read from commit file
|
||||||
table = HoodieTable.create(cfg, jsc);
|
table = HoodieTable.create(cfg, hadoopConf);
|
||||||
metadata = HoodieCommitMetadata.fromBytes(
|
metadata = HoodieCommitMetadata.fromBytes(
|
||||||
table.getActiveTimeline()
|
table.getActiveTimeline()
|
||||||
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
||||||
@@ -1128,7 +1128,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
client.rollback(instantTime);
|
client.rollback(instantTime);
|
||||||
|
|
||||||
// Read from commit file
|
// Read from commit file
|
||||||
table = HoodieTable.create(cfg, jsc);
|
table = HoodieTable.create(cfg, hadoopConf);
|
||||||
metadata = HoodieCommitMetadata.fromBytes(
|
metadata = HoodieCommitMetadata.fromBytes(
|
||||||
table.getActiveTimeline()
|
table.getActiveTimeline()
|
||||||
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
||||||
@@ -1171,7 +1171,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
||||||
|
|
||||||
// Read from commit file
|
// Read from commit file
|
||||||
HoodieTable table = HoodieTable.create(cfg, jsc);
|
HoodieTable table = HoodieTable.create(cfg, hadoopConf);
|
||||||
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
|
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
|
||||||
table.getActiveTimeline()
|
table.getActiveTimeline()
|
||||||
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
||||||
@@ -1200,7 +1200,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
||||||
|
|
||||||
// Read from commit file
|
// Read from commit file
|
||||||
table = HoodieTable.create(cfg, jsc);
|
table = HoodieTable.create(cfg, hadoopConf);
|
||||||
metadata = HoodieCommitMetadata.fromBytes(
|
metadata = HoodieCommitMetadata.fromBytes(
|
||||||
table.getActiveTimeline()
|
table.getActiveTimeline()
|
||||||
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
||||||
@@ -1231,7 +1231,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
client.commitCompaction(instantTime, statuses, Option.empty());
|
client.commitCompaction(instantTime, statuses, Option.empty());
|
||||||
|
|
||||||
// Read from commit file
|
// Read from commit file
|
||||||
table = HoodieTable.create(cfg, jsc);
|
table = HoodieTable.create(cfg, hadoopConf);
|
||||||
metadata = HoodieCommitMetadata.fromBytes(
|
metadata = HoodieCommitMetadata.fromBytes(
|
||||||
table.getActiveTimeline()
|
table.getActiveTimeline()
|
||||||
.getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(),
|
.getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(),
|
||||||
@@ -1259,7 +1259,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
assertTrue(client.commit(instantTime, statuses), "Commit should succeed");
|
||||||
|
|
||||||
// Read from commit file
|
// Read from commit file
|
||||||
table = HoodieTable.create(cfg, jsc);
|
table = HoodieTable.create(cfg, hadoopConf);
|
||||||
metadata = HoodieCommitMetadata.fromBytes(
|
metadata = HoodieCommitMetadata.fromBytes(
|
||||||
table.getActiveTimeline()
|
table.getActiveTimeline()
|
||||||
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
|
||||||
@@ -1305,7 +1305,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, jsc);
|
HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
|
|
||||||
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
|
||||||
assertTrue(deltaCommit.isPresent());
|
assertTrue(deltaCommit.isPresent());
|
||||||
@@ -1398,7 +1398,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
|
|||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
|
|
||||||
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
|
||||||
assertTrue(deltaCommit.isPresent());
|
assertTrue(deltaCommit.isPresent());
|
||||||
|
|||||||
@@ -98,7 +98,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
|
|||||||
String instantTime = HoodieTestUtils.makeNewCommitTime();
|
String instantTime = HoodieTestUtils.makeNewCommitTime();
|
||||||
HoodieWriteConfig config = makeHoodieClientConfig();
|
HoodieWriteConfig config = makeHoodieClientConfig();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
Pair<Path, String> newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> {
|
Pair<Path, String> newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> {
|
||||||
HoodieRecord record = mock(HoodieRecord.class);
|
HoodieRecord record = mock(HoodieRecord.class);
|
||||||
@@ -134,7 +134,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
|
|||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
|
||||||
String partitionPath = "2016/01/31";
|
String partitionPath = "2016/01/31";
|
||||||
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
|
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Get some records belong to the same partition (2016/01/31)
|
// Get some records belong to the same partition (2016/01/31)
|
||||||
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
||||||
@@ -276,7 +276,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
|
|||||||
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
|
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
|
||||||
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
|
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Get some records belong to the same partition (2016/01/31)
|
// Get some records belong to the same partition (2016/01/31)
|
||||||
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
|
||||||
@@ -314,7 +314,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
|
|||||||
HoodieWriteConfig config = makeHoodieClientConfig();
|
HoodieWriteConfig config = makeHoodieClientConfig();
|
||||||
String instantTime = HoodieTestUtils.makeNewCommitTime();
|
String instantTime = HoodieTestUtils.makeNewCommitTime();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
|
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
// Case 1:
|
// Case 1:
|
||||||
// 10 records for partition 1, 1 record for partition 2.
|
// 10 records for partition 1, 1 record for partition 2.
|
||||||
@@ -369,7 +369,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
|
|||||||
.limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
|
.limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build();
|
||||||
String instantTime = HoodieTestUtils.makeNewCommitTime();
|
String instantTime = HoodieTestUtils.makeNewCommitTime();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
|
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
List<HoodieRecord> records = new ArrayList<>();
|
List<HoodieRecord> records = new ArrayList<>();
|
||||||
// Approx 1150 records are written for block size of 64KB
|
// Approx 1150 records are written for block size of 64KB
|
||||||
@@ -403,7 +403,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
|
|||||||
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
|
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
|
||||||
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
|
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
final HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
|
final HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
String instantTime = "000";
|
String instantTime = "000";
|
||||||
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
|
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
|
||||||
final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
|
final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ public class TestUpsertPartitioner extends HoodieClientTestHarness {
|
|||||||
HoodieClientTestUtils.fakeCommitFile(basePath, "001");
|
HoodieClientTestUtils.fakeCommitFile(basePath, "001");
|
||||||
HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
|
HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, jsc);
|
HoodieCopyOnWriteTable table = (HoodieCopyOnWriteTable) HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
|
|
||||||
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
|
||||||
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", numInserts);
|
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", numInserts);
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase {
|
|||||||
|
|
||||||
// Reload and rollback inflight compaction
|
// Reload and rollback inflight compaction
|
||||||
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
|
||||||
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, jsc);
|
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
|
||||||
// hoodieTable.rollback(jsc,
|
// hoodieTable.rollback(jsc,
|
||||||
// new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false);
|
// new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION, compactionInstantTime), false);
|
||||||
|
|
||||||
|
|||||||
@@ -104,7 +104,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
|||||||
@Test
|
@Test
|
||||||
public void testCompactionOnCopyOnWriteFail() throws Exception {
|
public void testCompactionOnCopyOnWriteFail() throws Exception {
|
||||||
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
|
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
|
||||||
HoodieTable<?> table = HoodieTable.create(metaClient, getConfig(), jsc);
|
HoodieTable<?> table = HoodieTable.create(metaClient, getConfig(), hadoopConf);
|
||||||
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
|
||||||
assertThrows(HoodieNotSupportedException.class, () -> {
|
assertThrows(HoodieNotSupportedException.class, () -> {
|
||||||
table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
|
table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
|
||||||
@@ -116,7 +116,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
|||||||
public void testCompactionEmpty() throws Exception {
|
public void testCompactionEmpty() throws Exception {
|
||||||
HoodieWriteConfig config = getConfig();
|
HoodieWriteConfig config = getConfig();
|
||||||
metaClient = HoodieTableMetaClient.reload(metaClient);
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
HoodieTable table = HoodieTable.create(metaClient, config, jsc);
|
HoodieTable table = HoodieTable.create(metaClient, config, hadoopConf);
|
||||||
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
try (HoodieWriteClient writeClient = getWriteClient(config);) {
|
||||||
|
|
||||||
String newCommitTime = writeClient.startCommit();
|
String newCommitTime = writeClient.startCommit();
|
||||||
@@ -143,7 +143,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
|||||||
writeClient.insert(recordsRDD, newCommitTime).collect();
|
writeClient.insert(recordsRDD, newCommitTime).collect();
|
||||||
|
|
||||||
// Update all the 100 records
|
// Update all the 100 records
|
||||||
HoodieTable table = HoodieTable.create(config, jsc);
|
HoodieTable table = HoodieTable.create(config, hadoopConf);
|
||||||
newCommitTime = "101";
|
newCommitTime = "101";
|
||||||
writeClient.startCommitWithTime(newCommitTime);
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
|
||||||
@@ -157,7 +157,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
|||||||
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
|
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
|
||||||
|
|
||||||
// Verify that all data file has one log file
|
// Verify that all data file has one log file
|
||||||
table = HoodieTable.create(config, jsc);
|
table = HoodieTable.create(config, hadoopConf);
|
||||||
for (String partitionPath : dataGen.getPartitionPaths()) {
|
for (String partitionPath : dataGen.getPartitionPaths()) {
|
||||||
List<FileSlice> groupedLogFiles =
|
List<FileSlice> groupedLogFiles =
|
||||||
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
|
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
|
||||||
@@ -168,7 +168,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
|
|||||||
HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
|
HoodieTestUtils.createDeltaCommitFiles(basePath, newCommitTime);
|
||||||
|
|
||||||
// Do a compaction
|
// Do a compaction
|
||||||
table = HoodieTable.create(config, jsc);
|
table = HoodieTable.create(config, hadoopConf);
|
||||||
String compactionInstantTime = "102";
|
String compactionInstantTime = "102";
|
||||||
table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
|
table.scheduleCompaction(jsc, compactionInstantTime, Option.empty());
|
||||||
table.getMetaClient().reloadActiveTimeline();
|
table.getMetaClient().reloadActiveTimeline();
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
|
|||||||
}
|
}
|
||||||
// TODO : Figure out a valid HoodieWriteConfig
|
// TODO : Figure out a valid HoodieWriteConfig
|
||||||
private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(),
|
private val hoodieTable = HoodieTable.create(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(),
|
||||||
sqlContext.sparkContext)
|
sqlContext.sparkContext.hadoopConfiguration)
|
||||||
val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
|
val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
|
||||||
if (commitTimeline.empty()) {
|
if (commitTimeline.empty()) {
|
||||||
throw new HoodieException("No instants to incrementally pull")
|
throw new HoodieException("No instants to incrementally pull")
|
||||||
|
|||||||
Reference in New Issue
Block a user