[HUDI-1191] Add incremental meta client API to query partitions modified in a time window
This commit is contained in:
@@ -20,16 +20,14 @@ package org.apache.hudi.sync.common;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.TableSchemaResolver;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.table.timeline.TimelineUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
@@ -40,12 +38,10 @@ import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class AbstractSyncHoodieClient {
|
||||
private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
|
||||
protected final HoodieTableMetaClient metaClient;
|
||||
protected HoodieTimeline activeTimeline;
|
||||
protected final HoodieTableType tableType;
|
||||
protected final FileSystem fs;
|
||||
private String basePath;
|
||||
@@ -57,7 +53,6 @@ public abstract class AbstractSyncHoodieClient {
|
||||
this.basePath = basePath;
|
||||
this.assumeDatePartitioning = assumeDatePartitioning;
|
||||
this.fs = fs;
|
||||
this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
}
|
||||
|
||||
public abstract void createTable(String tableName, MessageType storageSchema,
|
||||
@@ -75,10 +70,6 @@ public abstract class AbstractSyncHoodieClient {
|
||||
|
||||
public abstract Map<String, String> getTableSchema(String tableName);
|
||||
|
||||
public HoodieTimeline getActiveTimeline() {
|
||||
return activeTimeline;
|
||||
}
|
||||
|
||||
public HoodieTableType getTableType() {
|
||||
return tableType;
|
||||
}
|
||||
@@ -135,21 +126,14 @@ public abstract class AbstractSyncHoodieClient {
|
||||
}
|
||||
} else {
|
||||
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
|
||||
|
||||
HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
|
||||
return timelineToSync.getInstants().map(s -> {
|
||||
try {
|
||||
return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e);
|
||||
}
|
||||
}).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList());
|
||||
return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
|
||||
.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the schema from the log file on path.
|
||||
*/
|
||||
*/
|
||||
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||
private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) throws Exception {
|
||||
MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path);
|
||||
|
||||
Reference in New Issue
Block a user