1
0

[HUDI-379] Refactor the codes based on new JavadocStyle code style rule (#1079)

This commit is contained in:
lamber-ken
2019-12-06 12:59:28 +08:00
committed by leesf
parent c06d89b648
commit 2745b7552f
137 changed files with 434 additions and 433 deletions

View File

@@ -62,7 +62,7 @@ import java.util.Properties;
import scala.Tuple2;
/**
* Loads data from Parquet Sources
* Loads data from Parquet Sources.
*/
public class HDFSParquetImporter implements Serializable {
@@ -190,7 +190,7 @@ public class HDFSParquetImporter implements Serializable {
}
/**
* Imports records to Hoodie dataset
* Imports records to Hoodie dataset.
*
* @param client Hoodie Client
* @param instantTime Instant Time

View File

@@ -41,17 +41,17 @@ public class HoodieCleaner {
private static volatile Logger log = LogManager.getLogger(HoodieCleaner.class);
/**
* Config for Cleaner
* Config for Cleaner.
*/
private final Config cfg;
/**
* Filesystem used
* Filesystem used.
*/
private transient FileSystem fs;
/**
* Spark context
* Spark context.
*/
private transient JavaSparkContext jssc;

View File

@@ -56,7 +56,7 @@ public class HoodieCompactionAdminTool {
}
/**
* Executes one of compaction admin operations
* Executes one of compaction admin operations.
*/
public void run(JavaSparkContext jsc) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
@@ -118,7 +118,7 @@ public class HoodieCompactionAdminTool {
}
/**
* Print Operation Result
* Print Operation Result.
*
* @param initialLine Initial Line
* @param result Result
@@ -131,14 +131,14 @@ public class HoodieCompactionAdminTool {
}
/**
* Operation Types
* Operation Types.
*/
public enum Operation {
VALIDATE, UNSCHEDULE_PLAN, UNSCHEDULE_FILE, REPAIR
}
/**
* Admin Configuration Options
* Admin Configuration Options.
*/
public static class Config implements Serializable {

View File

@@ -55,7 +55,7 @@ import java.util.List;
import java.util.Map;
/**
* Bunch of helper methods
* Bunch of helper methods.
*/
public class UtilHelpers {
private static Logger logger = LogManager.getLogger(UtilHelpers.class);
@@ -115,7 +115,7 @@ public class UtilHelpers {
}
/**
* Parse Schema from file
* Parse Schema from file.
*
* @param fs File System
* @param schemaFile Schema File
@@ -167,7 +167,7 @@ public class UtilHelpers {
}
/**
* Build Spark Context for ingestion/compaction
* Build Spark Context for ingestion/compaction.
*
* @return
*/
@@ -178,7 +178,7 @@ public class UtilHelpers {
}
/**
* Build Hoodie write client
* Build Hoodie write client.
*
* @param jsc Java Spark Context
* @param basePath Base Path

View File

@@ -41,7 +41,7 @@ import java.util.Properties;
/**
* This is an one-time use class meant for migrating the configuration for "hoodie.compaction.payload.class" in
* .hoodie/hoodie.properties from com.uber.hoodie to org.apache.hudi It takes in a file containing base-paths for a set
* .hoodie/hoodie.properties from com.uber.hoodie to org.apache.hudi . It takes in a file containing base-paths for a set
* of hudi datasets and does the migration
*/
public class UpgradePayloadFromUberToApache implements Serializable {

View File

@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/**
* Base Class for running delta-sync/compaction in separate thread and controlling their life-cyle
* Base Class for running delta-sync/compaction in separate thread and controlling their life-cyle.
*/
public abstract class AbstractDeltaStreamerService implements Serializable {
@@ -116,14 +116,14 @@ public abstract class AbstractDeltaStreamerService implements Serializable {
}
/**
* Service implementation
* Service implementation.
*
* @return
*/
protected abstract Pair<CompletableFuture, ExecutorService> startService();
/**
* A monitor thread is started which would trigger a callback if the service is shutdown
* A monitor thread is started which would trigger a callback if the service is shutdown.
*
* @param onShutdownCallback
*/

View File

@@ -33,7 +33,7 @@ import java.io.IOException;
import java.io.Serializable;
/**
* Run one round of compaction
* Run one round of compaction.
*/
public class Compactor implements Serializable {

View File

@@ -78,9 +78,8 @@ import scala.collection.JavaConversions;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
/**
* Sync's one batch of data to hoodie dataset
* Sync's one batch of data to hoodie dataset.
*/
public class DeltaSync implements Serializable {
@@ -89,12 +88,12 @@ public class DeltaSync implements Serializable {
public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
/**
* Delta Sync Config
* Delta Sync Config.
*/
private final HoodieDeltaStreamer.Config cfg;
/**
* Source to pull deltas from
* Source to pull deltas from.
*/
private transient SourceFormatAdapter formatAdapter;
@@ -104,32 +103,32 @@ public class DeltaSync implements Serializable {
private transient SchemaProvider schemaProvider;
/**
* Allows transforming source to target dataset before writing
* Allows transforming source to target dataset before writing.
*/
private transient Transformer transformer;
/**
* Extract the key for the target dataset
* Extract the key for the target dataset.
*/
private KeyGenerator keyGenerator;
/**
* Filesystem used
* Filesystem used.
*/
private transient FileSystem fs;
/**
* Spark context
* Spark context.
*/
private transient JavaSparkContext jssc;
/**
* Spark Session
* Spark Session.
*/
private transient SparkSession sparkSession;
/**
* Hive Config
* Hive Config.
*/
private transient HiveConf hiveConf;
@@ -139,22 +138,22 @@ public class DeltaSync implements Serializable {
private final TypedProperties props;
/**
* Callback when write client is instantiated
* Callback when write client is instantiated.
*/
private transient Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient;
/**
* Timeline with completed commits
* Timeline with completed commits.
*/
private transient Option<HoodieTimeline> commitTimelineOpt;
/**
* Write Client
* Write Client.
*/
private transient HoodieWriteClient writeClient;
/**
* Table Type
* Table Type.
*/
private final HoodieTableType tableType;
@@ -190,7 +189,7 @@ public class DeltaSync implements Serializable {
}
/**
* Refresh Timeline
* Refresh Timeline.
*/
private void refreshTimeline() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) {
@@ -204,7 +203,7 @@ public class DeltaSync implements Serializable {
}
/**
* Run one round of delta sync and return new compaction instant if one got scheduled
* Run one round of delta sync and return new compaction instant if one got scheduled.
*/
public Option<String> syncOnce() throws Exception {
Option<String> scheduledCompaction = Option.empty();
@@ -236,7 +235,7 @@ public class DeltaSync implements Serializable {
}
/**
* Read from Upstream Source and apply transformation if needed
* Read from Upstream Source and apply transformation if needed.
*/
private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
Option<HoodieTimeline> commitTimelineOpt) throws Exception {
@@ -321,7 +320,7 @@ public class DeltaSync implements Serializable {
}
/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed.
*
* @param records Input Records
* @param checkpointStr Checkpoint String
@@ -434,7 +433,7 @@ public class DeltaSync implements Serializable {
}
/**
* Sync to Hive
* Sync to Hive.
*/
private void syncHive() throws ClassNotFoundException {
if (cfg.enableHiveSync) {
@@ -462,7 +461,7 @@ public class DeltaSync implements Serializable {
}
/**
* Helper to construct Write Client config
* Helper to construct Write Client config.
*
* @param schemaProvider Schema Provider
*/
@@ -491,7 +490,7 @@ public class DeltaSync implements Serializable {
}
/**
* Register Avro Schemas
* Register Avro Schemas.
*
* @param schemaProvider Schema Provider
*/
@@ -510,7 +509,7 @@ public class DeltaSync implements Serializable {
}
/**
* Close all resources
* Close all resources.
*/
public void close() {
if (null != writeClient) {

View File

@@ -107,7 +107,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
/**
* Main method to start syncing
* Main method to start syncing.
*
* @throws Exception
*/
@@ -306,7 +306,7 @@ public class HoodieDeltaStreamer implements Serializable {
public static class DeltaSyncService extends AbstractDeltaStreamerService {
/**
* Delta Sync Config
* Delta Sync Config.
*/
private final HoodieDeltaStreamer.Config cfg;
@@ -316,12 +316,12 @@ public class HoodieDeltaStreamer implements Serializable {
private transient SchemaProvider schemaProvider;
/**
* Spark Session
* Spark Session.
*/
private transient SparkSession sparkSession;
/**
* Spark context
* Spark context.
*/
private transient JavaSparkContext jssc;
@@ -331,17 +331,17 @@ public class HoodieDeltaStreamer implements Serializable {
TypedProperties props;
/**
* Async Compactor Service
* Async Compactor Service.
*/
private AsyncCompactService asyncCompactService;
/**
* Table Type
* Table Type.
*/
private final HoodieTableType tableType;
/**
* Delta Sync
* Delta Sync.
*/
private transient DeltaSync deltaSync;
@@ -419,7 +419,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
/**
* Shutdown compactor as DeltaSync is shutdown
* Shutdown compactor as DeltaSync is shutdown.
*/
private void shutdownCompactor(boolean error) {
log.info("Delta Sync shutdown. Error ?" + error);
@@ -430,7 +430,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
/**
* Callback to initialize write client and start compaction service if required
* Callback to initialize write client and start compaction service if required.
*
* @param writeClient HoodieWriteClient
* @return
@@ -458,7 +458,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
/**
* Close all resources
* Close all resources.
*/
public void close() {
if (null != deltaSync) {
@@ -507,14 +507,14 @@ public class HoodieDeltaStreamer implements Serializable {
}
/**
* Enqueues new Pending compaction
* Enqueues new Pending compaction.
*/
public void enqueuePendingCompaction(HoodieInstant instant) {
pendingCompactions.add(instant);
}
/**
* Wait till outstanding pending compactions reduces to the passed in value
* Wait till outstanding pending compactions reduces to the passed in value.
*
* @param numPendingCompactions Maximum pending compactions allowed
* @throws InterruptedException
@@ -531,7 +531,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
/**
* Fetch Next pending compaction if available
* Fetch Next pending compaction if available.
*
* @return
* @throws InterruptedException
@@ -552,7 +552,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
/**
* Start Compaction Service
* Start Compaction Service.
*/
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction);

View File

@@ -60,7 +60,7 @@ public class SchedulerConfGenerator {
}
/**
* Helper to set Spark Scheduling Configs dynamically
* Helper to set Spark Scheduling Configs dynamically.
*
* @param cfg Config
*/

View File

@@ -39,7 +39,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
/**
* Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer)
* Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer).
*/
public final class SourceFormatAdapter {

View File

@@ -52,7 +52,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
private final String outputDateFormat;
/**
* Supported configs
* Supported configs.
*/
static class Config {

View File

@@ -32,12 +32,12 @@ import java.io.IOException;
import java.util.Collections;
/**
* A simple schema provider, that reads off files on DFS
* A simple schema provider, that reads off files on DFS.
*/
public class FilebasedSchemaProvider extends SchemaProvider {
/**
* Configs supported
* Configs supported.
*/
public static class Config {
private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider" + ".source.schema.file";

View File

@@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
/**
* Class to provide schema for reading data and also writing into a Hoodie table
* Class to provide schema for reading data and also writing into a Hoodie table.
*/
public abstract class SchemaProvider implements Serializable {

View File

@@ -32,14 +32,14 @@ import java.net.URL;
import java.util.Collections;
/**
* Obtains latest schema from the Confluent/Kafka schema-registry
* Obtains latest schema from the Confluent/Kafka schema-registry.
*
* https://github.com/confluentinc/schema-registry
*/
public class SchemaRegistryProvider extends SchemaProvider {
/**
* Configs supported
* Configs supported.
*/
public static class Config {

View File

@@ -34,7 +34,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* DFS Source that reads avro data
* DFS Source that reads avro data.
*/
public class AvroDFSSource extends AvroSource {

View File

@@ -36,7 +36,7 @@ import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry
* Reads avro serialized Kafka data, based on the confluent schema-registry.
*/
public class AvroKafkaSource extends AvroSource {

View File

@@ -66,7 +66,7 @@ public class HiveIncrPullSource extends AvroSource {
private final String incrPullRootPath;
/**
* Configs supported
* Configs supported.
*/
static class Config {

View File

@@ -40,25 +40,25 @@ public class HoodieIncrSource extends RowSource {
protected static class Config {
/**
* {@value #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie table
* {@value #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie table.
*/
private static final String HOODIE_SRC_BASE_PATH = "hoodie.deltastreamer.source.hoodieincr.path";
/**
* {@value #NUM_INSTANTS_PER_FETCH} allows the max number of instants whose changes can be incrementally fetched
* {@value #NUM_INSTANTS_PER_FETCH} allows the max number of instants whose changes can be incrementally fetched.
*/
private static final String NUM_INSTANTS_PER_FETCH = "hoodie.deltastreamer.source.hoodieincr.num_instants";
private static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1;
/**
* {@value #HOODIE_SRC_PARTITION_FIELDS} specifies partition fields that needs to be added to source table after
* parsing _hoodie_partition_path
* parsing _hoodie_partition_path.
*/
private static final String HOODIE_SRC_PARTITION_FIELDS = "hoodie.deltastreamer.source.hoodieincr.partition.fields";
/**
* {@value #HOODIE_SRC_PARTITION_EXTRACTORCLASS} PartitionValueExtractor class to extract partition fields from
* _hoodie_partition_path
* _hoodie_partition_path.
*/
private static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS =
"hoodie.deltastreamer.source.hoodieincr.partition.extractor.class";
@@ -90,7 +90,7 @@ public class HoodieIncrSource extends RowSource {
* props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",", new ArrayList<>()); PartitionValueExtractor
* extractor = DataSourceUtils.createPartitionExtractor(props.getString( Config.HOODIE_SRC_PARTITION_EXTRACTORCLASS,
* Config.DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS));
**/
*/
String srcPath = props.getString(Config.HOODIE_SRC_BASE_PATH);
int numInstantsPerFetch = props.getInteger(Config.NUM_INSTANTS_PER_FETCH, Config.DEFAULT_NUM_INSTANTS_PER_FETCH);
boolean readLatestOnMissingCkpt = props.getBoolean(Config.READ_LATEST_INSTANT_ON_MISSING_CKPT,
@@ -136,7 +136,7 @@ public class HoodieIncrSource extends RowSource {
* RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema));
*
* log.info("Validated Source Schema :" + validated.schema());
**/
*/
// Remove Hoodie meta columns except partition path from input source
final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream()

View File

@@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* DFS Source that reads json data
* DFS Source that reads json data.
*/
public class JsonDFSSource extends JsonSource {

View File

@@ -34,7 +34,7 @@ import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
/**
* Read json kafka data
* Read json kafka data.
*/
public class JsonKafkaSource extends JsonSource {

View File

@@ -32,7 +32,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
/**
* DFS Source that reads parquet data
* DFS Source that reads parquet data.
*/
public class ParquetDFSSource extends ParquetSource {

View File

@@ -63,7 +63,7 @@ public abstract class Source<T> implements Serializable {
protected abstract InputBatch<T> fetchNewData(Option<String> lastCkptStr, long sourceLimit);
/**
* Main API called by Hoodie Delta Streamer to fetch records
* Main API called by Hoodie Delta Streamer to fetch records.
*
* @param lastCkptStr Last Checkpoint
* @param sourceLimit Source Limit

View File

@@ -35,20 +35,20 @@ import java.io.Serializable;
public class AvroConvertor implements Serializable {
/**
* To be lazily inited on executors
* To be lazily inited on executors.
*/
private transient Schema schema;
private final String schemaStr;
/**
* To be lazily inited on executors
* To be lazily inited on executors.
*/
private transient MercifulJsonConverter jsonConverter;
/**
* To be lazily inited on executors
* To be lazily inited on executors.
*/
private transient Injection<GenericRecord, byte[]> recordInjection;

View File

@@ -43,7 +43,7 @@ import java.util.stream.Collectors;
public class DFSPathSelector {
/**
* Configs supported
* Configs supported.
*/
static class Config {

View File

@@ -31,7 +31,7 @@ import org.apache.spark.sql.Row;
public class IncrSourceHelper {
/**
* Get a timestamp which is the next value in a descending sequence
* Get a timestamp which is the next value in a descending sequence.
*
* @param timestamp Timestamp
*/
@@ -43,7 +43,7 @@ public class IncrSourceHelper {
}
/**
* Find begin and end instants to be set for the next fetch
* Find begin and end instants to be set for the next fetch.
*
* @param jssc Java Spark Context
* @param srcBasePath Base path of Hudi source table
@@ -77,7 +77,7 @@ public class IncrSourceHelper {
}
/**
* Validate instant time seen in the incoming row
* Validate instant time seen in the incoming row.
*
* @param row Input Row
* @param instantTime Hoodie Instant time of the row

View File

@@ -47,7 +47,7 @@ import scala.collection.mutable.StringBuilder;
import scala.util.Either;
/**
* Source to read data from Kafka, incrementally
* Source to read data from Kafka, incrementally.
*/
public class KafkaOffsetGen {
@@ -162,7 +162,7 @@ public class KafkaOffsetGen {
}
/**
* Kafka reset offset strategies
* Kafka reset offset strategies.
*/
enum KafkaResetOffsetStrategies {
LARGEST, SMALLEST

View File

@@ -39,7 +39,9 @@ public class FlatteningTransformer implements Transformer {
private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_";
private static volatile Logger log = LogManager.getLogger(SqlQueryBasedTransformer.class);
/** Configs supported */
/**
* Configs supported.
*/
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {

View File

@@ -26,7 +26,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Identity transformer
* Identity transformer.
*/
public class IdentityTransformer implements Transformer {

View File

@@ -42,7 +42,7 @@ public class SqlQueryBasedTransformer implements Transformer {
private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
/**
* Configs supported
* Configs supported.
*/
static class Config {

View File

@@ -26,12 +26,12 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Transform source to target dataset before writing
* Transform source to target dataset before writing.
*/
public interface Transformer {
/**
* Transform source RDD to target RDD
* Transform source RDD to target RDD.
*
* @param jsc JavaSparkContext
* @param sparkSession Spark Session

View File

@@ -560,12 +560,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
/**
* UDF to calculate Haversine distance
* UDF to calculate Haversine distance.
*/
public static class DistanceUDF implements UDF4<Double, Double, Double, Double, Double> {
/**
* Returns some random number as distance between the points
* Returns some random number as distance between the points.
*
* @param lat1 Latitiude of source
* @param lat2 Latitude of destination
@@ -580,7 +580,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
/**
* Adds a new field "haversine_distance" to the row
* Adds a new field "haversine_distance" to the row.
*/
public static class TripsWithDistanceTransformer implements Transformer {
@@ -601,7 +601,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
/**
* Return empty dataset
* Return empty dataset.
*/
public static class DropAllTransformer implements Transformer {

View File

@@ -119,7 +119,7 @@ public class UtilitiesTestBase {
}
/**
* Helper to get hive sync config
* Helper to get hive sync config.
*
* @param basePath
* @param tableName
@@ -140,7 +140,7 @@ public class UtilitiesTestBase {
}
/**
* Initialize Hive DB
* Initialize Hive DB.
*
* @throws IOException
*/

View File

@@ -48,7 +48,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Basic tests against all subclasses of {@link JsonDFSSource} and {@link ParquetDFSSource}
* Basic tests against all subclasses of {@link JsonDFSSource} and {@link ParquetDFSSource}.
*/
public class TestDFSSource extends UtilitiesTestBase {

View File

@@ -48,7 +48,7 @@ import java.util.HashMap;
import static org.junit.Assert.assertEquals;
/**
* Tests against {@link AvroKafkaSource}
* Tests against {@link AvroKafkaSource}.
*/
public class TestKafkaSource extends UtilitiesTestBase {

View File

@@ -19,7 +19,7 @@
package org.apache.hudi.utilities.sources.config;
/**
* Configurations for Test Data Sources
* Configurations for Test Data Sources.
*/
public class TestSourceConfig {