首页 >> 软件 >> 大数据培训Spark SQL配置文件血缘扩展实战案例

大数据培训Spark SQL配置文件血缘扩展实战案例

2024-12-08 软件

ging{

override def parsePlan(sqlText: String): LogicalPlan = {

val lineAgeEnabled = SparkSession.getActiveSession

.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean

logDebug(s"SqlText: $sqlText")

if(sqlText.toLowerCase().contains("insert")){

if(lineAgeEnabled){

if(FIELD_LINE_AGE_SQL_COULD_SET.get()){

//调用本地变数在这里

FIELD_LINE_AGE_SQL.set(sqlText)

}

FIELD_LINE_AGE_SQL_COULD_SET.remove()

}

}

delegate.parsePlan(sqlText)

}

//初始化独有的sqlparser

override def parseExpression(sqlText: String): Expression = {

delegate.parseExpression(sqlText)

}

//初始化独有的sqlparser

override def parseTableIdentifier(sqlText: String): TableIdentifier = {

delegate.parseTableIdentifier(sqlText)

}

//初始化独有的sqlparser

override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {

delegate.parseFunctionIdentifier(sqlText)

}

//初始化独有的sqlparser

override def parseTableSchema(sqlText: String): StructType = {

delegate.parseTableSchema(sqlText)

}

//初始化独有的sqlparser

override def parseDataType(sqlText: String): DataType = {

delegate.parseDataType(sqlText)

}

}

3.3 拓展的规范类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {

val executor: ThreadPoolExecutor =

ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)

override def apply(plan: LogicalPlan): Unit = {

val sql = FIELD_LINE_AGE_SQL.get

FIELD_LINE_AGE_SQL.remove()

if(sql != null){

//这里我们取得sql然后触发一个调用做到余下的重构战斗任务

val task = new FieldLineageRunnableV3(sparkSession,sql)

executor.execute(task)

}

}

}

很单纯,我们只是取得了 SQL 然后再触发了一个调用去受益 SparkPlan,具体形式化在

FieldLineageRunnableV3。

3.4 具体的做到到新方法

3.4.1 受益 SparkPlan

我们在 run 新方法中所受益 SparkPlan:

override def run(): Unit = {

val parser = sparkSession.sessionState.sqlParser

val yzer = sparkSession.sessionState.yzer

val optimizer = sparkSession.sessionState.optimizer

val planner = sparkSession.sessionState.planner

val newPlan = parser.parsePlan(sql)

PASS_TABLE_AUTH.set(true)

val yzedPlan = yzer.executeAndCheck(newPlan)

val optimizerPlan = optimizer.execute(yzedPlan)

//受益sparkPlan

val sparkPlan = planner.plan(optimizerPlan).next()

if(targetTable != null){

val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()

val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()

//projection

projectionLineAge(levelProject, sparkPlan.child)

//predication

predicationLineAge(predicates, sparkPlan.child)

为什么要采用 SparkPlan 呢?当初我们权衡的时候,力学原计划拿取的设计文件人关系的时候是相当准的,且交换机相当短也更反之亦然。

在这里补充一下 Spark SQL 重构的过程如下:

经过SqlParser后时会受益形式化原计划,此时所列名、数组等都未重构,还不能执行者;经过Analyzer时会比对一些加载电子邮件,例如所列的测试、的设计文件电子邮件、数组电子邮件;经过Optimizer 后形式化原计划时会根据既定规范被最佳化,这里的规范是RBO,当然 Spark 还赞成CBO的最佳化;经过SparkPlanner后就变成了可执行者的力学原计划。

我们看一个形式化原计划与力学原计划对比的例证:

一个 SQL 解释器:

select item_id,TYPE,v_value,imei from t1

union all

select item_id,TYPE,v_value,imei from t2

union all

select item_id,TYPE,v_value,imei from t3

形式化原计划是这样的:

力学原计划是这样的:

似乎简化了很多。

受益 SparkPlan 后,我们就可以根据不同的SparkPlan路由器做到乘积处理。

我们将的设计文件血缘关系包含两种类型:projection(select查阅的设计文件)、predication(wehre查阅条件)。

这两种是一种互联的人关系,即从独有所列【关切尚硅谷,轻松学IT】的的设计文件生变成目标所列的的设计文件的相关联人关系。

想象一个查阅是一棵柏树,那么乘积人关系时会如下从柏树的顶端开始乘积,直到柏树的树干路由器,树干路由器即为独有所列:

那么我们乘积查阅的结果应该为

id ->tab1.id ,

name->tab1.name,tabb2.name,

age→tabb2.age。

似乎有该变数

val levelProject = new ArrayBuffer

[ArrayBuffer[NameExpressionHolder]](),通过projecti-onLineAge 乘积后 levelProject 加载了顶层id,name,age相关联的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

当然也不是单纯的递归乘积,还需权衡特殊持续性例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需特殊权衡。

例证及视觉效果:

SQL:

with A as (select id,name,age from tab1 where id> 100 ) ,

C as (select id,name,max(age) from A group by A.id,A.name) ,

B as (select id,name,age from tabb2 where age> 28)

insert into tab3

select C.id,concat(C.name,B.name) as name, B.age from

B,C where C.id = B.id

视觉效果:

{

"edges": [

{

"sources": [

3

],

"targets": [

0

],

"expression": "id",

"edgeType": "PROJECTION"

},

{

"sources": [

4,

7

],

"targets": [

1

],

"expression": "name",

"edgeType": "PROJECTION"

},

{

"sources": [

5

],

"targets": [

2

],

"expression": "age",

"edgeType": "PROJECTION"

},

{

"sources": [

6,

3

],

"targets": [

0,

1,

2

],

"expression": "INNER",

"edgeType": "PREDICATE"

},

{

"sources": [

6,

5

],

"targets": [

0,

1,

2

],

"expression": "((((default.tabb2.人口为120人age人口为120人 IS NOT NULL) AND (CAST(default.tabb2.人口为120人age人口为120人 AS INT)> 28)) AND (B.人口为120人id人口为120人> 100)) AND (B.人口为120人id人口为120人 IS NOT NULL))",

"edgeType": "PREDICATE"

},

{

"sources": [

3

],

"targets": [

0,

1,

2

],

"expression": "((default.tab1.人口为120人id人口为120人 IS NOT NULL) AND (default.tab1.人口为120人id人口为120人> 100))",

"edgeType": "PREDICATE"

}

],

"vertices": [

{

"id": 0,

"vertexType": "COLUMN",

"vertexId": "default.tab3.id"

},

{

"id": 1,

"vertexType": "COLUMN",

"vertexId": "default.tab3.name"

},

{

"id": 2,

"vertexType": "COLUMN",

"vertexId": "default.tab3.age"

},

{

"id": 3,

"vertexType": "COLUMN",

"vertexId": "default.tab1.id"

},

{

"id": 4,

"vertexType": "COLUMN",

"vertexId": "default.tab1.name"

},

{

"id": 5,

"vertexType": "COLUMN",

"vertexId": "default.tabb2.age"

},

{

"id": 6,

"vertexType": "COLUMN",

"vertexId": "default.tabb2.id"

},

{

"id": 7,

"vertexType": "COLUMN",

"vertexId": "default.tabb2.name"

}

]

}

四、总结

在 Spark SQL 的的设计文件血缘关系做到到中所,我们通过其自拓展,首先取得了 insert 解释器,在我们自己的健康检查规范中所取得

SQL 解释器,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,事与愿违受益了力学原计划。

我们通过乘积力学原计划,根据不同执行者原计划做到相关联的反转,www.atguigu.com然后就受益了的设计文件之两者二者之间的相关联人关系。当年前的做到到是借助于的,的设计文件之两者二者之间是斜向的相关联人关系,中所两者二者之间过程被忽略,如果想做到到的设计文件的反转的整个过程也是未问题的。

文章刊文;也于图表仓库与Python大图表

中所选读到:

大图表共同开发 Spark 可选之SparkSQL

Spark SQL之RDD反转DataFrame的新方法

大图表共同开发之SparkSQL面试篇

大图表共同开发之Spark SQL执行者性能的提高

人工泪液和眼药水的区别
葵花麦枣咀嚼片
健胃消食
达霏欣米诺地尔搽剂女用的
预约挂号
艾拉莫德片有哪些作用功效
痛风吃什么能缓解疼痛
999消痔软膏可以消除肉球吗
双氯芬酸钠凝胶能治颈椎病吗
急支糖浆说明书及用法
友情链接