Adding a config to control whether date partitioning can be assumed
- false by default - CAUTION: If you have an existing tables without partition metadata, you need to set this to "true"
This commit is contained in:
committed by
vinoth chandar
parent
f9fd16069d
commit
dce35ff0d7
@@ -476,7 +476,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
+ lastCommitRetained);
|
+ lastCommitRetained);
|
||||||
|
|
||||||
Map<String, List<String>> latestFilesMap = jsc.parallelize(
|
Map<String, List<String>> latestFilesMap = jsc.parallelize(
|
||||||
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()))
|
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
|
||||||
.mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
|
.mapToPair((PairFunction<String, String, List<String>>) partitionPath -> {
|
||||||
// Scan all partitions files with this commit time
|
// Scan all partitions files with this commit time
|
||||||
logger.info("Collecting latest files in partition path " + partitionPath);
|
logger.info("Collecting latest files in partition path " + partitionPath);
|
||||||
@@ -650,7 +650,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
logger.info("Clean out all parquet files generated for commits: " + commits);
|
logger.info("Clean out all parquet files generated for commits: " + commits);
|
||||||
final LongAccumulator numFilesDeletedCounter = jsc.sc().longAccumulator();
|
final LongAccumulator numFilesDeletedCounter = jsc.sc().longAccumulator();
|
||||||
List<HoodieRollbackStat> stats = jsc.parallelize(
|
List<HoodieRollbackStat> stats = jsc.parallelize(
|
||||||
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath()))
|
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning()))
|
||||||
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
|
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
|
||||||
// Scan all partitions files with this commit time
|
// Scan all partitions files with this commit time
|
||||||
logger.info("Cleaning path " + partitionPath);
|
logger.info("Cleaning path " + partitionPath);
|
||||||
@@ -739,7 +739,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
|||||||
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
.getHoodieTable(new HoodieTableMetaClient(fs, config.getBasePath(), true), config);
|
||||||
|
|
||||||
List<String> partitionsToClean =
|
List<String> partitionsToClean =
|
||||||
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath());
|
FSUtils.getAllPartitionPaths(fs, table.getMetaClient().getBasePath(), config.shouldAssumeDatePartitioning());
|
||||||
// shuffle to distribute cleaning work across partitions evenly
|
// shuffle to distribute cleaning work across partitions evenly
|
||||||
Collections.shuffle(partitionsToClean);
|
Collections.shuffle(partitionsToClean);
|
||||||
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config
|
logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config
|
||||||
|
|||||||
@@ -51,6 +51,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
|
private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
|
||||||
private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
|
private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
|
||||||
private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
|
private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
|
||||||
|
private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
|
||||||
|
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
|
||||||
|
|
||||||
|
|
||||||
private HoodieWriteConfig(Properties props) {
|
private HoodieWriteConfig(Properties props) {
|
||||||
@@ -76,6 +78,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Boolean.parseBoolean(props.getProperty(HOODIE_AUTO_COMMIT_PROP));
|
return Boolean.parseBoolean(props.getProperty(HOODIE_AUTO_COMMIT_PROP));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean shouldAssumeDatePartitioning() {
|
||||||
|
return Boolean.parseBoolean(props.getProperty(HOODIE_ASSUME_DATE_PARTITIONING_PROP));
|
||||||
|
}
|
||||||
|
|
||||||
public int getInsertShuffleParallelism() {
|
public int getInsertShuffleParallelism() {
|
||||||
return Integer.parseInt(props.getProperty(INSERT_PARALLELISM));
|
return Integer.parseInt(props.getProperty(INSERT_PARALLELISM));
|
||||||
}
|
}
|
||||||
@@ -337,6 +343,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
|
WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
|
||||||
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP),
|
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP),
|
||||||
HOODIE_AUTO_COMMIT_PROP, DEFAULT_HOODIE_AUTO_COMMIT);
|
HOODIE_AUTO_COMMIT_PROP, DEFAULT_HOODIE_AUTO_COMMIT);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(HOODIE_ASSUME_DATE_PARTITIONING_PROP),
|
||||||
|
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
|
||||||
|
|
||||||
|
|
||||||
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().build());
|
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().build());
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
|||||||
String compactionCommit = startCompactionCommit(hoodieTable);
|
String compactionCommit = startCompactionCommit(hoodieTable);
|
||||||
log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit);
|
log.info("Compacting " + metaClient.getBasePath() + " with commit " + compactionCommit);
|
||||||
List<String> partitionPaths =
|
List<String> partitionPaths =
|
||||||
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath());
|
FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning());
|
||||||
|
|
||||||
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
|
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
|
||||||
List<CompactionOperation> operations =
|
List<CompactionOperation> operations =
|
||||||
|
|||||||
@@ -403,7 +403,7 @@ public class TestHoodieClient implements Serializable {
|
|||||||
// Verify there are no errors
|
// Verify there are no errors
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
|
|
||||||
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath());
|
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
|
||||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
|
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
|
||||||
final TableFileSystemView view = table.getFileSystemView();
|
final TableFileSystemView view = table.getFileSystemView();
|
||||||
@@ -475,7 +475,7 @@ public class TestHoodieClient implements Serializable {
|
|||||||
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
|
||||||
// Verify there are no errors
|
// Verify there are no errors
|
||||||
assertNoWriteErrors(statuses);
|
assertNoWriteErrors(statuses);
|
||||||
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath());
|
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(), getConfig().shouldAssumeDatePartitioning());
|
||||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
|
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath);
|
||||||
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
|
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig());
|
||||||
final TableFileSystemView view1 = table.getFileSystemView();
|
final TableFileSystemView view1 = table.getFileSystemView();
|
||||||
|
|||||||
@@ -139,7 +139,7 @@ public class FSUtils {
|
|||||||
* Obtain all the partition paths, that are present in this table, denoted by presence of {@link
|
* Obtain all the partition paths, that are present in this table, denoted by presence of {@link
|
||||||
* com.uber.hoodie.common.model.HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}
|
* com.uber.hoodie.common.model.HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}
|
||||||
*/
|
*/
|
||||||
public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr)
|
public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<String> partitions = new ArrayList<>();
|
List<String> partitions = new ArrayList<>();
|
||||||
Path basePath = new Path(basePathStr);
|
Path basePath = new Path(basePathStr);
|
||||||
@@ -155,6 +155,15 @@ public class FSUtils {
|
|||||||
return partitions;
|
return partitions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr, boolean assumeDatePartitioning)
|
||||||
|
throws IOException {
|
||||||
|
if (assumeDatePartitioning) {
|
||||||
|
return getAllFoldersThreeLevelsDown(fs, basePathStr);
|
||||||
|
} else {
|
||||||
|
return getAllFoldersWithPartitionMetaFile(fs, basePathStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static String getFileExtension(String fullName) {
|
public static String getFileExtension(String fullName) {
|
||||||
Preconditions.checkNotNull(fullName);
|
Preconditions.checkNotNull(fullName);
|
||||||
String fileName = (new File(fullName)).getName();
|
String fileName = (new File(fullName)).getName();
|
||||||
|
|||||||
@@ -65,9 +65,12 @@ public class HoodieSnapshotCopier implements Serializable {
|
|||||||
|
|
||||||
@Parameter(names = {"--output-path", "-op"}, description = "The snapshot output path", required = true)
|
@Parameter(names = {"--output-path", "-op"}, description = "The snapshot output path", required = true)
|
||||||
String outputPath = null;
|
String outputPath = null;
|
||||||
|
|
||||||
|
@Parameter(names = {"--date-partitioned", "-dp"}, description = "Can we assume date partitioning?")
|
||||||
|
boolean shouldAssumeDatePartitioning = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir) throws IOException {
|
public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir, final boolean shouldAssumeDatePartitioning) throws IOException {
|
||||||
FileSystem fs = FSUtils.getFs();
|
FileSystem fs = FSUtils.getFs();
|
||||||
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir);
|
final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir);
|
||||||
final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata,
|
final TableFileSystemView fsView = new HoodieTableFileSystemView(tableMetadata,
|
||||||
@@ -82,7 +85,7 @@ public class HoodieSnapshotCopier implements Serializable {
|
|||||||
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
|
final String latestCommitTimestamp = latestCommit.get().getTimestamp();
|
||||||
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
|
logger.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.", latestCommitTimestamp));
|
||||||
|
|
||||||
List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir);
|
List<String> partitions = FSUtils.getAllPartitionPaths(fs, baseDir, shouldAssumeDatePartitioning);
|
||||||
if (partitions.size() > 0) {
|
if (partitions.size() > 0) {
|
||||||
logger.info(String.format("The job needs to copy %d partitions.", partitions.size()));
|
logger.info(String.format("The job needs to copy %d partitions.", partitions.size()));
|
||||||
|
|
||||||
@@ -172,7 +175,7 @@ public class HoodieSnapshotCopier implements Serializable {
|
|||||||
|
|
||||||
// Copy
|
// Copy
|
||||||
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
||||||
copier.snapshot(jsc, cfg.basePath, cfg.outputPath);
|
copier.snapshot(jsc, cfg.basePath, cfg.outputPath, cfg.shouldAssumeDatePartitioning);
|
||||||
|
|
||||||
// Stop the job
|
// Stop the job
|
||||||
jsc.stop();
|
jsc.stop();
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ public class TestHoodieSnapshotCopier {
|
|||||||
|
|
||||||
// Do the snapshot
|
// Do the snapshot
|
||||||
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
||||||
copier.snapshot(jsc, basePath, outputPath);
|
copier.snapshot(jsc, basePath, outputPath, true);
|
||||||
|
|
||||||
// Nothing changed; we just bail out
|
// Nothing changed; we just bail out
|
||||||
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
|
assertEquals(fs.listStatus(new Path(basePath)).length, 1);
|
||||||
@@ -117,7 +117,7 @@ public class TestHoodieSnapshotCopier {
|
|||||||
|
|
||||||
// Do a snapshot copy
|
// Do a snapshot copy
|
||||||
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
HoodieSnapshotCopier copier = new HoodieSnapshotCopier();
|
||||||
copier.snapshot(jsc, basePath, outputPath);
|
copier.snapshot(jsc, basePath, outputPath, false);
|
||||||
|
|
||||||
// Check results
|
// Check results
|
||||||
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
|
assertTrue(fs.exists(new Path(outputPath + "/2016/05/01/" + file11.getName())));
|
||||||
|
|||||||
Reference in New Issue
Block a user