[HUDI-1328] Introduce HoodieFlinkEngineContext to hudi-flink-client (#2161)
This commit is contained in:
@@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.client.common;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hudi.client.common.function.SerializableConsumer;
|
||||
import org.apache.hudi.client.common.function.SerializableFunction;
|
||||
import org.apache.hudi.client.common.function.SerializablePairFunction;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper;
|
||||
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper;
|
||||
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper;
|
||||
import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapWrapper;
|
||||
|
||||
/**
|
||||
* A flink engine implementation of HoodieEngineContext.
|
||||
*/
|
||||
public class HoodieFlinkEngineContext extends HoodieEngineContext {
|
||||
|
||||
public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
|
||||
this(new SerializableConfiguration(new Configuration()), taskContextSupplier);
|
||||
}
|
||||
|
||||
public HoodieFlinkEngineContext(SerializableConfiguration hadoopConf, TaskContextSupplier taskContextSupplier) {
|
||||
super(hadoopConf, taskContextSupplier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
|
||||
return data.stream().parallel().map(throwingMapWrapper(func)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
|
||||
return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism) {
|
||||
data.forEach(throwingForeachWrapper(consumer));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
|
||||
Map<K, V> map = new HashMap<>();
|
||||
data.stream().map(throwingMapToPairWrapper(func)).forEach(x -> map.put(x._1, x._2));
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProperty(EngineProperty key, String value) {
|
||||
// no operation for now
|
||||
}
|
||||
|
||||
@Override
|
||||
public Option<String> getProperty(EngineProperty key) {
|
||||
// no operation for now
|
||||
return Option.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setJobStatus(String activeModule, String activityDescription) {
|
||||
// no operation for now
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.client.common.function;
|
||||
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Function wrapper util class, which catches the exception thrown by input function and return a similar function
|
||||
* with no exception thrown.
|
||||
*/
|
||||
public class FunctionWrapper {
|
||||
|
||||
public static <I, O> Function<I, O> throwingMapWrapper(SerializableFunction<I, O> throwingMapFunction) {
|
||||
return v1 -> {
|
||||
try {
|
||||
return throwingMapFunction.apply(v1);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error occurs when executing map", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static <I, O> Function<I, Stream<O>> throwingFlatMapWrapper(SerializableFunction<I, Stream<O>> throwingFlatMapFunction) {
|
||||
return v1 -> {
|
||||
try {
|
||||
return throwingFlatMapFunction.apply(v1);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error occurs when executing flatMap", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static <I> Consumer<I> throwingForeachWrapper(SerializableConsumer<I> throwingConsumer) {
|
||||
return v1 -> {
|
||||
try {
|
||||
throwingConsumer.accept(v1);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error occurs when executing foreach", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public static <I, K, V> Function<I, Tuple2<K, V>> throwingMapToPairWrapper(SerializablePairFunction<I, K, V> throwingPairFunction) {
|
||||
return v1 -> {
|
||||
try {
|
||||
return throwingPairFunction.call(v1);
|
||||
} catch (Exception e) {
|
||||
throw new HoodieException("Error occurs when executing mapToPair", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
###
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
###
|
||||
log4j.rootLogger=INFO, A1
|
||||
# A1 is set to be a ConsoleAppender.
|
||||
log4j.appender.A1=org.apache.log4j.ConsoleAppender
|
||||
# A1 uses PatternLayout.
|
||||
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
||||
Reference in New Issue
Block a user