1
0

[HUDI-2596] Make class names consistent in hudi-client (#4680)

This commit is contained in:
Raymond Xu
2022-01-27 17:05:08 -08:00
committed by GitHub
parent 4a9f826382
commit 0bd38f26ca
68 changed files with 216 additions and 175 deletions

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -93,7 +93,7 @@ public class TestRollbacksCommand extends CLIFunctionalTestHarness {
.withRollbackUsingMarkers(false) .withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
try (AbstractHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) { try (BaseHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) {
// Rollback inflight commit3 and commit2 // Rollback inflight commit3 and commit2
client.rollback("102"); client.rollback("102");
client.rollback("101"); client.rollback("101");

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.async; package org.apache.hudi.async;
import org.apache.hudi.client.AbstractClusteringClient; import org.apache.hudi.client.BaseClusterer;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
@@ -44,19 +44,19 @@ public abstract class AsyncClusteringService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class); private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
private final int maxConcurrentClustering; private final int maxConcurrentClustering;
private transient AbstractClusteringClient clusteringClient; private transient BaseClusterer clusteringClient;
public AsyncClusteringService(AbstractHoodieWriteClient writeClient) { public AsyncClusteringService(BaseHoodieWriteClient writeClient) {
this(writeClient, false); this(writeClient, false);
} }
public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean runInDaemonMode) { public AsyncClusteringService(BaseHoodieWriteClient writeClient, boolean runInDaemonMode) {
super(runInDaemonMode); super(runInDaemonMode);
this.clusteringClient = createClusteringClient(writeClient); this.clusteringClient = createClusteringClient(writeClient);
this.maxConcurrentClustering = 1; this.maxConcurrentClustering = 1;
} }
protected abstract AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client); protected abstract BaseClusterer createClusteringClient(BaseHoodieWriteClient client);
/** /**
* Start clustering service. * Start clustering service.
@@ -94,7 +94,7 @@ public abstract class AsyncClusteringService extends HoodieAsyncService {
/** /**
* Update the write client to be used for clustering. * Update the write client to be used for clustering.
*/ */
public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) { public synchronized void updateWriteClient(BaseHoodieWriteClient writeClient) {
this.clusteringClient.updateWriteClient(writeClient); this.clusteringClient.updateWriteClient(writeClient);
} }
} }

View File

@@ -17,8 +17,8 @@
package org.apache.hudi.async; package org.apache.hudi.async;
import org.apache.hudi.client.AbstractCompactor; import org.apache.hudi.client.BaseCompactor;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -48,21 +48,21 @@ public abstract class AsyncCompactService extends HoodieAsyncService {
public static final String COMPACT_POOL_NAME = "hoodiecompact"; public static final String COMPACT_POOL_NAME = "hoodiecompact";
private final int maxConcurrentCompaction; private final int maxConcurrentCompaction;
private transient AbstractCompactor compactor; private transient BaseCompactor compactor;
protected transient HoodieEngineContext context; protected transient HoodieEngineContext context;
public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) { public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) {
this(context, client, false); this(context, client, false);
} }
public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client, boolean runInDaemonMode) { public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client, boolean runInDaemonMode) {
super(runInDaemonMode); super(runInDaemonMode);
this.context = context; this.context = context;
this.compactor = createCompactor(client); this.compactor = createCompactor(client);
this.maxConcurrentCompaction = 1; this.maxConcurrentCompaction = 1;
} }
protected abstract AbstractCompactor createCompactor(AbstractHoodieWriteClient client); protected abstract BaseCompactor createCompactor(BaseHoodieWriteClient client);
/** /**
* Start Compaction Service. * Start Compaction Service.
@@ -110,7 +110,7 @@ public abstract class AsyncCompactService extends HoodieAsyncService {
return false; return false;
} }
public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) { public synchronized void updateWriteClient(BaseHoodieWriteClient writeClient) {
this.compactor.updateWriteClient(writeClient); this.compactor.updateWriteClient(writeClient);
} }
} }

View File

@@ -36,10 +36,10 @@ class AsyncCleanerService extends HoodieAsyncService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class); private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
private final AbstractHoodieWriteClient writeClient; private final BaseHoodieWriteClient writeClient;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor(); private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) { protected AsyncCleanerService(BaseHoodieWriteClient writeClient) {
this.writeClient = writeClient; this.writeClient = writeClient;
} }
@@ -53,7 +53,7 @@ class AsyncCleanerService extends HoodieAsyncService {
}, executor), executor); }, executor), executor);
} }
public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) { public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) {
AsyncCleanerService asyncCleanerService = null; AsyncCleanerService asyncCleanerService = null;
if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) { if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
asyncCleanerService = new AsyncCleanerService(writeClient); asyncCleanerService = new AsyncCleanerService(writeClient);

View File

@@ -28,13 +28,13 @@ import java.io.Serializable;
/** /**
* Client will run one round of clustering. * Client will run one round of clustering.
*/ */
public abstract class AbstractClusteringClient<T extends HoodieRecordPayload, I, K, O> implements Serializable { public abstract class BaseClusterer<T extends HoodieRecordPayload, I, K, O> implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
protected transient AbstractHoodieWriteClient<T, I, K, O> clusteringClient; protected transient BaseHoodieWriteClient<T, I, K, O> clusteringClient;
public AbstractClusteringClient(AbstractHoodieWriteClient<T, I, K, O> clusteringClient) { public BaseClusterer(BaseHoodieWriteClient<T, I, K, O> clusteringClient) {
this.clusteringClient = clusteringClient; this.clusteringClient = clusteringClient;
} }
@@ -49,7 +49,7 @@ public abstract class AbstractClusteringClient<T extends HoodieRecordPayload, I,
* Update the write client used by async clustering. * Update the write client used by async clustering.
* @param writeClient * @param writeClient
*/ */
public void updateWriteClient(AbstractHoodieWriteClient<T, I, K, O> writeClient) { public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) {
this.clusteringClient = writeClient; this.clusteringClient = writeClient;
} }
} }

View File

@@ -27,19 +27,19 @@ import java.io.Serializable;
/** /**
* Run one round of compaction. * Run one round of compaction.
*/ */
public abstract class AbstractCompactor<T extends HoodieRecordPayload, I, K, O> implements Serializable { public abstract class BaseCompactor<T extends HoodieRecordPayload, I, K, O> implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
protected transient AbstractHoodieWriteClient<T, I, K, O> compactionClient; protected transient BaseHoodieWriteClient<T, I, K, O> compactionClient;
public AbstractCompactor(AbstractHoodieWriteClient<T, I, K, O> compactionClient) { public BaseCompactor(BaseHoodieWriteClient<T, I, K, O> compactionClient) {
this.compactionClient = compactionClient; this.compactionClient = compactionClient;
} }
public abstract void compact(HoodieInstant instant) throws IOException; public abstract void compact(HoodieInstant instant) throws IOException;
public void updateWriteClient(AbstractHoodieWriteClient<T, I, K, O> writeClient) { public void updateWriteClient(BaseHoodieWriteClient<T, I, K, O> writeClient) {
this.compactionClient = writeClient; this.compactionClient = writeClient;
} }

View File

@@ -40,9 +40,9 @@ import java.io.Serializable;
* Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs) Also, manages * Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs) Also, manages
* embedded timeline-server if enabled. * embedded timeline-server if enabled.
*/ */
public abstract class AbstractHoodieClient implements Serializable, AutoCloseable { public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class); private static final Logger LOG = LogManager.getLogger(BaseHoodieClient.class);
protected final transient FileSystem fs; protected final transient FileSystem fs;
protected final transient HoodieEngineContext context; protected final transient HoodieEngineContext context;
@@ -59,11 +59,11 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
private transient Option<EmbeddedTimelineService> timelineServer; private transient Option<EmbeddedTimelineService> timelineServer;
private final boolean shouldStopTimelineServer; private final boolean shouldStopTimelineServer;
protected AbstractHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, Option.empty()); this(context, clientConfig, Option.empty());
} }
protected AbstractHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService> timelineServer) { Option<EmbeddedTimelineService> timelineServer) {
this.hadoopConf = context.getHadoopConf().get(); this.hadoopConf = context.getHadoopConf().get();
this.fs = FSUtils.getFs(clientConfig.getBasePath(), hadoopConf); this.fs = FSUtils.getFs(clientConfig.getBasePath(), hadoopConf);

View File

@@ -98,11 +98,11 @@ import java.util.stream.Stream;
* @param <K> Type of keys * @param <K> Type of keys
* @param <O> Type of outputs * @param <O> Type of outputs
*/ */
public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I, K, O> extends AbstractHoodieClient { public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, O> extends BaseHoodieClient {
protected static final String LOOKUP_STR = "lookup"; protected static final String LOOKUP_STR = "lookup";
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class); private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
protected final transient HoodieMetrics metrics; protected final transient HoodieMetrics metrics;
private final transient HoodieIndex<T, ?, ?, ?> index; private final transient HoodieIndex<T, ?, ?, ?> index;
@@ -123,7 +123,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* @param writeConfig instance of HoodieWriteConfig * @param writeConfig instance of HoodieWriteConfig
*/ */
@Deprecated @Deprecated
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
this(context, writeConfig, Option.empty()); this(context, writeConfig, Option.empty());
} }
@@ -134,7 +134,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* @param timelineService Timeline Service that runs as part of write client. * @param timelineService Timeline Service that runs as part of write client.
*/ */
@Deprecated @Deprecated
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) { Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService); super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config); this.metrics = new HoodieMetrics(config);
@@ -359,7 +359,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* table for the very first time (e.g: converting an existing table to Hoodie). * table for the very first time (e.g: converting an existing table to Hoodie).
* <p> * <p>
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
* the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)} * the numbers of files with less memory compared to the {@link BaseHoodieWriteClient#insert(I, String)}
* *
* @param records HoodieRecords to insert * @param records HoodieRecords to insert
* @param instantTime Instant time of the commit * @param instantTime Instant time of the commit
@@ -372,7 +372,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* table for the very first time (e.g: converting an existing table to Hoodie). * table for the very first time (e.g: converting an existing table to Hoodie).
* <p> * <p>
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
* the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)}. Optionally * the numbers of files with less memory compared to the {@link BaseHoodieWriteClient#insert(I, String)}. Optionally
* it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See * it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See
* {@link BulkInsertPartitioner}. * {@link BulkInsertPartitioner}.
* *
@@ -392,7 +392,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* duplicates if needed. * duplicates if needed.
* <p> * <p>
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control * This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
* the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)}. Optionally * the numbers of files with less memory compared to the {@link BaseHoodieWriteClient#insert(I, String)}. Optionally
* it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See * it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See
* {@link BulkInsertPartitioner}. * {@link BulkInsertPartitioner}.
* *
@@ -606,7 +606,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
/** /**
* @Deprecated * @Deprecated
* Rollback the inflight record changes with the given commit time. This * Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)} * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)}
* Adding this api for backwards compatability. * Adding this api for backwards compatability.
* @param commitInstantTime Instant time of the commit * @param commitInstantTime Instant time of the commit
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped. * @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
@@ -620,7 +620,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
/** /**
* @Deprecated * @Deprecated
* Rollback the inflight record changes with the given commit time. This * Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)} * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)}
* *
* @param commitInstantTime Instant time of the commit * @param commitInstantTime Instant time of the commit
* @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt. * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt.
@@ -714,7 +714,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned). This API provides the flexibility to schedule clean instant asynchronously via * cleaned). This API provides the flexibility to schedule clean instant asynchronously via
* {@link AbstractHoodieWriteClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling * {@link BaseHoodieWriteClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
* of clean. * of clean.
* @param cleanInstantTime instant time for clean. * @param cleanInstantTime instant time for clean.
* @param scheduleInline true if needs to be scheduled inline. false otherwise. * @param scheduleInline true if needs to be scheduled inline. false otherwise.

View File

@@ -61,7 +61,7 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_AC
/** /**
* Client to perform admin operations related to compaction. * Client to perform admin operations related to compaction.
*/ */
public class CompactionAdminClient extends AbstractHoodieClient { public class CompactionAdminClient extends BaseHoodieClient {
private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class); private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class);

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieKeyException; import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@@ -161,9 +161,9 @@ public class KeyGenUtils {
/** /**
* Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed. * Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed.
*/ */
public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException { public static BaseHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
try { try {
return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props); return (BaseHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
} catch (Throwable e) { } catch (Throwable e) {
throw new IOException("Could not load date time parser class " + parserClass, e); throw new IOException("Could not load date time parser class " + parserClass, e);
} }
@@ -196,4 +196,4 @@ public class KeyGenUtils {
} }
return keyGenerator; return keyGenerator;
} }
} }

View File

@@ -26,8 +26,8 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyGeneratorException; import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; import org.apache.hudi.keygen.parser.BaseHoodieDateTimeParser;
import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl; import org.apache.hudi.keygen.parser.HoodieDateTimeParser;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormat;
@@ -56,7 +56,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
private final String outputDateFormat; private final String outputDateFormat;
private transient Option<DateTimeFormatter> inputFormatter; private transient Option<DateTimeFormatter> inputFormatter;
private transient DateTimeFormatter partitionFormatter; private transient DateTimeFormatter partitionFormatter;
private final AbstractHoodieDateTimeParser parser; private final BaseHoodieDateTimeParser parser;
// TimeZone detailed settings reference // TimeZone detailed settings reference
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
@@ -99,7 +99,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
TimestampBasedAvroKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException { TimestampBasedAvroKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
super(config, recordKeyField, partitionPathField); super(config, recordKeyField, partitionPathField);
String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName()); String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParser.class.getName());
this.parser = KeyGenUtils.createDateTimeParser(config, dateTimeParserClass); this.parser = KeyGenUtils.createDateTimeParser(config, dateTimeParserClass);
this.inputDateTimeZone = parser.getInputDateTimeZone(); this.inputDateTimeZone = parser.getInputDateTimeZone();
this.outputDateTimeZone = parser.getOutputDateTimeZone(); this.outputDateTimeZone = parser.getOutputDateTimeZone();

View File

@@ -25,12 +25,12 @@ import org.joda.time.format.DateTimeFormatter;
import java.io.Serializable; import java.io.Serializable;
public abstract class AbstractHoodieDateTimeParser implements Serializable { public abstract class BaseHoodieDateTimeParser implements Serializable {
protected final TypedProperties config; protected final TypedProperties config;
protected final String configInputDateFormatDelimiter; protected final String configInputDateFormatDelimiter;
public AbstractHoodieDateTimeParser(TypedProperties config) { public BaseHoodieDateTimeParser(TypedProperties config) {
this.config = config; this.config = config;
this.configInputDateFormatDelimiter = initInputDateFormatDelimiter(); this.configInputDateFormatDelimiter = initInputDateFormatDelimiter();
} }

View File

@@ -32,7 +32,7 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.TimeZone; import java.util.TimeZone;
public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser { public class HoodieDateTimeParser extends BaseHoodieDateTimeParser {
private String configInputDateFormatList; private String configInputDateFormatList;
@@ -40,7 +40,7 @@ public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
private final DateTimeZone inputDateTimeZone; private final DateTimeZone inputDateTimeZone;
public HoodieDateTimeParserImpl(TypedProperties config) { public HoodieDateTimeParser(TypedProperties config) {
super(config); super(config);
KeyGenUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); KeyGenUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
this.inputDateTimeZone = getInputDateTimeZone(); this.inputDateTimeZone = getInputDateTimeZone();

View File

@@ -24,7 +24,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData;
@@ -682,7 +682,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
* 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a
* deltacommit. * deltacommit.
*/ */
protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) { protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) {
// finish off any pending compactions if any from previous attempt. // finish off any pending compactions if any from previous attempt.
writeClient.runAnyPendingCompactions(); writeClient.runAnyPendingCompactions();
@@ -706,7 +706,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
} }
} }
protected void cleanIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) { protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) {
Option<HoodieInstant> lastCompletedCompactionInstant = metadataMetaClient.reloadActiveTimeline() Option<HoodieInstant> lastCompletedCompactionInstant = metadataMetaClient.reloadActiveTimeline()
.getCommitTimeline().filterCompletedInstants().lastInstant(); .getCommitTimeline().filterCompletedInstants().lastInstant();
if (lastCompletedCompactionInstant.isPresent() if (lastCompletedCompactionInstant.isPresent()

View File

@@ -23,12 +23,12 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter; import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter;
import org.apache.hudi.metrics.custom.CustomizableMetricsReporter;
import org.apache.hudi.metrics.datadog.DatadogMetricsReporter; import org.apache.hudi.metrics.datadog.DatadogMetricsReporter;
import com.codahale.metrics.MetricRegistry;
import org.apache.hudi.metrics.prometheus.PrometheusReporter; import org.apache.hudi.metrics.prometheus.PrometheusReporter;
import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter; import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter;
import org.apache.hudi.metrics.userdefined.AbstractUserDefinedMetricsReporter;
import com.codahale.metrics.MetricRegistry;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@@ -47,9 +47,9 @@ public class MetricsReporterFactory {
if (!StringUtils.isNullOrEmpty(reporterClassName)) { if (!StringUtils.isNullOrEmpty(reporterClassName)) {
Object instance = ReflectionUtils.loadClass( Object instance = ReflectionUtils.loadClass(
reporterClassName, new Class<?>[] {Properties.class, MetricRegistry.class}, config.getProps(), registry); reporterClassName, new Class<?>[] {Properties.class, MetricRegistry.class}, config.getProps(), registry);
if (!(instance instanceof AbstractUserDefinedMetricsReporter)) { if (!(instance instanceof CustomizableMetricsReporter)) {
throw new HoodieException(config.getMetricReporterClassName() throw new HoodieException(config.getMetricReporterClassName()
+ " is not a subclass of AbstractUserDefinedMetricsReporter"); + " is not a subclass of CustomizableMetricsReporter");
} }
return (MetricsReporter) instance; return (MetricsReporter) instance;
} }

View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metrics.custom;
import org.apache.hudi.metrics.MetricsReporter;
import com.codahale.metrics.MetricRegistry;
import java.util.Properties;
/**
* Extensible metrics reporter for custom implementation.
*/
public abstract class CustomizableMetricsReporter extends MetricsReporter {
private Properties props;
private MetricRegistry registry;
public CustomizableMetricsReporter(Properties props, MetricRegistry registry) {
this.props = props;
this.registry = registry;
}
public Properties getProps() {
return props;
}
public MetricRegistry getRegistry() {
return registry;
}
}

View File

@@ -7,38 +7,31 @@
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing,
* distributed under the License is distributed on an "AS IS" BASIS, * software distributed under the License is distributed on an
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* See the License for the specific language governing permissions and * KIND, either express or implied. See the License for the
* limitations under the License. * specific language governing permissions and limitations
* under the License.
*/ */
package org.apache.hudi.metrics.userdefined; package org.apache.hudi.metrics.userdefined;
import org.apache.hudi.metrics.custom.CustomizableMetricsReporter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import org.apache.hudi.metrics.MetricsReporter;
import java.util.Properties; import java.util.Properties;
/** /**
* Abstract class of user defined metrics reporter. * @deprecated Extend {@link CustomizableMetricsReporter} instead.
*/ */
public abstract class AbstractUserDefinedMetricsReporter extends MetricsReporter { @Deprecated
private Properties props; public abstract class AbstractUserDefinedMetricsReporter extends CustomizableMetricsReporter {
private MetricRegistry registry;
public AbstractUserDefinedMetricsReporter(Properties props, MetricRegistry registry) { public AbstractUserDefinedMetricsReporter(Properties props, MetricRegistry registry) {
this.props = props; super(props, registry);
this.registry = registry;
} }
}
public Properties getProps() {
return props;
}
public MetricRegistry getRegistry() {
return registry;
}
}

View File

@@ -26,7 +26,7 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
public abstract class AbstractBulkInsertHelper<T extends HoodieRecordPayload, I, K, O, R> { public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K, O, R> {
/** /**
* Mark instant as inflight, write input records, update index and return result. * Mark instant as inflight, write input records, update index and return result.

View File

@@ -72,7 +72,7 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I,
this.operationType = operationType; this.operationType = operationType;
this.extraMetadata = extraMetadata; this.extraMetadata = extraMetadata;
this.taskContextSupplier = context.getTaskContextSupplier(); this.taskContextSupplier = context.getTaskContextSupplier();
// TODO : Remove this once we refactor and move out autoCommit method from here, since the TxnManager is held in {@link AbstractHoodieWriteClient}. // TODO : Remove this once we refactor and move out autoCommit method from here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs()); this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient()); this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
if (table.getStorageLayout().doesNotSupport(operationType)) { if (table.getStorageLayout().doesNotSupport(operationType)) {

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
* *
* @param <T> * @param <T>
*/ */
public abstract class AbstractDeleteHelper<T extends HoodieRecordPayload, I, K, O, R> { public abstract class BaseDeleteHelper<T extends HoodieRecordPayload, I, K, O, R> {
/** /**
* Deduplicate Hoodie records, using the given deduplication function. * Deduplicate Hoodie records, using the given deduplication function.

View File

@@ -47,7 +47,7 @@ import java.util.Iterator;
/** /**
* Helper to read records from previous version of base file and run Merge. * Helper to read records from previous version of base file and run Merge.
*/ */
public abstract class AbstractMergeHelper<T extends HoodieRecordPayload, I, K, O> { public abstract class BaseMergeHelper<T extends HoodieRecordPayload, I, K, O> {
/** /**
* Read records from previous version of base file and merge. * Read records from previous version of base file and merge.

View File

@@ -30,7 +30,7 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
public abstract class AbstractWriteHelper<T extends HoodieRecordPayload, I, K, O, R> { public abstract class BaseWriteHelper<T extends HoodieRecordPayload, I, K, O, R> {
public HoodieWriteMetadata<O> write(String instantTime, public HoodieWriteMetadata<O> write(String instantTime,
I inputRecords, I inputRecords,

View File

@@ -35,10 +35,10 @@ public interface DowngradeHandler {
* @param config instance of {@link HoodieWriteConfig} to be used. * @param config instance of {@link HoodieWriteConfig} to be used.
* @param context instance of {@link HoodieEngineContext} to be used. * @param context instance of {@link HoodieEngineContext} to be used.
* @param instantTime current instant time that should not touched. * @param instantTime current instant time that should not touched.
* @param upgradeDowngradeHelper instance of {@link BaseUpgradeDowngradeHelper} to be used. * @param upgradeDowngradeHelper instance of {@link SupportsUpgradeDowngrade} to be used.
* @return Map of config properties and its values to be added to table properties. * @return Map of config properties and its values to be added to table properties.
*/ */
Map<ConfigProperty, String> downgrade( Map<ConfigProperty, String> downgrade(
HoodieWriteConfig config, HoodieEngineContext context, String instantTime, HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
BaseUpgradeDowngradeHelper upgradeDowngradeHelper); SupportsUpgradeDowngrade upgradeDowngradeHelper);
} }

View File

@@ -35,7 +35,7 @@ public class OneToTwoUpgradeHandler implements UpgradeHandler {
@Override @Override
public Map<ConfigProperty, String> upgrade( public Map<ConfigProperty, String> upgrade(
HoodieWriteConfig config, HoodieEngineContext context, String instantTime, HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { SupportsUpgradeDowngrade upgradeDowngradeHelper) {
Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>(); Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, upgradeDowngradeHelper.getPartitionColumns(config)); tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS, upgradeDowngradeHelper.getPartitionColumns(config));
tablePropsToAdd.put(HoodieTableConfig.RECORDKEY_FIELDS, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())); tablePropsToAdd.put(HoodieTableConfig.RECORDKEY_FIELDS, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));

View File

@@ -40,7 +40,7 @@ public class OneToZeroDowngradeHandler implements DowngradeHandler {
@Override @Override
public Map<ConfigProperty, String> downgrade( public Map<ConfigProperty, String> downgrade(
HoodieWriteConfig config, HoodieEngineContext context, String instantTime, HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { SupportsUpgradeDowngrade upgradeDowngradeHelper) {
HoodieTable table = upgradeDowngradeHelper.getTable(config, context); HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
// fetch pending commit info // fetch pending commit info
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();

View File

@@ -26,7 +26,7 @@ import org.apache.hudi.table.HoodieTable;
/** /**
* Interface for engine-specific logic needed for upgrade and downgrade actions. * Interface for engine-specific logic needed for upgrade and downgrade actions.
*/ */
public interface BaseUpgradeDowngradeHelper { public interface SupportsUpgradeDowngrade {
/** /**
* @param config Write config. * @param config Write config.
* @param context {@link HoodieEngineContext} instance to use. * @param context {@link HoodieEngineContext} instance to use.

View File

@@ -33,7 +33,7 @@ import java.util.Map;
public class ThreeToTwoDowngradeHandler implements DowngradeHandler { public class ThreeToTwoDowngradeHandler implements DowngradeHandler {
@Override @Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
if (config.isMetadataTableEnabled()) { if (config.isMetadataTableEnabled()) {
// Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous // Metadata Table in version 3 is synchronous and in version 2 is asynchronous. Downgrading to asynchronous
// removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the // removes the checks in code to decide whether to use a LogBlock or not. Also, the schema for the

View File

@@ -54,7 +54,7 @@ public class TwoToOneDowngradeHandler implements DowngradeHandler {
@Override @Override
public Map<ConfigProperty, String> downgrade( public Map<ConfigProperty, String> downgrade(
HoodieWriteConfig config, HoodieEngineContext context, String instantTime, HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { SupportsUpgradeDowngrade upgradeDowngradeHelper) {
HoodieTable table = upgradeDowngradeHelper.getTable(config, context); HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
HoodieTableMetaClient metaClient = table.getMetaClient(); HoodieTableMetaClient metaClient = table.getMetaClient();

View File

@@ -35,7 +35,7 @@ import java.util.Map;
*/ */
public class TwoToThreeUpgradeHandler implements UpgradeHandler { public class TwoToThreeUpgradeHandler implements UpgradeHandler {
@Override @Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
if (config.isMetadataTableEnabled()) { if (config.isMetadataTableEnabled()) {
// Metadata Table in version 2 is asynchronous and in version 3 is synchronous. Synchronous table will not // Metadata Table in version 2 is asynchronous and in version 3 is synchronous. Synchronous table will not
// sync any instants not already synced. So its simpler to re-bootstrap the table. Also, the schema for the // sync any instants not already synced. So its simpler to re-bootstrap the table. Also, the schema for the

View File

@@ -42,7 +42,7 @@ public class UpgradeDowngrade {
private static final Logger LOG = LogManager.getLogger(UpgradeDowngrade.class); private static final Logger LOG = LogManager.getLogger(UpgradeDowngrade.class);
public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated"; public static final String HOODIE_UPDATED_PROPERTY_FILE = "hoodie.properties.updated";
private final BaseUpgradeDowngradeHelper upgradeDowngradeHelper; private final SupportsUpgradeDowngrade upgradeDowngradeHelper;
private HoodieTableMetaClient metaClient; private HoodieTableMetaClient metaClient;
protected HoodieWriteConfig config; protected HoodieWriteConfig config;
protected HoodieEngineContext context; protected HoodieEngineContext context;
@@ -52,7 +52,7 @@ public class UpgradeDowngrade {
public UpgradeDowngrade( public UpgradeDowngrade(
HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context,
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { SupportsUpgradeDowngrade upgradeDowngradeHelper) {
this.metaClient = metaClient; this.metaClient = metaClient;
this.config = config; this.config = config;
this.context = context; this.context = context;

View File

@@ -35,10 +35,10 @@ public interface UpgradeHandler {
* @param config instance of {@link HoodieWriteConfig} to be used. * @param config instance of {@link HoodieWriteConfig} to be used.
* @param context instance of {@link HoodieEngineContext} to be used. * @param context instance of {@link HoodieEngineContext} to be used.
* @param instantTime current instant time that should not be touched. * @param instantTime current instant time that should not be touched.
* @param upgradeDowngradeHelper instance of {@link BaseUpgradeDowngradeHelper} to be used. * @param upgradeDowngradeHelper instance of {@link SupportsUpgradeDowngrade} to be used.
* @return Map of config properties and its values to be added to table properties. * @return Map of config properties and its values to be added to table properties.
*/ */
Map<ConfigProperty, String> upgrade( Map<ConfigProperty, String> upgrade(
HoodieWriteConfig config, HoodieEngineContext context, String instantTime, HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
BaseUpgradeDowngradeHelper upgradeDowngradeHelper); SupportsUpgradeDowngrade upgradeDowngradeHelper);
} }

View File

@@ -57,7 +57,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
@Override @Override
public Map<ConfigProperty, String> upgrade( public Map<ConfigProperty, String> upgrade(
HoodieWriteConfig config, HoodieEngineContext context, String instantTime, HoodieWriteConfig config, HoodieEngineContext context, String instantTime,
BaseUpgradeDowngradeHelper upgradeDowngradeHelper) { SupportsUpgradeDowngrade upgradeDowngradeHelper) {
// fetch pending commit info // fetch pending commit info
HoodieTable table = upgradeDowngradeHelper.getTable(config, context); HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();

View File

@@ -21,10 +21,10 @@ package org.apache.hudi.metrics;
import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.custom.CustomizableMetricsReporter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.userdefined.AbstractUserDefinedMetricsReporter;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
@@ -63,7 +63,7 @@ public class TestMetricsReporterFactory {
when(config.getProps()).thenReturn(props); when(config.getProps()).thenReturn(props);
MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry); MetricsReporter reporter = MetricsReporterFactory.createReporter(config, registry);
assertTrue(reporter instanceof AbstractUserDefinedMetricsReporter); assertTrue(reporter instanceof CustomizableMetricsReporter);
assertEquals(props, ((DummyMetricsReporter) reporter).getProps()); assertEquals(props, ((DummyMetricsReporter) reporter).getProps());
assertEquals(registry, ((DummyMetricsReporter) reporter).getRegistry()); assertEquals(registry, ((DummyMetricsReporter) reporter).getRegistry());
} }
@@ -75,7 +75,7 @@ public class TestMetricsReporterFactory {
assertThrows(HoodieException.class, () -> MetricsReporterFactory.createReporter(config, registry)); assertThrows(HoodieException.class, () -> MetricsReporterFactory.createReporter(config, registry));
} }
public static class DummyMetricsReporter extends AbstractUserDefinedMetricsReporter { public static class DummyMetricsReporter extends CustomizableMetricsReporter {
public DummyMetricsReporter(Properties props, MetricRegistry registry) { public DummyMetricsReporter(Properties props, MetricRegistry registry) {
super(props, registry); super(props, registry);

View File

@@ -19,12 +19,12 @@
package org.apache.hudi.testutils.providers; package org.apache.hudi.testutils.providers;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import java.io.IOException; import java.io.IOException;
public interface HoodieWriteClientProvider { public interface HoodieWriteClientProvider {
AbstractHoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException; BaseHoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException;
} }

View File

@@ -78,7 +78,7 @@ import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class); private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class);

View File

@@ -132,7 +132,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime); throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
} }
}); });
// flink does not support auto-commit yet, also the auto commit logic is not complete as AbstractHoodieWriteClient now. // flink does not support auto-commit yet, also the auto commit logic is not complete as BaseHoodieWriteClient now.
writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
// reload timeline // reload timeline

View File

@@ -43,7 +43,7 @@ import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class FlinkDeleteHelper<R> extends public class FlinkDeleteHelper<R> extends
AbstractDeleteHelper<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> { BaseDeleteHelper<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> {
private FlinkDeleteHelper() { private FlinkDeleteHelper() {
} }

View File

@@ -45,7 +45,7 @@ import java.util.Iterator;
import scala.collection.immutable.List; import scala.collection.immutable.List;
public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMergeHelper<T, List<HoodieRecord<T>>, public class FlinkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> { List<HoodieKey>, List<WriteStatus>> {
private FlinkMergeHelper() { private FlinkMergeHelper() {

View File

@@ -48,7 +48,7 @@ import java.util.stream.Collectors;
* <p>Computing the records batch locations all at a time is a pressure to the engine, * <p>Computing the records batch locations all at a time is a pressure to the engine,
* we should avoid that in streaming system. * we should avoid that in streaming system.
*/ */
public class FlinkWriteHelper<T extends HoodieRecordPayload, R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>, public class FlinkWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>, R> { List<HoodieKey>, List<WriteStatus>, R> {
private FlinkWriteHelper() { private FlinkWriteHelper() {

View File

@@ -29,7 +29,7 @@ import org.apache.hudi.table.HoodieTable;
/** /**
* Flink upgrade and downgrade helper. * Flink upgrade and downgrade helper.
*/ */
public class FlinkUpgradeDowngradeHelper implements BaseUpgradeDowngradeHelper { public class FlinkUpgradeDowngradeHelper implements SupportsUpgradeDowngrade {
private static final FlinkUpgradeDowngradeHelper SINGLETON_INSTANCE = private static final FlinkUpgradeDowngradeHelper SINGLETON_INSTANCE =
new FlinkUpgradeDowngradeHelper(); new FlinkUpgradeDowngradeHelper();

View File

@@ -50,7 +50,7 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> { BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig); super(context, clientConfig);

View File

@@ -39,12 +39,12 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* A java implementation of {@link AbstractBulkInsertHelper}. * A java implementation of {@link BaseBulkInsertHelper}.
* *
* @param <T> * @param <T>
*/ */
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends AbstractBulkInsertHelper<T, List<HoodieRecord<T>>, public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends BaseBulkInsertHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>, R> { List<HoodieKey>, List<WriteStatus>, R> {
private JavaBulkInsertHelper() { private JavaBulkInsertHelper() {

View File

@@ -43,7 +43,7 @@ import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class JavaDeleteHelper<R> extends public class JavaDeleteHelper<R> extends
AbstractDeleteHelper<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> { BaseDeleteHelper<EmptyHoodieRecordPayload, List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>, R> {
private JavaDeleteHelper() { private JavaDeleteHelper() {
} }

View File

@@ -44,7 +44,7 @@ import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
public class JavaMergeHelper<T extends HoodieRecordPayload> extends AbstractMergeHelper<T, List<HoodieRecord<T>>, public class JavaMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> { List<HoodieKey>, List<WriteStatus>> {
private JavaMergeHelper() { private JavaMergeHelper() {

View File

@@ -33,7 +33,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>, public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>, R> { List<HoodieKey>, List<WriteStatus>, R> {
private JavaWriteHelper() { private JavaWriteHelper() {

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.async; package org.apache.hudi.async;
import org.apache.hudi.client.AbstractClusteringClient; import org.apache.hudi.client.BaseClusterer;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieSparkClusteringClient; import org.apache.hudi.client.HoodieSparkClusteringClient;
/** /**
@@ -28,12 +28,12 @@ import org.apache.hudi.client.HoodieSparkClusteringClient;
*/ */
public class SparkAsyncClusteringService extends AsyncClusteringService { public class SparkAsyncClusteringService extends AsyncClusteringService {
public SparkAsyncClusteringService(AbstractHoodieWriteClient writeClient) { public SparkAsyncClusteringService(BaseHoodieWriteClient writeClient) {
super(writeClient); super(writeClient);
} }
@Override @Override
protected AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client) { protected BaseClusterer createClusteringClient(BaseHoodieWriteClient client) {
return new HoodieSparkClusteringClient(client); return new HoodieSparkClusteringClient(client);
} }
} }

View File

@@ -18,19 +18,19 @@
package org.apache.hudi.async; package org.apache.hudi.async;
import org.apache.hudi.client.AbstractCompactor; import org.apache.hudi.client.BaseCompactor;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieSparkCompactor; import org.apache.hudi.client.HoodieSparkCompactor;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
public class SparkAsyncCompactService extends AsyncCompactService { public class SparkAsyncCompactService extends AsyncCompactService {
public SparkAsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) { public SparkAsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) {
super(context, client); super(context, client);
} }
@Override @Override
protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) { protected BaseCompactor createCompactor(BaseHoodieWriteClient client) {
return new HoodieSparkCompactor(client, this.context); return new HoodieSparkCompactor(client, this.context);
} }
} }

View File

@@ -38,12 +38,12 @@ import java.util.stream.Stream;
* Async clustering client for Spark datasource. * Async clustering client for Spark datasource.
*/ */
public class HoodieSparkClusteringClient<T extends HoodieRecordPayload> extends public class HoodieSparkClusteringClient<T extends HoodieRecordPayload> extends
AbstractClusteringClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { BaseClusterer<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(HoodieSparkClusteringClient.class); private static final Logger LOG = LogManager.getLogger(HoodieSparkClusteringClient.class);
public HoodieSparkClusteringClient( public HoodieSparkClusteringClient(
AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> clusteringClient) { BaseHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> clusteringClient) {
super(clusteringClient); super(clusteringClient);
} }

View File

@@ -31,12 +31,12 @@ import org.apache.spark.api.java.JavaRDD;
import java.io.IOException; import java.io.IOException;
public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends AbstractCompactor<T, public class HoodieSparkCompactor<T extends HoodieRecordPayload> extends BaseCompactor<T,
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(HoodieSparkCompactor.class); private static final Logger LOG = LogManager.getLogger(HoodieSparkCompactor.class);
private transient HoodieEngineContext context; private transient HoodieEngineContext context;
public HoodieSparkCompactor(AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> compactionClient, public HoodieSparkCompactor(BaseHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> compactionClient,
HoodieEngineContext context) { HoodieEngineContext context) {
super(compactionClient); super(compactionClient);
this.context = context; this.context = context;

View File

@@ -74,7 +74,7 @@ import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { BaseHoodieWriteClient<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkRDDWriteClient.class); private static final Logger LOG = LogManager.getLogger(SparkRDDWriteClient.class);

View File

@@ -41,12 +41,12 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
/** /**
* A spark implementation of {@link AbstractBulkInsertHelper}. * A spark implementation of {@link BaseBulkInsertHelper}.
* *
* @param <T> * @param <T>
*/ */
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends AbstractBulkInsertHelper<T, JavaRDD<HoodieRecord<T>>, public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends BaseBulkInsertHelper<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> { JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
private SparkBulkInsertHelper() { private SparkBulkInsertHelper() {

View File

@@ -42,13 +42,13 @@ import java.time.Instant;
import java.util.HashMap; import java.util.HashMap;
/** /**
* A spark implementation of {@link AbstractDeleteHelper}. * A spark implementation of {@link BaseDeleteHelper}.
* *
* @param <T> * @param <T>
*/ */
@SuppressWarnings("checkstyle:LineLength") @SuppressWarnings("checkstyle:LineLength")
public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends
AbstractDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> { BaseDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
private SparkDeleteHelper() { private SparkDeleteHelper() {
} }

View File

@@ -43,7 +43,7 @@ import org.apache.spark.api.java.JavaRDD;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
public class SparkMergeHelper<T extends HoodieRecordPayload> extends AbstractMergeHelper<T, JavaRDD<HoodieRecord<T>>, public class SparkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
private SparkMergeHelper() { private SparkMergeHelper() {

View File

@@ -32,11 +32,11 @@ import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2; import scala.Tuple2;
/** /**
* A spark implementation of {@link AbstractWriteHelper}. * A spark implementation of {@link BaseWriteHelper}.
* *
* @param <T> * @param <T>
*/ */
public class SparkWriteHelper<T extends HoodieRecordPayload,R> extends AbstractWriteHelper<T, JavaRDD<HoodieRecord<T>>, public class SparkWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteHelper<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> { JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
private SparkWriteHelper() { private SparkWriteHelper() {
} }

View File

@@ -43,19 +43,19 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
public abstract class AbstractSparkDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> public abstract class BaseSparkDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> { extends BaseSparkCommitActionExecutor<T> {
private static final Logger LOG = LogManager.getLogger(AbstractSparkDeltaCommitActionExecutor.class); private static final Logger LOG = LogManager.getLogger(BaseSparkDeltaCommitActionExecutor.class);
// UpsertPartitioner for MergeOnRead table type // UpsertPartitioner for MergeOnRead table type
private SparkUpsertDeltaCommitPartitioner mergeOnReadUpsertPartitioner; private SparkUpsertDeltaCommitPartitioner mergeOnReadUpsertPartitioner;
public AbstractSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, public BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) { String instantTime, WriteOperationType operationType) {
this(context, config, table, instantTime, operationType, Option.empty()); this(context, config, table, instantTime, operationType, Option.empty());
} }
public AbstractSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table, public BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType, String instantTime, WriteOperationType operationType,
Option<Map<String, String>> extraMetadata) { Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, operationType, extraMetadata); super(context, config, table, instantTime, operationType, extraMetadata);

View File

@@ -18,8 +18,6 @@
package org.apache.hudi.table.action.deltacommit; package org.apache.hudi.table.action.deltacommit;
import java.util.Map;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
@@ -28,15 +26,17 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import java.util.Map;
public class SparkBulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> public class SparkBulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends AbstractSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD; private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner; private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;

View File

@@ -26,15 +26,15 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
public class SparkBulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> public class SparkBulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends AbstractSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd; private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner; private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
@@ -61,4 +61,4 @@ public class SparkBulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRec
} }
} }
} }

View File

@@ -25,13 +25,13 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkDeleteHelper; import org.apache.hudi.table.action.commit.SparkDeleteHelper;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
public class SparkDeleteDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> public class SparkDeleteDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends AbstractSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieKey> keys; private final JavaRDD<HoodieKey> keys;

View File

@@ -25,13 +25,13 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkWriteHelper; import org.apache.hudi.table.action.commit.SparkWriteHelper;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
public class SparkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> public class SparkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends AbstractSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD; private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;

View File

@@ -26,10 +26,11 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
public class SparkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> public class SparkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends AbstractSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedRecords; private final JavaRDD<HoodieRecord<T>> preppedRecords;

View File

@@ -24,13 +24,13 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkWriteHelper; import org.apache.hudi.table.action.commit.SparkWriteHelper;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
public class SparkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> public class SparkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends AbstractSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private JavaRDD<HoodieRecord<T>> inputRecordsRDD; private JavaRDD<HoodieRecord<T>> inputRecordsRDD;

View File

@@ -26,10 +26,11 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
public class SparkUpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> public class SparkUpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends AbstractSparkDeltaCommitActionExecutor<T> { extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedRecords; private final JavaRDD<HoodieRecord<T>> preppedRecords;

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.table.HoodieTable;
/** /**
* Spark upgrade and downgrade helper. * Spark upgrade and downgrade helper.
*/ */
public class SparkUpgradeDowngradeHelper implements BaseUpgradeDowngradeHelper { public class SparkUpgradeDowngradeHelper implements SupportsUpgradeDowngrade {
private static final SparkUpgradeDowngradeHelper SINGLETON_INSTANCE = private static final SparkUpgradeDowngradeHelper SINGLETON_INSTANCE =
new SparkUpgradeDowngradeHelper(); new SparkUpgradeDowngradeHelper();

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.client.functional; package org.apache.hudi.client.functional;
import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
@@ -90,6 +89,7 @@ import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;

View File

@@ -21,7 +21,7 @@ package org.apache.hudi.client.functional;
import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.SparkTaskContextSupplier;
@@ -687,7 +687,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
} }
/** /**
* Test one of HoodieConcatHandle w/ {@link AbstractHoodieWriteClient#insert(Object, String)} API. * Test one of HoodieConcatHandle w/ {@link BaseHoodieWriteClient#insert(Object, String)} API.
* *
* @param config Write Config * @param config Write Config
* @throws Exception in case of error * @throws Exception in case of error

View File

@@ -44,7 +44,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
@@ -552,7 +552,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
// initialize partitioner // initialize partitioner
hoodieTable.getHoodieView().sync(); hoodieTable.getHoodieView().sync();
AbstractSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable, BaseSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable,
newDeleteTime, deleteRDD); newDeleteTime, deleteRDD);
actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD))); actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD)));
final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> { final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> {

View File

@@ -19,8 +19,8 @@
package org.apache.hudi.async; package org.apache.hudi.async;
import org.apache.hudi.client.AbstractClusteringClient; import org.apache.hudi.client.BaseClusterer;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieSparkClusteringClient; import org.apache.hudi.client.HoodieSparkClusteringClient;
/** /**
@@ -31,12 +31,12 @@ public class SparkStreamingAsyncClusteringService extends AsyncClusteringService
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public SparkStreamingAsyncClusteringService(AbstractHoodieWriteClient writeClient) { public SparkStreamingAsyncClusteringService(BaseHoodieWriteClient writeClient) {
super(writeClient, true); super(writeClient, true);
} }
@Override @Override
protected AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client) { protected BaseClusterer createClusteringClient(BaseHoodieWriteClient client) {
return new HoodieSparkClusteringClient(client); return new HoodieSparkClusteringClient(client);
} }
} }

View File

@@ -18,8 +18,8 @@
package org.apache.hudi.async; package org.apache.hudi.async;
import org.apache.hudi.client.AbstractCompactor; import org.apache.hudi.client.BaseCompactor;
import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieSparkCompactor; import org.apache.hudi.client.HoodieSparkCompactor;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -31,12 +31,12 @@ public class SparkStreamingAsyncCompactService extends AsyncCompactService {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public SparkStreamingAsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) { public SparkStreamingAsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) {
super(context, client, true); super(context, client, true);
} }
@Override @Override
protected AbstractCompactor createCompactor(AbstractHoodieWriteClient client) { protected BaseCompactor createCompactor(BaseHoodieWriteClient client) {
return new HoodieSparkCompactor(client, this.context); return new HoodieSparkCompactor(client, this.context);
} }
} }