[HUDI-1419] Add base implementation for hudi java client (#2286)
This commit is contained in:
@@ -55,6 +55,12 @@
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hudi</groupId>
|
||||
<artifactId>hudi-hadoop-mr</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Test -->
|
||||
<dependency>
|
||||
|
||||
@@ -0,0 +1,235 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.client;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.index.JavaHoodieIndex;
|
||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
import org.apache.hudi.table.HoodieJavaTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
|
||||
AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
|
||||
|
||||
public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
|
||||
super(context, clientConfig);
|
||||
}
|
||||
|
||||
public HoodieJavaWriteClient(HoodieEngineContext context,
|
||||
HoodieWriteConfig writeConfig,
|
||||
boolean rollbackPending,
|
||||
Option<EmbeddedTimelineService> timelineService) {
|
||||
super(context, writeConfig, rollbackPending, timelineService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieJavaTable<T> table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context);
|
||||
Timer.Context indexTimer = metrics.getIndexCtx();
|
||||
List<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, context, table);
|
||||
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
|
||||
return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createIndex(HoodieWriteConfig writeConfig) {
|
||||
return JavaHoodieIndex.createIndex(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean commit(String instantTime,
|
||||
List<WriteStatus> writeStatuses,
|
||||
Option<Map<String, String>> extraMetadata,
|
||||
String commitActionType,
|
||||
Map<String, List<String>> partitionToReplacedFileIds) {
|
||||
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
|
||||
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config,
|
||||
Configuration hadoopConf) {
|
||||
return HoodieJavaTable.create(config, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
|
||||
String instantTime) {
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
|
||||
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
|
||||
table.validateUpsertSchema();
|
||||
setOperationType(WriteOperationType.UPSERT);
|
||||
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
|
||||
if (result.getIndexLookupDuration().isPresent()) {
|
||||
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
|
||||
}
|
||||
return postWrite(result, instantTime, table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
|
||||
String instantTime) {
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
|
||||
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
|
||||
table.validateUpsertSchema();
|
||||
setOperationType(WriteOperationType.UPSERT_PREPPED);
|
||||
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||
HoodieWriteMetadata<List<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
|
||||
return postWrite(result, instantTime, table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
|
||||
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
|
||||
table.validateUpsertSchema();
|
||||
setOperationType(WriteOperationType.INSERT);
|
||||
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
|
||||
if (result.getIndexLookupDuration().isPresent()) {
|
||||
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
|
||||
}
|
||||
return postWrite(result, instantTime, table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
|
||||
String instantTime) {
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
|
||||
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
|
||||
table.validateInsertSchema();
|
||||
setOperationType(WriteOperationType.INSERT_PREPPED);
|
||||
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
|
||||
HoodieWriteMetadata<List<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
|
||||
return postWrite(result, instantTime, table);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
|
||||
String instantTime) {
|
||||
throw new HoodieNotSupportedException("BulkInsert is not supported in HoodieJavaClient");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
|
||||
String instantTime,
|
||||
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
|
||||
throw new HoodieNotSupportedException("BulkInsert is not supported in HoodieJavaClient");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
|
||||
String instantTime,
|
||||
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
|
||||
throw new HoodieNotSupportedException("BulkInsertPreppedRecords is not supported in HoodieJavaClient");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> delete(List<HoodieKey> keys,
|
||||
String instantTime) {
|
||||
throw new HoodieNotSupportedException("Delete is not supported in HoodieJavaClient");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
|
||||
String instantTime,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
|
||||
if (result.getIndexLookupDuration().isPresent()) {
|
||||
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
|
||||
}
|
||||
if (result.isCommitted()) {
|
||||
// Perform post commit operations.
|
||||
if (result.getFinalizeDuration().isPresent()) {
|
||||
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
|
||||
result.getWriteStats().get().size());
|
||||
}
|
||||
|
||||
postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty());
|
||||
|
||||
emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());
|
||||
}
|
||||
return result.getWriteStatuses();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitCompaction(String compactionInstantTime,
|
||||
List<WriteStatus> writeStatuses,
|
||||
Option<Map<String, String>> extraMetadata) throws IOException {
|
||||
throw new HoodieNotSupportedException("CommitCompaction is not supported in HoodieJavaClient");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void completeCompaction(HoodieCommitMetadata metadata,
|
||||
List<WriteStatus> writeStatuses,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
|
||||
String compactionCommitTime) {
|
||||
throw new HoodieNotSupportedException("CompleteCompaction is not supported in HoodieJavaClient");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<WriteStatus> compact(String compactionInstantTime,
|
||||
boolean shouldComplete) {
|
||||
throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaClient");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
|
||||
HoodieTableMetaClient metaClient = createMetaClient(true);
|
||||
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
|
||||
return getTableAndInitCtx(metaClient, operationType);
|
||||
}
|
||||
|
||||
private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
|
||||
if (operationType == WriteOperationType.DELETE) {
|
||||
setWriteSchemaForDeletes(metaClient);
|
||||
}
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieJavaTable<T> table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient);
|
||||
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
|
||||
writeTimer = metrics.getCommitCtx();
|
||||
} else {
|
||||
writeTimer = metrics.getDeltaCommitCtx();
|
||||
}
|
||||
return table;
|
||||
}
|
||||
}
|
||||
@@ -43,6 +43,10 @@ import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMap
|
||||
*/
|
||||
public class HoodieJavaEngineContext extends HoodieEngineContext {
|
||||
|
||||
public HoodieJavaEngineContext(Configuration conf) {
|
||||
this(conf, new JavaTaskContextSupplier());
|
||||
}
|
||||
|
||||
public HoodieJavaEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) {
|
||||
super(new SerializableConfiguration(conf), taskContextSupplier);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.client.common;
|
||||
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class JavaTaskContextSupplier extends TaskContextSupplier {
|
||||
@Override
|
||||
public Supplier<Integer> getPartitionIdSupplier() {
|
||||
return () -> 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Integer> getStageIdSupplier() {
|
||||
return () -> 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Supplier<Long> getAttemptIdSupplier() {
|
||||
return () -> 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<String> getProperty(EngineProperty prop) {
|
||||
return Option.empty();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.execution;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.TaskContextSupplier;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
|
||||
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.io.WriteHandleFactory;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class JavaLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {
|
||||
public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
|
||||
boolean areRecordsSorted,
|
||||
HoodieWriteConfig config,
|
||||
String instantTime,
|
||||
HoodieTable hoodieTable,
|
||||
String idPrefix,
|
||||
TaskContextSupplier taskContextSupplier) {
|
||||
super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier);
|
||||
}
|
||||
|
||||
public JavaLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
|
||||
boolean areRecordsSorted,
|
||||
HoodieWriteConfig config,
|
||||
String instantTime,
|
||||
HoodieTable hoodieTable,
|
||||
String idPrefix,
|
||||
TaskContextSupplier taskContextSupplier,
|
||||
WriteHandleFactory writeHandleFactory) {
|
||||
super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<WriteStatus> computeNext() {
|
||||
// Executor service used for launching writer thread.
|
||||
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
|
||||
null;
|
||||
try {
|
||||
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
|
||||
bufferedIteratorExecutor =
|
||||
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema));
|
||||
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
|
||||
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException(e);
|
||||
} finally {
|
||||
if (null != bufferedIteratorExecutor) {
|
||||
bufferedIteratorExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.index;
|
||||
|
||||
import org.apache.hudi.ApiMaturityLevel;
|
||||
import org.apache.hudi.PublicAPIMethod;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.ReflectionUtils;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieIndexException;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public abstract class JavaHoodieIndex<T extends HoodieRecordPayload> extends HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
|
||||
protected JavaHoodieIndex(HoodieWriteConfig config) {
|
||||
super(config);
|
||||
}
|
||||
|
||||
public static JavaHoodieIndex createIndex(HoodieWriteConfig config) {
|
||||
// first use index class config to create index.
|
||||
if (!StringUtils.isNullOrEmpty(config.getIndexClass())) {
|
||||
Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config);
|
||||
if (!(instance instanceof HoodieIndex)) {
|
||||
throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex");
|
||||
}
|
||||
return (JavaHoodieIndex) instance;
|
||||
}
|
||||
|
||||
// TODO more indexes to be added
|
||||
switch (config.getIndexType()) {
|
||||
case INMEMORY:
|
||||
return new JavaInMemoryHashIndex(config);
|
||||
default:
|
||||
throw new HoodieIndexException("Unsupported index type " + config.getIndexType());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
|
||||
public abstract List<WriteStatus> updateLocation(List<WriteStatus> writeStatuses,
|
||||
HoodieEngineContext context,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException;
|
||||
|
||||
@Override
|
||||
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
|
||||
public abstract List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
|
||||
HoodieEngineContext context,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException;
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.index;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Hoodie Index implementation backed by an in-memory Hash map.
|
||||
* <p>
|
||||
* ONLY USE FOR LOCAL TESTING
|
||||
*/
|
||||
@SuppressWarnings("checkstyle:LineLength")
|
||||
public class JavaInMemoryHashIndex<T extends HoodieRecordPayload> extends JavaHoodieIndex<T> {
|
||||
|
||||
private static ConcurrentMap<HoodieKey, HoodieRecordLocation> recordLocationMap;
|
||||
|
||||
public JavaInMemoryHashIndex(HoodieWriteConfig config) {
|
||||
super(config);
|
||||
synchronized (JavaInMemoryHashIndex.class) {
|
||||
if (recordLocationMap == null) {
|
||||
recordLocationMap = new ConcurrentHashMap<>();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records, HoodieEngineContext context,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
|
||||
List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
|
||||
records.stream().forEach(record -> {
|
||||
if (recordLocationMap.containsKey(record.getKey())) {
|
||||
record.unseal();
|
||||
record.setCurrentLocation(recordLocationMap.get(record.getKey()));
|
||||
record.seal();
|
||||
}
|
||||
taggedRecords.add(record);
|
||||
});
|
||||
return taggedRecords;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<WriteStatus> updateLocation(List<WriteStatus> writeStatusList,
|
||||
HoodieEngineContext context,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
|
||||
return writeStatusList.stream().map(writeStatus -> {
|
||||
for (HoodieRecord record : writeStatus.getWrittenRecords()) {
|
||||
if (!writeStatus.isErrored(record.getKey())) {
|
||||
HoodieKey key = record.getKey();
|
||||
Option<HoodieRecordLocation> newLocation = record.getNewLocation();
|
||||
if (newLocation.isPresent()) {
|
||||
recordLocationMap.put(key, newLocation.get());
|
||||
} else {
|
||||
// Delete existing index for a deleted record
|
||||
recordLocationMap.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
return writeStatus;
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rollbackCommit(String instantTime) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only looks up by recordKey.
|
||||
*/
|
||||
@Override
|
||||
public boolean isGlobal() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mapping is available in HBase already.
|
||||
*/
|
||||
@Override
|
||||
public boolean canIndexLogFiles() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Index needs to be explicitly updated after storage write.
|
||||
*/
|
||||
@Override
|
||||
public boolean isImplicitWithStorage() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,178 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
|
||||
import org.apache.hudi.table.action.clean.JavaCleanActionExecutor;
|
||||
import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor;
|
||||
import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieJavaTable<T> {
|
||||
protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
|
||||
HoodieEngineContext context,
|
||||
HoodieTableMetaClient metaClient) {
|
||||
super(config, context, metaClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> records) {
|
||||
return new JavaUpsertCommitActionExecutor<>(context, config,
|
||||
this, instantTime, records).execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> insert(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> records) {
|
||||
return new JavaInsertCommitActionExecutor<>(context, config,
|
||||
this, instantTime, records).execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> records,
|
||||
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
|
||||
throw new HoodieNotSupportedException("BulkInsert is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieKey> keys) {
|
||||
throw new HoodieNotSupportedException("Delete is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> preppedRecords) {
|
||||
return new JavaUpsertPreppedCommitActionExecutor<>((HoodieJavaEngineContext) context, config,
|
||||
this, instantTime, preppedRecords).execute();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> insertPrepped(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> preppedRecords) {
|
||||
return new JavaInsertPreppedCommitActionExecutor<>((HoodieJavaEngineContext) context, config,
|
||||
this, instantTime, preppedRecords).execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> preppedRecords,
|
||||
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
|
||||
throw new HoodieNotSupportedException("BulkInsertPrepped is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> records) {
|
||||
throw new HoodieNotSupportedException("InsertOverwrite is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> records) {
|
||||
throw new HoodieNotSupportedException("InsertOverwrite is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context,
|
||||
String instantTime,
|
||||
Option<Map<String, String>> extraMetadata) {
|
||||
throw new HoodieNotSupportedException("ScheduleCompaction is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext context,
|
||||
String compactionInstantTime) {
|
||||
throw new HoodieNotSupportedException("Compact is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieBootstrapWriteMetadata<List<WriteStatus>> bootstrap(HoodieEngineContext context,
|
||||
Option<Map<String, String>> extraMetadata) {
|
||||
throw new HoodieNotSupportedException("Bootstrap is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollbackBootstrap(HoodieEngineContext context,
|
||||
String instantTime) {
|
||||
throw new HoodieNotSupportedException("RollbackBootstrap is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieCleanMetadata clean(HoodieEngineContext context,
|
||||
String cleanInstantTime) {
|
||||
return new JavaCleanActionExecutor(context, config, this, cleanInstantTime).execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieRollbackMetadata rollback(HoodieEngineContext context,
|
||||
String rollbackInstantTime,
|
||||
HoodieInstant commitInstant,
|
||||
boolean deleteInstants) {
|
||||
throw new HoodieNotSupportedException("Rollback is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieSavepointMetadata savepoint(HoodieEngineContext context,
|
||||
String instantToSavepoint,
|
||||
String user,
|
||||
String comment) {
|
||||
throw new HoodieNotSupportedException("Savepoint is not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieRestoreMetadata restore(HoodieEngineContext context,
|
||||
String restoreInstantTime,
|
||||
String instantToRestore) {
|
||||
throw new HoodieNotSupportedException("Restore is not supported yet");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends HoodieJavaCopyOnWriteTable<T> {
|
||||
protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
|
||||
super(config, context, metaClient);
|
||||
}
|
||||
// TODO not support yet.
|
||||
}
|
||||
@@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieNotSupportedException;
|
||||
import org.apache.hudi.index.JavaHoodieIndex;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public abstract class HoodieJavaTable<T extends HoodieRecordPayload>
|
||||
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
|
||||
protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
|
||||
super(config, context, metaClient);
|
||||
}
|
||||
|
||||
public static <T extends HoodieRecordPayload> HoodieJavaTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
|
||||
context.getHadoopConf().get(),
|
||||
config.getBasePath(),
|
||||
true,
|
||||
config.getConsistencyGuardConfig(),
|
||||
Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
|
||||
);
|
||||
return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient);
|
||||
}
|
||||
|
||||
public static <T extends HoodieRecordPayload> HoodieJavaTable<T> create(HoodieWriteConfig config,
|
||||
HoodieJavaEngineContext context,
|
||||
HoodieTableMetaClient metaClient) {
|
||||
switch (metaClient.getTableType()) {
|
||||
case COPY_ON_WRITE:
|
||||
return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient);
|
||||
case MERGE_ON_READ:
|
||||
throw new HoodieNotSupportedException("MERGE_ON_READ is not supported yet");
|
||||
default:
|
||||
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
|
||||
return JavaHoodieIndex.createIndex(config);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.clean;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieActionInstant;
|
||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.HoodieCleanStat;
|
||||
import org.apache.hudi.common.model.CleanFileInfo;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class JavaCleanActionExecutor<T extends HoodieRecordPayload> extends
|
||||
BaseCleanActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(JavaCleanActionExecutor.class);
|
||||
|
||||
public JavaCleanActionExecutor(HoodieEngineContext context,
|
||||
HoodieWriteConfig config,
|
||||
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
|
||||
String instantTime) {
|
||||
super(context, config, table, instantTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
|
||||
|
||||
Iterator<ImmutablePair<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
|
||||
.flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator();
|
||||
|
||||
Stream<Pair<String, PartitionCleanStat>> partitionCleanStats =
|
||||
deleteFilesFunc(filesToBeDeletedPerPartition, table)
|
||||
.collect(Collectors.groupingBy(Pair::getLeft))
|
||||
.entrySet().stream()
|
||||
.map(x -> new ImmutablePair(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get()));
|
||||
|
||||
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
|
||||
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
|
||||
|
||||
// Return PartitionCleanStat for each partition passed.
|
||||
return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
|
||||
PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
|
||||
? partitionCleanStatsMap.get(partitionPath)
|
||||
: new PartitionCleanStat(partitionPath);
|
||||
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
|
||||
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
|
||||
.withEarliestCommitRetained(Option.ofNullable(
|
||||
actionInstant != null
|
||||
? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
|
||||
actionInstant.getAction(), actionInstant.getTimestamp())
|
||||
: null))
|
||||
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
|
||||
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
|
||||
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
|
||||
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
|
||||
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
|
||||
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
|
||||
.build();
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<ImmutablePair<String, CleanFileInfo>> iter, HoodieTable table) {
|
||||
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
|
||||
FileSystem fs = table.getMetaClient().getFs();
|
||||
|
||||
while (iter.hasNext()) {
|
||||
Pair<String, CleanFileInfo> partitionDelFileTuple = iter.next();
|
||||
String partitionPath = partitionDelFileTuple.getLeft();
|
||||
Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath());
|
||||
String deletePathStr = deletePath.toString();
|
||||
Boolean deletedFileResult = null;
|
||||
try {
|
||||
deletedFileResult = deleteFileAndGetResult(fs, deletePathStr);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Delete file failed");
|
||||
}
|
||||
if (!partitionCleanStatMap.containsKey(partitionPath)) {
|
||||
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
|
||||
}
|
||||
boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile();
|
||||
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
|
||||
if (isBootstrapBasePathFile) {
|
||||
// For Bootstrap Base file deletions, store the full file path.
|
||||
partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
|
||||
partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true);
|
||||
} else {
|
||||
partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
|
||||
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
|
||||
}
|
||||
}
|
||||
return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,329 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.CommitUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieCommitException;
|
||||
import org.apache.hudi.exception.HoodieUpsertException;
|
||||
import org.apache.hudi.execution.JavaLazyInsertIterable;
|
||||
import org.apache.hudi.io.CreateHandleFactory;
|
||||
import org.apache.hudi.io.HoodieMergeHandle;
|
||||
import org.apache.hudi.io.HoodieSortedMergeHandle;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.WorkloadProfile;
|
||||
import org.apache.hudi.table.WorkloadStat;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload> extends
|
||||
BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, HoodieWriteMetadata> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(BaseJavaCommitActionExecutor.class);
|
||||
|
||||
public BaseJavaCommitActionExecutor(HoodieEngineContext context,
|
||||
HoodieWriteConfig config,
|
||||
HoodieTable table,
|
||||
String instantTime,
|
||||
WriteOperationType operationType) {
|
||||
super(context, config, table, instantTime, operationType, Option.empty());
|
||||
}
|
||||
|
||||
public BaseJavaCommitActionExecutor(HoodieEngineContext context,
|
||||
HoodieWriteConfig config,
|
||||
HoodieTable table,
|
||||
String instantTime,
|
||||
WriteOperationType operationType,
|
||||
Option extraMetadata) {
|
||||
super(context, config, table, instantTime, operationType, extraMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
|
||||
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
|
||||
|
||||
WorkloadProfile profile = null;
|
||||
if (isWorkloadProfileNeeded()) {
|
||||
profile = new WorkloadProfile(buildProfile(inputRecords));
|
||||
LOG.info("Workload profile :" + profile);
|
||||
try {
|
||||
saveWorkloadProfileMetadataToInflight(profile, instantTime);
|
||||
} catch (Exception e) {
|
||||
HoodieTableMetaClient metaClient = table.getMetaClient();
|
||||
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
|
||||
try {
|
||||
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
|
||||
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Check file exists failed");
|
||||
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final Partitioner partitioner = getPartitioner(profile);
|
||||
Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
|
||||
|
||||
List<WriteStatus> writeStatuses = new LinkedList<>();
|
||||
partitionedRecords.forEach((partition, records) -> {
|
||||
if (WriteOperationType.isChangingRecords(operationType)) {
|
||||
handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
|
||||
} else {
|
||||
handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
|
||||
}
|
||||
});
|
||||
updateIndex(writeStatuses, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
protected void updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
|
||||
Instant indexStartTime = Instant.now();
|
||||
// Update the index back
|
||||
List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
|
||||
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
|
||||
result.setWriteStatuses(statuses);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getCommitActionType() {
|
||||
return table.getMetaClient().getCommitActionType();
|
||||
}
|
||||
|
||||
private Partitioner getPartitioner(WorkloadProfile profile) {
|
||||
if (WriteOperationType.isChangingRecords(operationType)) {
|
||||
return getUpsertPartitioner(profile);
|
||||
} else {
|
||||
return getInsertPartitioner(profile);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<Integer, List<HoodieRecord<T>>> partition(List<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
|
||||
Map<Integer, List<Pair<Pair<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>>>> partitionedMidRecords = dedupedRecords
|
||||
.stream()
|
||||
.map(record -> Pair.of(Pair.of(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record))
|
||||
.collect(Collectors.groupingBy(x -> partitioner.getPartition(x.getLeft())));
|
||||
Map<Integer, List<HoodieRecord<T>>> results = new LinkedHashMap<>();
|
||||
partitionedMidRecords.forEach((key, value) -> results.put(key, value.stream().map(x -> x.getRight()).collect(Collectors.toList())));
|
||||
return results;
|
||||
}
|
||||
|
||||
protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(List<HoodieRecord<T>> inputRecords) {
|
||||
HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
|
||||
WorkloadStat globalStat = new WorkloadStat();
|
||||
|
||||
Map<Pair<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
|
||||
.stream()
|
||||
.map(record -> Pair.of(
|
||||
Pair.of(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
|
||||
.collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
|
||||
|
||||
for (Map.Entry<Pair<String, Option<HoodieRecordLocation>>, Long> e : partitionLocationCounts.entrySet()) {
|
||||
String partitionPath = e.getKey().getLeft();
|
||||
Long count = e.getValue();
|
||||
Option<HoodieRecordLocation> locOption = e.getKey().getRight();
|
||||
|
||||
if (!partitionPathStatMap.containsKey(partitionPath)) {
|
||||
partitionPathStatMap.put(partitionPath, new WorkloadStat());
|
||||
}
|
||||
|
||||
if (locOption.isPresent()) {
|
||||
// update
|
||||
partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count);
|
||||
globalStat.addUpdates(locOption.get(), count);
|
||||
} else {
|
||||
// insert
|
||||
partitionPathStatMap.get(partitionPath).addInserts(count);
|
||||
globalStat.addInserts(count);
|
||||
}
|
||||
}
|
||||
return Pair.of(partitionPathStatMap, globalStat);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
|
||||
commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
|
||||
String actionType = getCommitActionType();
|
||||
LOG.info("Committing " + instantTime + ", action Type " + actionType);
|
||||
result.setCommitted(true);
|
||||
result.setWriteStats(writeStats);
|
||||
// Finalize write
|
||||
finalizeWrite(instantTime, writeStats, result);
|
||||
|
||||
try {
|
||||
LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType());
|
||||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
|
||||
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
|
||||
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
|
||||
|
||||
activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime),
|
||||
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
LOG.info("Committed " + instantTime);
|
||||
result.setCommitMetadata(Option.of(metadata));
|
||||
} catch (IOException e) {
|
||||
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isWorkloadProfileNeeded() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
|
||||
Partitioner partitioner) {
|
||||
UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner;
|
||||
BucketInfo binfo = upsertPartitioner.getBucketInfo(partition);
|
||||
BucketType btype = binfo.bucketType;
|
||||
try {
|
||||
if (btype.equals(BucketType.INSERT)) {
|
||||
return handleInsert(binfo.fileIdPrefix, recordItr);
|
||||
} else if (btype.equals(BucketType.UPDATE)) {
|
||||
return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
|
||||
} else {
|
||||
throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
|
||||
LOG.error(msg, t);
|
||||
throw new HoodieUpsertException(msg, t);
|
||||
}
|
||||
}
|
||||
|
||||
protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr,
|
||||
Partitioner partitioner) {
|
||||
return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
|
||||
Iterator<HoodieRecord<T>> recordItr)
|
||||
throws IOException {
|
||||
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
|
||||
if (!recordItr.hasNext()) {
|
||||
LOG.info("Empty partition with fileId => " + fileId);
|
||||
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
|
||||
}
|
||||
// these are updates
|
||||
HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr);
|
||||
return handleUpdateInternal(upsertHandle, fileId);
|
||||
}
|
||||
|
||||
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId)
|
||||
throws IOException {
|
||||
if (upsertHandle.getOldFilePath() == null) {
|
||||
throw new HoodieUpsertException(
|
||||
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
|
||||
} else {
|
||||
JavaMergeHelper.newInstance().runMerge(table, upsertHandle);
|
||||
}
|
||||
|
||||
// TODO(vc): This needs to be revisited
|
||||
if (upsertHandle.getWriteStatus().getPartitionPath() == null) {
|
||||
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
|
||||
+ upsertHandle.getWriteStatus());
|
||||
}
|
||||
return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator();
|
||||
}
|
||||
|
||||
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
|
||||
if (table.requireSortedRecords()) {
|
||||
return new HoodieSortedMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
|
||||
} else {
|
||||
return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier);
|
||||
}
|
||||
}
|
||||
|
||||
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId,
|
||||
Map<String, HoodieRecord<T>> keyToNewRecords,
|
||||
HoodieBaseFile dataFileToBeMerged) {
|
||||
return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords,
|
||||
partitionPath, fileId, dataFileToBeMerged, taskContextSupplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
|
||||
throws Exception {
|
||||
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
|
||||
if (!recordItr.hasNext()) {
|
||||
LOG.info("Empty partition");
|
||||
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
|
||||
}
|
||||
return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
|
||||
taskContextSupplier, new CreateHandleFactory<>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a partitioner to perform the upsert operation, based on the workload profile.
|
||||
*/
|
||||
public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
|
||||
if (profile == null) {
|
||||
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
|
||||
}
|
||||
return new UpsertPartitioner(profile, context, table, config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a partitioner to perform the insert operation, based on the workload profile.
|
||||
*/
|
||||
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
|
||||
return getUpsertPartitioner(profile);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class JavaInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
|
||||
|
||||
private List<HoodieRecord<T>> inputRecords;
|
||||
|
||||
public JavaInsertCommitActionExecutor(HoodieEngineContext context,
|
||||
HoodieWriteConfig config,
|
||||
HoodieTable table,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> inputRecords) {
|
||||
super(context, config, table, instantTime, WriteOperationType.INSERT);
|
||||
this.inputRecords = inputRecords;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> execute() {
|
||||
return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
|
||||
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class JavaInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
|
||||
extends BaseJavaCommitActionExecutor<T> {
|
||||
|
||||
private final List<HoodieRecord<T>> preppedRecords;
|
||||
|
||||
public JavaInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
|
||||
HoodieWriteConfig config, HoodieTable table,
|
||||
String instantTime, List<HoodieRecord<T>> preppedRecords) {
|
||||
super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
|
||||
this.preppedRecords = preppedRecords;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> execute() {
|
||||
return super.execute(preppedRecords);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,115 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
|
||||
import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.io.HoodieMergeHandle;
|
||||
import org.apache.hudi.io.storage.HoodieFileReader;
|
||||
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.BinaryDecoder;
|
||||
import org.apache.avro.io.BinaryEncoder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class JavaMergeHelper<T extends HoodieRecordPayload> extends AbstractMergeHelper<T, List<HoodieRecord<T>>,
|
||||
List<HoodieKey>, List<WriteStatus>> {
|
||||
|
||||
private JavaMergeHelper() {
|
||||
}
|
||||
|
||||
private static class MergeHelperHolder {
|
||||
private static final JavaMergeHelper JAVA_MERGE_HELPER = new JavaMergeHelper();
|
||||
}
|
||||
|
||||
public static JavaMergeHelper newInstance() {
|
||||
return JavaMergeHelper.MergeHelperHolder.JAVA_MERGE_HELPER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
|
||||
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> upsertHandle) throws IOException {
|
||||
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
|
||||
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
|
||||
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle = upsertHandle;
|
||||
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
|
||||
|
||||
final GenericDatumWriter<GenericRecord> gWriter;
|
||||
final GenericDatumReader<GenericRecord> gReader;
|
||||
Schema readSchema;
|
||||
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
|
||||
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
|
||||
gWriter = new GenericDatumWriter<>(readSchema);
|
||||
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields());
|
||||
} else {
|
||||
gReader = null;
|
||||
gWriter = null;
|
||||
readSchema = mergeHandle.getWriterSchemaWithMetafields();
|
||||
}
|
||||
|
||||
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
|
||||
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
|
||||
try {
|
||||
final Iterator<GenericRecord> readerIterator;
|
||||
if (baseFile.getBootstrapBaseFile().isPresent()) {
|
||||
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
|
||||
} else {
|
||||
readerIterator = reader.getRecordIterator(readSchema);
|
||||
}
|
||||
|
||||
ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
|
||||
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
|
||||
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
|
||||
Option.of(new UpdateHandler(mergeHandle)), record -> {
|
||||
if (!externalSchemaTransformation) {
|
||||
return record;
|
||||
}
|
||||
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
|
||||
});
|
||||
wrapper.execute();
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException(e);
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
mergeHandle.close();
|
||||
if (null != wrapper) {
|
||||
wrapper.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class JavaUpsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
|
||||
|
||||
private List<HoodieRecord<T>> inputRecords;
|
||||
|
||||
public JavaUpsertCommitActionExecutor(HoodieEngineContext context,
|
||||
HoodieWriteConfig config,
|
||||
HoodieTable table,
|
||||
String instantTime,
|
||||
List<HoodieRecord<T>> inputRecords) {
|
||||
super(context, config, table, instantTime, WriteOperationType.UPSERT);
|
||||
this.inputRecords = inputRecords;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> execute() {
|
||||
return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
|
||||
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieJavaEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
import org.apache.hudi.table.action.HoodieWriteMetadata;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class JavaUpsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
|
||||
extends BaseJavaCommitActionExecutor<T> {
|
||||
|
||||
private final List<HoodieRecord<T>> preppedRecords;
|
||||
|
||||
public JavaUpsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
|
||||
HoodieWriteConfig config, HoodieTable table,
|
||||
String instantTime, List<HoodieRecord<T>> preppedRecords) {
|
||||
super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
|
||||
this.preppedRecords = preppedRecords;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HoodieWriteMetadata<List<WriteStatus>> execute() {
|
||||
return super.execute(preppedRecords);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class JavaWriteHelper<T extends HoodieRecordPayload,R> extends AbstractWriteHelper<T, List<HoodieRecord<T>>,
|
||||
List<HoodieKey>, List<WriteStatus>, R> {
|
||||
|
||||
private JavaWriteHelper() {
|
||||
}
|
||||
|
||||
private static class WriteHelperHolder {
|
||||
private static final JavaWriteHelper JAVA_WRITE_HELPER = new JavaWriteHelper();
|
||||
}
|
||||
|
||||
public static JavaWriteHelper newInstance() {
|
||||
return WriteHelperHolder.JAVA_WRITE_HELPER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HoodieRecord<T>> deduplicateRecords(List<HoodieRecord<T>> records,
|
||||
HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> index,
|
||||
int parallelism) {
|
||||
boolean isIndexingGlobal = index.isGlobal();
|
||||
Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
|
||||
HoodieKey hoodieKey = record.getKey();
|
||||
// If index used is global, then records are expected to differ in their partitionPath
|
||||
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
|
||||
return Pair.of(key, record);
|
||||
}).collect(Collectors.groupingBy(Pair::getLeft));
|
||||
|
||||
return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
T reducedData = (T) rec1.getData().preCombine(rec2.getData());
|
||||
// we cannot allow the user to change the key or partitionPath, since that will affect
|
||||
// everything
|
||||
// so pick it from one of the records.
|
||||
return new HoodieRecord<T>(rec1.getKey(), reducedData);
|
||||
}).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,319 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.table.action.commit;
|
||||
|
||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.NumericUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.WorkloadProfile;
|
||||
import org.apache.hudi.table.WorkloadStat;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Packs incoming records to be upserted, into buckets.
|
||||
*/
|
||||
public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Partitioner {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class);
|
||||
|
||||
/**
|
||||
* List of all small files to be corrected.
|
||||
*/
|
||||
protected List<SmallFile> smallFiles = new ArrayList<>();
|
||||
/**
|
||||
* Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into.
|
||||
*/
|
||||
private int totalBuckets = 0;
|
||||
/**
|
||||
* Stat for the current workload. Helps in determining inserts, upserts etc.
|
||||
*/
|
||||
private WorkloadProfile profile;
|
||||
/**
|
||||
* Helps decide which bucket an incoming update should go to.
|
||||
*/
|
||||
private HashMap<String, Integer> updateLocationToBucket;
|
||||
/**
|
||||
* Helps us pack inserts into 1 or more buckets depending on number of incoming records.
|
||||
*/
|
||||
private HashMap<String, List<InsertBucketCumulativeWeightPair>> partitionPathToInsertBucketInfos;
|
||||
/**
|
||||
* Remembers what type each bucket is for later.
|
||||
*/
|
||||
private HashMap<Integer, BucketInfo> bucketInfoMap;
|
||||
|
||||
protected final HoodieTable table;
|
||||
|
||||
protected final HoodieWriteConfig config;
|
||||
|
||||
public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table,
|
||||
HoodieWriteConfig config) {
|
||||
updateLocationToBucket = new HashMap<>();
|
||||
partitionPathToInsertBucketInfos = new HashMap<>();
|
||||
bucketInfoMap = new HashMap<>();
|
||||
this.profile = profile;
|
||||
this.table = table;
|
||||
this.config = config;
|
||||
assignUpdates(profile);
|
||||
assignInserts(profile, context);
|
||||
|
||||
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
|
||||
+ "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n"
|
||||
+ "UpdateLocations mapped to buckets =>" + updateLocationToBucket);
|
||||
}
|
||||
|
||||
private void assignUpdates(WorkloadProfile profile) {
|
||||
// each update location gets a partition
|
||||
Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
|
||||
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
|
||||
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
|
||||
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
|
||||
addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int addUpdateBucket(String partitionPath, String fileIdHint) {
|
||||
int bucket = totalBuckets;
|
||||
updateLocationToBucket.put(fileIdHint, bucket);
|
||||
BucketInfo bucketInfo = new BucketInfo();
|
||||
bucketInfo.bucketType = BucketType.UPDATE;
|
||||
bucketInfo.fileIdPrefix = fileIdHint;
|
||||
bucketInfo.partitionPath = partitionPath;
|
||||
bucketInfoMap.put(totalBuckets, bucketInfo);
|
||||
totalBuckets++;
|
||||
return bucket;
|
||||
}
|
||||
|
||||
private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) {
|
||||
// for new inserts, compute buckets depending on how many records we have for each partition
|
||||
Set<String> partitionPaths = profile.getPartitionPaths();
|
||||
long averageRecordSize =
|
||||
averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),
|
||||
config);
|
||||
LOG.info("AvgRecordSize => " + averageRecordSize);
|
||||
|
||||
Map<String, List<SmallFile>> partitionSmallFilesMap =
|
||||
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), context);
|
||||
|
||||
for (String partitionPath : partitionPaths) {
|
||||
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
|
||||
if (pStat.getNumInserts() > 0) {
|
||||
|
||||
List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
|
||||
this.smallFiles.addAll(smallFiles);
|
||||
|
||||
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
|
||||
|
||||
long totalUnassignedInserts = pStat.getNumInserts();
|
||||
List<Integer> bucketNumbers = new ArrayList<>();
|
||||
List<Long> recordsPerBucket = new ArrayList<>();
|
||||
|
||||
// first try packing this into one of the smallFiles
|
||||
for (SmallFile smallFile : smallFiles) {
|
||||
long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize,
|
||||
totalUnassignedInserts);
|
||||
if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
|
||||
// create a new bucket or re-use an existing bucket
|
||||
int bucket;
|
||||
if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
|
||||
bucket = updateLocationToBucket.get(smallFile.location.getFileId());
|
||||
LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
|
||||
} else {
|
||||
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
|
||||
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
|
||||
}
|
||||
bucketNumbers.add(bucket);
|
||||
recordsPerBucket.add(recordsToAppend);
|
||||
totalUnassignedInserts -= recordsToAppend;
|
||||
}
|
||||
}
|
||||
|
||||
// if we have anything more, create new insert buckets, like normal
|
||||
if (totalUnassignedInserts > 0) {
|
||||
long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
||||
if (config.shouldAutoTuneInsertSplits()) {
|
||||
insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
|
||||
}
|
||||
|
||||
int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);
|
||||
LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts
|
||||
+ ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket);
|
||||
for (int b = 0; b < insertBuckets; b++) {
|
||||
bucketNumbers.add(totalBuckets);
|
||||
recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
|
||||
BucketInfo bucketInfo = new BucketInfo();
|
||||
bucketInfo.bucketType = BucketType.INSERT;
|
||||
bucketInfo.partitionPath = partitionPath;
|
||||
bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
|
||||
bucketInfoMap.put(totalBuckets, bucketInfo);
|
||||
totalBuckets++;
|
||||
}
|
||||
}
|
||||
|
||||
// Go over all such buckets, and assign weights as per amount of incoming inserts.
|
||||
List<InsertBucketCumulativeWeightPair> insertBuckets = new ArrayList<>();
|
||||
double curentCumulativeWeight = 0;
|
||||
for (int i = 0; i < bucketNumbers.size(); i++) {
|
||||
InsertBucket bkt = new InsertBucket();
|
||||
bkt.bucketNumber = bucketNumbers.get(i);
|
||||
bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts();
|
||||
curentCumulativeWeight += bkt.weight;
|
||||
insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, curentCumulativeWeight));
|
||||
}
|
||||
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
|
||||
partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> partitionPaths, HoodieEngineContext context) {
|
||||
Map<String, List<SmallFile>> partitionSmallFilesMap = new HashMap<>();
|
||||
if (partitionPaths != null && partitionPaths.size() > 0) {
|
||||
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions");
|
||||
partitionSmallFilesMap = context.mapToPair(partitionPaths,
|
||||
partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0);
|
||||
}
|
||||
return partitionSmallFilesMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of small files in the given partition path.
|
||||
*/
|
||||
protected List<SmallFile> getSmallFiles(String partitionPath) {
|
||||
|
||||
// smallFiles only for partitionPath
|
||||
List<SmallFile> smallFileLocations = new ArrayList<>();
|
||||
|
||||
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
|
||||
|
||||
if (!commitTimeline.empty()) { // if we have some commits
|
||||
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
|
||||
List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
|
||||
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
|
||||
|
||||
for (HoodieBaseFile file : allFiles) {
|
||||
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
|
||||
String filename = file.getFileName();
|
||||
SmallFile sf = new SmallFile();
|
||||
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
|
||||
sf.sizeBytes = file.getFileSize();
|
||||
smallFileLocations.add(sf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return smallFileLocations;
|
||||
}
|
||||
|
||||
public BucketInfo getBucketInfo(int bucketNumber) {
|
||||
return bucketInfoMap.get(bucketNumber);
|
||||
}
|
||||
|
||||
public List<InsertBucketCumulativeWeightPair> getInsertBuckets(String partitionPath) {
|
||||
return partitionPathToInsertBucketInfos.get(partitionPath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumPartitions() {
|
||||
return totalBuckets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPartition(Object key) {
|
||||
Pair<HoodieKey, Option<HoodieRecordLocation>> keyLocation =
|
||||
(Pair<HoodieKey, Option<HoodieRecordLocation>>) key;
|
||||
if (keyLocation.getRight().isPresent()) {
|
||||
HoodieRecordLocation location = keyLocation.getRight().get();
|
||||
return updateLocationToBucket.get(location.getFileId());
|
||||
} else {
|
||||
String partitionPath = keyLocation.getLeft().getPartitionPath();
|
||||
List<InsertBucketCumulativeWeightPair> targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath);
|
||||
// pick the target bucket to use based on the weights.
|
||||
final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
|
||||
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation.getLeft().getRecordKey());
|
||||
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;
|
||||
|
||||
int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r));
|
||||
|
||||
if (index >= 0) {
|
||||
return targetBuckets.get(index).getKey().bucketNumber;
|
||||
}
|
||||
|
||||
if ((-1 * index - 1) < targetBuckets.size()) {
|
||||
return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber;
|
||||
}
|
||||
|
||||
// return first one, by default
|
||||
return targetBuckets.get(0).getKey().bucketNumber;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtains the average record size based on records written during previous commits. Used for estimating how many
|
||||
* records pack into one file.
|
||||
*/
|
||||
protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) {
|
||||
long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
|
||||
long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit());
|
||||
try {
|
||||
if (!commitTimeline.empty()) {
|
||||
// Go over the reverse ordered commits to get a more recent estimate of average record size.
|
||||
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
|
||||
while (instants.hasNext()) {
|
||||
HoodieInstant instant = instants.next();
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
|
||||
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
|
||||
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
|
||||
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// make this fail safe.
|
||||
LOG.error("Error trying to compute average bytes/record ", t);
|
||||
}
|
||||
return avgSize;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user