Posted on 2019-04-11 14:14
大大毛 閱讀(476)
評(píng)論(0) 編輯 收藏 所屬分類(lèi):
Nifi
需求:
接收Kafka資料,資料具有Key列(多列),有新增、修改但無(wú)刪除,需要同步落地至MariaDB解決方案(僅新增、修改):
這個(gè)場(chǎng)景是最常見(jiàn)的,資料不會(huì)有被刪除的狀態(tài),所有的更新就只有Insert Or Update這兩種狀態(tài),先上實(shí)例的圖 (兩邊的LogMessage是為了接收Fail,有感嘆號(hào)是避免一起開(kāi)啟的時(shí)候它也被開(kāi)啟----這樣failure的訊息就不會(huì)再卡在Connection中了)

思路:
因?yàn)橛涗浿挥行略龊托薷膬煞N狀態(tài),理論上說(shuō)這兩種的SQL非常接近,所以可以做以下考量
1. Processor層面是否支援Update Or Insert
> 查網(wǎng)上訊息有個(gè)叫Upsert,不過(guò)在Nifi中查找,只有一個(gè)支援Mongo的組件具有這個(gè)功能
2. DB層面是否支援
> Maria DB有個(gè) "REPLACE INTO" 的語(yǔ)法是可以支持Insert Or Update,雖然簡(jiǎn)單看了下介紹說(shuō)是會(huì)依主鍵或唯一索引去先做定位,如果定位到已經(jīng)存在則先做刪除再進(jìn)行新增(偽Update),但確實(shí)可以達(dá)成我們的目的,不是嗎?Processor及其設(shè)定:
ConsumeKafkaRecord,作用是從Kafka中Consume出資料(以Record的形態(tài)),這里使用Record是因?yàn)樵磾?shù)據(jù)就是以Record的方式存上去的 (Avro Schema)

- Kafka Brokers:Kafka的Broker列表,多個(gè)Broker以逗號(hào)分隔,類(lèi)似www.broker1.com:9193,www.broker2.com:9193這樣的形式配置
- Topic Name:需要Consume的Kafka Topic名稱
- Record Reader/Writer:關(guān)于Record所需要設(shè)定的Reader和Writer,要先行在Configure中設(shè)定,當(dāng)然也要設(shè)定好Schema
- Group ID:Consumer所要設(shè)定的ID,這個(gè)的設(shè)定要依Kafka的配置來(lái),現(xiàn)在我們一般就只有單個(gè)的Partition,所以會(huì)要求每個(gè)Processor都設(shè)定有不同
- Offset Reset:需要設(shè)定為"earliest",這樣就會(huì)依GroupID沒(méi)有收過(guò)的資料來(lái)進(jìn)行收取,否則就只會(huì)收新推上去的資料。第一次玩的兄弟經(jīng)常坑在GroupID和Offset Reset這兩項(xiàng)上,若是收不到資料則有 可能就是GroupID沒(méi)有換成新的(舊的已經(jīng)收過(guò)一次就不會(huì)重新再收),或者是Offset Reset = latest又沒(méi)有新資料推上去~~~
- Max Poll Records和SCHEDULING中的Run Schedule:需要根據(jù)實(shí)際接收的速度來(lái)進(jìn)行調(diào)整。經(jīng)過(guò)觀察發(fā)現(xiàn)Consume的速度超快,但整個(gè)Nifi Flow的速度會(huì)卡在其它需要做解析或讀寫(xiě)DB的Processor外 (通常解析JSON會(huì)是前面的關(guān)卡),所以任由Consumer的高速讀取就會(huì)造成整個(gè)Nifi流程在后段被卡住。造成這個(gè)的主要原因其實(shí)就在于kafka處理的高速上,所以當(dāng)有新?lián)QGroupID或新流程時(shí),Kafka上積累的海量資料就會(huì)在一瞬間被接收下來(lái),然后就是各種紅 (其實(shí)紅了也沒(méi)事,它會(huì)自動(dòng)向上推,讓前一個(gè)Processor停止處理)。
- 若是常態(tài)下的資料推送量就已經(jīng)超過(guò)了你的Nifi處理速度,那么就要考量使用多個(gè)線程處理或者是從源頭的Kafka上就把資料分割開(kāi)來(lái)
- SCHEDULING的Cocurrent Tasks:這個(gè)Default=1,就是當(dāng)前Processor需要開(kāi)起來(lái)的線程數(shù)。但是這個(gè)設(shè)置需要當(dāng)心,你需要仔細(xì)考量過(guò)你的資料流是否允許亂序 (多線程時(shí)當(dāng)然不可能還能保證資料處理的順序),所以它是僅適用于不Care資料處理順序的場(chǎng)景,例如每筆Key就只會(huì)有一筆資料,而且哪筆資料先收后收無(wú)所謂
SplitJson,作用就只是簡(jiǎn)單的把一個(gè)JSON數(shù)組切開(kāi)成單個(gè)的JSON。Consume出來(lái)的會(huì)是個(gè)數(shù)組,這跟你存放進(jìn)去的單筆訊息是不是數(shù)組沒(méi)什么關(guān)系。
Connection,就是Processor中的那根帶箭頭的連線,它的作用是連接不同的Processor并且它還具有緩存池的的一個(gè)用途,除了把數(shù)據(jù)從A導(dǎo)流向B外,還可以將B暫時(shí)處理不動(dòng)的資料存放在自帶的緩存池中,若是緩存池達(dá)到上限,則Nifi會(huì)自動(dòng)讓A暫停處理直至B緩過(guò)勁~~~
- Back Pressure Object Threshold / Back Pressure Data Size Threshold:最大緩存的消息筆數(shù) / 最大緩存消息的體積,兩者任一超過(guò)就會(huì)讓上游Processor處理暫停
- Available Prioritizers:出入緩存池的順序控制,Default是空,通常來(lái)說(shuō)都應(yīng)該要設(shè)成FIFO先進(jìn)先出的方式
- 不設(shè)定這個(gè)經(jīng)常會(huì)造成Nifi資料處理丟失的假象,A1,A2,A3,A4,最后看到的不是A4而是A3,會(huì)讓人以為A4被玩掉了,其實(shí)只是A4被先處理,而A3變成了最后一筆狀態(tài)。而且這種錯(cuò)誤很難被發(fā)現(xiàn)!!
EvaluateJsonPath1,這個(gè)元件的作用是解析JSON,它也只能簡(jiǎn)單的解析,想在Value中對(duì)取出來(lái)的值做一些處理好象是不允許的....
- Destination:表示解析出來(lái)的內(nèi)容是成為Attribute,還是直接替換Flow File內(nèi)容,這里設(shè)定是做為屬性,所以Processor處理后就可以在Flow File上看到多出自定義的那些屬性以及它們的值
- Return Type:返回值的類(lèi)型,這種簡(jiǎn)單從JSON中取值的可以使用Auto-detect即可
- Path Not Found Behavior:是說(shuō)如果設(shè)定需要解析的JSON路徑不存在時(shí)的處理行為
- Null Value Representation:這個(gè)對(duì)于Null值的處理, "empty string"會(huì)將null設(shè)為空字符串(MO=),另外一個(gè)"the string 'null'"則是會(huì)將null設(shè)為"null"這樣的字符串 (MO="null")
- MO/MODELFAMILY/....:這些是我手工添加的屬性名稱,需要根據(jù)JSON長(zhǎng)樣來(lái)設(shè),對(duì)應(yīng)Value設(shè)定的$.MO則是表示MO的值來(lái)源于JSON第一層的"MO"節(jié)點(diǎn)。
- 需要注意的一點(diǎn)是屬性名稱貌似是會(huì)區(qū)分大小寫(xiě)的,所以可以看到我全部使用的大寫(xiě)
- 截圖是運(yùn)行時(shí)態(tài)的Procssor,停止運(yùn)行時(shí)PROPERTIES上會(huì)有一個(gè) + 號(hào),點(diǎn)它即可以新增自己的屬性
- 有一點(diǎn)比較奇怪的地方,就是通過(guò)+號(hào)維護(hù)進(jìn)去的多個(gè)屬性,它們的排列順序卻不是你手工新增的順序,這點(diǎn)引發(fā)另外一處的一個(gè)疑問(wèn),會(huì)在下面講
EvaluateJsonPath2,當(dāng)然也是要從JSON中解析,只不過(guò)我是要把整個(gè)JSON的內(nèi)容都保留下來(lái),由于它們要求的設(shè)定不同,所以被迫要撕成兩個(gè)元件來(lái)做
- Destination:這個(gè)設(shè)定仍然是屬性
- Return Type:json,第一個(gè)解析元件雖然可以隨意設(shè)置,但把這兩種合并成一個(gè)元件并使用Auto時(shí)就會(huì)報(bào)錯(cuò),所以看起來(lái)第一種簡(jiǎn)單屬性實(shí)際上只支持Scalar吧...
- JSONDATA:我定義的一個(gè)屬性名稱,注意Value中設(shè)定的"@"符號(hào),它表示整份FlowFile的內(nèi)容(前面已經(jīng)轉(zhuǎn)成一個(gè)JSON)
- 這個(gè)JSONDATA是因?yàn)槲业男枨螅驗(yàn)镵afka上的資料來(lái)源于其它系統(tǒng),而我其實(shí)只需要其中的少量幾個(gè)欄位 (前一個(gè)EvaluateJsonPath解析的那些),為了備查數(shù)據(jù)上的其它欄位以及在后續(xù)使用,所以才要把整份JSON都保留到DB中去 (說(shuō)得這么高端,實(shí)際的原因卻是他們的JSON屬性是用程序硬拼字串拼出來(lái)的,有的東西實(shí)在是在Nifi中搞不出來(lái)......)
UpdateAttribute,元件用途是對(duì)FlowFile的Attrubute進(jìn)行修改,這里是拿來(lái)對(duì)解析出來(lái)的值進(jìn)行再加工以及添加新屬性
- Delete Attributes Expression:這個(gè)屬性如果有設(shè)置就表示該P(yáng)rocessor為Delete屬性的狀態(tài),會(huì)忽略你新加的那些屬性處理,只專心做好一件事"刪除符合條件的屬性"
- PROVIDER:這是一個(gè)新的屬性,它并沒(méi)有包含在JSON中,是為表示數(shù)據(jù)來(lái)源而新加的
- SO:這個(gè)就是前面
EvaluateJsonPath1解析出來(lái)的某個(gè)值,那個(gè)元件無(wú)法直接加工,所以放在這里做的二次加工,去掉前導(dǎo)0
AttributesToJson,作用是將一堆Attribute轉(zhuǎn)換為Json,當(dāng)然就只能是那種簡(jiǎn)單結(jié)構(gòu)的Json,這里使用它是為了配合后面一個(gè)Processor的使用
- Attributes List:拿來(lái)生成JSON的屬性列表,這里我其實(shí)把EvaluateJsonPath1、EvaluateJsonPath2和UpdateAttribute產(chǎn)生的屬性都放上去了 (它們就是我落地MariaDB的Table列)
- 不得不說(shuō)的一個(gè)灰常遺憾的結(jié)果:那就是生成的JSON屬性順序絕對(duì)不是你在List中寫(xiě)的屬性順序,我比較懷疑是在前面幾個(gè)組件生成Attribute的順序,但更讓人遺憾的是它們的順序也不會(huì)是你維護(hù)它們的順序。這個(gè)結(jié)果會(huì)導(dǎo)致我們?cè)诹硗獾腃ase 2中會(huì)碰到一個(gè)不可逾越的障礙~~~~
- Attributes Regular Expression:符合條件的正則表達(dá)式
- Destination: 這個(gè)屬性在
EvaluateJsonPath上
就有, 它可以讓結(jié)果成為一個(gè)新的屬性還是直接替換FlowFile的內(nèi)容, Default是直接換掉FlowFile的內(nèi)容。
ConvertJsonToSQL,作用是根據(jù)JSON內(nèi)容轉(zhuǎn)換成SQL語(yǔ)句以及語(yǔ)句所要的參數(shù),經(jīng)過(guò)這一關(guān)后FlowFile的內(nèi)容就變成SQL語(yǔ)句,然后Attribute中多出一些參數(shù)- JDBC Connection Pool:Configrue中指定的MariaDB連接字符串,那里直接有指定Schema
- Statement Type:這個(gè)有INSERT、UPDATE、DELETE這3個(gè)選項(xiàng),若是Mongo的那個(gè)組件就會(huì)看到有UPSERT(Update or Insert),其它各類(lèi)的都木有~~~,這里我使用的是INSERT選項(xiàng),后面通過(guò)玩的一點(diǎn)小花招把它再折騰為REPLACE INTO
- Update Keys: 這個(gè)屬性可以不填,它是For Update時(shí)使用的
。
- SQL Parameter Attribute Prefix: default = sql,它其實(shí)影響到組件處理后生成的SQL語(yǔ)句參數(shù)叫什么,設(shè)為sql,最后就會(huì)看到生成出來(lái)
。如下圖就是處理后的Attribute樣式,它會(huì)產(chǎn)生sql.args.X.type和sql.args.X.value,這一組合起來(lái)就對(duì)應(yīng)于SQL中第一個(gè)?參數(shù)的類(lèi)型及值,”sql"就是我們這里設(shè)置的前綴名稱 (充分考慮到大家會(huì)想要搞事)

ReplaceText,作用是文本替換,這里就是我們處理Update Or Insert的關(guān)鍵,直接把SQL語(yǔ)句換掉它
- Search Value:在FlowFile中查找的字符串,它支持正則
- Replacement Value:替換的值,這里就是簡(jiǎn)單的把Insert Into (x1,x2,x3) values (?,?,?)處理為Replace Into (x1,x2,x3) values (?,?,?)而已,Replace Or Insert的行為交給DB去做
PutSQL,作用是在DB上執(zhí)行一段SQL語(yǔ)句
- JDBC Connection Pool:前面ConvertJsonToSQL轉(zhuǎn)換SQL時(shí)就有用過(guò),指定數(shù)據(jù)庫(kù)的連接
- SQL Statement:需執(zhí)行的SQL,為空時(shí)表示使用前面?zhèn)鬟f過(guò)來(lái)的FlowFile的內(nèi)容(已經(jīng)是一個(gè)SQL語(yǔ)句)
總結(jié):
這是一個(gè)帶有Key值(多個(gè)Key列)的無(wú)刪除行為的資料接收,所以可以利用AttributeToJSON去將提取出來(lái)的有用屬性重新生成JSON文件,并直接利用ConvertJsonToSQL轉(zhuǎn)換為Insert語(yǔ)句及對(duì)應(yīng)的綁定參數(shù),這里借用了MariaDB提供的Replace Into機(jī)制去自動(dòng)使用表上的Key鍵去做Update更新,所以整個(gè)Nifi Flow還是比較簡(jiǎn)單。在后續(xù)文章中會(huì)講到帶Delete行為的資料接收方法以及無(wú)Key更新的解決方案