1
0

[MINOR] Remove redundant plus operator (#1097)

This commit is contained in:
lamber-ken
2019-12-12 05:42:05 +08:00
committed by leesf
parent 3790b75e05
commit ba514cfea0
46 changed files with 106 additions and 107 deletions

View File

@@ -472,8 +472,7 @@ public class CompactionCommand implements CommandMarker {
if (result.get()) { if (result.get()) {
System.out.println("All renames successfully completed to " + operation + " done !!"); System.out.println("All renames successfully completed to " + operation + " done !!");
} else { } else {
System.out System.out.println("Some renames failed. DataSet could be in inconsistent-state. Try running compaction repair");
.println("Some renames failed. DataSet could be in inconsistent-state. " + "Try running compaction repair");
} }
List<Comparable[]> rows = new ArrayList<>(); List<Comparable[]> rows = new ArrayList<>();

View File

@@ -53,7 +53,7 @@ public class RepairsCommand implements CommandMarker {
} }
@CliCommand(value = "repair deduplicate", @CliCommand(value = "repair deduplicate",
help = "De-duplicate a partition path contains duplicates & produce " + "repaired files to replace with") help = "De-duplicate a partition path contains duplicates & produce repaired files to replace with")
public String deduplicate( public String deduplicate(
@CliOption(key = {"duplicatedPartitionPath"}, help = "Partition Path containing the duplicates", @CliOption(key = {"duplicatedPartitionPath"}, help = "Partition Path containing the duplicates",
mandatory = true) final String duplicatedPartitionPath, mandatory = true) final String duplicatedPartitionPath,

View File

@@ -333,7 +333,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
} }
} else { } else {
throw new CompactionValidationException( throw new CompactionValidationException(
"Unable to find any committed instant. Compaction Operation may " + "be pointing to stale file-slices"); "Unable to find any committed instant. Compaction Operation may be pointing to stale file-slices");
} }
} catch (CompactionValidationException | IllegalArgumentException e) { } catch (CompactionValidationException | IllegalArgumentException e) {
return new ValidationOpResult(operation, false, Option.of(e)); return new ValidationOpResult(operation, false, Option.of(e));

View File

@@ -1017,7 +1017,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> { metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> {
Preconditions.checkArgument( Preconditions.checkArgument(
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER),
"Latest pending compaction instant time must be earlier " + "than this instant time. Latest Compaction :" "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime); + latestPending + ", Ingesting at " + instantTime);
}); });
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
@@ -1049,7 +1049,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> { metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight -> {
Preconditions.checkArgument( Preconditions.checkArgument(
HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER), HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), instantTime, HoodieTimeline.GREATER),
"Earliest write inflight instant time must be later " + "than compaction time. Earliest :" + earliestInflight "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
+ ", Compaction scheduled at " + instantTime); + ", Compaction scheduled at " + instantTime);
}); });
// Committed and pending compaction instants should have strictly lower timestamps // Committed and pending compaction instants should have strictly lower timestamps

View File

@@ -43,8 +43,8 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
// Turn on inline compaction - after fw delta commits a inline compaction will be run // Turn on inline compaction - after fw delta commits a inline compaction will be run
public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline"; public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
// Run a compaction every N delta commits // Run a compaction every N delta commits
public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max" + ".delta.commits"; public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = "hoodie.compact.inline.max.delta.commits";
public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions" + ".retained"; public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = "hoodie.cleaner.fileversions.retained";
public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained"; public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained";
public static final String CLEANER_INCREMENTAL_MODE = "hoodie.cleaner.incremental.mode"; public static final String CLEANER_INCREMENTAL_MODE = "hoodie.cleaner.incremental.mode";
public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits"; public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits";
@@ -58,18 +58,18 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
* Configs related to specific table types. * Configs related to specific table types.
*/ */
// Number of inserts, that will be put each partition/bucket for writing // Number of inserts, that will be put each partition/bucket for writing
public static final String COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE = "hoodie.copyonwrite.insert" + ".split.size"; public static final String COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE = "hoodie.copyonwrite.insert.split.size";
// The rationale to pick the insert parallelism is the following. Writing out 100MB files, // The rationale to pick the insert parallelism is the following. Writing out 100MB files,
// with atleast 1kb records, means 100K records per file. we just overprovision to 500K // with atleast 1kb records, means 100K records per file. we just overprovision to 500K
public static final String DEFAULT_COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE = String.valueOf(500000); public static final String DEFAULT_COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE = String.valueOf(500000);
// Config to control whether we control insert split sizes automatically based on average // Config to control whether we control insert split sizes automatically based on average
// record sizes // record sizes
public static final String COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS = "hoodie.copyonwrite.insert" + ".auto.split"; public static final String COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS = "hoodie.copyonwrite.insert.auto.split";
// its off by default // its off by default
public static final String DEFAULT_COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS = String.valueOf(true); public static final String DEFAULT_COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS = String.valueOf(true);
// This value is used as a guessimate for the record size, if we can't determine this from // This value is used as a guessimate for the record size, if we can't determine this from
// previous commits // previous commits
public static final String COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE = "hoodie.copyonwrite" + ".record.size.estimate"; public static final String COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE = "hoodie.copyonwrite.record.size.estimate";
// Used to determine how much more can be packed into a small file, before it exceeds the size // Used to determine how much more can be packed into a small file, before it exceeds the size
// limit. // limit.
public static final String DEFAULT_COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE = String.valueOf(1024); public static final String DEFAULT_COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE = String.valueOf(1024);
@@ -88,10 +88,10 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
// used to choose a trade off between IO vs Memory when performing compaction process // used to choose a trade off between IO vs Memory when performing compaction process
// Depending on outputfile_size and memory provided, choose true to avoid OOM for large file // Depending on outputfile_size and memory provided, choose true to avoid OOM for large file
// size + small memory // size + small memory
public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "hoodie.compaction.lazy" + ".block.read"; public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = "hoodie.compaction.lazy.block.read";
public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "false"; public static final String DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED = "false";
// used to choose whether to enable reverse log reading (reverse log traversal) // used to choose whether to enable reverse log reading (reverse log traversal)
public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = "hoodie.compaction" + ".reverse.log.read"; public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = "hoodie.compaction.reverse.log.read";
public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false"; public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false";
private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(); private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
private static final String DEFAULT_AUTO_CLEAN = "true"; private static final String DEFAULT_AUTO_CLEAN = "true";
@@ -104,7 +104,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20"; private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10); private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10);
public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP =
"hoodie.compaction.daybased.target" + ".partitions"; "hoodie.compaction.daybased.target.partitions";
// 500GB of target IO per compaction (both read and write) // 500GB of target IO per compaction (both read and write)
public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = String.valueOf(10); public static final String DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = String.valueOf(10);

View File

@@ -45,7 +45,7 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism"; public static final String BLOOM_INDEX_PARALLELISM_PROP = "hoodie.bloom.index.parallelism";
// Disable explicit bloom index parallelism setting by default - hoodie auto computes // Disable explicit bloom index parallelism setting by default - hoodie auto computes
public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0"; public static final String DEFAULT_BLOOM_INDEX_PARALLELISM = "0";
public static final String BLOOM_INDEX_PRUNE_BY_RANGES_PROP = "hoodie.bloom.index.prune.by" + ".ranges"; public static final String BLOOM_INDEX_PRUNE_BY_RANGES_PROP = "hoodie.bloom.index.prune.by.ranges";
public static final String DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES = "true"; public static final String DEFAULT_BLOOM_INDEX_PRUNE_BY_RANGES = "true";
public static final String BLOOM_INDEX_USE_CACHING_PROP = "hoodie.bloom.index.use.caching"; public static final String BLOOM_INDEX_USE_CACHING_PROP = "hoodie.bloom.index.use.caching";
public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true"; public static final String DEFAULT_BLOOM_INDEX_USE_CACHING = "true";
@@ -69,7 +69,7 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
public static final String DEFAULT_HBASE_BATCH_SIZE = "100"; public static final String DEFAULT_HBASE_BATCH_SIZE = "100";
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage" + ".level"; public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level";
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
private HoodieIndexConfig(Properties props) { private HoodieIndexConfig(Properties props) {

View File

@@ -69,7 +69,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit"; private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true"; private static final String DEFAULT_HOODIE_AUTO_COMMIT = "true";
private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date" + ".partitioning"; private static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";
private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false"; private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class"; private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();

View File

@@ -67,7 +67,7 @@ public class OperationResult<T> implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "OperationResult{" + "operation=" + operation + ", executed=" + executed + ", success=" + success return "OperationResult{operation=" + operation + ", executed=" + executed + ", success=" + success
+ ", exception=" + exception + '}'; + ", exception=" + exception + '}';
} }
} }

View File

@@ -207,7 +207,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
long totalRecords = recordsPerPartition.values().stream().mapToLong(Long::longValue).sum(); long totalRecords = recordsPerPartition.values().stream().mapToLong(Long::longValue).sum();
int parallelism = (int) (totalComparisons / MAX_ITEMS_PER_SHUFFLE_PARTITION + 1); int parallelism = (int) (totalComparisons / MAX_ITEMS_PER_SHUFFLE_PARTITION + 1);
LOG.info(String.format( LOG.info(String.format(
"TotalRecords %d, TotalFiles %d, TotalAffectedPartitions %d, TotalComparisons %d, " + "SafeParallelism %d", "TotalRecords %d, TotalFiles %d, TotalAffectedPartitions %d, TotalComparisons %d, SafeParallelism %d",
totalRecords, totalFiles, recordsPerPartition.size(), totalComparisons, parallelism)); totalRecords, totalFiles, recordsPerPartition.size(), totalComparisons, parallelism));
return parallelism; return parallelism;
} }
@@ -226,8 +226,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
// take the max // take the max
int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism()); int indexParallelism = Math.max(inputParallelism, config.getBloomIndexParallelism());
int joinParallelism = Math.max(totalSubPartitions, indexParallelism); int joinParallelism = Math.max(totalSubPartitions, indexParallelism);
LOG.info("InputParallelism: ${" + inputParallelism + "}, " + "IndexParallelism: ${" LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: ${"
+ config.getBloomIndexParallelism() + "}, " + "TotalSubParts: ${" + totalSubPartitions + "}, " + config.getBloomIndexParallelism() + "}, TotalSubParts: ${" + totalSubPartitions + "}, "
+ "Join Parallelism set to : " + joinParallelism); + "Join Parallelism set to : " + joinParallelism);
return joinParallelism; return joinParallelism;
} }

View File

@@ -62,7 +62,7 @@ class KeyRangeNode implements Comparable<KeyRangeNode>, Serializable {
@Override @Override
public String toString() { public String toString() {
return "KeyRangeNode{" + "minRecordKey='" + minRecordKey + '\'' + ", maxRecordKey='" + maxRecordKey + '\'' return "KeyRangeNode{minRecordKey='" + minRecordKey + '\'' + ", maxRecordKey='" + maxRecordKey + '\''
+ ", fileNameList=" + fileNameList + ", rightSubTreeMax='" + rightSubTreeMax + '\'' + ", leftSubTreeMax='" + ", fileNameList=" + fileNameList + ", rightSubTreeMax='" + rightSubTreeMax + '\'' + ", leftSubTreeMax='"
+ leftSubTreeMax + '\'' + ", rightSubTreeMin='" + rightSubTreeMin + '\'' + ", leftSubTreeMin='" + leftSubTreeMin + leftSubTreeMax + '\'' + ", rightSubTreeMin='" + rightSubTreeMin + '\'' + ", leftSubTreeMin='" + leftSubTreeMin
+ '\'' + '}'; + '\'' + '}';

View File

@@ -218,7 +218,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
compactionPlan.getOperations().stream().noneMatch( compactionPlan.getOperations().stream().noneMatch(
op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))), op -> fgIdsInPendingCompactions.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
"Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. " "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
+ "Please fix your strategy implementation." + "FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactions
+ ", Selected workload :" + compactionPlan); + ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) { if (compactionPlan.getOperations().isEmpty()) {
LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath()); LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());

View File

@@ -574,7 +574,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
assignUpdates(profile); assignUpdates(profile);
assignInserts(profile); assignInserts(profile);
LOG.info("Total Buckets :" + totalBuckets + ", " + "buckets info => " + bucketInfoMap + ", \n" LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n"
+ "UpdateLocations mapped to buckets =>" + updateLocationToBucket); + "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
} }

View File

@@ -220,7 +220,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends Hoodi
switch (instantToRollback.getAction()) { switch (instantToRollback.getAction()) {
case HoodieTimeline.COMMIT_ACTION: case HoodieTimeline.COMMIT_ACTION:
LOG.info( LOG.info(
"Rolling back commit action. There are higher delta commits. So only rolling back this " + "instant"); "Rolling back commit action. There are higher delta commits. So only rolling back this instant");
partitionRollbackRequests.add( partitionRollbackRequests.add(
RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback)); RollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath, instantToRollback));
break; break;

View File

@@ -74,11 +74,11 @@ public class HoodieTestDataGenerator {
public static final String[] DEFAULT_PARTITION_PATHS = public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3; public static final int DEFAULT_PARTITION_DEPTH = 3;
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}"; + "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double"; public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double";

View File

@@ -290,7 +290,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
try { try {
bloomIndex.tagLocation(recordRDD, jsc, table); bloomIndex.tagLocation(recordRDD, jsc, table);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
fail("EmptyRDD should not result in IllegalArgumentException: Positive number of slices " + "required"); fail("EmptyRDD should not result in IllegalArgumentException: Positive number of slices required");
} }
} }
@@ -300,11 +300,11 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
String rowKey1 = UUID.randomUUID().toString(); String rowKey1 = UUID.randomUUID().toString();
String rowKey2 = UUID.randomUUID().toString(); String rowKey2 = UUID.randomUUID().toString();
String rowKey3 = UUID.randomUUID().toString(); String rowKey3 = UUID.randomUUID().toString();
String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\"," + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\"," + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\"," + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
// place same row key under a different partition. // place same row key under a different partition.
String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\"," + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1); TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
HoodieRecord record1 = HoodieRecord record1 =
new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);

View File

@@ -341,7 +341,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
assertTrue(result); assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals( assertEquals(
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only " + "archive 100)", 5, "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)", 5,
timeline.countInstants()); timeline.countInstants());
assertTrue("Archived commits should always be safe", assertTrue("Archived commits should always be safe",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101"))); timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101")));
@@ -380,7 +380,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
assertFalse("Instants before oldest pending compaction can be removed", assertFalse("Instants before oldest pending compaction can be removed",
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100"))); timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")));
assertEquals("Since we have a pending compaction at 101, we should never archive any commit " assertEquals("Since we have a pending compaction at 101, we should never archive any commit "
+ "after 101 (we only " + "archive 100)", 7, timeline.countInstants()); + "after 101 (we only archive 100)", 7, timeline.countInstants());
assertTrue("Requested Compaction must still be present", assertTrue("Requested Compaction must still be present",
timeline.containsInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101"))); timeline.containsInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "101")));
assertTrue("Instants greater than oldest pending compaction must be present", assertTrue("Instants greater than oldest pending compaction must be present",

View File

@@ -135,7 +135,7 @@ public class CompactionOperation implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "CompactionOperation{" + "baseInstantTime='" + baseInstantTime + '\'' + ", dataFileCommitTime=" return "CompactionOperation{baseInstantTime='" + baseInstantTime + '\'' + ", dataFileCommitTime="
+ dataFileCommitTime + ", deltaFileNames=" + deltaFileNames + ", dataFileName=" + dataFileName + ", id='" + id + dataFileCommitTime + ", deltaFileNames=" + deltaFileNames + ", dataFileName=" + dataFileName + ", id='" + id
+ '\'' + ", metrics=" + metrics + '}'; + '\'' + ", metrics=" + metrics + '}';
} }

View File

@@ -341,7 +341,7 @@ public class HoodieCommitMetadata implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "HoodieCommitMetadata{" + "partitionToWriteStats=" + partitionToWriteStats + ", compacted=" + compacted return "HoodieCommitMetadata{partitionToWriteStats=" + partitionToWriteStats + ", compacted=" + compacted
+ ", extraMetadataMap=" + extraMetadataMap + '}'; + ", extraMetadataMap=" + extraMetadataMap + '}';
} }
} }

View File

@@ -98,6 +98,6 @@ public class HoodieDataFile implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "HoodieDataFile{" + "fullPath=" + fullPath + ", fileLen=" + fileLen + '}'; return "HoodieDataFile{fullPath=" + fullPath + ", fileLen=" + fileLen + '}';
} }
} }

View File

@@ -62,6 +62,6 @@ public class HoodieFileGroupId implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "HoodieFileGroupId{" + "partitionPath='" + partitionPath + '\'' + ", fileId='" + fileId + '\'' + '}'; return "HoodieFileGroupId{partitionPath='" + partitionPath + '\'' + ", fileId='" + fileId + '\'' + '}';
} }
} }

View File

@@ -179,6 +179,6 @@ public class HoodieLogFile implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "HoodieLogFile{" + "pathStr='" + pathStr + '\'' + ", fileLen=" + fileLen + '}'; return "HoodieLogFile{pathStr='" + pathStr + '\'' + ", fileLen=" + fileLen + '}';
} }
} }

View File

@@ -101,7 +101,7 @@ public class HoodiePartitionMetadata {
fs.rename(tmpMetaPath, metaPath); fs.rename(tmpMetaPath, metaPath);
} }
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Error trying to save partition metadata (this is okay, as long as " + "atleast 1 of these succced), " LOG.warn("Error trying to save partition metadata (this is okay, as long as atleast 1 of these succced), "
+ partitionPath, ioe); + partitionPath, ioe);
} finally { } finally {
if (!metafileExists) { if (!metafileExists) {

View File

@@ -320,7 +320,7 @@ public class HoodieWriteStat implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "HoodieWriteStat{" + "fileId='" + fileId + '\'' + ", path='" + path + '\'' + ", prevCommit='" + prevCommit return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path + '\'' + ", prevCommit='" + prevCommit
+ '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + ", numUpdateWrites=" + numUpdateWrites + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + ", numUpdateWrites=" + numUpdateWrites
+ ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + totalWriteErrors + ", tempPath='" + tempPath + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + totalWriteErrors + ", tempPath='" + tempPath
+ '\'' + ", partitionPath='" + partitionPath + '\'' + ", totalLogRecords=" + totalLogRecords + '\'' + ", partitionPath='" + partitionPath + '\'' + ", totalLogRecords=" + totalLogRecords

View File

@@ -140,6 +140,6 @@ public final class Option<T> implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "Option{" + "val=" + val + '}'; return "Option{val=" + val + '}';
} }
} }

View File

@@ -129,7 +129,7 @@ public class ParquetUtils {
footerVals.put(footerName, metadata.get(footerName)); footerVals.put(footerName, metadata.get(footerName));
} else if (required) { } else if (required) {
throw new MetadataNotFoundException( throw new MetadataNotFoundException(
"Could not find index in Parquet footer. " + "Looked for key " + footerName + " in " + parquetFilePath); "Could not find index in Parquet footer. Looked for key " + footerName + " in " + parquetFilePath);
} }
} }
return footerVals; return footerVals;

View File

@@ -65,7 +65,7 @@ public class SpillableMapUtils {
long crcOfReadValue = generateChecksum(value); long crcOfReadValue = generateChecksum(value);
if (crc != crcOfReadValue) { if (crc != crcOfReadValue) {
throw new HoodieCorruptedDataException( throw new HoodieCorruptedDataException(
"checksum of payload written to external disk does not match, " + "data may be corrupted"); "checksum of payload written to external disk does not match, data may be corrupted");
} }
return new FileEntry(crc, keySize, valueSize, key, value, timestamp); return new FileEntry(crc, keySize, valueSize, key, value, timestamp);
} }

View File

@@ -64,7 +64,7 @@ public class TimelineDiffHelper {
if (!lostPendingCompactions.isEmpty()) { if (!lostPendingCompactions.isEmpty()) {
// If a compaction is unscheduled, fall back to complete refresh of fs view since some log files could have been // If a compaction is unscheduled, fall back to complete refresh of fs view since some log files could have been
// moved. Its unsafe to incrementally sync in that case. // moved. Its unsafe to incrementally sync in that case.
LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?)." + "They are :" LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :"
+ lostPendingCompactions); + lostPendingCompactions);
return TimelineDiffResult.UNSAFE_SYNC_RESULT; return TimelineDiffResult.UNSAFE_SYNC_RESULT;
} }
@@ -132,7 +132,7 @@ public class TimelineDiffHelper {
@Override @Override
public String toString() { public String toString() {
return "TimelineDiffResult{" + "newlySeenInstants=" + newlySeenInstants + ", finishedCompactionInstants=" return "TimelineDiffResult{newlySeenInstants=" + newlySeenInstants + ", finishedCompactionInstants="
+ finishedCompactionInstants + ", canSyncIncrementally=" + canSyncIncrementally + '}'; + finishedCompactionInstants + ", canSyncIncrementally=" + canSyncIncrementally + '}';
} }
} }

View File

@@ -103,7 +103,7 @@ public class TestHoodieLogFormatAppendFailure {
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath) Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits" + ".archive") .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive")
.overBaseCommit("").withFs(fs).build(); .overBaseCommit("").withFs(fs).build();
writer = writer.appendBlock(dataBlock); writer = writer.appendBlock(dataBlock);
@@ -134,7 +134,7 @@ public class TestHoodieLogFormatAppendFailure {
// Opening a new Writer right now will throw IOException. The code should handle this, rollover the logfile and // Opening a new Writer right now will throw IOException. The code should handle this, rollover the logfile and
// return a new writer with a bumped up logVersion // return a new writer with a bumped up logVersion
writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath) writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits" + ".archive") .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive")
.overBaseCommit("").withFs(fs).build(); .overBaseCommit("").withFs(fs).build();
// The log version should be different for this new writer // The log version should be different for this new writer
Assert.assertFalse(writer.getLogFile().getLogVersion() == logFileVersion); Assert.assertFalse(writer.getLogFile().getLogVersion() == logFileVersion);

View File

@@ -30,8 +30,8 @@ import java.util.Map;
*/ */
public class TestHoodieAvroUtils { public class TestHoodieAvroUtils {
private static String EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"testrec\"," + "\"fields\": [ " private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+ "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"}," + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}"; + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}";

View File

@@ -144,11 +144,11 @@ public abstract class AbstractRealtimeRecordReader {
if (w instanceof ArrayWritable) { if (w instanceof ArrayWritable) {
builder.append(arrayWritableToString((ArrayWritable) w)).append(","); builder.append(arrayWritableToString((ArrayWritable) w)).append(",");
} else { } else {
builder.append("\"value" + i + "\":" + "\"" + w + "\"").append(","); builder.append("\"value" + i + "\":\"" + w + "\"").append(",");
if (w == null) { if (w == null) {
builder.append("\"type" + i + "\":" + "\"unknown\"").append(","); builder.append("\"type" + i + "\":\"unknown\"").append(",");
} else { } else {
builder.append("\"type" + i + "\":" + "\"" + w.getClass().getSimpleName() + "\"").append(","); builder.append("\"type" + i + "\":\"" + w.getClass().getSimpleName() + "\"").append(",");
} }
} }
i++; i++;

View File

@@ -99,7 +99,7 @@ public class HoodieRealtimeFileSplit extends FileSplit {
@Override @Override
public String toString() { public String toString() {
return "HoodieRealtimeFileSplit{" + "DataPath=" + getPath() + ", deltaFilePaths=" + deltaFilePaths return "HoodieRealtimeFileSplit{DataPath=" + getPath() + ", deltaFilePaths=" + deltaFilePaths
+ ", maxCommitTime='" + maxCommitTime + '\'' + ", basePath='" + basePath + '\'' + '}'; + ", maxCommitTime='" + maxCommitTime + '\'' + ", basePath='" + basePath + '\'' + '}';
} }
} }

View File

@@ -108,7 +108,7 @@ public class TestHoodieInputFormat {
InputFormatTestUtil.setupIncremental(jobConf, "100", 1); InputFormatTestUtil.setupIncremental(jobConf, "100", 1);
FileStatus[] files = inputFormat.listStatus(jobConf); FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals("We should exclude commit 100 when returning incremental pull with start commit time as " + "100", 0, assertEquals("We should exclude commit 100 when returning incremental pull with start commit time as 100", 0,
files.length); files.length);
} }
@@ -152,7 +152,7 @@ public class TestHoodieInputFormat {
InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL); InputFormatTestUtil.setupIncremental(jobConf, "100", HoodieHiveUtil.MAX_COMMIT_ALL);
files = inputFormat.listStatus(jobConf); files = inputFormat.listStatus(jobConf);
assertEquals("Pulling all commits from 100, should get us the 1 file from each of 200,300,400,500,400 " + "commits", assertEquals("Pulling all commits from 100, should get us the 1 file from each of 200,300,400,500,400 commits",
5, files.length); 5, files.length);
ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 600 commit", files, "600", 1); ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 600 commit", files, "600", 1);
ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 500 commit", files, "500", 1); ensureFilesInCommit("Pulling all commits from 100, should get us the 1 files from 500 commit", files, "500", 1);

View File

@@ -50,12 +50,12 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by") @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
public List<String> partitionFields = new ArrayList<>(); public List<String> partitionFields = new ArrayList<>();
@Parameter(names = "--partition-value-extractor", description = "Class which implements " + "PartitionValueExtractor " @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+ "to extract the partition " + "values from HDFS path") + "to extract the partition values from HDFS path")
public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName(); public String partitionValueExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getName();
@Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this" @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ " exists to support " + "backward compatibility. If" + " you use hoodie 0.3.x, do " + "not set this parameter") + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
public Boolean assumeDatePartitioning = false; public Boolean assumeDatePartitioning = false;
@Parameter(names = {"--use-pre-apache-input-format"}, @Parameter(names = {"--use-pre-apache-input-format"},
@@ -88,7 +88,7 @@ public class HiveSyncConfig implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "HiveSyncConfig{" + "databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\'' return "HiveSyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
+ ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\'' + ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='" + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning

View File

@@ -187,7 +187,7 @@ public class HoodieHiveClient {
+ ". Check partition strategy. "); + ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>(); List<String> partBuilder = new ArrayList<>();
for (int i = 0; i < syncConfig.partitionFields.size(); i++) { for (int i = 0; i < syncConfig.partitionFields.size(); i++) {
partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`=" + "'" + partitionValues.get(i) + "'"); partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValues.get(i) + "'");
} }
return partBuilder.stream().collect(Collectors.joining(",")); return partBuilder.stream().collect(Collectors.joining(","));
} }

View File

@@ -34,11 +34,11 @@ import java.util.List;
public class ITTestHoodieDemo extends ITTestBase { public class ITTestHoodieDemo extends ITTestBase {
private static String HDFS_DATA_DIR = "/usr/hive/data/input"; private static String HDFS_DATA_DIR = "/usr/hive/data/input";
private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/" + "batch_1.json"; private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/batch_1.json";
private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/" + "batch_2.json"; private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/batch_2.json";
private static String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/" + "presto-table-check.commands"; private static String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/presto-table-check.commands";
private static String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/" + "presto-batch1.commands"; private static String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/presto-batch1.commands";
private static String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/" + "presto-batch2-after-compaction.commands"; private static String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/presto-batch2-after-compaction.commands";
private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json"; private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json";
private static String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands"; private static String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands";
@@ -64,7 +64,7 @@ public class ITTestHoodieDemo extends ITTestBase {
private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands"; private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands";
private static String HIVE_SYNC_CMD_FMT = private static String HIVE_SYNC_CMD_FMT =
" --enable-hive-sync " + " --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 " " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
+ " --hoodie-conf hoodie.datasource.hive_sync.username=hive " + " --hoodie-conf hoodie.datasource.hive_sync.username=hive "
+ " --hoodie-conf hoodie.datasource.hive_sync.password=hive " + " --hoodie-conf hoodie.datasource.hive_sync.password=hive "
+ " --hoodie-conf hoodie.datasource.hive_sync.partition_fields=%s " + " --hoodie-conf hoodie.datasource.hive_sync.partition_fields=%s "
@@ -143,10 +143,10 @@ public class ITTestHoodieDemo extends ITTestBase {
assertStdOutContains(stdOutErrPair, "| stock_ticks_mor_rt |"); assertStdOutContains(stdOutErrPair, "| stock_ticks_mor_rt |");
assertStdOutContains(stdOutErrPair, assertStdOutContains(stdOutErrPair,
"| partition |\n" + "+----------------+\n" + "| dt=2018-08-31 |\n" + "+----------------+\n", 3); "| partition |\n+----------------+\n| dt=2018-08-31 |\n+----------------+\n", 3);
stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS); stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n" assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:29:00 |\n", 3); + "| GOOG | 2018-08-31 10:29:00 |\n", 3);
assertStdOutContains(stdOutErrPair, assertStdOutContains(stdOutErrPair,
"| symbol | ts | volume | open | close |\n" "| symbol | ts | volume | open | close |\n"
@@ -159,9 +159,9 @@ public class ITTestHoodieDemo extends ITTestBase {
private void testSparkSQLAfterFirstBatch() throws Exception { private void testSparkSQLAfterFirstBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH1_COMMANDS, true); Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH1_COMMANDS, true);
assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n" assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n"
+ "|default |stock_ticks_mor |false |\n" + "|default |stock_ticks_mor_rt |false |"); + "|default |stock_ticks_mor |false |\n|default |stock_ticks_mor_rt |false |");
assertStdOutContains(stdOutErrPair, assertStdOutContains(stdOutErrPair,
"+------+-------------------+\n" + "|GOOG |2018-08-31 10:29:00|\n" + "+------+-------------------+", 3); "+------+-------------------+\n|GOOG |2018-08-31 10:29:00|\n+------+-------------------+", 3);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3); assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|", 3); assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|", 3);
} }
@@ -203,9 +203,9 @@ public class ITTestHoodieDemo extends ITTestBase {
private void testHiveAfterSecondBatch() throws Exception { private void testHiveAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS); Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n" assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:29:00 |\n"); + "| GOOG | 2018-08-31 10:29:00 |\n");
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n" assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:59:00 |\n", 2); + "| GOOG | 2018-08-31 10:59:00 |\n", 2);
assertStdOutContains(stdOutErrPair, assertStdOutContains(stdOutErrPair,
"| symbol | ts | volume | open | close |\n" "| symbol | ts | volume | open | close |\n"
@@ -236,7 +236,7 @@ public class ITTestHoodieDemo extends ITTestBase {
private void testHiveAfterSecondBatchAfterCompaction() throws Exception { private void testHiveAfterSecondBatchAfterCompaction() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH2_COMMANDS); Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH2_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n" assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n"
+ "| GOOG | 2018-08-31 10:59:00 |", 2); + "| GOOG | 2018-08-31 10:59:00 |", 2);
assertStdOutContains(stdOutErrPair, assertStdOutContains(stdOutErrPair,
"| symbol | ts | volume | open | close |\n" "| symbol | ts | volume | open | close |\n"
@@ -259,12 +259,12 @@ public class ITTestHoodieDemo extends ITTestBase {
private void testSparkSQLAfterSecondBatch() throws Exception { private void testSparkSQLAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH2_COMMANDS, true); Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH2_COMMANDS, true);
assertStdOutContains(stdOutErrPair, assertStdOutContains(stdOutErrPair,
"+------+-------------------+\n" + "|GOOG |2018-08-31 10:59:00|\n" + "+------+-------------------+", 2); "+------+-------------------+\n|GOOG |2018-08-31 10:59:00|\n+------+-------------------+", 2);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3); assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 09:59:00|6330 |1230.5 |1230.02 |", 3);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215|", 2); assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215|", 2);
assertStdOutContains(stdOutErrPair, assertStdOutContains(stdOutErrPair,
"+------+-------------------+\n" + "|GOOG |2018-08-31 10:29:00|\n" + "+------+-------------------+"); "+------+-------------------+\n|GOOG |2018-08-31 10:29:00|\n+------+-------------------+");
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|"); assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|");
} }
@@ -291,10 +291,10 @@ public class ITTestHoodieDemo extends ITTestBase {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_INCREMENTAL_COMMANDS, true); Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_INCREMENTAL_COMMANDS, true);
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215|"); assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:59:00|9021 |1227.1993|1227.215|");
assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n" assertStdOutContains(stdOutErrPair, "|default |stock_ticks_cow |false |\n"
+ "|default |stock_ticks_derived_mor |false |\n" + "|default |stock_ticks_derived_mor_rt|false |\n" + "|default |stock_ticks_derived_mor |false |\n|default |stock_ticks_derived_mor_rt|false |\n"
+ "|default |stock_ticks_mor |false |\n" + "|default |stock_ticks_mor_rt |false |\n" + "|default |stock_ticks_mor |false |\n|default |stock_ticks_mor_rt |false |\n"
+ "| |stock_ticks_cow_incr |true |"); + "| |stock_ticks_cow_incr |true |");
assertStdOutContains(stdOutErrPair, "|count(1)|\n" + "+--------+\n" + "|99 |", 2); assertStdOutContains(stdOutErrPair, "|count(1)|\n+--------+\n|99 |", 2);
} }
private void scheduleAndRunCompaction() throws Exception { private void scheduleAndRunCompaction() throws Exception {

View File

@@ -98,7 +98,7 @@ public class DataSourceUtils {
} }
} }
throw new HoodieException( throw new HoodieException(
fieldName + "(Part -" + parts[i] + ") field not found in record. " + "Acceptable fields were :" fieldName + "(Part -" + parts[i] + ") field not found in record. Acceptable fields were :"
+ valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList())); + valueNode.getSchema().getFields().stream().map(Field::name).collect(Collectors.toList()));
} }

View File

@@ -53,11 +53,11 @@ public class QuickstartUtils {
private static final String[] DEFAULT_PARTITION_PATHS = private static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"ts\",\"type\": \"double\"}," + "{\"name\": \"uuid\", \"type\": \"string\"}," + "{\"name\": \"ts\",\"type\": \"double\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}"; + "{\"name\":\"fare\",\"type\": \"double\"}]}";
static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);

View File

@@ -181,7 +181,7 @@ public class HiveIncrementalPuller {
String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next(); String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) { if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
LOG.info("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable LOG.info("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
+ ", which means its pulling from a different table. Fencing this from " + "happening."); + ", which means its pulling from a different table. Fencing this from happening.");
throw new HoodieIncrementalPullSQLException( throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable); "Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable);
} }

View File

@@ -135,7 +135,7 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
LOG.info("Monitoring thread(s) !!"); LOG.info("Monitoring thread(s) !!");
future.get(); future.get();
} catch (ExecutionException ex) { } catch (ExecutionException ex) {
LOG.error("Monitor noticed one or more threads failed." + " Requesting graceful shutdown of other threads", ex); LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
error = true; error = true;
shutdown(false); shutdown(false);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {

View File

@@ -53,9 +53,9 @@ public class Compactor implements Serializable {
long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count(); long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count();
if (numWriteErrors != 0) { if (numWriteErrors != 0) {
// We treat even a single error in compaction as fatal // We treat even a single error in compaction as fatal
LOG.error("Compaction for instant (" + instant + ") failed with write errors. " + "Errors :" + numWriteErrors); LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
throw new HoodieException( throw new HoodieException(
"Compaction for instant (" + instant + ") failed with write errors. " + "Errors :" + numWriteErrors); "Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
} }
// Commit compaction // Commit compaction
compactionClient.commitCompaction(instant.getTimestamp(), res, Option.empty()); compactionClient.commitCompaction(instant.getTimestamp(), res, Option.empty());

View File

@@ -300,7 +300,7 @@ public class DeltaSync implements Serializable {
} }
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
LOG.info("No new data, source checkpoint has not changed. Nothing to commit." + "Old checkpoint=(" LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=("
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"); + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
return null; return null;
} }

View File

@@ -160,7 +160,7 @@ public class HoodieDeltaStreamer implements Serializable {
@Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true) @Parameter(names = {"--target-table"}, description = "name of the target table in Hive", required = true)
public String targetTableName; public String targetTableName;
@Parameter(names = {"--storage-type"}, description = "Type of Storage. " + "COPY_ON_WRITE (or) MERGE_ON_READ", @Parameter(names = {"--storage-type"}, description = "Type of Storage. COPY_ON_WRITE (or) MERGE_ON_READ",
required = true) required = true)
public String storageType; public String storageType;
@@ -213,7 +213,7 @@ public class HoodieDeltaStreamer implements Serializable {
public Operation operation = Operation.UPSERT; public Operation operation = Operation.UPSERT;
@Parameter(names = {"--filter-dupes"}, @Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out" + "before insert/bulk-insert") description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
public Boolean filterDupes = false; public Boolean filterDupes = false;
@Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive") @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
@@ -229,7 +229,7 @@ public class HoodieDeltaStreamer implements Serializable {
public Boolean continuousMode = false; public Boolean continuousMode = false;
@Parameter(names = {"--min-sync-interval-seconds"}, @Parameter(names = {"--min-sync-interval-seconds"},
description = "the min sync interval of each sync in " + "continuous mode") description = "the min sync interval of each sync in continuous mode")
public Integer minSyncIntervalSeconds = 0; public Integer minSyncIntervalSeconds = 0;
@Parameter(names = {"--spark-master"}, description = "spark master to use.") @Parameter(names = {"--spark-master"}, description = "spark master to use.")
@@ -259,7 +259,7 @@ public class HoodieDeltaStreamer implements Serializable {
* Compaction is enabled for MoR table by default. This flag disables it * Compaction is enabled for MoR table by default. This flag disables it
*/ */
@Parameter(names = {"--disable-compaction"}, @Parameter(names = {"--disable-compaction"},
description = "Compaction is enabled for MoR table by default." + "This flag disables it ") description = "Compaction is enabled for MoR table by default. This flag disables it ")
public Boolean forceDisableCompaction = false; public Boolean forceDisableCompaction = false;
/** /**

View File

@@ -47,10 +47,10 @@ public class SchedulerConfGenerator {
public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file"; public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";
private static String SPARK_SCHEDULING_PATTERN = private static String SPARK_SCHEDULING_PATTERN =
"<?xml version=\"1.0\"?>\n" + "<allocations>\n" + " <pool name=\"%s\">\n" "<?xml version=\"1.0\"?>\n<allocations>\n <pool name=\"%s\">\n"
+ " <schedulingMode>%s</schedulingMode>\n" + " <weight>%s</weight>\n" + " <minShare>%s</minShare>\n" + " <schedulingMode>%s</schedulingMode>\n <weight>%s</weight>\n <minShare>%s</minShare>\n"
+ " </pool>\n" + " <pool name=\"%s\">\n" + " <schedulingMode>%s</schedulingMode>\n" + " </pool>\n <pool name=\"%s\">\n <schedulingMode>%s</schedulingMode>\n"
+ " <weight>%s</weight>\n" + " <minShare>%s</minShare>\n" + " </pool>\n" + "</allocations>"; + " <weight>%s</weight>\n <minShare>%s</minShare>\n </pool>\n</allocations>";
private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare, private static String generateConfig(Integer deltaSyncWeight, Integer compactionWeight, Integer deltaSyncMinShare,
Integer compactionMinShare) { Integer compactionMinShare) {

View File

@@ -57,11 +57,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
static class Config { static class Config {
// One value from TimestampType above // One value from TimestampType above
private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen" + ".timebased.timestamp.type"; private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type";
private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP = private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP =
"hoodie.deltastreamer.keygen" + ".timebased.input" + ".dateformat"; "hoodie.deltastreamer.keygen.timebased.input.dateformat";
private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP = private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP =
"hoodie.deltastreamer.keygen" + ".timebased.output" + ".dateformat"; "hoodie.deltastreamer.keygen.timebased.output.dateformat";
} }
public TimestampBasedKeyGenerator(TypedProperties config) { public TimestampBasedKeyGenerator(TypedProperties config) {

View File

@@ -40,8 +40,8 @@ public class FilebasedSchemaProvider extends SchemaProvider {
* Configs supported. * Configs supported.
*/ */
public static class Config { public static class Config {
private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider" + ".source.schema.file"; private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.file";
private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider" + ".target.schema.file"; private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file";
} }
private final FileSystem fs; private final FileSystem fs;

View File

@@ -55,7 +55,7 @@ public class IncrSourceHelper {
public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath, public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath,
int numInstantsPerFetch, Option<String> beginInstant, boolean readLatestOnMissingBeginInstant) { int numInstantsPerFetch, Option<String> beginInstant, boolean readLatestOnMissingBeginInstant) {
Preconditions.checkArgument(numInstantsPerFetch > 0, Preconditions.checkArgument(numInstantsPerFetch > 0,
"Make sure the config" + " hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value"); "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
HoodieTableMetaClient srcMetaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), srcBasePath, true); HoodieTableMetaClient srcMetaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), srcBasePath, true);
final HoodieTimeline activeCommitTimeline = final HoodieTimeline activeCommitTimeline =