1
0

[HUDI-1977] Fix Hudi CLI tempview query issue (#4626)

This commit is contained in:
peanut-chenzhong
2022-01-29 10:39:08 +08:00
committed by GitHub
parent e78b2f1b55
commit c0e8b03d93

View File

@@ -20,8 +20,6 @@ package org.apache.hudi.cli.utils;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@@ -31,27 +29,34 @@ import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType;
import org.springframework.shell.support.logging.HandlerUtils;
import java.util.List; import java.util.List;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class SparkTempViewProvider implements TempViewProvider { public class SparkTempViewProvider implements TempViewProvider {
private static final Logger LOG = LogManager.getLogger(SparkTempViewProvider.class); private static final Logger LOG = HandlerUtils.getLogger(SparkTempViewProvider.class);
private JavaSparkContext jsc; private JavaSparkContext jsc;
private SQLContext sqlContext; private SQLContext sqlContext;
public SparkTempViewProvider(String appName) { public SparkTempViewProvider(String appName) {
try { try {
Handler handler = LOG.getParent().getHandlers()[0];
SparkConf sparkConf = new SparkConf().setAppName(appName) SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]"); .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
jsc = new JavaSparkContext(sparkConf); jsc = new JavaSparkContext(sparkConf);
jsc.setLogLevel("ERROR");
sqlContext = new SQLContext(jsc); sqlContext = new SQLContext(jsc);
if (handler != null) {
LOG.getParent().removeHandler(LOG.getParent().getHandlers()[0]);
LOG.getParent().addHandler(handler);
}
} catch (Throwable ex) { } catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any // log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to initialize spark context ", ex); LOG.log(Level.WARNING, "unable to initialize spark context ", ex);
throw new HoodieException(ex); throw new HoodieException(ex);
} }
} }
@@ -90,7 +95,7 @@ public class SparkTempViewProvider implements TempViewProvider {
System.out.println("Wrote table view: " + tableName); System.out.println("Wrote table view: " + tableName);
} catch (Throwable ex) { } catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any // log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to write ", ex); LOG.log(Level.WARNING, "unable to write ", ex);
throw new HoodieException(ex); throw new HoodieException(ex);
} }
} }
@@ -101,7 +106,7 @@ public class SparkTempViewProvider implements TempViewProvider {
this.sqlContext.sql(sqlText).show(Integer.MAX_VALUE, false); this.sqlContext.sql(sqlText).show(Integer.MAX_VALUE, false);
} catch (Throwable ex) { } catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any // log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to read ", ex); LOG.log(Level.WARNING, "unable to read ", ex);
throw new HoodieException(ex); throw new HoodieException(ex);
} }
} }
@@ -112,7 +117,7 @@ public class SparkTempViewProvider implements TempViewProvider {
sqlContext.sql("SHOW TABLES").show(Integer.MAX_VALUE, false); sqlContext.sql("SHOW TABLES").show(Integer.MAX_VALUE, false);
} catch (Throwable ex) { } catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any // log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to get all views ", ex); LOG.log(Level.WARNING, "unable to get all views ", ex);
throw new HoodieException(ex); throw new HoodieException(ex);
} }
} }
@@ -123,7 +128,7 @@ public class SparkTempViewProvider implements TempViewProvider {
sqlContext.sql("DROP TABLE IF EXISTS " + tableName); sqlContext.sql("DROP TABLE IF EXISTS " + tableName);
} catch (Throwable ex) { } catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any // log full stack trace and rethrow. Without this its difficult to debug failures, if any
LOG.error("unable to initialize spark context ", ex); LOG.log(Level.WARNING, "unable to initialize spark context ", ex);
throw new HoodieException(ex); throw new HoodieException(ex);
} }
} }