訂閱
糾錯
加入自媒體

一文詳解Flink知識體系

2021-09-13 09:58
園陌
關(guān)注

失敗率重啟策略可以在flink-conf.yaml中設(shè)置下面的配置參數(shù)來啟用:

restart-strategy:failure-rate
配置參數(shù)描述默認(rèn)值restart-strategy.failure-rate.max-failures-per-interval在一個Job認(rèn)定為失敗之前,最大的重啟次數(shù)1restart-strategy.failure-rate.failure-rate-interval計算失敗率的時間間隔1分鐘restart-strategy.failure-rate.delay兩次連續(xù)重啟嘗試之間的時間間隔akka.a(chǎn)sk.timeout

例子:

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

失敗率重啟策略也可以在程序中設(shè)置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
 3, // 每個測量時間間隔最大失敗次數(shù)
 Time.of(5, TimeUnit.MINUTES), //失敗率測量的時間間隔
 Time.of(10, TimeUnit.SECONDS) // 兩次連續(xù)重啟嘗試的時間間隔
))
4) 無重啟策略

Job直接失敗,不會嘗試進(jìn)行重啟

restart-strategy: none

無重啟策略也可以在程序中設(shè)置

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())
5) 案例

需求:輸入五次zhangsan,程序掛掉。

代碼:

import org.a(chǎn)pache.flink.a(chǎn)pi.common.restartstrategy.RestartStrategies
import org.a(chǎn)pache.flink.runtime.state.filesystem.FsStateBackend
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala._
object FixDelayRestartStrategiesDemo {
 def main(args: Array[String]): Unit = {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //如果想要開啟重啟策略,就必須開啟CheckPoint
   env.enableCheckpointing(5000L)
   //指定狀態(tài)存儲后端,默認(rèn)就是內(nèi)存
   //現(xiàn)在指定的是FsStateBackend,支持本地系統(tǒng)、
   //new FsStateBackend要指定存儲系統(tǒng)的協(xié)議:scheme (hdfs://, file://, etc)
   env.setStateBackend(new FsStateBackend(args(0)))
   //如果程序被cancle,保留以前做的checkpoint
   env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   //指定以后存儲多個checkpoint目錄
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
   //指定重啟策略,默認(rèn)的重啟策略是不停的重啟
   //程序出現(xiàn)異常是會重啟,重啟五次,每次延遲5秒,如果超過了5次,程序退出
   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000))
   val lines: DataStream[String] = env.socketTextStream(args(1), 8888)
   val result = lines.flatMap(_.split(" ").map(word => {
     if(word.equals("zhangsan")) {
       throw new RuntimeException("zhangsan,程序重啟!");
     }
     (word, 1)
   })).keyBy(0).sum(1)
   result.print()
   env.execute()
 }
}  
3) checkpoint 案例

1. 需求:

假定用戶需要每隔1秒鐘需要統(tǒng)計4秒中窗口中數(shù)據(jù)的量,然后對統(tǒng)計的結(jié)果值進(jìn)行checkpoint處理

2. 數(shù)據(jù)規(guī)劃:

使用自定義算子每秒鐘產(chǎn)生大約10000條數(shù)據(jù)。產(chǎn)生的數(shù)據(jù)為一個四元組(Long,String,String,Integer)—------(id,name,info,count)。數(shù)據(jù)經(jīng)統(tǒng)計后,統(tǒng)計結(jié)果打印到終端輸出。打印輸出的結(jié)果為Long類型的數(shù)據(jù) 。

3. 開發(fā)思路:

source算子每隔1秒鐘發(fā)送10000條數(shù)據(jù),并注入到Window算子中。window算子每隔1秒鐘統(tǒng)計一次最近4秒鐘內(nèi)數(shù)據(jù)數(shù)量。每隔1秒鐘將統(tǒng)計結(jié)果打印到終端。每隔6秒鐘觸發(fā)一次checkpoint,然后將checkpoint的結(jié)果保存到HDFS中。

5. 開發(fā)步驟:

獲取流處理執(zhí)行環(huán)境

設(shè)置檢查點機(jī)制

自定義數(shù)據(jù)源

數(shù)據(jù)分組

劃分時間窗口

數(shù)據(jù)聚合

數(shù)據(jù)打印

觸發(fā)執(zhí)行

示例代碼:

//發(fā)送數(shù)據(jù)形式
case class SEvent(id: Long, name: String, info: String, count: Int)
class SEventSourceWithChk extends RichSourceFunction[SEvent]{
 private var count = 0L
 private var isRunning = true
 private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
 // 任務(wù)取消時調(diào)用
 override def cancel(): Unit = {
   isRunning = false
 }
 //// source算子的邏輯,即:每秒鐘向流圖中注入10000個元組
 override def run(sourceContext: SourceContext[SEvent]): Unit = {
   while(isRunning) {
     for (i <- 0 until 10000) {
       sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
       count += 1L
     }
     Thread.sleep(1000)
   }
 }
}
*
該段代碼是流圖定義代碼,具體實現(xiàn)業(yè)務(wù)流程,另外,代碼中窗口的觸發(fā)時間使 用了event time。
 
object FlinkEventTimeAPIChkMain {
 def main(args: Array[String]): Unit ={
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/"))
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setCheckpointInterval(6000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//保留策略:默認(rèn)情況下,檢查點不會被保留,僅用于故障中恢復(fù)作業(yè),可以啟用外部持久化檢查點,同時指定保留策略
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作業(yè)取消時保留檢查點,注意在這種情況下,您必須在取消后手動清理檢查點狀態(tài)
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:當(dāng)作業(yè)被cancel時,刪除檢查點,檢查點狀態(tài)僅在作業(yè)失敗時可用
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
   // 應(yīng)用邏輯
   val source: DataStream[SEvent] = env.a(chǎn)ddSource(new SEventSourceWithChk)
   source.a(chǎn)ssignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {
     // 設(shè)置watermark
     override def getCurrentWatermark: Watermark = {
       new Watermark(System.currentTimeMillis())
     }
     // 給每個元組打上時間戳
     override def extractTimestamp(t: SEvent, l: Long): Long = {
       System.currentTimeMillis()
     }
   })
     .keyBy(0)
     .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))
     .a(chǎn)pply(new WindowStatisticWithChk)
     .print()
   env.execute()
 }
}
//該數(shù)據(jù)在算子制作快照時用于保存到目前為止算子記錄的數(shù)據(jù)條數(shù)。
// 用戶自定義狀態(tài)
class UDFState extends Serializable{
 private var count = 0L
 // 設(shè)置用戶自定義狀態(tài)
 def setState(s: Long) = count = s
 // 獲取用戶自定狀態(tài)
 def getState = count
}
//該段代碼是window算子的代碼,每當(dāng)觸發(fā)計算時統(tǒng)計窗口中元組數(shù)量。
class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{
 private var total = 0L
 // window算子的實現(xiàn)邏輯,即:統(tǒng)計window中元組的數(shù)量
 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {
   var count = 0L
   for (event <- input) {
     count += 1L
   }
   total += count
   out.collect(count)
 }
 // 從自定義快照中恢復(fù)狀態(tài)
 override def restoreState(state: util.List[UDFState]): Unit = {
   val udfState = state.get(0)
   total = udfState.getState
 }
 // 制作自定義狀態(tài)快照
 override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = {
   val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]
   val udfState = new UDFState
   udfState.setState(total)
   udfList.a(chǎn)dd(udfState)
   udfList
 }
}
4. 端對端僅處理一次語義

當(dāng)談及僅一次處理時,我們真正想表達(dá)的是每條輸入消息只會影響最終結(jié)果一次!(影響應(yīng)用狀態(tài)一次,而非被處理一次)即使出現(xiàn)機(jī)器故障或軟件崩潰,Flink也要保證不會有數(shù)據(jù)被重復(fù)處理或壓根就沒有被處理從而影響狀態(tài)。

在 Flink 1.4 版本之前,精準(zhǔn)一次處理只限于 Flink 應(yīng)用內(nèi),也就是所有的 Operator 完全由 Flink 狀態(tài)保存并管理的才能實現(xiàn)精確一次處理。但 Flink 處理完數(shù)據(jù)后大多需要將結(jié)果發(fā)送到外部系統(tǒng),比如 Sink 到 Kafka 中,這個過程中 Flink 并不保證精準(zhǔn)一次處理。

在 Flink 1.4 版本正式引入了一個里程碑式的功能:兩階段提交 Sink,即 TwoPhaseCommitSinkFunction 函數(shù)。該 SinkFunction 提取并封裝了兩階段提交協(xié)議中的公共邏輯,自此 Flink 搭配特定 Source 和 Sink(如  Kafka 0.11 版)實現(xiàn)精確一次處理語義(英文簡稱:EOS,即 Exactly-Once Semantics)。

在 Flink 中需要端到端精準(zhǔn)一次處理的位置有三個:

Flink 端到端精準(zhǔn)一次處理

Source 端:數(shù)據(jù)從上一階段進(jìn)入到 Flink 時,需要保證消息精準(zhǔn)一次消費。

Flink 內(nèi)部端:這個我們已經(jīng)了解,利用 Checkpoint 機(jī)制,把狀態(tài)存盤,發(fā)生故障的時候可以恢復(fù),保證內(nèi)部的狀態(tài)一致性。不了解的小伙伴可以看下我之前的文章:

Flink可靠性的基石-checkpoint機(jī)制詳細(xì)解析

Sink 端:將處理完的數(shù)據(jù)發(fā)送到下一階段時,需要保證數(shù)據(jù)能夠準(zhǔn)確無誤發(fā)送到下一階段。

1) Flink端到端精準(zhǔn)一次處理語義(EOS)

以下內(nèi)容適用于 Flink 1.4 及之后版本

對于 Source 端:Source 端的精準(zhǔn)一次處理比較簡單,畢竟數(shù)據(jù)是落到 Flink 中,所以 Flink 只需要保存消費數(shù)據(jù)的偏移量即可, 如消費 Kafka 中的數(shù)據(jù),Flink 將 Kafka Consumer 作為 Source,可以將偏移量保存下來,如果后續(xù)任務(wù)出現(xiàn)了故障,恢復(fù)的時候可以由連接器重置偏移量,重新消費數(shù)據(jù),保證一致性。

對于 Sink 端:Sink 端是最復(fù)雜的,因為數(shù)據(jù)是落地到其他系統(tǒng)上的,數(shù)據(jù)一旦離開 Flink 之后,Flink 就監(jiān)控不到這些數(shù)據(jù)了,所以精準(zhǔn)一次處理語義必須也要應(yīng)用于 Flink 寫入數(shù)據(jù)的外部系統(tǒng),故這些外部系統(tǒng)必須提供一種手段允許提交或回滾這些寫入操作,同時還要保證與 Flink Checkpoint 能夠協(xié)調(diào)使用(Kafka 0.11 版本已經(jīng)實現(xiàn)精確一次處理語義)。

我們以 Flink 與 Kafka 組合為例,Flink 從 Kafka 中讀數(shù)據(jù),處理完的數(shù)據(jù)在寫入 Kafka 中。

為什么以Kafka為例,第一個原因是目前大多數(shù)的 Flink 系統(tǒng)讀寫數(shù)據(jù)都是與 Kafka 系統(tǒng)進(jìn)行的。第二個原因,也是最重要的原因 Kafka 0.11 版本正式發(fā)布了對于事務(wù)的支持,這是與Kafka交互的Flink應(yīng)用要實現(xiàn)端到端精準(zhǔn)一次語義的必要條件。

當(dāng)然,Flink 支持這種精準(zhǔn)一次處理語義并不只是限于與 Kafka 的結(jié)合,可以使用任何 Source/Sink,只要它們提供了必要的協(xié)調(diào)機(jī)制。

2) Flink 與 Kafka 組合

Flink 應(yīng)用示例

如上圖所示,Flink 中包含以下組件:

一個 Source,從 Kafka 中讀取數(shù)據(jù)(即 KafkaConsumer)

一個時間窗口化的聚會操作(Window)

一個 Sink,將結(jié)果寫入到 Kafka(即 KafkaProducer)

若要 Sink 支持精準(zhǔn)一次處理語義(EOS),它必須以事務(wù)的方式寫數(shù)據(jù)到 Kafka,這樣當(dāng)提交事務(wù)時兩次 Checkpoint 間的所有寫入操作當(dāng)作為一個事務(wù)被提交。這確保了出現(xiàn)故障或崩潰時這些寫入操作能夠被回滾。

當(dāng)然了,在一個分布式且含有多個并發(fā)執(zhí)行 Sink 的應(yīng)用中,僅僅執(zhí)行單次提交或回滾是不夠的,因為所有組件都必須對這些提交或回滾達(dá)成共識,這樣才能保證得到一個一致性的結(jié)果。Flink 使用兩階段提交協(xié)議以及預(yù)提交(Pre-commit)階段來解決這個問題。

3) 兩階段提交協(xié)議(2PC)

兩階段提交協(xié)議(Two-Phase Commit,2PC)是很常用的解決分布式事務(wù)問題的方式,它可以保證在分布式事務(wù)中,要么所有參與進(jìn)程都提交事務(wù),要么都取消,即實現(xiàn) ACID 中的 A (原子性)。

在數(shù)據(jù)一致性的環(huán)境下,其代表的含義是:要么所有備份數(shù)據(jù)同時更改某個數(shù)值,要么都不改,以此來達(dá)到數(shù)據(jù)的強(qiáng)一致性。

兩階段提交協(xié)議中有兩個重要角色,協(xié)調(diào)者(Coordinator)和參與者(Participant),其中協(xié)調(diào)者只有一個,起到分布式事務(wù)的協(xié)調(diào)管理作用,參與者有多個。

顧名思義,兩階段提交將提交過程劃分為連續(xù)的兩個階段:表決階段(Voting)和提交階段(Commit)。

兩階段提交協(xié)議過程如下圖所示:

兩階段提交協(xié)議

第一階段:表決階段

協(xié)調(diào)者向所有參與者發(fā)送一個 VOTE_REQUEST 消息。

當(dāng)參與者接收到 VOTE_REQUEST 消息,向協(xié)調(diào)者發(fā)送 VOTE_COMMIT 消息作為回應(yīng),告訴協(xié)調(diào)者自己已經(jīng)做好準(zhǔn)備提交準(zhǔn)備,如果參與者沒有準(zhǔn)備好或遇到其他故障,就返回一個 VOTE_ABORT 消息,告訴協(xié)調(diào)者目前無法提交事務(wù)。

第二階段:提交階段

協(xié)調(diào)者收集來自各個參與者的表決消息。如果所有參與者一致認(rèn)為可以提交事務(wù),那么協(xié)調(diào)者決定事務(wù)的最終提交,在此情形下協(xié)調(diào)者向所有參與者發(fā)送一個 GLOBAL_COMMIT 消息,通知參與者進(jìn)行本地提交;如果所有參與者中有任意一個返回消息是 VOTE_ABORT,協(xié)調(diào)者就會取消事務(wù),向所有參與者廣播一條 GLOBAL_ABORT 消息通知所有的參與者取消事務(wù)。

每個提交了表決信息的參與者等候協(xié)調(diào)者返回消息,如果參與者接收到一個 GLOBAL_COMMIT 消息,那么參與者提交本地事務(wù),否則如果接收到 GLOBAL_ABORT 消息,則參與者取消本地事務(wù)。

4) 兩階段提交協(xié)議在 Flink 中的應(yīng)用

Flink 的兩階段提交思路:

我們從 Flink 程序啟動到消費 Kafka 數(shù)據(jù),最后到 Flink 將數(shù)據(jù) Sink 到 Kafka 為止,來分析 Flink 的精準(zhǔn)一次處理。

當(dāng) Checkpoint 啟動時,JobManager 會將檢查點分界線(checkpoint battier)注入數(shù)據(jù)流,checkpoint barrier 會在算子間傳遞下去,如下如所示:

Flink 精準(zhǔn)一次處理:Checkpoint 啟動

Source 端:Flink Kafka Source 負(fù)責(zé)保存 Kafka 消費 offset,當(dāng) Chckpoint 成功時 Flink 負(fù)責(zé)提交這些寫入,否則就終止取消掉它們,當(dāng) Chckpoint 完成位移保存,它會將 checkpoint barrier(檢查點分界線) 傳給下一個 Operator,然后每個算子會對當(dāng)前的狀態(tài)做個快照,保存到狀態(tài)后端(State Backend)。

對于 Source 任務(wù)而言,就會把當(dāng)前的 offset 作為狀態(tài)保存起來。下次從 Checkpoint 恢復(fù)時,Source 任務(wù)可以重新提交偏移量,從上次保存的位置開始重新消費數(shù)據(jù),如下圖所示:

Flink 精準(zhǔn)一次處理:checkpoint barrier 及 offset 保存Slink 端:從 Source 端開始,每個內(nèi)部的 transform 任務(wù)遇到 checkpoint barrier(檢查點分界線)時,都會把狀態(tài)存到 Checkpoint 里。數(shù)據(jù)處理完畢到 Sink 端時,Sink 任務(wù)首先把數(shù)據(jù)寫入外部 Kafka,這些數(shù)據(jù)都屬于預(yù)提交的事務(wù)(還不能被消費),此時的 Pre-commit 預(yù)提交階段下 Data Sink 在保存狀態(tài)到狀態(tài)后端的同時還必須預(yù)提交它的外部事務(wù),如下圖所示:

Flink 精準(zhǔn)一次處理:預(yù)提交到外部系統(tǒng)

當(dāng)所有算子任務(wù)的快照完成(所有創(chuàng)建的快照都被視為是 Checkpoint 的一部分),也就是這次的 Checkpoint 完成時,JobManager 會向所有任務(wù)發(fā)通知,確認(rèn)這次 Checkpoint 完成,此時 Pre-commit 預(yù)提交階段才算完成。才正式到兩階段提交協(xié)議的第二個階段:commit 階段。該階段中 JobManager 會為應(yīng)用中每個 Operator 發(fā)起 Checkpoint 已完成的回調(diào)邏輯。

本例中的 Data Source 和窗口操作無外部狀態(tài),因此在該階段,這兩個 Opeartor 無需執(zhí)行任何邏輯,但是 Data Sink 是有外部狀態(tài)的,此時我們必須提交外部事務(wù),當(dāng) Sink 任務(wù)收到確認(rèn)通知,就會正式提交之前的事務(wù),Kafka 中未確認(rèn)的數(shù)據(jù)就改為“已確認(rèn)”,數(shù)據(jù)就真正可以被消費了,如下圖所示:

Flink 精準(zhǔn)一次處理:數(shù)據(jù)精準(zhǔn)被消費

注:Flink 由 JobManager 協(xié)調(diào)各個 TaskManager 進(jìn)行 Checkpoint 存儲,Checkpoint 保存在 StateBackend(狀態(tài)后端) 中,默認(rèn) StateBackend 是內(nèi)存級的,也可以改為文件級的進(jìn)行持久化保存。

最后,一張圖總結(jié)下 Flink 的 EOS:

Flink 端到端精準(zhǔn)一次處理

此圖建議保存,總結(jié)全面且簡明扼要,再也不慫面試官!

5) Exactly-Once 案例

Kafka來實現(xiàn)End-to-End Exactly-Once語義:

import java.util.Properties
import org.a(chǎn)pache.flink.a(chǎn)pi.common.serialization.SimpleStringSchema
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.CheckpointingMode
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.CheckpointConfig
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.StreamExecutionEnvironment
import org.a(chǎn)pache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
import org.a(chǎn)pache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper
*
* Kafka Producer的容錯-Kafka 0.9 and 0.10
* 如果Flink開啟了checkpoint,針對FlinkKafkaProducer09 和FlinkKafkaProducer010 可以提供 at-least-once的語義,還需要配置下面兩個參數(shù)
* ?setLogFailuresOnly(false)
* ?setFlushOnCheckpoint(true)
*
* 注意:建議修改kafka 生產(chǎn)者的重試次數(shù)
* retries【這個參數(shù)的值默認(rèn)是0】
*
* Kafka Producer的容錯-Kafka 0.11
* 如果Flink開啟了checkpoint,針對FlinkKafkaProducer011 就可以提供 exactly-once的語義
* 但是需要選擇具體的語義
* ?Semantic.NONE
* ?Semantic.AT_LEAST_ONCE【默認(rèn)】
* ?Semantic.EXACTLY_ONCE

object StreamingKafkaSinkScala {
 def main(args: Array[String]): Unit = {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //隱式轉(zhuǎn)換
   import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
   //checkpoint配置
   env.enableCheckpointing(5000)
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
   env.getCheckpointConfig.setCheckpointTimeout(60000)
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
   env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   val text = env.socketTextStream("node01", 9001, '')
   val topic = "test"
   val prop = new Properties()
   prop.setProperty("bootstrap.servers", "node01:9092")
   //設(shè)置事務(wù)超時時間,也可在kafka配置中設(shè)置
   prop.setProperty("transaction.timeout.ms",60000*15+"");
   //使用至少一次語義的形式
   //val myProducer = new FlinkKafkaProducer011(brokerList, topic, new SimpleStringSchema());
   //使用支持僅一次語義的形式
   val myProducer =
   new FlinkKafkaProducer011[String](topic, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
   text.a(chǎn)ddSink(myProducer)
   env.execute("StreamingKafkaSinkScala")
 }
}  

Redis實現(xiàn)End-to-End Exactly-Once語義:

代碼開發(fā)步驟:

獲取流處理執(zhí)行環(huán)境設(shè)置檢查點機(jī)制定義kafkaConsumer數(shù)據(jù)轉(zhuǎn)換:分組,求和數(shù)據(jù)寫入redis觸發(fā)執(zhí)行object ExactlyRedisSink {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   env.enableCheckpointing(5000)
   env.setStateBackend(new FsStateBackend("hdfs://node01:8020/check/11"))
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setCheckpointTimeout(60000)
   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
   env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
   //設(shè)置kafka,加載kafka數(shù)據(jù)源
   val properties = new Properties()
   properties.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
   properties.setProperty("group.id", "test")
   properties.setProperty("enable.a(chǎn)uto.commit", "false")
   val kafkaConsumer = new FlinkKafkaConsumer011[String]("test2", new SimpleStringSchema(), properties)
   kafkaConsumer.setStartFromLatest()
   //檢查點制作成功,才開始提交偏移量
   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
   val kafkaSource: DataStream[String] = env.a(chǎn)ddSource(kafkaConsumer)
   //數(shù)據(jù)轉(zhuǎn)換
   val sumData: DataStream[(String, Int)] = kafkaSource.flatMap(_.split(" "))
     .map(_ -> 1)
     .keyBy(0)
     .sum(1)
   val set = new util.HashSet[InetSocketAddress]()
   set.a(chǎn)dd(new InetSocketAddress(InetAddress.getByName("node01"),7001))
   set.a(chǎn)dd(new InetSocketAddress(InetAddress.getByName("node01"),7002))
   set.a(chǎn)dd(new InetSocketAddress(InetAddress.getByName("node01"),7003))
   val config: FlinkJedisClusterConfig = new FlinkJedisClusterConfig.Builder()
     .setNodes(set)
     .setMaxIdle(5)
     .setMaxTotal(10)
     .setMinIdle(5)
     .setTimeout(10)
     .build()
   //數(shù)據(jù)寫入
   sumData.a(chǎn)ddSink(new RedisSink(config,new MyRedisSink))
   env.execute()
 }
}
class MyRedisSink extends RedisMapper[(String,Int)] {
 override def getCommandDescription: RedisCommandDescription = {
     new RedisCommandDescription(RedisCommand.HSET,"resink")
 }
 override def getKeyFromData(data: (String, Int)): String = {
   data._1
 }
 override def getValueFromData(data: (String, Int)): String = {
   data._2.toString
 }
}  
八、Flink SQL

Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標(biāo)準(zhǔn) SQL 語義的開發(fā)語言。自 2015 年開始,阿里巴巴開始調(diào)研開源流計算引擎,最終決定基于 Flink 打造新一代計算引擎,針對 Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初將最終代碼開源,也就是我們熟知的 Blink。Blink 在原來的 Flink 基礎(chǔ)上最顯著的一個貢獻(xiàn)就是 Flink SQL 的實現(xiàn)。

Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計算領(lǐng)域,比如 Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,用戶通過 Java 或 Scala 寫業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。

在這個背景下,毫無疑問,SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因為其具有幾個非常重要的特點:

SQL 屬于設(shè)定式語言,用戶只要表達(dá)清楚需求即可,不需要了解具體做法;

SQL 可優(yōu)化,內(nèi)置多種查詢優(yōu)化器,這些查詢優(yōu)化器可為 SQL 翻譯出最優(yōu)執(zhí)行計劃;

SQL 易于理解,不同行業(yè)和領(lǐng)域的人都懂,學(xué)習(xí)成本較低;

SQL 非常穩(wěn)定,在數(shù)據(jù)庫 30 多年的歷史中,SQL 本身變化較少;

流與批的統(tǒng)一,Flink 底層 Runtime 本身就是一個流與批統(tǒng)一的引擎,而 SQL 可以做到 API 層的流與批統(tǒng)一。

1. Flink SQL 常用算子

SELECT:

SELECT 用于從 DataSet/DataStream 中選擇數(shù)據(jù),用于篩選出某些列。

示例:

SELECT * FROM Table; // 取出表中的所有列

SELECT name,age FROM Table; // 取出表中 name 和 age 兩列

與此同時 SELECT 語句中可以使用函數(shù)和別名,例如我們上面提到的 WordCount 中:

SELECT word, COUNT(word) FROM table GROUP BY word;

WHERE:

WHERE 用于從數(shù)據(jù)集/流中過濾數(shù)據(jù),與 SELECT 一起使用,用于根據(jù)某些條件對關(guān)系做水平分割,即選擇符合條件的記錄。

示例:

SELECT name,age FROM Table where name LIKE ‘% 小明 %’;

SELECT * FROM Table WHERE age = 20;

WHERE 是從原數(shù)據(jù)中進(jìn)行過濾,那么在 WHERE 條件中,Flink SQL 同樣支持 =、<、>、、>=、<=,以及 AND、OR 等表達(dá)式的組合,最終滿足過濾條件的數(shù)據(jù)會被選擇出來。并且 WHERE 可以結(jié)合 IN、NOT IN 聯(lián)合使用。舉個例子:

SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)

DISTINCT:

DISTINCT 用于從數(shù)據(jù)集/流中去重根據(jù) SELECT 的結(jié)果進(jìn)行去重。

示例:

SELECT DISTINCT name FROM Table;

對于流式查詢,計算查詢結(jié)果所需的 State 可能會無限增長,用戶需要自己控制查詢的狀態(tài)范圍,以防止?fàn)顟B(tài)過大。

GROUP BY:

GROUP BY 是對數(shù)據(jù)進(jìn)行分組操作。例如我們需要計算成績明細(xì)表中,每個學(xué)生的總分。

示例:

SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;

UNION 和 UNION ALL:

UNION 用于將兩個結(jié)果集合并起來,要求兩個結(jié)果集字段完全一致,包括字段類型、字段順序。不同于 UNION ALL 的是,UNION 會對結(jié)果數(shù)據(jù)去重。

示例:

SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;

JOIN:

JOIN 用于把來自兩個表的數(shù)據(jù)聯(lián)合起來形成結(jié)果表,Flink 支持的 JOIN 類型包括:

JOIN - INNER JOIN

LEFT JOIN - LEFT OUTER JOIN

RIGHT JOIN - RIGHT OUTER JOIN

FULL JOIN - FULL OUTER JOIN

這里的 JOIN 的語義和我們在關(guān)系型數(shù)據(jù)庫中使用的 JOIN 語義一致。

示例:

JOIN(將訂單表數(shù)據(jù)和商品表進(jìn)行關(guān)聯(lián))

SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id

LEFT JOIN 與 JOIN 的區(qū)別是當(dāng)右表沒有與左邊相 JOIN 的數(shù)據(jù)時候,右邊對應(yīng)的字段補(bǔ) NULL 輸出,RIGHT JOIN 相當(dāng)于 LEFT JOIN 左右兩個表交互一下位置。FULL JOIN 相當(dāng)于 RIGHT JOIN 和 LEFT JOIN 之后進(jìn)行 UNION ALL 操作。

示例:

SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

Group Window:

根據(jù)窗口數(shù)據(jù)劃分的不同,目前 Apache Flink 有如下 3 種 Bounded Window:

Tumble,滾動窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無疊加;

Hop,滑動窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;

Session,會話窗口,窗口數(shù)據(jù)沒有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無疊加。

Tumble Window:

Tumble 滾動窗口有固定大小,窗口數(shù)據(jù)不重疊,具體語義如下:

Tumble 滾動窗口對應(yīng)的語法如下:

SELECT
   [gk],
   [TUMBLE_START(timeCol, size)],
   [TUMBLE_END(timeCol, size)],
   agg1(col1),
   ...
   aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)                                                          

其中:

[gk] 決定了是否需要按照字段進(jìn)行聚合;

TUMBLE_START 代表窗口開始時間;

TUMBLE_END 代表窗口結(jié)束時間;

timeCol 是流表中表示時間字段;

size 表示窗口的大小,如 秒、分鐘、小時、天。

舉個例子,假如我們要計算每個人每天的訂單量,按照 user 進(jìn)行聚合分組:

SELECT user,
     TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart,
     SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user;

Hop Window:

Hop 滑動窗口和滾動窗口類似,窗口有固定的 size,與滾動窗口不同的是滑動窗口可以通過 slide 參數(shù)控制滑動窗口的新建頻率。因此當(dāng) slide 值小于窗口 size 的值的時候多個滑動窗口會重疊,具體語義如下:

Hop 滑動窗口對應(yīng)語法如下:

SELECT
   [gk],
   [HOP_START(timeCol, slide, size)] ,  
   [HOP_END(timeCol, slide, size)],
   agg1(col1),
   ...
   aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)                                                      

每次字段的意思和 Tumble 窗口類似:

[gk] 決定了是否需要按照字段進(jìn)行聚合;

HOP_START 表示窗口開始時間;

HOP_END 表示窗口結(jié)束時間;

timeCol 表示流表中表示時間字段;

slide 表示每次窗口滑動的大小;

size 表示整個窗口的大小,如 秒、分鐘、小時、天。

舉例說明,我們要每過一小時計算一次過去 24 小時內(nèi)每個商品的銷量:

SELECT product,
     SUM(amount)
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product

Session Window:

會話時間窗口沒有固定的持續(xù)時間,但它們的界限由 interval 不活動時間定義,即如果在定義的間隙期間沒有出現(xiàn)事件,則會話窗口關(guān)閉。

Seeeion 會話窗口對應(yīng)語法如下:

SELECT
   [gk],
   SESSION_START(timeCol, gap) AS winStart,  
   SESSION_END(timeCol, gap) AS winEnd,
   agg1(col1),
    ...
   aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)                                                        

[gk] 決定了是否需要按照字段進(jìn)行聚合;

SESSION_START 表示窗口開始時間;

SESSION_END 表示窗口結(jié)束時間;

timeCol 表示流表中表示時間字段;

gap 表示窗口數(shù)據(jù)非活躍周期的時長。

例如,我們需要計算每個用戶訪問時間 12 小時內(nèi)的訂單量:

SELECT user,
     SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart,
     SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS sEnd,
     SUM(amount)
FROM Orders
GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user  

Table API和SQL捆綁在flink-table Maven工件中。必須將以下依賴項添加到你的項目才能使用Table API和SQL:

                                                         

另外,你需要為Flink的Scala批處理或流式API添加依賴項。對于批量查詢,您需要添加:

 
2. Flink SQL 實戰(zhàn)案例1) 批數(shù)據(jù)SQL

用法:

構(gòu)建Table運(yùn)行環(huán)境將DataSet注冊為一張表使用Table運(yùn)行環(huán)境的 sqlQuery 方法來執(zhí)行SQL語句

示例:使用Flink SQL統(tǒng)計用戶消費訂單的總金額、最大金額、最小金額、訂單總數(shù)。

訂單id用戶名訂單日期消費金額1Zhangsan2018-10-20 15:30358.5

測試數(shù)據(jù)(訂單ID、用戶名、訂單日期、訂單金額):

Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)

步驟:

獲取一個批處理運(yùn)行環(huán)境獲取一個Table運(yùn)行環(huán)境創(chuàng)建一個樣例類 Order 用來映射數(shù)據(jù)(訂單名、用戶名、訂單日期、訂單金額)基于本地 Order 集合創(chuàng)建一個DataSet source使用Table運(yùn)行環(huán)境將DataSet注冊為一張表使用SQL語句來操作數(shù)據(jù)(統(tǒng)計用戶消費訂單的總金額、最大金額、最小金額、訂單總數(shù))使用TableEnv.toDataSet將Table轉(zhuǎn)換為DataSet打印測試

示例代碼:

import org.a(chǎn)pache.flink.a(chǎn)pi.scala.ExecutionEnvironment
import org.a(chǎn)pache.flink.table.a(chǎn)pi.{Table, TableEnvironment}
import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala.BatchTableEnvironment
import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.types.Row
*
* 使用Flink SQL統(tǒng)計用戶消費訂單的總金額、最大金額、最小金額、訂單總數(shù)。

object BatchFlinkSqlDemo {
 //3. 創(chuàng)建一個樣例類 Order 用來映射數(shù)據(jù)(訂單名、用戶名、訂單日期、訂單金額)
 case class Order(id:Int, userName:String, createTime:String, money:Double)
 def main(args: Array[String]): Unit = {
   *
    * 實現(xiàn)思路:
    * 1. 獲取一個批處理運(yùn)行環(huán)境
    * 2. 獲取一個Table運(yùn)行環(huán)境
    * 3. 創(chuàng)建一個樣例類 Order 用來映射數(shù)據(jù)(訂單名、用戶名、訂單日期、訂單金額)
    * 4. 基于本地 Order 集合創(chuàng)建一個DataSet source
    * 5. 使用Table運(yùn)行環(huán)境將DataSet注冊為一張表
    * 6. 使用SQL語句來操作數(shù)據(jù)(統(tǒng)計用戶消費訂單的總金額、最大金額、最小金額、訂單總數(shù))
    * 7. 使用TableEnv.toDataSet將Table轉(zhuǎn)換為DataSet
    * 8. 打印測試
   
   //1. 獲取一個批處理運(yùn)行環(huán)境
   val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
   //2. 獲取一個Table運(yùn)行環(huán)境
   val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
   //4. 基于本地 Order 集合創(chuàng)建一個DataSet source
   val orderDataSet: DataSet[Order] = env.fromElements(
     Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
     Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
     Order(3, "lisi", "2018-10-20 16:30", 127.5),
     Order(4, "lisi", "2018-10-20 16:30", 328.5),
     Order(5, "lisi", "2018-10-20 16:30", 432.5),
     Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
     Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
     Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
     Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
   )
   //5. 使用Table運(yùn)行環(huán)境將DataSet注冊為一張表
   tabEnv.registerDataSet("t_order", orderDataSet)
   //6. 使用SQL語句來操作數(shù)據(jù)(統(tǒng)計用戶消費訂單的總金額、最大金額、最小金額、訂單總數(shù))
   //用戶消費訂單的總金額、最大金額、最小金額、訂單總數(shù)。
   val sql =
     """
       | select
       |   userName,
       |   sum(money) totalMoney,
       |   max(money) maxMoney,
       |   min(money) minMoney,
       |   count(1) totalCount
       |  from t_order
       |  group by userName
       |""".stripMargin  //在scala中stripMargin默認(rèn)是“|”作為多行連接符
   //7. 使用TableEnv.toDataSet將Table轉(zhuǎn)換為DataSet
   val table: Table = tabEnv.sqlQuery(sql)
   table.printSchema()
   tabEnv.toDataSet[Row](table).print()
 }
}
2) 流數(shù)據(jù)SQL

流處理中也可以支持SQL。但是需要注意以下幾點:

要使用流處理的SQL,必須要添加水印時間使用 registerDataStream 注冊表的時候,使用 ' 來指定字段注冊表的時候,必須要指定一個rowtime,否則無法在SQL中使用窗口必須要導(dǎo)入 import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._ 隱式參數(shù)SQL中使用 trumble(時間列名, interval '時間' sencond) 來進(jìn)行定義窗口

示例:使用Flink SQL來統(tǒng)計5秒內(nèi) 用戶的 訂單總數(shù)、訂單的最大金額、訂單的最小金額。

步驟

獲取流處理運(yùn)行環(huán)境獲取Table運(yùn)行環(huán)境設(shè)置處理時間為 EventTime創(chuàng)建一個訂單樣例類 Order ,包含四個字段(訂單ID、用戶ID、訂單金額、時間戳)創(chuàng)建一個自定義數(shù)據(jù)源使用for循環(huán)生成1000個訂單隨機(jī)生成訂單ID(UUID)隨機(jī)生成用戶ID(0-2)隨機(jī)生成訂單金額(0-100)時間戳為當(dāng)前系統(tǒng)時間每隔1秒生成一個訂單添加水印,允許延遲2秒導(dǎo)入 import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._ 隱式參數(shù)使用 registerDataStream 注冊表,并分別指定字段,還要指定rowtime字段編寫SQL語句統(tǒng)計用戶訂單總數(shù)、最大金額、最小金額分組時要使用 tumble(時間列, interval '窗口時間' second) 來創(chuàng)建窗口使用 tableEnv.sqlQuery 執(zhí)行sql語句將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來啟動流處理程序

示例代碼:

import java.util.UUID
import java.util.concurrent.TimeUnit
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.TimeCharacteristic
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.functions.source.{RichSourceFunction, SourceFunction}
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.{DataStream, StreamExecutionEnvironment}
import org.a(chǎn)pache.flink.table.a(chǎn)pi.{Table, TableEnvironment}
import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.functions.AssignerWithPeriodicWatermarks
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.watermark.Watermark
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.windowing.time.Time
import org.a(chǎn)pache.flink.types.Row
import scala.util.Random
*
* 需求:
*  使用Flink SQL來統(tǒng)計5秒內(nèi) 用戶的 訂單總數(shù)、訂單的最大金額、訂單的最小金額
*
*  timestamp是關(guān)鍵字不能作為字段的名字(關(guān)鍵字不能作為字段名字)

object StreamFlinkSqlDemo {
   *
    *  1. 獲取流處理運(yùn)行環(huán)境
    * 2. 獲取Table運(yùn)行環(huán)境
    * 3. 設(shè)置處理時間為 EventTime
    * 4. 創(chuàng)建一個訂單樣例類 Order ,包含四個字段(訂單ID、用戶ID、訂單金額、時間戳)
    * 5. 創(chuàng)建一個自定義數(shù)據(jù)源
    *    使用for循環(huán)生成1000個訂單
    *    隨機(jī)生成訂單ID(UUID)
    *    隨機(jī)生成用戶ID(0-2)
    *    隨機(jī)生成訂單金額(0-100)
    *    時間戳為當(dāng)前系統(tǒng)時間
    *    每隔1秒生成一個訂單
    * 6. 添加水印,允許延遲2秒
    * 7. 導(dǎo)入 import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._ 隱式參數(shù)
    * 8. 使用 registerDataStream 注冊表,并分別指定字段,還要指定rowtime字段
    * 9. 編寫SQL語句統(tǒng)計用戶訂單總數(shù)、最大金額、最小金額
    * 分組時要使用 tumble(時間列, interval '窗口時間' second) 來創(chuàng)建窗口
    * 10. 使用 tableEnv.sqlQuery 執(zhí)行sql語句
    * 11. 將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來
    * 12. 啟動流處理程序
   
   // 3. 創(chuàng)建一個訂單樣例類`Order`,包含四個字段(訂單ID、用戶ID、訂單金額、時間戳)
   case class Order(orderId:String, userId:Int, money:Long, createTime:Long)
   def main(args: Array[String]): Unit = {
     // 1. 創(chuàng)建流處理運(yùn)行環(huán)境
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     // 2. 設(shè)置處理時間為`EventTime`
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     //獲取table的運(yùn)行環(huán)境
     val tableEnv = TableEnvironment.getTableEnvironment(env)
     // 4. 創(chuàng)建一個自定義數(shù)據(jù)源
     val orderDataStream = env.a(chǎn)ddSource(new RichSourceFunction[Order] {
       var isRunning = true
       override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
         // - 隨機(jī)生成訂單ID(UUID)
         // - 隨機(jī)生成用戶ID(0-2)
         // - 隨機(jī)生成訂單金額(0-100)
         // - 時間戳為當(dāng)前系統(tǒng)時間
         // - 每隔1秒生成一個訂單
         for (i <- 0 until 1000 if isRunning) {
           val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101),
             System.currentTimeMillis())
           TimeUnit.SECONDS.sleep(1)
           ctx.collect(order)
         }
       }
       override def cancel(): Unit = { isRunning = false }
     })
     // 5. 添加水印,允許延遲2秒
     val watermarkDataStream = orderDataStream.a(chǎn)ssignTimestampsAndWatermarks(
       new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
         override def extractTimestamp(element: Order): Long = {
           val eventTime = element.createTime
           eventTime
         }
       }
     )
     // 6. 導(dǎo)入`import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._`隱式參數(shù)
     // 7. 使用`registerDataStream`注冊表,并分別指定字段,還要指定rowtime字段
     import org.a(chǎn)pache.flink.table.a(chǎn)pi.scala._
     tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId, 'userId, 'money,'createTime.rowtime)
     // 8. 編寫SQL語句統(tǒng)計用戶訂單總數(shù)、最大金額、最小金額
     // - 分組時要使用`tumble(時間列, interval '窗口時間' second)`來創(chuàng)建窗口
     val sql =
     """
       |select
       | userId,
       | count(1) as totalCount,
       | max(money) as maxMoney,
       | min(money) as minMoney
       | from
       | t_order
       | group by
       | tumble(createTime, interval '5' second),
       | userId
     """.stripMargin
     // 9. 使用`tableEnv.sqlQuery`執(zhí)行sql語句
     val table: Table = tableEnv.sqlQuery(sql)
     // 10. 將SQL的執(zhí)行結(jié)果轉(zhuǎn)換成DataStream再打印出來
     table.toRetractStream[Row].print()
     env.execute("StreamSQLApp")
   }
}
九、Flink CEP

我們在看直播的時候,不管對于主播還是用戶來說,非常重要的一項就是彈幕文化。為了增加直播趣味性和互動性, 各大網(wǎng)絡(luò)直播平臺紛紛采用彈窗彈幕作為用戶實時交流的方式,內(nèi)容豐富且形式多樣的彈幕數(shù)據(jù)中隱含著復(fù)雜的用戶屬性與用戶行為, 研究并理解在線直播平臺用戶具有彈幕內(nèi)容審核與監(jiān)控、輿論熱點預(yù)測、個性化摘要標(biāo)注等多方面的應(yīng)用價值。

本文不分析彈幕數(shù)據(jù)的應(yīng)用價值,只通過彈幕內(nèi)容審核與監(jiān)控案例來了解下Flink CEP的概念及功能。

在用戶發(fā)彈幕時,直播平臺主要實時監(jiān)控識別兩類彈幕內(nèi)容:一類是發(fā)布不友善彈幕的用戶  ,一類是刷屏的用戶。

我們先記住上述需要實時監(jiān)控識別的兩類用戶,接下來介紹Flink CEP的API,然后使用CEP解決上述問題。

本文首發(fā)于公眾號【五分鐘學(xué)大數(shù)據(jù)】,大數(shù)據(jù)領(lǐng)域原創(chuàng)技術(shù)號

1. Flink CEP 是什么

Flink CEP是一個基于Flink的復(fù)雜事件處理庫,可以從多個數(shù)據(jù)流中發(fā)現(xiàn)復(fù)雜事件,識別有意義的事件(例如機(jī)會或者威脅),并盡快的做出響應(yīng),而不是需要等待幾天或則幾個月相當(dāng)長的時間,才發(fā)現(xiàn)問題。

2. Flink CEP API

CEP API的核心是Pattern(模式) API,它允許你快速定義復(fù)雜的事件模式。每個模式包含多個階段(stage)或者我們也可稱為狀態(tài)(state)。從一個狀態(tài)切換到另一個狀態(tài),用戶可以指定條件,這些條件可以作用在鄰近的事件或獨立事件上。

介紹API之前先來理解幾個概念:

1) 模式與模式序列

簡單模式稱為模式,將最終在數(shù)據(jù)流中進(jìn)行搜索匹配的復(fù)雜模式序列稱為模式序列,每個復(fù)雜模式序列是由多個簡單模式組成。

每個模式必須具有唯一的名稱,我們可以使用模式名稱來標(biāo)識該模式匹配到的事件。

2) 單個模式

一個模式既可以是單例的,也可以是循環(huán)的。單例模式接受單個事件,循環(huán)模式可以接受多個事件。

3) 模式示例:

有如下模式:a b+ c?d

其中a,b,c,d這些字母代表的是模式,+代表循環(huán),b+就是循環(huán)模式;?代表可選,c?就是可選模式;

所以上述模式的意思就是:a后面可以跟一個或多個b,后面再可選的跟c,最后跟d。

其中a、c? 、d是單例模式,b+是循環(huán)模式。

一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。

每個模式可以帶有一個或多個條件,這些條件是基于事件接收進(jìn)行定義的。或者說,每個模式通過一個或多個條件來匹配和接收事件。

了解完上述概念后,接下來介紹下案例中需要用到的幾個CEP API:

4) 案例中用到的CEP API:

Begin:定義一個起始模式狀態(tài)

用法:start = Pattern.

Next:附加一個新的模式狀態(tài)。匹配事件必須直接接續(xù)上一個匹配事件

用法:next = start.next("next");

Where:定義當(dāng)前模式狀態(tài)的過濾條件。僅當(dāng)事件通過過濾器時,它才能與狀態(tài)匹配

用法:patternState.where(_.message == "yyds");

Within: 定義事件序列與模式匹配的最大時間間隔。如果未完成的事件序列超過此時間,則將其丟棄

用法:patternState.within(Time.seconds(10));

Times:一個給定類型的事件出現(xiàn)了指定次數(shù)

用法:patternState.times(5);

API 先介紹以上這幾個,接下來我們解決下文章開頭提到的案例:

3. 監(jiān)測用戶彈幕行為案例案例一:監(jiān)測惡意用戶

規(guī)則:用戶如果在10s內(nèi),同時輸入 TMD 超過5次,就認(rèn)為用戶為惡意攻擊,識別出該用戶。

使用 Flink CEP 檢測惡意用戶:

import org.a(chǎn)pache.flink.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.cep.PatternSelectFunction
import org.a(chǎn)pache.flink.cep.scala.{CEP, PatternStream}
import org.a(chǎn)pache.flink.cep.scala.pattern.Pattern
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.TimeCharacteristic
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.windowing.time.Time
object BarrageBehavior01 {
 case class  LoginEvent(userId:String, message:String, timestamp:Long){
   override def toString: String = userId
 }
 def main(args: Array[String]): Unit = {
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 使用IngestionTime作為EventTime
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   // 用于觀察測試數(shù)據(jù)處理順序
   env.setParallelism(1)
   // 模擬數(shù)據(jù)源
   val loginEventStream: DataStream[LoginEvent] = env.fromCollection(
     List(
       LoginEvent("1", "TMD", 1618498576),
       LoginEvent("1", "TMD", 1618498577),
       LoginEvent("1", "TMD", 1618498579),
       LoginEvent("1", "TMD", 1618498582),
       LoginEvent("2", "TMD", 1618498583),
       LoginEvent("1", "TMD", 1618498585)
     )
   ).a(chǎn)ssignAscendingTimestamps(_.timestamp * 1000)
   //定義模式
   val loginEventPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin")
     .where(_.message == "TMD")
     .times(5)
     .within(Time.seconds(10))
   //匹配模式
   val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern)
   import scala.collection.Map
   val result = patternStream.select((pattern:Map[String, Iterable[LoginEvent]])=> {
     val first = pattern.getOrElse("begin", null).iterator.next()
     (first.userId, first.timestamp)
   })
   //惡意用戶,實際處理可將按用戶進(jìn)行禁言等處理,為簡化此處僅打印出該用戶
   result.print("惡意用戶>>>")
   env.execute("BarrageBehavior01")
 }
}
案例二:監(jiān)測刷屏用戶

規(guī)則:用戶如果在10s內(nèi),同時連續(xù)輸入同樣一句話超過5次,就認(rèn)為是惡意刷屏。

使用 Flink CEP檢測刷屏用戶

object BarrageBehavior02 {
 case class Message(userId: String, ip: String, msg: String)
 def main(args: Array[String]): Unit = {
   //初始化運(yùn)行環(huán)境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //設(shè)置并行度
   env.setParallelism(1)
   // 模擬數(shù)據(jù)源
   val loginEventStream: DataStream[Message] = env.fromCollection(
     List(
       Message("1", "192.168.0.1", "beijing"),
       Message("1", "192.168.0.2", "beijing"),
       Message("1", "192.168.0.3", "beijing"),
       Message("1", "192.168.0.4", "beijing"),
       Message("2", "192.168.10.10", "shanghai"),
       Message("3", "192.168.10.10", "beijing"),
       Message("3", "192.168.10.11", "beijing"),
       Message("4", "192.168.10.10", "beijing"),
       Message("5", "192.168.10.11", "shanghai"),
       Message("4", "192.168.10.12", "beijing"),
       Message("5", "192.168.10.13", "shanghai"),
       Message("5", "192.168.10.14", "shanghai"),
       Message("5", "192.168.10.15", "beijing"),
       Message("6", "192.168.10.16", "beijing"),
       Message("6", "192.168.10.17", "beijing"),
       Message("6", "192.168.10.18", "beijing"),
       Message("5", "192.168.10.18", "shanghai"),
       Message("6", "192.168.10.19", "beijing"),
       Message("6", "192.168.10.19", "beijing"),
       Message("5", "192.168.10.18", "shanghai")
     )
   )
   //定義模式
   val loginbeijingPattern = Pattern.begin[Message]("start")
     .where(_.msg != null) //一條登錄失敗
     .times(5).optional  //將滿足五次的數(shù)據(jù)配對打印
     .within(Time.seconds(10))
   //進(jìn)行分組匹配
   val loginbeijingDataPattern = CEP.pattern(loginEventStream.keyBy(_.userId), loginbeijingPattern)
   //查找符合規(guī)則的數(shù)據(jù)
   val loginbeijingResult: DataStream[Option[Iterable[Message]]] = loginbeijingDataPattern.select(patternSelectFun = (pattern: collection.Map[String, Iterable[Message]]) => {
     var loginEventList: Option[Iterable[Message]] = null
     loginEventList = pattern.get("start") match {
       case Some(value) => {
         if (value.toList.map(x => (x.userId, x.msg)).distinct.size == 1) {
           Some(value)
         } else {
           None
         }
       }
     }
     loginEventList
   })
   //打印測試
   loginbeijingResult.filter(x=>x!=None).map(x=>{
     x match {
       case Some(value)=> value
     }
   }).print()
   env.execute("BarrageBehavior02)
 }
}
4. Flink CEP API

除了案例中介紹的幾個API外,我們在介紹下其他的常用API:

1) 條件 API

為了讓傳入事件被模式所接受,給模式指定傳入事件必須滿足的條件,這些條件由事件本身的屬性或者前面匹配過的事件的屬性統(tǒng)計量等來設(shè)定。比如,事件的某個值大于5,或者大于先前接受事件的某個值的平均值。

可以使用pattern.where()、pattern.or()、pattern.until()方法來指定條件。條件既可以是迭代條件IterativeConditions,也可以是簡單條件SimpleConditions。

FlinkCEP支持事件之間的三種臨近條件:

next():嚴(yán)格的滿足條件

示例:模式為begin("first").where(_.name='a').next("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,b時,模式才會被命中。如果數(shù)據(jù)為a,c,b,由于a的后面跟了c,所以a會被直接丟棄,模式不會命中。

followedBy():松散的滿足條件

示例:模式為begin("first").where(_.name='a').followedBy("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,b或者為a,c,b,模式均被命中,中間的c會被忽略掉。

followedByAny():非確定的松散滿足條件

示例:模式為begin("first").where(_.name='a').followedByAny("second").where(.name='b')當(dāng)且僅當(dāng)數(shù)據(jù)為a,c,b,b時,對于followedBy模式而言命中的為{a,b},對于followedByAny而言會有兩次命中{a,b},{a,b}。

2) 量詞 API

還記得我們在上面講解模式的概念時說過的一句話嘛:一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉(zhuǎn)換為循環(huán)模式。這里的量詞就是指的量詞API。

以下這幾個量詞API,可以將模式指定為循環(huán)模式:

pattern.oneOrMore():一個給定的事件有一次或多次出現(xiàn),例如上面提到的b+。

pattern.times(#ofTimes):一個給定類型的事件出現(xiàn)了指定次數(shù),例如4次。

pattern.times(#fromTimes, #toTimes):一個給定類型的事件出現(xiàn)的次數(shù)在指定次數(shù)范圍內(nèi),例如2~4次。

可以使用pattern.greedy()方法將模式變成循環(huán)模式,但是不能讓一組模式都變成循環(huán)模式。greedy:就是盡可能的重復(fù)。

使用pattern.optional()方法將循環(huán)模式變成可選的,即可以是循環(huán)模式也可以是單個模式。

3) 匹配后的跳過策略

所謂的匹配跳過策略,是對多個成功匹配的模式進(jìn)行篩選。也就是說如果多個匹配成功,可能我不需要這么多,按照匹配策略,過濾下就可以。

Flink中有五種跳過策略:

NO_SKIP: 不過濾,所有可能的匹配都會被發(fā)出。

SKIP_TO_NEXT: 丟棄與開始匹配到的事件相同的事件,發(fā)出開始匹配到的事件,即直接跳到下一個模式匹配到的事件,以此類推。

SKIP_PAST_LAST_EVENT: 丟棄匹配開始后但結(jié)束之前匹配到的事件。

SKIP_TO_FIRST[PatternName]: 丟棄匹配開始后但在PatternName模式匹配到的第一個事件之前匹配到的事件。

SKIP_TO_LAST[PatternName]: 丟棄匹配開始后但在PatternName模式匹配到的最后一個事件之前匹配到的事件。

怎么理解上述策略,我們以NO_SKIP和SKIP_PAST_LAST_EVENT為例講解下:

在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b')中,我們輸入數(shù)據(jù):a,a,a,a,b  ,如果是NO_SKIP策略,即不過濾策略,模式匹配到的是:{a,b},{a,a,b},{a,a,a,b},{a,a,a,a,b};如果是SKIP_PAST_LAST_EVENT策略,即丟棄匹配開始后但結(jié)束之前匹配到的事件,模式匹配到的是:{a,a,a,a,b}。

5. Flink CEP 的使用場景

除上述案例場景外,Flink CEP 還廣泛用于網(wǎng)絡(luò)欺詐,故障檢測,風(fēng)險規(guī)避,智能營銷等領(lǐng)域。

1) 實時反作弊和風(fēng)控

對于電商來說,羊毛黨是必不可少的,國內(nèi)拼多多曾爆出 100 元的無門檻券隨便領(lǐng),當(dāng)晚被人褥幾百億,對于這種情況肯定是沒有做好及時的風(fēng)控。另外還有就是商家上架商品時通過頻繁修改商品的名稱和濫用標(biāo)題來提高搜索關(guān)鍵字的排名、批量注冊一批機(jī)器賬號快速刷單來提高商品的銷售量等作弊行為,各種各樣的作弊手法也是需要不斷的去制定規(guī)則去匹配這種行為。

2) 實時營銷

分析用戶在手機(jī) APP 的實時行為,統(tǒng)計用戶的活動周期,通過為用戶畫像來給用戶進(jìn)行推薦。比如用戶在登錄 APP 后 1 分鐘內(nèi)只瀏覽了商品沒有下單;用戶在瀏覽一個商品后,3 分鐘內(nèi)又去查看其他同類的商品,進(jìn)行比價行為;用戶商品下單后 1 分鐘內(nèi)是否支付了該訂單。如果這些數(shù)據(jù)都可以很好的利用起來,那么就可以給用戶推薦瀏覽過的類似商品,這樣可以大大提高購買率。

3) 實時網(wǎng)絡(luò)攻擊檢測

當(dāng)下互聯(lián)網(wǎng)安全形勢仍然嚴(yán)峻,網(wǎng)絡(luò)攻擊屢見不鮮且花樣眾多,這里我們以 DDOS(分布式拒絕服務(wù)攻擊)產(chǎn)生的流入流量來作為遭受攻擊的判斷依據(jù)。對網(wǎng)絡(luò)遭受的潛在攻擊進(jìn)行實時檢測并給出預(yù)警,云服務(wù)廠商的多個數(shù)據(jù)中心會定時向監(jiān)控中心上報其瞬時流量,如果流量在預(yù)設(shè)的正常范圍內(nèi)則認(rèn)為是正常現(xiàn)象,不做任何操作;如果某數(shù)據(jù)中心在 10 秒內(nèi)連續(xù) 5 次上報的流量超過正常范圍的閾值,則觸發(fā)一條警告的事件;如果某數(shù)據(jù)中心 30 秒內(nèi)連續(xù)出現(xiàn) 30 次上報的流量超過正常范圍的閾值,則觸發(fā)嚴(yán)重的告警。

6. Flink CEP 的原理簡單介紹

Apache Flink在實現(xiàn)CEP時借鑒了Efficient Pattern Matching over Event Streams論文中NFA的模型,在這篇論文中,還提到了一些優(yōu)化,我們在這里先跳過,只說下NFA的概念。

在這篇論文中,提到了NFA,也就是Non-determined Finite Automaton,叫做不確定的有限狀態(tài)機(jī),指的是狀態(tài)有限,但是每個狀態(tài)可能被轉(zhuǎn)換成多個狀態(tài)(不確定)。

非確定有限自動狀態(tài)機(jī)

先介紹兩個概念:

狀態(tài):狀態(tài)分為三類,起始狀態(tài)、中間狀態(tài)和最終狀態(tài)。

轉(zhuǎn)換:take/ignore/proceed都是轉(zhuǎn)換的名稱。

在NFA匹配規(guī)則里,本質(zhì)上是一個狀態(tài)轉(zhuǎn)換的過程。三種轉(zhuǎn)換的含義如下所示:

Take: 主要是條件的判斷,當(dāng)過來一條數(shù)據(jù)進(jìn)行判斷,一旦滿足條件,獲取當(dāng)前元素,放入到結(jié)果集中,然后將當(dāng)前狀態(tài)轉(zhuǎn)移到下一個的狀態(tài)。

Proceed:當(dāng)前的狀態(tài)可以不依賴任何的事件轉(zhuǎn)移到下一個狀態(tài),比如說透傳的意思。

Ignore:當(dāng)一條數(shù)據(jù)到來的時候,可以忽略這個消息事件,當(dāng)前的狀態(tài)保持不變,相當(dāng)于自己到自己的一個狀態(tài)。

NFA的特點:在NFA中,給定當(dāng)前狀態(tài),可能有多個下一個狀態(tài)?梢噪S機(jī)選擇下一個狀態(tài),也可以并行(同時)選擇下一個狀態(tài)。輸入符號可以為空。

7. 規(guī)則引擎

規(guī)則引擎:將業(yè)務(wù)決策從應(yīng)用程序代碼中分離出來,并使用預(yù)定義的語義模塊編寫業(yè)務(wù)決策。接受數(shù)據(jù)輸入,解釋業(yè)務(wù)規(guī)則,并根據(jù)業(yè)務(wù)規(guī)則做出業(yè)務(wù)決策。
使用規(guī)則引擎可以通過降低實現(xiàn)復(fù)雜業(yè)務(wù)邏輯的組件的復(fù)雜性,降低應(yīng)用程序的維護(hù)和可擴(kuò)展性成本。

1) Drools

Drools 是一款使用 Java 編寫的開源規(guī)則引擎,通常用來解決業(yè)務(wù)代碼與業(yè)務(wù)規(guī)則的分離,它內(nèi)置的 Drools Fusion 模塊也提供 CEP 的功能。

優(yōu)勢:

功能較為完善,具有如系統(tǒng)監(jiān)控、操作平臺等功能。規(guī)則支持動態(tài)更新。

劣勢:

以內(nèi)存實現(xiàn)時間窗功能,無法支持較長跨度的時間窗。無法有效支持定時觸達(dá)(如用戶在瀏覽發(fā)生一段時間后觸達(dá)條件判斷)。2) Aviator

Aviator 是一個高性能、輕量級的 Java 語言實現(xiàn)的表達(dá)式求值引擎,主要用于各種表達(dá)式的動態(tài)求值。

優(yōu)勢:

支持大部分運(yùn)算操作符。支持函數(shù)調(diào)用和自定義函數(shù)。支持正則表達(dá)式匹配。支持傳入變量并且性能優(yōu)秀。

劣勢:

沒有 if else、do while 等語句,沒有賦值語句,沒有位運(yùn)算符。3) EasyRules

EasyRules 集成了 MVEL 和 SpEL 表達(dá)式的一款輕量級規(guī)則引擎。

優(yōu)勢:

輕量級框架,學(xué)習(xí)成本低; POJO。為定義業(yè)務(wù)引擎提供有用的抽象和簡便的應(yīng)用。支持從簡單的規(guī)則組建成復(fù)雜規(guī)則。4) Esper

Esper 設(shè)計目標(biāo)為 CEP 的輕量級解決方案,可以方便的嵌入服務(wù)中,提供 CEP 功能。

優(yōu)勢:

輕量級可嵌入開發(fā),常用的 CEP 功能簡單好用。EPL 語法與 SQL 類似,學(xué)習(xí)成本較低。

劣勢:

單機(jī)全內(nèi)存方案,需要整合其他分布式和存儲。以內(nèi)存實現(xiàn)時間窗功能,無法支持較長跨度的時間窗。無法有效支持定時觸達(dá)(如用戶在瀏覽發(fā)生一段時間后觸達(dá)條件判斷)。5) Flink CEP

Flink 是一個流式系統(tǒng),具有高吞吐低延遲的特點,Flink CEP 是一套極具通用性、易于使用的實時流式事件處理方案。

優(yōu)勢:

繼承了 Flink 高吞吐的特點。事件支持存儲到外部,可以支持較長跨度的時間窗。可以支持定時觸達(dá)(用 followedBy + PartternTimeoutFunction 實現(xiàn))。十、Flink CDC1. CDC是什么

CDC 是 Change Data Capture(變更數(shù)據(jù)獲取)的簡稱。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(包括數(shù)據(jù)或數(shù)據(jù)表的插入、更新以及刪除等),將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進(jìn)行訂閱及消費。

在廣義的概念上,只要能捕獲數(shù)據(jù)變更的技術(shù),我們都可以稱為 CDC 。通常我們說的 CDC 技術(shù)主要面向數(shù)據(jù)庫的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術(shù)。

CDC 技術(shù)應(yīng)用場景非常廣泛:

數(shù)據(jù)同步,用于備份,容災(zāi);

數(shù)據(jù)分發(fā),一個數(shù)據(jù)源分發(fā)給多個下游;

數(shù)據(jù)采集(E),面向數(shù)據(jù)倉庫/數(shù)據(jù)湖的 ETL 數(shù)據(jù)集成。

2. CDC 的種類

CDC 主要分為基于查詢和基于 Binlog 兩種方式,我們主要了解一下這兩種之間的區(qū)別:


基于查詢的 CDC基于 Binlog 的 CDC開源產(chǎn)品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium執(zhí)行模式BatchStreaming是否可以捕獲所有數(shù)據(jù)變化否是延遲性高延遲低延遲是否增加數(shù)據(jù)庫壓力是否3. 傳統(tǒng)CDC與Flink CDC對比1) 傳統(tǒng) CDC ETL 分析

2) 基于 Flink CDC 的 ETL 分析

2) 基于 Flink CDC 的聚合分析

2) 基于 Flink CDC 的數(shù)據(jù)打?qū)?/p>

4. Flink-CDC 案例

Flink 社區(qū)開發(fā)了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL等數(shù)據(jù)庫直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件。

開源地址:https://github.com/ververica/flink-cdc-connectors。

示例代碼:

import com.a(chǎn)libaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.a(chǎn)libaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.a(chǎn)libaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.a(chǎn)pache.flink.a(chǎn)pi.common.restartstrategy.RestartStrategies;
import org.a(chǎn)pache.flink.runtime.state.filesystem.FsStateBackend;
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.CheckpointingMode;
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.datastream.DataStreamSource;
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.CheckpointConfig;
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.Flink-CDC 將讀取 binlog 的位置信息以狀態(tài)的方式保存在 CK,如果想要做到斷點
續(xù)傳,需要從 Checkpoint 或者 Savepoint 啟動程序
//2.1 開啟 Checkpoint,每隔 5 秒鐘做一次 CK
env.enableCheckpointing(5000L);
//2.2 指定 CK 的一致性語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 設(shè)置任務(wù)關(guān)閉的時候保留最后一次 CK 數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定從 CK 自動重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 設(shè)置狀態(tài)后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
//2.6 設(shè)置訪問 HDFS 的用戶名
System.setProperty("HADOOP_USER_NAME", "atguigu");
//3.創(chuàng)建 Flink-MySQL-CDC 的 Source
//initial (default): Performs an initial snapshot on the monitored database tables upon
first startup, and continue to read the latest binlog.
//latest-offset: Never to perform snapshot on the monitored database tables upon first
startup, just read from the end of the binlog which means only have the changes since the
connector was started.
//timestamp: Never to perform snapshot on the monitored database tables upon first
startup, and directly read binlog from the specified timestamp. The consumer will traverse the
binlog from the beginning and ignore change events whose timestamp is smaller than the
specified timestamp.
//specific-offset: Never to perform snapshot on the monitored database tables upon
first startup, and directly read binlog from the specified offset.
DebeziumSourceFunction

雖然實時計算在最近幾年才火起來,但是在早期也有部分公司有實時計算的需求,但是數(shù)據(jù)量比較少,所以在實時方面形成不了完整的體系,基本所有的開發(fā)都是具體問題具體分析,來一個需求做一個,基本不考慮它們之間的關(guān)系,開發(fā)形式如下:

早期實時計算

如上圖所示,拿到數(shù)據(jù)源后,會經(jīng)過數(shù)據(jù)清洗,擴(kuò)維,通過Flink進(jìn)行業(yè)務(wù)邏輯處理,最后直接進(jìn)行業(yè)務(wù)輸出。把這個環(huán)節(jié)拆開來看,數(shù)據(jù)源端會重復(fù)引用相同的數(shù)據(jù)源,后面進(jìn)行清洗、過濾、擴(kuò)維等操作,都要重復(fù)做一遍,唯一不同的是業(yè)務(wù)的代碼邏輯是不一樣的。

隨著產(chǎn)品和業(yè)務(wù)人員對實時數(shù)據(jù)需求的不斷增多,這種開發(fā)模式出現(xiàn)的問題越來越多:

數(shù)據(jù)指標(biāo)越來越多,“煙囪式”的開發(fā)導(dǎo)致代碼耦合問題嚴(yán)重。

需求越來越多,有的需要明細(xì)數(shù)據(jù),有的需要 OLAP 分析。單一的開發(fā)模式難以應(yīng)付多種需求。

每個需求都要申請資源,導(dǎo)致資源成本急速膨脹,資源不能集約有效利用。

缺少完善的監(jiān)控系統(tǒng),無法在對業(yè)務(wù)產(chǎn)生影響之前發(fā)現(xiàn)并修復(fù)問題。

大家看實時數(shù)倉的發(fā)展和出現(xiàn)的問題,和離線數(shù)倉非常類似,后期數(shù)據(jù)量大了之后產(chǎn)生了各種問題,離線數(shù)倉當(dāng)時是怎么解決的?離線數(shù)倉通過分層架構(gòu)使數(shù)據(jù)解耦,多個業(yè)務(wù)可以共用數(shù)據(jù),實時數(shù)倉是否也可以用分層架構(gòu)呢?當(dāng)然是可以的,但是細(xì)節(jié)上和離線的分層還是有一些不同,稍后會講到。

2. 實時數(shù)倉建設(shè)

從方法論來講,實時和離線是非常相似的,離線數(shù)倉早期的時候也是具體問題具體分析,當(dāng)數(shù)據(jù)規(guī)模漲到一定量的時候才會考慮如何治理。分層是一種非常有效的數(shù)據(jù)治理方式,所以在實時數(shù)倉如何進(jìn)行管理的問題上,首先考慮的也是分層的處理邏輯。

實時數(shù)倉的架構(gòu)如下圖:

實時數(shù)倉架構(gòu)

從上圖中我們具體分析下每層的作用:

數(shù)據(jù)源:在數(shù)據(jù)源的層面,離線和實時在數(shù)據(jù)源是一致的,主要分為日志類和業(yè)務(wù)類,日志類又包括用戶日志,埋點日志以及服務(wù)器日志等。

實時明細(xì)層:在明細(xì)層,為了解決重復(fù)建設(shè)的問題,要進(jìn)行統(tǒng)一構(gòu)建,利用離線數(shù)倉的模式,建設(shè)統(tǒng)一的基礎(chǔ)明細(xì)數(shù)據(jù)層,按照主題進(jìn)行管理,明細(xì)層的目的是給下游提供直接可用的數(shù)據(jù),因此要對基礎(chǔ)層進(jìn)行統(tǒng)一的加工,比如清洗、過濾、擴(kuò)維等。

匯總層:匯總層通過Flink的簡潔算子直接可以算出結(jié)果,并且形成匯總指標(biāo)池,所有的指標(biāo)都統(tǒng)一在匯總層加工,所有人按照統(tǒng)一的規(guī)范管理建設(shè),形成可復(fù)用的匯總結(jié)果。

我們可以看出,實時數(shù)倉和離線數(shù)倉的分層非常類似,比如 數(shù)據(jù)源層,明細(xì)層,匯總層,乃至應(yīng)用層,他們命名的模式可能都是一樣的。但仔細(xì)比較不難發(fā)現(xiàn),兩者有很多區(qū)別:

與離線數(shù)倉相比,實時數(shù)倉的層次更少一些:

從目前建設(shè)離線數(shù)倉的經(jīng)驗來看,數(shù)倉的數(shù)據(jù)明細(xì)層內(nèi)容會非常豐富,處理明細(xì)數(shù)據(jù)外一般還會包含輕度匯總層的概念,另外離線數(shù)倉中應(yīng)用層數(shù)據(jù)在數(shù)倉內(nèi)部,但實時數(shù)倉中,app 應(yīng)用層數(shù)據(jù)已經(jīng)落入應(yīng)用系統(tǒng)的存儲介質(zhì)中,可以把該層與數(shù)倉的表分離。

應(yīng)用層少建設(shè)的好處:實時處理數(shù)據(jù)的時候,每建一個層次,數(shù)據(jù)必然會產(chǎn)生一定的延遲。

匯總層少建的好處:在匯總統(tǒng)計的時候,往往為了容忍一部分?jǐn)?shù)據(jù)的延遲,可能會人為的制造一些延遲來保證數(shù)據(jù)的準(zhǔn)確。舉例,在統(tǒng)計跨天相關(guān)的訂單事件中的數(shù)據(jù)時,可能會等到 00:00:05 或者 00:00:10 再統(tǒng)計,確保 00:00 前的數(shù)據(jù)已經(jīng)全部接受到位了,再進(jìn)行統(tǒng)計。所以,匯總層的層次太多的話,就會更大的加重人為造成的數(shù)據(jù)延遲。

與離線數(shù)倉相比,實時數(shù)倉的數(shù)據(jù)源存儲不同:

在建設(shè)離線數(shù)倉的時候,基本整個離線數(shù)倉都是建立在 Hive 表之上。但是,在建設(shè)實時數(shù)倉的時候,同一份表,會使用不同的方式進(jìn)行存儲。比如常見的情況下,明細(xì)數(shù)據(jù)或者匯總數(shù)據(jù)都會存在 Kafka 里面,但是像城市、渠道等維度信息需要借助 Hbase,MySQL 或者其他 KV 存儲等數(shù)據(jù)庫來進(jìn)行存儲。3. Lambda架構(gòu)的實時數(shù)倉

Lambda和Kappa架構(gòu)的概念已在前文中解釋,不了解的小伙伴可點擊鏈接:一文讀懂大數(shù)據(jù)實時計算

下圖是基于 Flink 和 Kafka 的 Lambda 架構(gòu)的具體實踐,上層是實時計算,下層是離線計算,橫向是按計算引擎來分,縱向是按實時數(shù)倉來區(qū)分:

Lambda架構(gòu)的實時數(shù)倉

Lambda架構(gòu)是比較經(jīng)典的架構(gòu),以前實時的場景不是很多,以離線為主,當(dāng)附加了實時場景后,由于離線和實時的時效性不同,導(dǎo)致技術(shù)生態(tài)是不一樣的。Lambda架構(gòu)相當(dāng)于附加了一條實時生產(chǎn)鏈路,在應(yīng)用層面進(jìn)行一個整合,雙路生產(chǎn),各自獨立。這在業(yè)務(wù)應(yīng)用中也是順理成章采用的一種方式。

雙路生產(chǎn)會存在一些問題,比如加工邏輯double,開發(fā)運(yùn)維也會double,資源同樣會變成兩個資源鏈路。因為存在以上問題,所以又演進(jìn)了一個Kappa架構(gòu)。

4. Kappa架構(gòu)的實時數(shù)倉

Kappa架構(gòu)相當(dāng)于去掉了離線計算部分的Lambda架構(gòu),具體如下圖所示:

Kappa架構(gòu)的實時數(shù)倉

Kappa架構(gòu)從架構(gòu)設(shè)計來講比較簡單,生產(chǎn)統(tǒng)一,一套邏輯同時生產(chǎn)離線和實時。但是在實際應(yīng)用場景有比較大的局限性,因為實時數(shù)據(jù)的同一份表,會使用不同的方式進(jìn)行存儲,這就導(dǎo)致關(guān)聯(lián)時需要跨數(shù)據(jù)源,操作數(shù)據(jù)有很大局限性,所以在業(yè)內(nèi)直接用Kappa架構(gòu)生產(chǎn)落地的案例不多見,且場景比較單一。

關(guān)于 Kappa 架構(gòu),熟悉實時數(shù)倉生產(chǎn)的同學(xué),可能會有一個疑問。因為我們經(jīng)常會面臨業(yè)務(wù)變更,所以很多業(yè)務(wù)邏輯是需要去迭代的。之前產(chǎn)出的一些數(shù)據(jù),如果口徑變更了,就需要重算,甚至重刷歷史數(shù)據(jù)。對于實時數(shù)倉來說,怎么去解決數(shù)據(jù)重算問題?

Kappa 架構(gòu)在這一塊的思路是:首先要準(zhǔn)備好一個能夠存儲歷史數(shù)據(jù)的消息隊列,比如 Kafka,并且這個消息隊列是可以支持你從某個歷史的節(jié)點重新開始消費的。接著需要新起一個任務(wù),從原來比較早的一個時間節(jié)點去消費 Kafka 上的數(shù)據(jù),然后當(dāng)這個新的任務(wù)運(yùn)行的進(jìn)度已經(jīng)能夠和現(xiàn)在的正在跑的任務(wù)齊平的時候,你就可以把現(xiàn)在任務(wù)的下游切換到新的任務(wù)上面,舊的任務(wù)就可以停掉,并且原來產(chǎn)出的結(jié)果表也可以被刪掉。

5. 流批結(jié)合的實時數(shù)倉

隨著實時 OLAP 技術(shù)的發(fā)展,目前開源的OLAP引擎在性能,易用等方面有了很大的提升,如Doris、Presto等,加上數(shù)據(jù)湖技術(shù)的迅速發(fā)展,使得流批結(jié)合的方式變得簡單。

如下圖是流批結(jié)合的實時數(shù)倉:

流批結(jié)合的實時數(shù)倉

數(shù)據(jù)從日志統(tǒng)一采集到消息隊列,再到實時數(shù)倉,作為基礎(chǔ)數(shù)據(jù)流的建設(shè)是統(tǒng)一的。之后對于日志類實時特征,實時大屏類應(yīng)用走實時流計算。對于Binlog類業(yè)務(wù)分析走實時OLAP批處理。

我們看到流批結(jié)合的方式與上面幾種架構(gòu)的存儲方式發(fā)生了變化,由Kafka換成了Iceberg,Iceberg是介于上層計算引擎和底層存儲格式之間的一個中間層,我們可以把它定義成一種“數(shù)據(jù)組織格式”,底層存儲還是HDFS,那么為什么加了中間層,就對流批結(jié)合處理的比較好了呢?Iceberg的ACID能力可以簡化整個流水線的設(shè)計,降低整個流水線的延遲,并且所具有的修改、刪除能力能夠有效地降低開銷,提升效率。Iceberg可以有效支持批處理的高吞吐數(shù)據(jù)掃描和流計算按分區(qū)粒度并發(fā)實時處理。

十二、Flink 面試題1. Flink 的容錯機(jī)制(checkpoint)

Checkpoint機(jī)制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現(xiàn)故障時,能夠?qū)⒄麄應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保證應(yīng)用流圖狀態(tài)的一致性。Flink的Checkpoint機(jī)制原理來自“Chandy-Lamport algorithm”算法。

每個需要Checkpoint的應(yīng)用在啟動時,Flink的JobManager為其創(chuàng)建一個 CheckpointCoordinator(檢查點協(xié)調(diào)器),CheckpointCoordinator全權(quán)負(fù)責(zé)本應(yīng)用的快照制作。

CheckpointCoordinator(檢查點協(xié)調(diào)器),CheckpointCoordinator全權(quán)負(fù)責(zé)本應(yīng)用的快照制作。

CheckpointCoordinator(檢查點協(xié)調(diào)器) 周期性的向該流應(yīng)用的所有source算子發(fā)送 barrier(屏障)。

當(dāng)某個source算子收到一個barrier時,便暫停數(shù)據(jù)處理過程,然后將自己的當(dāng)前狀態(tài)制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自己快照制作情況,同時向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理

下游算子收到barrier之后,會暫停自己的數(shù)據(jù)處理過程,然后將自身的相關(guān)狀態(tài)制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自身快照情況,同時向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理。

每個算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。

當(dāng)CheckpointCoordinator收到所有算子的報告之后,認(rèn)為該周期的快照制作成功; 否則,如果在規(guī)定的時間內(nèi)沒有收到所有算子的報告,則認(rèn)為本周期快照制作失敗。

文章推薦:

Flink可靠性的基石-checkpoint機(jī)制詳細(xì)解析

2. Flink Checkpoint與 Spark 的相比,Flink 有什么區(qū)別或優(yōu)勢嗎

Spark Streaming 的 Checkpoint 僅僅是針對 Driver 的故障恢復(fù)做了數(shù)據(jù)和元數(shù)據(jù)的 Checkpoint。而 Flink 的 Checkpoint 機(jī)制要復(fù)雜了很多,它采用的是輕量級的分布式快照,實現(xiàn)了每個算子的快照,及流動中的數(shù)據(jù)的快照。

3. Flink 中的 Time 有哪幾種

Flink中的時間有三種類型,如下圖所示:

Event Time:是事件創(chuàng)建的時間。它通常由事件中的時間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。

Ingestion Time:是數(shù)據(jù)進(jìn)入Flink的時間。

Processing Time:是每一個執(zhí)行基于時間操作的算子的本地系統(tǒng)時間,與機(jī)器相關(guān),默認(rèn)的時間屬性就是Processing Time。

例如,一條日志進(jìn)入Flink的時間為2021-01-22 10:00:00.123,到達(dá)Window的系統(tǒng)時間為2021-01-22 10:00:01.234,日志的內(nèi)容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2

對于業(yè)務(wù)來說,要統(tǒng)計1min內(nèi)的故障日志個數(shù),哪個時間是最有意義的?—— eventTime,因為我們要根據(jù)日志的生成時間進(jìn)行統(tǒng)計。

4. 對于遲到數(shù)據(jù)是怎么處理的

Flink中 WaterMark 和 Window 機(jī)制解決了流式數(shù)據(jù)的亂序問題,對于因為延遲而順序有誤的數(shù)據(jù),可以根據(jù)eventTime進(jìn)行業(yè)務(wù)處理,對于延遲的數(shù)據(jù)Flink也有自己的解決辦法,主要的辦法是給定一個允許延遲的時間,在該時間范圍內(nèi)仍可以接受處理延遲數(shù)據(jù):

設(shè)置允許延遲的時間是通過allowedLateness(lateness: Time)設(shè)置

保存延遲數(shù)據(jù)則是通過sideOutputLateData(outputTag: OutputTag[T])保存

獲取延遲數(shù)據(jù)是通過DataStream.getSideOutput(tag: OutputTag[X])獲取

文章推薦:

Flink 中極其重要的 Time 與 Window 詳細(xì)解析

5. Flink 的運(yùn)行必須依賴 Hadoop 組件嗎

Flink可以完全獨立于Hadoop,在不依賴Hadoop組件下運(yùn)行。但是做為大數(shù)據(jù)的基礎(chǔ)設(shè)施,Hadoop體系是任何大數(shù)據(jù)框架都繞不過去的。Flink可以集成眾多Hadooop 組件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做資源調(diào)度,也可以讀寫HDFS,或者利用HDFS做檢查點。

6. Flink集群有哪些角色?各自有什么作用

有以下三個角色:

JobManager處理器:

也稱之為Master,用于協(xié)調(diào)分布式執(zhí)行,它們用來調(diào)度task,協(xié)調(diào)檢查點,協(xié)調(diào)失敗時恢復(fù)等。Flink運(yùn)行時至少存在一個master處理器,如果配置高可用模式則會存在多個master處理器,它們其中有一個是leader,而其他的都是standby。

TaskManager處理器:

也稱之為Worker,用于執(zhí)行一個dataflow的task(或者特殊的subtask)、數(shù)據(jù)緩沖和data stream的交換,Flink運(yùn)行時至少會存在一個worker處理器。

Clint客戶端:

Client是Flink程序提交的客戶端,當(dāng)用戶提交一個Flink程序時,會首先創(chuàng)建一個Client,該Client首先會對用戶提交的Flink程序進(jìn)行預(yù)處理,并提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,并建立到JobManager的連接,將Flink Job提交給JobManager

7. Flink 資源管理中 Task Slot 的概念

在Flink中每個TaskManager是一個JVM的進(jìn)程, 可以在不同的線程中執(zhí)行一個或多個子任務(wù)。為了控制一個worker能接收多少個task。worker通過task slot(任務(wù)槽)來進(jìn)行控制(一個worker至少有一個task slot)。

8. Flink的重啟策略了解嗎

Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟:

固定延遲重啟策略

固定延遲重啟策略會嘗試一個給定的次數(shù)來重啟Job,如果超過了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會等待一個固定的時間。

失敗率重啟策略

失敗率重啟策略在Job失敗后會重啟,但是超過失敗率后,Job會最終被認(rèn)定失敗。在兩個連續(xù)的重啟嘗試之間,重啟策略會等待一個固定的時間。

無重啟策略

Job直接失敗,不會嘗試進(jìn)行重啟。

9. Flink 是如何保證 Exactly-once 語義的

Flink通過實現(xiàn)兩階段提交和狀態(tài)保存來實現(xiàn)端到端的一致性語義。分為以下幾個步驟:

開始事務(wù)(beginTransaction)創(chuàng)建一個臨時文件夾,來寫把數(shù)據(jù)寫入到這個文件夾里面

預(yù)提交(preCommit)將內(nèi)存中緩存的數(shù)據(jù)寫入文件并關(guān)閉

正式提交(commit)將之前寫完的臨時文件放入目標(biāo)目錄下。這代表著最終的數(shù)據(jù)會有一些延遲

丟棄(abort)丟棄臨時文件

若失敗發(fā)生在預(yù)提交成功后,正式提交前。可以根據(jù)狀態(tài)來提交預(yù)提交的數(shù)據(jù),也可刪除預(yù)提交的數(shù)據(jù)。

文章推薦:

八張圖搞懂 Flink 端到端精準(zhǔn)一次處理語義 Exactly-once

10. 如果下級存儲不支持事務(wù),Flink 怎么保證 exactly-once

端到端的 exactly-once 對 sink 要求比較高,具體實現(xiàn)主要有冪等寫入和事務(wù)性寫入兩種方式。

冪等寫入的場景依賴于業(yè)務(wù)邏輯,更常見的是用事務(wù)性寫入。而事務(wù)性寫入又有預(yù)寫日志(WAL)和兩階段提交(2PC)兩種方式。

如果外部系統(tǒng)不支持事務(wù),那么可以用預(yù)寫日志的方式,把結(jié)果數(shù)據(jù)先當(dāng)成狀態(tài)保存,然后在收到 checkpoint 完成的通知時,一次性寫入 sink 系統(tǒng)。

11. Flink是如何處理反壓的

Flink 內(nèi)部是基于 producer-consumer 模型來進(jìn)行消息傳遞的,Flink的反壓設(shè)計也是基于這個模型。Flink 使用了高效有界的分布式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣。下游消費者消費變慢,上游就會受到阻塞。

12. Flink中的狀態(tài)存儲

Flink在做計算的過程中經(jīng)常需要存儲中間狀態(tài),來避免數(shù)據(jù)丟失和狀態(tài)恢復(fù)。選擇的狀態(tài)存儲策略不同,會影響狀態(tài)持久化如何和 checkpoint 交互。Flink提供了三種狀態(tài)存儲方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。

13. Flink是如何支持流批一體的

這道題問的比較開闊,如果知道Flink底層原理,可以詳細(xì)說說,如果不是很了解,就直接簡單一句話:Flink的開發(fā)者認(rèn)為批處理是流處理的一種特殊情況。批處理是有限的流處理。Flink 使用一個引擎支持了 DataSet API 和 DataStream API。

14. Flink的內(nèi)存管理是如何做的

Flink 并不是將大量對象存在堆上,而是將對象都序列化到一個預(yù)分配的內(nèi)存塊上。此外,Flink大量的使用了堆外內(nèi)存。如果需要處理的數(shù)據(jù)超出了內(nèi)存限制,則會將部分?jǐn)?shù)據(jù)存儲到硬盤上。Flink 為了直接操作二進(jìn)制數(shù)據(jù)實現(xiàn)了自己的序列化框架。

15. Flink CEP 編程中當(dāng)狀態(tài)沒有到達(dá)的時候會將數(shù)據(jù)保存在哪里

在流式處理中,CEP 當(dāng)然是要支持 EventTime 的,那么相對應(yīng)的也要支持?jǐn)?shù)據(jù)的遲到現(xiàn)象,也就是watermark的處理邏輯。CEP對未匹配成功的事件序列的處理,和遲到數(shù)據(jù)是類似的。在 Flink CEP的處理邏輯中,狀態(tài)沒有滿足的和遲到的數(shù)據(jù),都會存儲在一個Map數(shù)據(jù)結(jié)構(gòu)中,也就是說,如果我們限定判斷事件序列的時長為5分鐘,那么內(nèi)存中就會存儲5分鐘的數(shù)據(jù),這在我看來,也是對內(nèi)存的極大損傷之一。

最后

第一時間獲取最新大數(shù)據(jù)技術(shù),盡在本公眾號:五分鐘學(xué)大數(shù)據(jù)

<上一頁  1  2  3  4  
聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權(quán)或其他問題,請聯(lián)系舉報。

發(fā)表評論

0條評論,0人參與

請輸入評論內(nèi)容...

請輸入評論/評論長度6~500個字

您提交的評論過于頻繁,請輸入驗證碼繼續(xù)

暫無評論

暫無評論

人工智能 獵頭職位 更多
掃碼關(guān)注公眾號
OFweek人工智能網(wǎng)
獲取更多精彩內(nèi)容
文章糾錯
x
*文字標(biāo)題:
*糾錯內(nèi)容:
聯(lián)系郵箱:
*驗 證 碼:

粵公網(wǎng)安備 44030502002758號