大大毛
的筆記
  DDM's Note
哪怕沒有辦法一定有說法,
就算沒有鴿子一定有烏鴉,
固執無罪 夢想有價,
讓他們驚訝.
posts - 14, comments - 23, trackbacks - 0, articles - 58
::
首頁
:: ::
聯系
:: ::
管理
Kafka資料落地至MariaDB (帶Key的新增、修改和刪除)
Posted on 2019-04-11 15:40
大大毛
閱讀(284)
評論(0)
編輯
收藏
所屬分類:
Nifi
需求:
比較前一篇文章來說,僅加多Delete的行為。例如我僅需要Status=1的資料,所以對于資料落地來講,最合適的莫過于下面這樣,Table也可以自動保持最少量的有效資料
1. 新資料Status=0,執行Insert;
2. 資料修改Status=0,執行Update;
3. 資料狀態變更Status=1,執行Delete;
4. 若資料狀態重新變更為0,則又會執行Insert;
思路:
按理來說,只要通過分支Route就可以將Insert/Update與Delete作業分成兩條Nifi支流(看網上確實有很多這么整法的),但是用Route有一個問題處理不了,那就是資料順序的正確性你是無法保證的。對于小數據量的場景來說,每筆Key的多次操作間隔可能會比較長,所以它不會有什么問題,但大數據量的情況下,兩同相同Key值的資料走Route后被處理的順序混亂就會造成最終資料結果的異常(比如應該是先Insert再Delete,結果卻是發現資料還躺在Table中)。而大數據量在使用Kafka做為數據源時就不可避免會出現:即使業務數據量確實不大,但對于積累了好幾天的數據再進行接收時,那一瞬間的數據量也會是很大的。
所以我們能做的就是動態決定執行Delete和Insert。
解決方案:
雖然與
前一篇
來說差異不大,但Nifi流程上卻有很大不同,下面會詳細描述為什么要這樣做
Processor及其設定:
ConsumeKafkaRecord、SplitJson、Connection、EvaluateJsonPath
,
與前一章的一樣,只是不同數據下解析的屬性有所不同,這里不再詳述。
UpdateAttribute
,
作用是從Kafka中Consume出資料(以Record的形態),這里使用Record是因為源數據就是以Record的方式存上去的 (Avro Schema)
SQL1
:這才是這次花招的關鍵,在這里根據STATUS自行構建SQL語句
Status=0
:構建的就是Delete From xxx Where Key1=? and Key2=? 這樣的刪除語句
Status!=0
:構建的就是Replace Into xxx (Key1,Key2,col3,col4) Values ( ?, ?, ?, ?) 這樣的Insert or Update語句
經此Processor處理后,資料落地所需的SQL就構建好了,后續的問題就是如何去綁定參數和執行
ReplaceText
,
作用是對FlowFile進行文本替換,這里使用它來直接產生我所需的JSON內容
Search Value
:這里使用的是Default的
(?s)(^.*$),作用就是把原先的整份文件全部換掉
Replacement Value
:這里放的就是一個固定結構的JSON,可以看到里面的屬性值都是使用的Attribute (它們的值來源于前面的EvaluateJsonPath從源JSON文件中的提取)
細心的朋友可以發現這里是與前一篇文章的最大不同,這里沒有使用AttributeToJson去直接產生JSON文件,而使用的是更加笨拙的方式
前面的文章有提過,我們產生的Attribute以及AttributeToJson所生成JSON中各屬性的順序問題,結論是怎么搞它都不是我所想象到的順序。但是ConvertJsonToSQL這個東東卻很實在,它確確實實是按JSON中屬性的順序去生成的SQL以及參數名稱(還記得參數名稱sql.args.
1
.value中的這個順序
1
么),所以問題就來了:
SQL由于必須要有Delete和Replace,所以它們的參數個數一定是不同的,而Delete壓的參數又是我們的Key,所以就必須要保證ConvertJsonToSQL生成屬性的順序,這樣我們才能夠保證我們的兩個Key一定會是sql.args.1和sql.args.2
換句話說,如果AttributesToJson若是能夠保證JSON屬性順序的話,那就不用這么費勁
ConvertJsonToSQL
,
與前文一樣,以Insert的方式生成SQL和綁定參數即可
UpdateAttribute
,
終于用到了它的Delete功能,作用是清除掉多余的SQL綁定參數
Delete Attributes Expression
:這里我根據Delete的條件(STATUS=0)去刪除多余的SQL綁定參數
這里的寫法比較死,我Hard-code刪除掉大與2的其它所有參數("
*
"
是一個通配符,"
|
"是一個多條件的間隔符),感覺上還有更好的寫法
至此我們就可以保證綁定參數的數量與SQL語法參數個數一致 (不一致它死給你看)
PutSQL
,
這里仍然是執行SQL,這里使用配置參數的形式讓它執行我們的SQL
SQL Statement
:前面用UpdateAttribute產生的SQL1參數,它會根據STATUS=0去判斷是使用DELETE還是REPLACE語法
這個屬性壓上后,無論SQL1是不是為空,這個組件都不會再去管FlowFile的內容(空屬性時是把FlowFile的內容當成SQL去執行的)
新用戶注冊
刷新評論列表
只有注冊用戶
登錄
后才能發表評論。
網站導航:
博客園
IT新聞
Chat2DB
C++博客
博問
管理
相關文章:
Nifi同步數據的幾種方法
RouteOnAttribute的用法
Kafka資料落地至MariaDB (帶Key的新增、修改和刪除)
Kafka資料落地至MariaDB (帶Key的新增、修改)
Oracle資料推送MQTT
Powered by:
BlogJava
Copyright © 大大毛
日歷
<
2025年5月
>
日
一
二
三
四
五
六
27
28
29
30
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1
2
3
4
5
6
7
公告
果然是不能想得太好。
隨筆分類
(4)
VB培訓(4)
文章分類
(59)
JAVA
(6)
Spring
(3)
Hibernate
Struts
(12)
NET
VB
(2)
ASP
(1)
ASP.NET
(6)
HTML
(3)
400
(2)
I4.0
Nifi
(5)
Angular
(1)
SQL
(15)
常用算法
(1)
其它
(2)
積分與排名
積分 - 60244
排名 - 871
最新評論
1.?re: 手工添加MyEclipse的XML文件模板[未登錄]
請問,eclipse下面有沒有呢?現在想要實現eclipse的xml的模板進行配置修改,簡單說,就是把新建時候的名字作為其中的一個tag;找了很久沒有找到方法
--allen
2.?re: 第二章 Visual Basic 基礎語法
受益匪淺,多謝!
--yuleself
3.?re: 數字填空
評論內容較長,點擊標題查看
--去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去
4.?re: Checkbox聯動演示
dcdc
--dcd
5.?re: 利用TN5250NF下載檔案的自動化處理
請教若密碼要動態生成,是否有辦法呢?
謝謝
--江佳桂
i am ddm
主站蜘蛛池模板:
色爽黄1000部免费软件下载
|
91免费福利视频
|
亚洲第一黄片大全
|
热久久这里是精品6免费观看
|
亚洲av无码国产精品色午夜字幕
|
一色屋成人免费精品网站
|
国产成人+综合亚洲+天堂
|
亚洲成AV人在线观看天堂无码
|
日日麻批免费40分钟日本的
|
黄网站在线播放视频免费观看
|
亚洲an天堂an在线观看
|
成年人免费网站在线观看
|
中文在线观看永久免费
|
2020国产精品亚洲综合网
|
不卡精品国产_亚洲人成在线
|
99免费视频观看
|
欧美亚洲国产SUV
|
亚洲福利视频导航
|
国产亚洲福利一区二区免费看
|
男人j进入女人j内部免费网站
|
四虎国产精品永免费
|
亚洲婷婷天堂在线综合
|
亚洲成av人在片观看
|
97免费人妻无码视频
|
国产麻豆成人传媒免费观看
|
亚洲AV日韩AV一区二区三曲
|
亚洲福利在线视频
|
伊人久久精品亚洲午夜
|
好吊妞视频免费视频
|
9420免费高清在线视频
|
一级毛片a免费播放王色
|
最新亚洲卡一卡二卡三新区
|
亚洲AV永久纯肉无码精品动漫
|
久久亚洲中文字幕精品一区四
|
国拍在线精品视频免费观看
|
两个人看www免费视频
|
黄色a三级三级三级免费看
|
色噜噜亚洲男人的天堂
|
亚洲毛片在线观看
|
亚洲一区二区三区自拍公司
|
国产在线播放免费
|