大数据培训Spark SQL配置文件血缘扩展实战案例
2024-12-08 软件
override def parsePlan(sqlText: String): LogicalPlan = {
val lineAgeEnabled = SparkSession.getActiveSession
.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
logDebug(s"SqlText: $sqlText")
if(sqlText.toLowerCase().contains("insert")){
if(lineAgeEnabled){
if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
//调用本地变数在这里
FIELD_LINE_AGE_SQL.set(sqlText)
}
FIELD_LINE_AGE_SQL_COULD_SET.remove()
}
}
delegate.parsePlan(sqlText)
}
//初始化独有的sqlparser
override def parseExpression(sqlText: String): Expression = {
delegate.parseExpression(sqlText)
}
//初始化独有的sqlparser
override def parseTableIdentifier(sqlText: String): TableIdentifier = {
delegate.parseTableIdentifier(sqlText)
}
//初始化独有的sqlparser
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
delegate.parseFunctionIdentifier(sqlText)
}
//初始化独有的sqlparser
override def parseTableSchema(sqlText: String): StructType = {
delegate.parseTableSchema(sqlText)
}
//初始化独有的sqlparser
override def parseDataType(sqlText: String): DataType = {
delegate.parseDataType(sqlText)
}
}
3.3 拓展的规范类
case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {
val executor: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
override def apply(plan: LogicalPlan): Unit = {
val sql = FIELD_LINE_AGE_SQL.get
FIELD_LINE_AGE_SQL.remove()
if(sql != null){
//这里我们取得sql然后触发一个调用做到余下的重构战斗任务
val task = new FieldLineageRunnableV3(sparkSession,sql)
executor.execute(task)
}
}
}
很单纯,我们只是取得了 SQL 然后再触发了一个调用去受益 SparkPlan,具体形式化在
FieldLineageRunnableV3。
3.4 具体的做到到新方法
3.4.1 受益 SparkPlan
我们在 run 新方法中所受益 SparkPlan:
override def run(): Unit = {
val parser = sparkSession.sessionState.sqlParser
val yzer = sparkSession.sessionState.yzer
val optimizer = sparkSession.sessionState.optimizer
val planner = sparkSession.sessionState.planner
val newPlan = parser.parsePlan(sql)
PASS_TABLE_AUTH.set(true)
val yzedPlan = yzer.executeAndCheck(newPlan)
val optimizerPlan = optimizer.execute(yzedPlan)
//受益sparkPlan
val sparkPlan = planner.plan(optimizerPlan).next()
if(targetTable != null){
val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
//projection
projectionLineAge(levelProject, sparkPlan.child)
//predication
predicationLineAge(predicates, sparkPlan.child)
为什么要采用 SparkPlan 呢?当初我们权衡的时候,力学原计划拿取的设计文件人关系的时候是相当准的,且交换机相当短也更反之亦然。
在这里补充一下 Spark SQL 重构的过程如下:
经过SqlParser后时会受益形式化原计划,此时所列名、数组等都未重构,还不能执行者;经过Analyzer时会比对一些加载电子邮件,例如所列的测试、的设计文件电子邮件、数组电子邮件;经过Optimizer 后形式化原计划时会根据既定规范被最佳化,这里的规范是RBO,当然 Spark 还赞成CBO的最佳化;经过SparkPlanner后就变成了可执行者的力学原计划。
我们看一个形式化原计划与力学原计划对比的例证:
一个 SQL 解释器:
select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3
形式化原计划是这样的:
力学原计划是这样的:
似乎简化了很多。
受益 SparkPlan 后,我们就可以根据不同的SparkPlan路由器做到乘积处理。
我们将的设计文件血缘关系包含两种类型:projection(select查阅的设计文件)、predication(wehre查阅条件)。
这两种是一种互联的人关系,即从独有所列【关切尚硅谷,轻松学IT】的的设计文件生变成目标所列的的设计文件的相关联人关系。
想象一个查阅是一棵柏树,那么乘积人关系时会如下从柏树的顶端开始乘积,直到柏树的树干路由器,树干路由器即为独有所列:
那么我们乘积查阅的结果应该为
id ->tab1.id ,
name->tab1.name,tabb2.name,
age→tabb2.age。
似乎有该变数
val levelProject = new ArrayBuffer
[ArrayBuffer[NameExpressionHolder]](),通过projecti-onLineAge 乘积后 levelProject 加载了顶层id,name,age相关联的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。
当然也不是单纯的递归乘积,还需权衡特殊持续性例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需特殊权衡。
例证及视觉效果:
SQL:
with A as (select id,name,age from tab1 where id> 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age> 28)
insert into tab3
select C.id,concat(C.name,B.name) as name, B.age from
B,C where C.id = B.id
视觉效果:
{
"edges": [
{
"sources": [
3
],
"targets": [
0
],
"expression": "id",
"edgeType": "PROJECTION"
},
{
"sources": [
4,
7
],
"targets": [
1
],
"expression": "name",
"edgeType": "PROJECTION"
},
{
"sources": [
5
],
"targets": [
2
],
"expression": "age",
"edgeType": "PROJECTION"
},
{
"sources": [
6,
3
],
"targets": [
0,
1,
2
],
"expression": "INNER",
"edgeType": "PREDICATE"
},
{
"sources": [
6,
5
],
"targets": [
0,
1,
2
],
"expression": "((((default.tabb2.人口为120人age人口为120人 IS NOT NULL) AND (CAST(default.tabb2.人口为120人age人口为120人 AS INT)> 28)) AND (B.人口为120人id人口为120人> 100)) AND (B.人口为120人id人口为120人 IS NOT NULL))",
"edgeType": "PREDICATE"
},
{
"sources": [
3
],
"targets": [
0,
1,
2
],
"expression": "((default.tab1.人口为120人id人口为120人 IS NOT NULL) AND (default.tab1.人口为120人id人口为120人> 100))",
"edgeType": "PREDICATE"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "default.tab3.id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "default.tab3.name"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "default.tab3.age"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "default.tab1.id"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "default.tab1.name"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.age"
},
{
"id": 6,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.id"
},
{
"id": 7,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.name"
}
]
}
四、总结
在 Spark SQL 的的设计文件血缘关系做到到中所,我们通过其自拓展,首先取得了 insert 解释器,在我们自己的健康检查规范中所取得
SQL 解释器,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,事与愿违受益了力学原计划。
我们通过乘积力学原计划,根据不同执行者原计划做到相关联的反转,www.atguigu.com然后就受益了的设计文件之两者二者之间的相关联人关系。当年前的做到到是借助于的,的设计文件之两者二者之间是斜向的相关联人关系,中所两者二者之间过程被忽略,如果想做到到的设计文件的反转的整个过程也是未问题的。
文章刊文;也于图表仓库与Python大图表
中所选读到:
大图表共同开发 Spark 可选之SparkSQL
Spark SQL之RDD反转DataFrame的新方法
大图表共同开发之SparkSQL面试篇
大图表共同开发之Spark SQL执行者性能的提高
。人工泪液和眼药水的区别葵花麦枣咀嚼片
健胃消食
达霏欣米诺地尔搽剂女用的
预约挂号
艾拉莫德片有哪些作用功效
痛风吃什么能缓解疼痛
999消痔软膏可以消除肉球吗
双氯芬酸钠凝胶能治颈椎病吗
急支糖浆说明书及用法
- 05-10能做主力机的折叠屏,荣耀Magic V发布即火热,期权也要抢
- 05-10泰坦尼克号唯一活从前的日本人,一生背负骂名,死后真相才被公开
- 05-10三星推出电子衣橱:AirDress AI衣艾玛 即日起上市
- 05-101950年,韶山冲不知道怎么细分伟人的成分?主席:按政策办事
- 05-10办公处白领常备的几个制作甘特图软件
- 05-10的贡献有多了不起?
- 05-10华为永久关闭手机Tizen应用商店 只保留于电视
- 05-10荣耀Magic V发售 万元拉链屏旗舰 是你的菜吗?
- 05-101969年李宗仁逝世,周总理在葬礼报告上删改四个文,毛主席:同意
- 05-10消息称2025年前不要期望玩到《侠III车手6》