Spark簡(jiǎn)介
Spark是整個(gè)BDAS的核心組件,是一個(gè)大數(shù)據(jù)分布式編程框架,不僅實(shí)現(xiàn)了MapReduce的算子map 函數(shù)和reduce函數(shù)及計(jì)算模型,還提供更為豐富的算子,如filter、join、groupByKey等。是一個(gè)用來(lái)實(shí)現(xiàn)快速而同用的集群計(jì)算的平臺(tái)。
Spark將分布式數(shù)據(jù)抽象為彈性分布式數(shù)據(jù)集(RDD),實(shí)現(xiàn)了應(yīng)用任務(wù)調(diào)度、RPC、序列化和壓縮,并為運(yùn)行在其上的上層組件提供API。其底層采用Scala這種函數(shù)式語(yǔ)言書(shū)寫(xiě)而成,并且所提供的API深度借鑒Scala函數(shù)式的編程思想,提供與Scala類(lèi)似的編程接口
Sparkon Yarn

從用戶(hù)提交作業(yè)到作業(yè)運(yùn)行結(jié)束整個(gè)運(yùn)行期間的過(guò)程分析。
一、客戶(hù)端進(jìn)行操作
根據(jù)yarnConf來(lái)初始化yarnClient,并啟動(dòng)yarnClient
創(chuàng)建客戶(hù)端Application,并獲取Application的ID,進(jìn)一步判斷集群中的資源是否滿(mǎn)足executor和ApplicationMaster申請(qǐng)的資源,如果不滿(mǎn)足則拋出IllegalArgumentException;
設(shè)置資源、環(huán)境變量:其中包括了設(shè)置Application的Staging目錄、準(zhǔn)備本地資源(jar文件、log4j.properties)、設(shè)置Application其中的環(huán)境變量、創(chuàng)建Container啟動(dòng)的Context等;
設(shè)置Application提交的Context,包括設(shè)置應(yīng)用的名字、隊(duì)列、AM的申請(qǐng)的Container、標(biāo)記該作業(yè)的類(lèi)型為Spark;
申請(qǐng)Memory,并最終通過(guò)yarnClient.submitApplication向ResourceManager提交該Application。
當(dāng)作業(yè)提交到Y(jié)ARN上之后,客戶(hù)端就沒(méi)事了,甚至在終端關(guān)掉那個(gè)進(jìn)程也沒(méi)事,因?yàn)檎麄€(gè)作業(yè)運(yùn)行在YARN集群上進(jìn)行,運(yùn)行的結(jié)果將會(huì)保存到HDFS或者日志中。
二、提交到Y(jié)ARN集群,YARN操作
運(yùn)行ApplicationMaster的run方法;
設(shè)置好相關(guān)的環(huán)境變量。
創(chuàng)建amClient,并啟動(dòng);
在Spark UI啟動(dòng)之前設(shè)置Spark UI的AmIpFilter;
在startUserClass函數(shù)專(zhuān)門(mén)啟動(dòng)了一個(gè)線程(名稱(chēng)為Driver的線程)來(lái)啟動(dòng)用戶(hù)提交的Application,也就是啟動(dòng)了Driver。在Driver中將會(huì)初始化SparkContext;
等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(shù)(默認(rèn)為10),如果等待了的次數(shù)超過(guò)了配置的,程序?qū)?huì)退出;否則用SparkContext初始化yarnAllocator;
當(dāng)SparkContext、Driver初始化完成的時(shí)候,通過(guò)amClient向ResourceManager注冊(cè)ApplicationMaster
分配并啟動(dòng)Executeors。在啟動(dòng)Executeors之前,先要通過(guò)yarnAllocator獲取到numExecutors個(gè)Container,然后在Container中啟動(dòng)Executeors。
那么這個(gè)Application將失敗,將Application Status標(biāo)明為FAILED,并將關(guān)閉SparkContext。其實(shí),啟動(dòng)Executeors是通過(guò)ExecutorRunnable實(shí)現(xiàn)的,而ExecutorRunnable內(nèi)部是啟動(dòng)CoarseGrainedExecutorBackend的。
最后,Task將在CoarseGrainedExecutorBackend里面運(yùn)行,然后運(yùn)行狀況會(huì)通過(guò)Akka通知CoarseGrainedScheduler,直到作業(yè)運(yùn)行完成。
Spark節(jié)點(diǎn)的概念
一、Spark驅(qū)動(dòng)器是執(zhí)行程序中的main()方法的進(jìn)程。它執(zhí)行用戶(hù)編寫(xiě)的用來(lái)創(chuàng)建SparkContext(初始化)、創(chuàng)建RDD,以及運(yùn)行RDD的轉(zhuǎn)化操作和行動(dòng)操作的代碼。
驅(qū)動(dòng)器節(jié)點(diǎn)driver的職責(zé):
把用戶(hù)程序轉(zhuǎn)為任務(wù)task(driver)
Spark驅(qū)動(dòng)器程序負(fù)責(zé)把用戶(hù)程序轉(zhuǎn)化為多個(gè)物理執(zhí)行單元,這些單元也被稱(chēng)之為任務(wù)task(詳解見(jiàn)備注)
為執(zhí)行器節(jié)點(diǎn)調(diào)度任務(wù)(executor)
有了物理計(jì)劃之后,Spark驅(qū)動(dòng)器在各個(gè)執(zhí)行器節(jié)點(diǎn)進(jìn)程間協(xié)調(diào)任務(wù)的調(diào)度。Spark驅(qū)動(dòng)器程序會(huì)根據(jù)當(dāng)前的執(zhí)行器節(jié)點(diǎn),把所有任務(wù)基于數(shù)據(jù)所在位置分配給合適的執(zhí)行器進(jìn)程。當(dāng)執(zhí)行任務(wù)時(shí),執(zhí)行器進(jìn)程會(huì)把緩存的數(shù)據(jù)存儲(chǔ)起來(lái),而驅(qū)動(dòng)器進(jìn)程同樣會(huì)跟蹤這些緩存數(shù)據(jù)的位置,并利用這些位置信息來(lái)調(diào)度以后的任務(wù),以盡量減少數(shù)據(jù)的網(wǎng)絡(luò)傳輸。(就是所謂的移動(dòng)計(jì)算,而不移動(dòng)數(shù)據(jù))。
二、執(zhí)行器節(jié)點(diǎn)
作用:
負(fù)責(zé)運(yùn)行組成Spark應(yīng)用的任務(wù),并將結(jié)果返回給驅(qū)動(dòng)器進(jìn)程;
通過(guò)自身的塊管理器(blockManager)為用戶(hù)程序中要求緩存的RDD提供內(nèi)存式存儲(chǔ)。RDD是直接緩存在執(zhí)行器進(jìn)程內(nèi)的,因此任務(wù)可以在運(yùn)行時(shí)充分利用緩存數(shù)據(jù)加快運(yùn)算。
驅(qū)動(dòng)器的職責(zé):
所有的Spark程序都遵循同樣的結(jié)構(gòu):程序從輸入數(shù)據(jù)創(chuàng)建一系列RDD,再使用轉(zhuǎn)化操作派生成新的RDD,最后使用行動(dòng)操作手機(jī)或存儲(chǔ)結(jié)果RDD,Spark程序其實(shí)是隱式地創(chuàng)建出了一個(gè)由操作組成的邏輯上的有向無(wú)環(huán)圖DAG。當(dāng)驅(qū)動(dòng)器程序執(zhí)行時(shí),它會(huì)把這個(gè)邏輯圖轉(zhuǎn)為物理執(zhí)行計(jì)劃。
這樣 Spark就把邏輯計(jì)劃轉(zhuǎn)為一系列步驟(stage),而每個(gè)步驟又由多個(gè)任務(wù)組成。這些任務(wù)會(huì)被打包送到集群中。
Spark初始化
每個(gè)Spark應(yīng)用都由一個(gè)驅(qū)動(dòng)器程序來(lái)發(fā)起集群上的各種并行操作。驅(qū)動(dòng)器程序包含應(yīng)用的main函數(shù),并且定義了集群上的分布式數(shù)據(jù)集,以及對(duì)該分布式數(shù)據(jù)集應(yīng)用了相關(guān)操作。
驅(qū)動(dòng)器程序通過(guò)一個(gè)SparkContext對(duì)象來(lái)訪問(wèn)spark,這個(gè)對(duì)象代表對(duì)計(jì)算集群的一個(gè)連接。(比如在sparkshell啟動(dòng)時(shí)已經(jīng)自動(dòng)創(chuàng)建了一個(gè)SparkContext對(duì)象,是一個(gè)叫做SC的變量。(下圖,查看變量sc)

一旦創(chuàng)建了sparkContext,就可以用它來(lái)創(chuàng)建RDD。比如調(diào)用sc.textFile()來(lái)創(chuàng)建一個(gè)代表文本中各行文本的RDD。(比如vallinesRDD = sc.textFile(“yangsy.text”),val spark = linesRDD.filter(line=>line.contains(“spark”),spark.count())
執(zhí)行這些操作,驅(qū)動(dòng)器程序一般要管理多個(gè)執(zhí)行器,就是我們所說(shuō)的executor節(jié)點(diǎn)。
在初始化SparkContext的同時(shí),加載sparkConf對(duì)象來(lái)加載集群的配置,從而創(chuàng)建sparkContext對(duì)象。
從源碼中可以看到,在啟動(dòng)thriftserver時(shí),調(diào)用了spark- daemon.sh文件,該文件源碼如左圖,加載spark_home下的conf中的文件。

(在執(zhí)行后臺(tái)代碼時(shí),需要首先創(chuàng)建conf對(duì)象,加載相應(yīng)參數(shù), val sparkConf = newSparkConf().setMaster("local").setAppName("cocapp").set("spark.executor.memory","1g"), val sc: SparkContext = new SparkContext(sparkConf))
RDD工作原理:
RDD(Resilient DistributedDatasets)[1] ,彈性分布式數(shù)據(jù)集,是分布式內(nèi)存的一個(gè)抽象概念,RDD提供了一種高度受限的共享內(nèi)存模型,即RDD是只讀的記錄分區(qū)的集合,只能通過(guò)在其他RDD執(zhí)行確定的轉(zhuǎn)換操作(如map、join和group by)而創(chuàng)建,然而這些限制使得實(shí)現(xiàn)容錯(cuò)的開(kāi)銷(xiāo)很低。對(duì)開(kāi)發(fā)者而言,RDD可以看作是Spark的一個(gè)對(duì)象,它本身運(yùn)行于內(nèi)存中,如讀文件是一個(gè)RDD,對(duì)文件計(jì)算是一個(gè)RDD,結(jié)果集也是一個(gè)RDD ,不同的分片、數(shù)據(jù)之間的依賴(lài)、key-value類(lèi)型的map數(shù)據(jù)都可以看做RDD。
主要分為三部分:創(chuàng)建RDD對(duì)象,DAG調(diào)度器創(chuàng)建執(zhí)行計(jì)劃,Task調(diào)度器分配任務(wù)并調(diào)度Worker開(kāi)始運(yùn)行。
SparkContext(RDD相關(guān)操作)→通過(guò)(提交作業(yè))→(遍歷RDD拆分stage→生成作業(yè))DAGScheduler→通過(guò)(提交任務(wù)集)→任務(wù)調(diào)度管理(TaskScheduler)→通過(guò)(按照資源獲取任務(wù))→任務(wù)調(diào)度管理(TaskSetManager)
Transformation返回值還是一個(gè)RDD。它使用了鏈?zhǔn)秸{(diào)用的設(shè)計(jì)模式,對(duì)一個(gè)RDD進(jìn)行計(jì)算后,變換成另外一個(gè)RDD,然后這個(gè)RDD又可以進(jìn)行另外一次轉(zhuǎn)換。這個(gè)過(guò)程是分布式的。
Action返回值不是一個(gè)RDD。它要么是一個(gè)Scala的普通集合,要么是一個(gè)值,要么是空,最終或返回到Driver程序,或把RDD寫(xiě)入到文件系統(tǒng)中
轉(zhuǎn)換(Transformations)(如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說(shuō)從一個(gè)RDD轉(zhuǎn)換生成另一個(gè)RDD的操作不是馬上執(zhí)行,Spark在遇到Transformations操作時(shí)只會(huì)記錄需要這樣的操作,并不會(huì)去執(zhí)行,需要等到有Actions操作的時(shí)候才會(huì)真正啟動(dòng)計(jì)算過(guò)程進(jìn)行計(jì)算。
操作(Actions)(如:count, collect, save等),Actions操作會(huì)返回結(jié)果或把RDD數(shù)據(jù)寫(xiě)到存儲(chǔ)系統(tǒng)中。Actions是觸發(fā)Spark啟動(dòng)計(jì)算的動(dòng)因。
它們本質(zhì)區(qū)別是:Transformation返回值還是一個(gè)RDD。它使用了鏈?zhǔn)秸{(diào)用的設(shè)計(jì)模式,對(duì)一個(gè)RDD進(jìn)行計(jì)算后,變換成另外一個(gè)RDD,然后這個(gè)RDD又可以進(jìn)行另外一次轉(zhuǎn)換。這個(gè)過(guò)程是分布式的。Action返回值不是一個(gè)RDD。它要么是一個(gè)Scala的普通集合,要么是一個(gè)值,要么是空,最終或返回到Driver程序,或把RDD寫(xiě)入到文件系統(tǒng)中。關(guān)于這兩個(gè)動(dòng)作,在Spark開(kāi)發(fā)指南中會(huì)有就進(jìn)一步的詳細(xì)介紹,它們是基于Spark開(kāi)發(fā)的核心。
RDD基礎(chǔ)
Spark中的RDD就是一個(gè)不可變的分布式對(duì)象集合。每個(gè)RDD都被分為多個(gè)分區(qū),這些分區(qū)運(yùn)行在集群的不同節(jié)點(diǎn)上。創(chuàng)建RDD的方法有兩種:一種是讀取一個(gè)外部數(shù)據(jù)集;一種是在群東程序里分發(fā)驅(qū)動(dòng)器程序中的對(duì)象集合,不如剛才的示例,讀取文本文件作為一個(gè)字符串的RDD的示例。
創(chuàng)建出來(lái)后,RDD支持兩種類(lèi)型的操作:轉(zhuǎn)化操作和行動(dòng)操作
轉(zhuǎn)化操作會(huì)由一個(gè)RDD生成一個(gè)新的RDD。(比如剛才的根據(jù)謂詞篩選)
行動(dòng)操作會(huì)對(duì)RDD計(jì)算出一個(gè)結(jié)果,并把結(jié)果返回到驅(qū)動(dòng)器程序中,或把結(jié)果存儲(chǔ)到外部存儲(chǔ)系統(tǒng)(比如HDFS)中。比如first()操作就是一個(gè)行動(dòng)操作,會(huì)返回RDD的第一個(gè)元素。
注:轉(zhuǎn)化操作與行動(dòng)操作的區(qū)別在于Spark計(jì)算RDD的方式不同。雖然你可以在任何時(shí)候定義一個(gè)新的RDD,但Spark只會(huì)惰性計(jì)算這些RDD。它們只有第一個(gè)在一個(gè)行動(dòng)操作中用到時(shí),才會(huì)真正的計(jì)算。之所以這樣設(shè)計(jì),是因?yàn)楸热鐒偛耪{(diào)用sc.textFile(...)時(shí)就把文件中的所有行都讀取并存儲(chǔ)起來(lái),就會(huì)消耗很多存儲(chǔ)空間,而我們馬上又要篩選掉其中的很多數(shù)據(jù)。
這里還需要注意的一點(diǎn)是,spark會(huì)在你每次對(duì)它們進(jìn)行行動(dòng)操作時(shí)重新計(jì)算。如果想在多個(gè)行動(dòng)操作中重用同一個(gè)RDD,那么可以使用RDD.persist()或RDD.collect()讓Spark把這個(gè)RDD緩存下來(lái)。(可以是內(nèi)存,也可以是磁盤(pán))
Spark會(huì)使用譜系圖來(lái)記錄這些不同RDD之間的依賴(lài)關(guān)系,Spark需要用這些信息來(lái)按需計(jì)算每個(gè)RDD,也可以依靠譜系圖在持久化的RDD丟失部分?jǐn)?shù)據(jù)時(shí)用來(lái)恢復(fù)所丟失的數(shù)據(jù)。(如下圖,過(guò)濾errorsRDD與warningsRDD,最終調(diào)用union()函數(shù))

RDD計(jì)算方式

RDD的寬窄依賴(lài)

窄依賴(lài) (narrowdependencies) 和寬依賴(lài) (widedependencies) 。窄依賴(lài)是指 父 RDD 的每個(gè)分區(qū)都只被子 RDD 的一個(gè)分區(qū)所使用 。相應(yīng)的,那么寬依賴(lài)就是指父 RDD 的分區(qū)被多個(gè)子 RDD 的分區(qū)所依賴(lài)。例如, map 就是一種窄依賴(lài),而 join 則會(huì)導(dǎo)致寬依賴(lài)
這種劃分有兩個(gè)用處。首先,窄依賴(lài)支持在一個(gè)結(jié)點(diǎn)上管道化執(zhí)行。例如基于一對(duì)一的關(guān)系,可以在 filter 之后執(zhí)行 map 。其次,窄依賴(lài)支持更高效的故障還原。因?yàn)閷?duì)于窄依賴(lài),只有丟失的父 RDD 的分區(qū)需要重新計(jì)算。而對(duì)于寬依賴(lài),一個(gè)結(jié)點(diǎn)的故障可能導(dǎo)致來(lái)自所有父 RDD 的分區(qū)丟失,因此就需要完全重新執(zhí)行。因此對(duì)于寬依賴(lài),Spark 會(huì)在持有各個(gè)父分區(qū)的結(jié)點(diǎn)上,將中間數(shù)據(jù)持久化來(lái)簡(jiǎn)化故障還原,就像 MapReduce 會(huì)持久化 map 的輸出一樣。
SparkExample

步驟 1 :創(chuàng)建 RDD 。上面的例子除去最后一個(gè) collect 是個(gè)動(dòng)作,不會(huì)創(chuàng)建 RDD 之外,前面四個(gè)轉(zhuǎn)換都會(huì)創(chuàng)建出新的 RDD 。因此第一步就是創(chuàng)建好所有 RDD( 內(nèi)部的五項(xiàng)信息 ) 。
步驟 2 :創(chuàng)建執(zhí)行計(jì)劃。Spark 會(huì)盡可能地管道化,并基于是否要重新組織數(shù)據(jù)來(lái)劃分 階段 (stage) ,例如本例中的 groupBy() 轉(zhuǎn)換就會(huì)將整個(gè)執(zhí)行計(jì)劃劃分成兩階段執(zhí)行。最終會(huì)產(chǎn)生一個(gè) DAG(directedacyclic graph ,有向無(wú)環(huán)圖 ) 作為邏輯執(zhí)行計(jì)劃。
步驟 3 :調(diào)度任務(wù)。 將各階段劃分成不同的 任務(wù) (task) ,每個(gè)任務(wù)都是數(shù)據(jù)和計(jì)算的合體。在進(jìn)行下一階段前,當(dāng)前階段的所有任務(wù)都要執(zhí)行完成。因?yàn)橄乱浑A段的第一個(gè)轉(zhuǎn)換一定是重新組織數(shù)據(jù)的,所以必須等當(dāng)前階段所有結(jié)果數(shù)據(jù)都計(jì)算出來(lái)了才能繼續(xù)。
假設(shè)本例中的 hdfs://names 下有四個(gè)文件塊,那么 HadoopRDD 中 partitions 就會(huì)有四個(gè)分區(qū)對(duì)應(yīng)這四個(gè)塊數(shù)據(jù),同時(shí) preferedLocations 會(huì)指明這四個(gè)塊的最佳位置?,F(xiàn)在,就可以創(chuàng)建出四個(gè)任務(wù),并調(diào)度到合適的集群結(jié)點(diǎn)上。
Spark數(shù)據(jù)分區(qū)
Spark的特性是對(duì)數(shù)據(jù)集在節(jié)點(diǎn)間的分區(qū)進(jìn)行控制。在分布式系統(tǒng)中,通訊的代價(jià)是巨大的,控制數(shù)據(jù)分布以獲得最少的網(wǎng)絡(luò)傳輸可以極大地提升整體性能。Spark程序可以通過(guò)控制RDD分區(qū)方式來(lái)減少通訊的開(kāi)銷(xiāo)。
Spark中所有的鍵值對(duì)RDD都可以進(jìn)行分區(qū)。確保同一組的鍵出現(xiàn)在同一個(gè)節(jié)點(diǎn)上。比如,使用哈希分區(qū)將一個(gè)RDD分成了100個(gè)分區(qū),此時(shí)鍵的哈希值對(duì)100取模的結(jié)果相同的記錄會(huì)被放在一個(gè)節(jié)點(diǎn)上。
(可使用partitionBy(newHashPartitioner(100)).persist()來(lái)構(gòu)造100個(gè)分區(qū))
Spark中的許多操作都引入了將數(shù)據(jù)根據(jù)鍵跨界點(diǎn)進(jìn)行混洗的過(guò)程。(比如:join(),leftOuterJoin(),groupByKey(),reducebyKey()等)對(duì)于像reduceByKey()這樣只作用于單個(gè)RDD的操作,運(yùn)行在未分區(qū)的RDD上的時(shí)候會(huì)導(dǎo)致每個(gè)鍵的所有對(duì)應(yīng)值都在每臺(tái)機(jī)器上進(jìn)行本地計(jì)算。
SparkSQL的shuffle過(guò)程

Spark SQL的核心是把已有的RDD,帶上Schema信息,然后注冊(cè)成類(lèi)似sql里的”Table”,對(duì)其進(jìn)行sql查詢(xún)。這里面主要分兩部分,一是生成SchemaRD,二是執(zhí)行查詢(xún)。
如果是spark-hive項(xiàng)目,那么讀取metadata信息作為Schema、讀取hdfs上數(shù)據(jù)的過(guò)程交給Hive完成,然后根據(jù)這倆部分生成SchemaRDD,在HiveContext下進(jìn)行hql()查詢(xún)。
SparkSQL結(jié)構(gòu)化數(shù)據(jù)
首先說(shuō)一下ApacheHive,Hive可以在HDFS內(nèi)或者在其他存儲(chǔ)系統(tǒng)上存儲(chǔ)多種格式的表。SparkSQL可以讀取Hive支持的任何表。要把Spark SQL連接已有的hive上,需要提供Hive的配置文件。hive-site.xml文件復(fù)制到spark的conf文件夾下。再創(chuàng)建出HiveContext對(duì)象(sparksql的入口),然后就可以使用HQL來(lái)對(duì)表進(jìn)行查詢(xún),并以由行足證的RDD的形式拿到返回的數(shù)據(jù)。
創(chuàng)建Hivecontext并查詢(xún)數(shù)據(jù)
importorg.apache.spark.sql.hive.HiveContext
valhiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
valrows = hiveCtx.sql(“SELECT name,age FROM users”)
valfitstRow – rows.first()
println(fitstRow.getSgtring(0)) //字段0是name字段
通過(guò)jdbc連接外部數(shù)據(jù)源更新與加載
Class.forName("com.mysql.jdbc.Driver")
val conn =DriverManager.getConnection(mySQLUrl)
val stat1 =conn.createStatement()
stat1.execute("UPDATE CI_LABEL_INFO set DATA_STATUS_ID = 2 , DATA_DATE ='" + dataDate +"' where LABEL_ID in ("+allCreatedLabels.mkString(",")+")")
stat1.close()
//加載外部數(shù)據(jù)源數(shù)據(jù)到內(nèi)存
valDIM_COC_INDEX_MODEL_TABLE_CONF =sqlContext.jdbc(mySQLUrl,"DIM_COC_INDEX_MODEL_TABLE_CONF").cache()
val targets =DIM_COC_INDEX_MODEL_TABLE_CONF.filter("TABLE_DATA_CYCLE ="+TABLE_DATA_CYCLE).collect
SparkSQL解析

首先說(shuō)下傳統(tǒng)數(shù)據(jù)庫(kù)的解析,傳統(tǒng)數(shù)據(jù)庫(kù)的解析過(guò)程是按Rusult、Data Source、Operation的次序來(lái)解析的。傳統(tǒng)數(shù)據(jù)庫(kù)先將讀入的SQL語(yǔ)句進(jìn)行解析,分辨出SQL語(yǔ)句中哪些詞是關(guān)鍵字(如select,from,where),哪些是表達(dá)式,哪些是Projection,哪些是Data Source等等。進(jìn)一步判斷SQL語(yǔ)句是否規(guī)范,不規(guī)范就報(bào)錯(cuò),規(guī)范則按照下一步過(guò)程綁定(Bind)。過(guò)程綁定是將SQL語(yǔ)句和數(shù)據(jù)庫(kù)的數(shù)據(jù)字典(列,表,視圖等)進(jìn)行綁定,如果相關(guān)的Projection、Data Source等都存在,就表示這個(gè)SQL語(yǔ)句是可以執(zhí)行的。在執(zhí)行過(guò)程中,有時(shí)候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運(yùn)行剛運(yùn)行過(guò)的SQL語(yǔ)句,直接從數(shù)據(jù)庫(kù)的緩沖池中獲取返回結(jié)果。在數(shù)據(jù)庫(kù)解析的過(guò)程中SQL語(yǔ)句時(shí),將會(huì)把SQL語(yǔ)句轉(zhuǎn)化成一個(gè)樹(shù)形結(jié)構(gòu)來(lái)進(jìn)行處理,會(huì)形成一個(gè)或含有多個(gè)節(jié)點(diǎn)(TreeNode)的Tree,然后再后續(xù)的處理政對(duì)該Tree進(jìn)行一系列的操作。
Spark SQL對(duì)SQL語(yǔ)句的處理和關(guān)系數(shù)據(jù)庫(kù)對(duì)SQL語(yǔ)句的解析采用了類(lèi)似的方法,首先會(huì)將SQL語(yǔ)句進(jìn)行解析,然后形成一個(gè)Tree,后續(xù)如綁定、優(yōu)化等處理過(guò)程都是對(duì)Tree的操作,而操作方法是采用Rule,通過(guò)模式匹配,對(duì)不同類(lèi)型的節(jié)點(diǎn)采用不同的操作。SparkSQL有兩個(gè)分支,sqlContext和hiveContext。sqlContext現(xiàn)在只支持SQL語(yǔ)法解析器(Catalyst),hiveContext支持SQL語(yǔ)法和HiveContext語(yǔ)法解析器。
原文地址:http://mt.sohu.com/20160522/n450849016.shtml