[HUDI-3670] free temp views in sql transformers (#5080)
This commit is contained in:
@@ -49,8 +49,10 @@ public class FlatteningTransformer implements Transformer {
|
|||||||
// tmp table name doesn't like dashes
|
// tmp table name doesn't like dashes
|
||||||
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
|
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
|
||||||
LOG.info("Registering tmp table : " + tmpTable);
|
LOG.info("Registering tmp table : " + tmpTable);
|
||||||
rowDataset.registerTempTable(tmpTable);
|
rowDataset.createOrReplaceTempView(tmpTable);
|
||||||
return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
|
Dataset<Row> transformed = sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
|
||||||
|
sparkSession.catalog().dropTempView(tmpTable);
|
||||||
|
return transformed;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String flattenSchema(StructType schema, String prefix) {
|
public String flattenSchema(StructType schema, String prefix) {
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ public class SqlFileBasedTransformer implements Transformer {
|
|||||||
// tmp table name doesn't like dashes
|
// tmp table name doesn't like dashes
|
||||||
final String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
|
final String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
|
||||||
LOG.info("Registering tmp table : " + tmpTable);
|
LOG.info("Registering tmp table : " + tmpTable);
|
||||||
rowDataset.registerTempTable(tmpTable);
|
rowDataset.createOrReplaceTempView(tmpTable);
|
||||||
|
|
||||||
try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) {
|
try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) {
|
||||||
Dataset<Row> rows = null;
|
Dataset<Row> rows = null;
|
||||||
@@ -95,6 +95,8 @@ public class SqlFileBasedTransformer implements Transformer {
|
|||||||
return rows;
|
return rows;
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
throw new HoodieIOException("Error reading transformer SQL file.", ioe);
|
throw new HoodieIOException("Error reading transformer SQL file.", ioe);
|
||||||
|
} finally {
|
||||||
|
sparkSession.catalog().dropTempView(tmpTable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -60,9 +60,11 @@ public class SqlQueryBasedTransformer implements Transformer {
|
|||||||
// tmp table name doesn't like dashes
|
// tmp table name doesn't like dashes
|
||||||
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
|
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
|
||||||
LOG.info("Registering tmp table : " + tmpTable);
|
LOG.info("Registering tmp table : " + tmpTable);
|
||||||
rowDataset.registerTempTable(tmpTable);
|
rowDataset.createOrReplaceTempView(tmpTable);
|
||||||
String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
|
String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
|
||||||
LOG.debug("SQL Query for transformation : (" + sqlStr + ")");
|
LOG.debug("SQL Query for transformation : (" + sqlStr + ")");
|
||||||
return sparkSession.sql(sqlStr);
|
Dataset<Row> transformed = sparkSession.sql(sqlStr);
|
||||||
|
sparkSession.catalog().dropTempView(tmpTable);
|
||||||
|
return transformed;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user