詳解Flink CEP的概念及功能
我們在看直播的時候,不管對于主播還是用戶來說,非常重要的一項就是彈幕文化。為了增加直播趣味性和互動性, 各大網(wǎng)絡(luò)直播平臺紛紛采用彈窗彈幕作為用戶實時交流的方式,內(nèi)容豐富且形式多樣的彈幕數(shù)據(jù)中隱含著復(fù)雜的用戶屬性與用戶行為, 研究并理解在線直播平臺用戶具有彈幕內(nèi)容審核與監(jiān)控、輿論熱點(diǎn)預(yù)測、個性化摘要標(biāo)注等多方面的應(yīng)用價值。
本文不分析彈幕數(shù)據(jù)的應(yīng)用價值,只通過彈幕內(nèi)容審核與監(jiān)控案例來了解下Flink CEP的概念及功能。
在用戶發(fā)彈幕時,直播平臺主要實時監(jiān)控識別兩類彈幕內(nèi)容:一類是發(fā)布不友善彈幕的用戶 ;一類是刷屏的用戶。
我們先記住上述需要實時監(jiān)控識別的兩類用戶,接下來介紹Flink CEP的API,然后使用CEP解決上述問題。
Flink CEPFlink CEP 是什么
Flink CEP是一個基于Flink的復(fù)雜事件處理庫,可以從多個數(shù)據(jù)流中發(fā)現(xiàn)復(fù)雜事件,識別有意義的事件(例如機(jī)會或者威脅),并盡快的做出響應(yīng),而不是需要等待幾天或則幾個月相當(dāng)長的時間,才發(fā)現(xiàn)問題。
Flink CEP API
CEP API的核心是Pattern(模式) API,它允許你快速定義復(fù)雜的事件模式。每個模式包含多個階段(stage)或者我們也可稱為狀態(tài)(state)。從一個狀態(tài)切換到另一個狀態(tài),用戶可以指定條件,這些條件可以作用在鄰近的事件或獨(dú)立事件上。
介紹API之前先來理解幾個概念:
1. 模式與模式序列
簡單模式稱為模式,將最終在數(shù)據(jù)流中進(jìn)行搜索匹配的復(fù)雜模式序列稱為模式序列,每個復(fù)雜模式序列是由多個簡單模式組成。
匹配是一系列輸入事件,這些事件通過一系列有效的模式轉(zhuǎn)換,能夠訪問復(fù)雜模式圖的所有模式。
每個模式必須具有唯一的名稱,我們可以使用模式名稱來標(biāo)識該模式匹配到的事件。
2. 單個模式
一個模式既可以是單例的,也可以是循環(huán)的。單例模式接受單個事件,循環(huán)模式可以接受多個事件。
3. 模式示例:
有如下模式:a b+ c?d
其中a,b,c,d這些字母代表的是模式,+代表循環(huán),b+就是循環(huán)模式;?代表可選,c?就是可選模式;
所以上述模式的意思就是:a后面可以跟一個或多個b,后面再可選的跟c,最后跟d。
其中a、c? 、d是單例模式,b+是循環(huán)模式。
一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。
每個模式可以帶有一個或多個條件,這些條件是基于事件接收進(jìn)行定義的;蛘哒f,每個模式通過一個或多個條件來匹配和接收事件。
了解完上述概念后,接下來介紹下案例中需要用到的幾個CEP API:
案例中用到的CEP API:
Begin:定義一個起始模式狀態(tài)
用法:start = Pattern.<Event>begin("start");
Next:附加一個新的模式狀態(tài)。匹配事件必須直接接續(xù)上一個匹配事件
用法:next = start.next("next");
Where:定義當(dāng)前模式狀態(tài)的過濾條件。僅當(dāng)事件通過過濾器時,它才能與狀態(tài)匹配
用法:patternState.where(_.message == "TMD");
Within: 定義事件序列與模式匹配的最大時間間隔。如果未完成的事件序列超過此時間,則將其丟棄
用法:patternState.within(Time.seconds(10));
Times:一個給定類型的事件出現(xiàn)了指定次數(shù)
用法:patternState.times(5);
API 先介紹以上這幾個,接下來我們解決下文章開頭提到的案例:
監(jiān)測用戶彈幕行為案例
案例一:監(jiān)測惡意用戶
規(guī)則:用戶如果在10s內(nèi),同時輸入 TMD 超過5次,就認(rèn)為用戶為惡意攻擊,識別出該用戶。
使用 Flink CEP 檢測惡意用戶:
import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.cep.PatternSelectFunction
import org.a(chǎn)pache.flink.cep.scala.{CEP, PatternStream}
import org.a(chǎn)pache.flink.cep.scala.pattern.Pattern
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.TimeCharacteristic
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.windowing.time.Time
object BarrageBehavior01 {
case class LoginEvent(userId:String, message:String, timestamp:Long){
override def toString: String = userId
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用IngestionTime作為EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 用于觀察測試數(shù)據(jù)處理順序
env.setParallelism(1)
// 模擬數(shù)據(jù)源
val loginEventStream: DataStream[LoginEvent] = env.fromCollection(
List(
LoginEvent("1", "TMD", 1618498576),
LoginEvent("1", "TMD", 1618498577),
LoginEvent("1", "TMD", 1618498579),
LoginEvent("1", "TMD", 1618498582),
LoginEvent("2", "TMD", 1618498583),
LoginEvent("1", "TMD", 1618498585)
)
).a(chǎn)ssignAscendingTimestamps(_.timestamp * 1000)
//定義模式
val loginEventPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin")
.where(_.message == "TMD")
.times(5)
.within(Time.seconds(10))
//匹配模式
val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern)
import scala.collection.Map
val result = patternStream.select((pattern:Map[String, Iterable[LoginEvent]])=> {
val first = pattern.getOrElse("begin", null).iterator.next()
(first.userId, first.timestamp)
})
//惡意用戶,實際處理可將按用戶進(jìn)行禁言等處理,為簡化此處僅打印出該用戶
請輸入評論內(nèi)容...
請輸入評論/評論長度6~500個字
最新活動更多
推薦專題
- 1 Intel宣布40年來最重大轉(zhuǎn)型:年底前裁員15000人、拋掉2/3房產(chǎn)
- 2 因美封殺TikTok,字節(jié)股價骨折!估值僅Meta1/5
- 3 宏山激光重磅發(fā)布行業(yè)解決方案,助力智能制造產(chǎn)業(yè)新飛躍
- 4 國產(chǎn)AI芯片公司破產(chǎn)!白菜價拍賣
- 5 具身智能火了,但規(guī)模落地還需時間
- 6 國產(chǎn)英偉達(dá)們,抓緊沖刺A股
- 7 三次錯失風(fēng)口!OpenAI前員工殺回AI編程賽道,老東家捧金相助
- 8 英特爾賦能智慧醫(yī)療,共創(chuàng)數(shù)字化未來
- 9 英偉達(dá)的麻煩在后頭?
- 10 將“網(wǎng)紅”變成“商品”,AI“爆改”實力拉滿
- 高級軟件工程師 廣東省/深圳市
- 自動化高級工程師 廣東省/深圳市
- 光器件研發(fā)工程師 福建省/福州市
- 銷售總監(jiān)(光器件) 北京市/海淀區(qū)
- 激光器高級銷售經(jīng)理 上海市/虹口區(qū)
- 光器件物理工程師 北京市/海淀區(qū)
- 激光研發(fā)工程師 北京市/昌平區(qū)
- 技術(shù)專家 廣東省/江門市
- 封裝工程師 北京市/海淀區(qū)
- 結(jié)構(gòu)工程師 廣東省/深圳市