訂閱
糾錯(cuò)
加入自媒體

一文詳解Flink知識(shí)體系

2021-09-13 09:58
園陌
關(guān)注

4) Flink 關(guān)聯(lián) Hive 分區(qū)表

Flink 1.12 支持了 Hive 最新的分區(qū)作為時(shí)態(tài)表的功能,可以通過(guò) SQL 的方式直接關(guān)聯(lián) Hive 分區(qū)表的最新分區(qū),并且會(huì)自動(dòng)監(jiān)聽最新的 Hive 分區(qū),當(dāng)監(jiān)控到新的分區(qū)后,會(huì)自動(dòng)地做維表數(shù)據(jù)的全量替換。通過(guò)這種方式,用戶無(wú)需編寫 DataStream 程序即可完成 Kafka 流實(shí)時(shí)關(guān)聯(lián)最新的 Hive 分區(qū)實(shí)現(xiàn)數(shù)據(jù)打?qū)挕?/p>

具體用法:

在 Sql Client 中注冊(cè) HiveCatalog:

vim conf/sql-client-defaults.yaml
catalogs:
 - name: hive_catalog
   type: hive
   hive-conf-dir: /disk0/soft/hive-conf/ #該目錄需要包hive-site.xml文件

創(chuàng)建 Kafka 表


CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  
   master Row

Flink 事實(shí)表與 Hive 最新分區(qū)數(shù)據(jù)關(guān)聯(lián)

dim_extend_shop_info 是 Hive 中已存在的表,所以我們用 table hint 動(dòng)態(tài)地開啟維表參數(shù)。


CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
SELECT * FROM  
(select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,  
    ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
   from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
      JOIN hive_catalog.flink_db.dim_extend_shop_info  
 + OPTIONS('streaming-source.enable'='true',  
    'streaming-source.partition.include' = 'latest',  
    'streaming-source.monitor-interval' = '1 h',
    'streaming-source.partition-order' = 'partition-name')
   FOR SYSTEM_TIME AS OF t1.proctime AS t2 --時(shí)態(tài)表  
   ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
   where groupID in (202042)) t  where t.rn = 1

參數(shù)解釋:

streaming-source.enable 開啟流式讀取 Hive 數(shù)據(jù)。

streaming-source.partition.include 有以下兩個(gè)值:

latest 屬性: 只讀取最新分區(qū)數(shù)據(jù)。all: 讀取全量分區(qū)數(shù)據(jù) ,默認(rèn)值為 all,表示讀所有分區(qū),latest 只能用在 temporal join 中,用于讀取最新分區(qū)作為維表,不能直接讀取最新分區(qū)數(shù)據(jù)。

streaming-source.monitor-interval 監(jiān)聽新分區(qū)生成的時(shí)間、不宜過(guò)短 、最短是1 個(gè)小時(shí),因?yàn)槟壳暗膶?shí)現(xiàn)是每個(gè) task 都會(huì)查詢 metastore,高頻的查可能會(huì)對(duì)metastore 產(chǎn)生過(guò)大的壓力。需要注意的是,1.12.1 放開了這個(gè)限制,但仍建議按照實(shí)際業(yè)務(wù)不要配個(gè)太短的 interval。

streaming-source.partition-order 分區(qū)策略,主要有以下 3 種,其中最為推薦的是 partition-name:

partition-name 使用默認(rèn)分區(qū)名稱順序加載最新分區(qū)create-time 使用分區(qū)文件創(chuàng)建時(shí)間順序partition-time 使用分區(qū)時(shí)間順序六、Flink 狀態(tài)管理

我們前面寫的 wordcount 的例子,沒有包含狀態(tài)管理。如果一個(gè)task在處理過(guò)程中掛掉了,那么它在內(nèi)存中的狀態(tài)都會(huì)丟失,所有的數(shù)據(jù)都需要重新計(jì)算。從容錯(cuò)和消息處理的語(yǔ)義上(at least once, exactly once),Flink引入了state和checkpoint。

因此可以說(shuō)flink因?yàn)橐肓藄tate和checkpoint所以才支持的exactly once

首先區(qū)分一下兩個(gè)概念:

state:

state一般指一個(gè)具體的task/operator的狀態(tài):

state數(shù)據(jù)默認(rèn)保存在java的堆內(nèi)存中,TaskManage節(jié)點(diǎn)的內(nèi)存中。

operator表示一些算子在運(yùn)行的過(guò)程中會(huì)產(chǎn)生的一些中間結(jié)果。

checkpoint:

checkpoint可以理解為checkpoint是把state數(shù)據(jù)定時(shí)持久化存儲(chǔ)了,則表示了一個(gè)Flink Job在一個(gè)特定時(shí)刻的一份全局狀態(tài)快照,即包含了所有task/operator的狀態(tài)。

注意:task(subTask)是Flink中執(zhí)行的基本單位。operator指算子(transformation)

State可以被記錄,在失敗的情況下數(shù)據(jù)還可以恢復(fù)。

Flink中有兩種基本類型的State:

Keyed State

Operator State

Keyed State和Operator State,可以以兩種形式存在:

原始狀態(tài)(raw state)

托管狀態(tài)(managed state)

托管狀態(tài)是由Flink框架管理的狀態(tài)。

我們說(shuō)operator算子保存了數(shù)據(jù)的中間結(jié)果,中間結(jié)果保存在什么類型中,如果我們這里是托管狀態(tài),則由flink框架自行管理

原始狀態(tài)由用戶自行管理狀態(tài)具體的數(shù)據(jù)結(jié)構(gòu),框架在做checkpoint的時(shí)候,使用byte[]來(lái)讀寫狀態(tài)內(nèi)容,對(duì)其內(nèi)部數(shù)據(jù)結(jié)構(gòu)一無(wú)所知。

通常在DataStream上的狀態(tài)推薦使用托管的狀態(tài),當(dāng)實(shí)現(xiàn)一個(gè)用戶自定義的operator時(shí),會(huì)使用到原始狀態(tài)。

1. State-Keyed State

基于KeyedStream上的狀態(tài)。這個(gè)狀態(tài)是跟特定的key綁定的,對(duì)KeyedStream流上的每一個(gè)key,都對(duì)應(yīng)一個(gè)state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解為分區(qū)過(guò)的Operator State。

保存state的數(shù)據(jù)結(jié)構(gòu):

ValueState:即類型為T的單值狀態(tài)。這個(gè)狀態(tài)與對(duì)應(yīng)的key綁定,是最簡(jiǎn)單的狀態(tài)了。它可以通過(guò)update方法更新狀態(tài)值,通過(guò)value()方法獲取狀態(tài)值。

ListState:即key上的狀態(tài)值為一個(gè)列表?梢酝ㄟ^(guò)add方法往列表中附加值;也可以通過(guò)get()方法返回一個(gè)Iterable來(lái)遍歷狀態(tài)值。

ReducingState:這種狀態(tài)通過(guò)用戶傳入的reduceFunction,每次調(diào)用add方法添加值的時(shí)候,會(huì)調(diào)用reduceFunction,最后合并到一個(gè)單一的狀態(tài)值。

MapState

需要注意的是,以上所述的State對(duì)象,僅僅用于與狀態(tài)進(jìn)行交互(更新、刪除、清空等),而真正的狀態(tài)值,有可能是存在內(nèi)存、磁盤、或者其他分布式存儲(chǔ)系統(tǒng)中。相當(dāng)于我們只是持有了這個(gè)狀態(tài)的句柄。

1. ValueState

使用ValueState保存中間結(jié)果對(duì)下面數(shù)據(jù)進(jìn)行分組求和。

開發(fā)步驟:

1. 獲取流處理執(zhí)行環(huán)境
 2. 加載數(shù)據(jù)源
 3. 數(shù)據(jù)分組
 4. 數(shù)據(jù)轉(zhuǎn)換,定義ValueState,保存中間結(jié)果
 5. 數(shù)據(jù)打印
 6. 觸發(fā)執(zhí)行

ValueState:測(cè)試數(shù)據(jù)源:

List(
  (1L, 4L),
  (2L, 3L),
  (3L, 1L),
  (1L, 2L),
  (3L, 2L),
  (1L, 2L),
  (2L, 2L),
  (2L, 9L)
)

示例代碼:

import org.a(chǎn)pache.flink.a(chǎn)pi.common.functions.RichFlatMapFunction
import org.a(chǎn)pache.flink.a(chǎn)pi.common.state.{ValueState, ValueStateDescriptor}
import org.a(chǎn)pache.flink.a(chǎn)pi.common.typeinfo.{TypeHint, TypeInformation}
import org.a(chǎn)pache.flink.configuration.Configuration
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala._
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.{DataStream, StreamExecutionEnvironment}
import org.a(chǎn)pache.flink.util.Collector
object TestKeyedState {
 class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
   *
    * ValueState狀態(tài)句柄. 第一個(gè)值為count,第二個(gè)值為sum。
   
   private var sum: ValueState[(Long, Long)] = _
   override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
     // 獲取當(dāng)前狀態(tài)值
     val tmpCurrentSum: (Long, Long) = sum.value
     // 狀態(tài)默認(rèn)值
     val currentSum = if (tmpCurrentSum != null) {
       tmpCurrentSum
     } else {
       (0L, 0L)
     }
     // 更新
     val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
     // 更新狀態(tài)值
     sum.update(newSum)
     // 如果count >=3 清空狀態(tài)值,重新計(jì)算
     if (newSum._1 >= 3) {
       out.collect((input._1, newSum._2 / newSum._1))
       sum.clear()
     }
   }
   override def open(parameters: Configuration): Unit = {
     sum = getRuntimeContext.getState(
       new ValueStateDescriptor[(Long, Long)]("average", // 狀態(tài)名稱
         TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 狀態(tài)類型
     )
   }
 }  
 def main(args: Array[String]): Unit = {
   //初始化執(zhí)行環(huán)境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //構(gòu)建數(shù)據(jù)源
   val inputStream: DataStream[(Long, Long)] = env.fromCollection(
     List(
       (1L, 4L),
       (2L, 3L),
       (3L, 1L),
       (1L, 2L),
       (3L, 2L),
       (1L, 2L),
       (2L, 2L),
       (2L, 9L))
   )
   //執(zhí)行數(shù)據(jù)處理
   inputStream.keyBy(0)
     .flatMap(new CountWithKeyedState)
     .setParallelism(1)
     .print
   //運(yùn)行任務(wù)
   env.execute
 }
}  
2. MapState

使用MapState保存中間結(jié)果對(duì)下面數(shù)據(jù)進(jìn)行分組求和:

1. 獲取流處理執(zhí)行環(huán)境
 2. 加載數(shù)據(jù)源
 3. 數(shù)據(jù)分組
 4. 數(shù)據(jù)轉(zhuǎn)換,定義MapState,保存中間結(jié)果
 5. 數(shù)據(jù)打印
 6. 觸發(fā)執(zhí)行

MapState:測(cè)試數(shù)據(jù)源:

List(
  ("java", 1),
  ("python", 3),
  ("java", 2),
  ("scala", 2),
  ("python", 1),
  ("java", 1),
  ("scala", 2)
)  

示例代碼:

object MapState {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   *
     * 使用MapState保存中間結(jié)果對(duì)下面數(shù)據(jù)進(jìn)行分組求和
     * 1.獲取流處理執(zhí)行環(huán)境
     * 2.加載數(shù)據(jù)源
     * 3.?dāng)?shù)據(jù)分組
     * 4.?dāng)?shù)據(jù)轉(zhuǎn)換,定義MapState,保存中間結(jié)果
     * 5.?dāng)?shù)據(jù)打印
     * 6.觸發(fā)執(zhí)行
     
   val source: DataStream[(String, Int)] = env.fromCollection(List(
     ("java", 1),
     ("python", 3),
     ("java", 2),
     ("scala", 2),
     ("python", 1),
     ("java", 1),
     ("scala", 2)))
 
   source.keyBy(0)
     .map(new RichMapFunction[(String, Int), (String, Int)] {
       var mste: MapState[String, Int] = _
       override def open(parameters: Configuration): Unit = {
         val msState = new MapStateDescriptor[String, Int]("ms",
           TypeInformation.of(new TypeHint[(String)] {}),
           TypeInformation.of(new TypeHint[(Int)] {}))
         mste = getRuntimeContext.getMapState(msState)
       }
       override def map(value: (String, Int)): (String, Int) = {
         val i: Int = mste.get(value._1)
         mste.put(value._1, value._2 + i)
         (value._1, value._2 + i)
       }
     }).print()
   env.execute()
 }
}  
2. State-Operator State

與Key無(wú)關(guān)的State,與Operator綁定的state,整個(gè)operator只對(duì)應(yīng)一個(gè)state。

保存state的數(shù)據(jù)結(jié)構(gòu):

ListState

舉例來(lái)說(shuō),Flink中的 Kafka Connector,就使用了operator state。它會(huì)在每個(gè)connector實(shí)例中,保存該實(shí)例中消費(fèi)topic的所有(partition, offset)映射。

步驟:

獲取執(zhí)行環(huán)境

設(shè)置檢查點(diǎn)機(jī)制:路徑,重啟策略

自定義數(shù)據(jù)源

需要繼承并行數(shù)據(jù)源和CheckpointedFunction設(shè)置listState,通過(guò)上下文對(duì)象context獲取數(shù)據(jù)處理,保留offset制作快照

數(shù)據(jù)打印

觸發(fā)執(zhí)行

示例代碼:

import java.util
import org.a(chǎn)pache.flink.a(chǎn)pi.common.restartstrategy.RestartStrategies
import org.a(chǎn)pache.flink.a(chǎn)pi.common.state.{ListState, ListStateDescriptor}
import org.a(chǎn)pache.flink.a(chǎn)pi.common.time.Time
import org.a(chǎn)pache.flink.a(chǎn)pi.common.typeinfo.{TypeHint, TypeInformation}
import org.a(chǎn)pache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.a(chǎn)pache.flink.runtime.state.filesystem.FsStateBackend
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.CheckpointingMode
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.checkpoint.CheckpointedFunction
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.environment.CheckpointConfig
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala.StreamExecutionEnvironment
import org.a(chǎn)pache.flink.streaming.a(chǎn)pi.scala._
object ListOperate {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   env.enableCheckpointing(5000)
   env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
   env.getCheckpointConfig.setCheckpointTimeout(60000)
   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
   env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   //重啟策略
   env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))
   //模擬kakfa偏移量
   env.a(chǎn)ddSource(new MyRichParrelSourceFun)
     .print()
   env.execute()
 }
}
class MyRichParrelSourceFun extends RichParallelSourceFunction[String]
 with CheckpointedFunction {
 var listState: ListState[Long] = _
 var offset: Long = 0L
 //任務(wù)運(yùn)行
 override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
   val iterState: util.Iterator[Long] = listState.get().iterator()
   while (iterState.hasNext) {
     offset = iterState.next()
   }
   while (true) {
     offset += 1
     ctx.collect("offset:"+offset)
     Thread.sleep(1000)
     if(offset > 10){
       1/0
     }
   }
 }
 //取消任務(wù)
 override def cancel(): Unit = ???
 //制作快照
 override def snapshotState(context: FunctionSnapshotContext): Unit = {
   listState.clear()
   listState.a(chǎn)dd(offset)
 }
 //初始化狀態(tài)
 override def initializeState(context: FunctionInitializationContext): Unit = {
   listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long](
     "listState", TypeInformation.of(new TypeHint[Long] {})
   ))
 }
}
3. Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在開發(fā)過(guò)程中,如果遇到需要下發(fā)/廣播配置、規(guī)則等低吞吐事件流到下游所有 task 時(shí),就可以使用 Broadcast State 特性。下游的 task 接收這些配置、規(guī)則并保存為 BroadcastState, 將這些配置應(yīng)用到另一個(gè)數(shù)據(jù)流的計(jì)算中 。

1) API介紹

通常,我們首先會(huì)創(chuàng)建一個(gè)Keyed或Non-Keyed的Data Stream,然后再創(chuàng)建一個(gè)Broadcasted Stream,最后通過(guò)Data Stream來(lái)連接(調(diào)用connect方法)到Broadcasted Stream上,這樣實(shí)現(xiàn)將Broadcast State廣播到Data Stream下游的每個(gè)Task中。

如果Data Stream是Keyed Stream,則連接到Broadcasted Stream后,添加處理ProcessFunction時(shí)需要使用KeyedBroadcastProcessFunction來(lái)實(shí)現(xiàn),下面是KeyedBroadcastProcessFunction的API,代碼如下所示:

public abstract class KeyedBroadcastProcessFunction

上面泛型中的各個(gè)參數(shù)的含義,說(shuō)明如下:

KS:表示Flink程序從最上游的Source Operator開始構(gòu)建Stream,當(dāng)調(diào)用keyBy時(shí)所依賴的Key的類型;IN1:表示非Broadcast的Data Stream中的數(shù)據(jù)記錄的類型;IN2:表示Broadcast Stream中的數(shù)據(jù)記錄的類型;OUT:表示經(jīng)過(guò)KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法處理后輸出結(jié)果數(shù)據(jù)記錄的類型。

如果Data Stream是Non-Keyed Stream,則連接到Broadcasted Stream后,添加處理ProcessFunction時(shí)需要使用BroadcastProcessFunction來(lái)實(shí)現(xiàn),下面是BroadcastProcessFunction的API,代碼如下所示:

public abstract class BroadcastProcessFunction

上面泛型中的各個(gè)參數(shù)的含義,與前面KeyedBroadcastProcessFunction的泛型類型中的后3個(gè)含義相同,只是沒有調(diào)用keyBy操作對(duì)原始Stream進(jìn)行分區(qū)操作,就不需要KS泛型參數(shù)。

注意事項(xiàng):

Broadcast State 是Map類型,即K-V類型。

Broadcast State 只有在廣播一側(cè)的方法中processBroadcastElement可以修改;在非廣播一側(cè)方法中processElement只讀。

Broadcast State在運(yùn)行時(shí)保存在內(nèi)存中。

2) 場(chǎng)景舉例

動(dòng)態(tài)更新計(jì)算規(guī)則: 如事件流需要根據(jù)最新的規(guī)則進(jìn)行計(jì)算,則可將規(guī)則作為廣播狀態(tài)廣播到下游Task中。

實(shí)時(shí)增加額外字段: 如事件流需要實(shí)時(shí)增加用戶的基礎(chǔ)信息,則可將用戶的基礎(chǔ)信息作為廣播狀態(tài)廣播到下游Task中。

七、Flink的容錯(cuò)1. Checkpoint介紹

checkpoint機(jī)制是Flink可靠性的基石,可以保證Flink集群在某個(gè)算子因?yàn)槟承┰?如 異常退出)出現(xiàn)故障時(shí),能夠?qū)⒄麄(gè)應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保 證應(yīng)用流圖狀態(tài)的一致性。Flink的checkpoint機(jī)制原理來(lái)自“Chandy-Lamport algorithm”算法。

每個(gè)需要checkpoint的應(yīng)用在啟動(dòng)時(shí),Flink的JobManager為其創(chuàng)建一個(gè) CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器),CheckpointCoordinator全權(quán)負(fù)責(zé)本應(yīng)用的快照制作。

CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器) 周期性的向該流應(yīng)用的所有source算子發(fā)送 barrier(屏障)。

當(dāng)某個(gè)source算子收到一個(gè)barrier時(shí),便暫停數(shù)據(jù)處理過(guò)程,然后將自己的當(dāng)前狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中,最后向CheckpointCoordinator報(bào)告自己快照制作情況,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理

下游算子收到barrier之后,會(huì)暫停自己的數(shù)據(jù)處理過(guò)程,然后將自身的相關(guān)狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中,最后向CheckpointCoordinator報(bào)告自身快照情況,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理。

每個(gè)算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。

當(dāng)CheckpointCoordinator收到所有算子的報(bào)告之后,認(rèn)為該周期的快照制作成功; 否則,如果在規(guī)定的時(shí)間內(nèi)沒有收到所有算子的報(bào)告,則認(rèn)為本周期快照制作失敗。

如果一個(gè)算子有兩個(gè)輸入源,則暫時(shí)阻塞先收到barrier的輸入源,等到第二個(gè)輸入源相 同編號(hào)的barrier到來(lái)時(shí),再制作自身快照并向下游廣播該barrier。具體如下圖所示:

假設(shè)算子C有A和B兩個(gè)輸入源

在第i個(gè)快照周期中,由于某些原因(如處理時(shí)延、網(wǎng)絡(luò)時(shí)延等)輸入源A發(fā)出的 barrier 先到來(lái),這時(shí)算子C暫時(shí)將輸入源A的輸入通道阻塞,僅收輸入源B的數(shù)據(jù)。

當(dāng)輸入源B發(fā)出的barrier到來(lái)時(shí),算子C制作自身快照并向 CheckpointCoordinator 報(bào)告自身的快照制作情況,然后將兩個(gè)barrier合并為一個(gè),向下游所有的算子廣播。

當(dāng)由于某些原因出現(xiàn)故障時(shí),CheckpointCoordinator通知流圖上所有算子統(tǒng)一恢復(fù)到某個(gè)周期的checkpoint狀態(tài),然后恢復(fù)數(shù)據(jù)流處理。分布式checkpoint機(jī)制保證了數(shù)據(jù)僅被處理一次(Exactly Once)。

2. 持久化存儲(chǔ)1) MemStateBackend

該持久化存儲(chǔ)主要將快照數(shù)據(jù)保存到JobManager的內(nèi)存中,僅適合作為測(cè)試以及快照的數(shù)據(jù)量非常小時(shí)使用,并不推薦用作大規(guī)模商業(yè)部署。

MemoryStateBackend 的局限性:

默認(rèn)情況下,每個(gè)狀態(tài)的大小限制為 5 MB。可以在MemoryStateBackend的構(gòu)造函數(shù)中增加此值。

無(wú)論配置的最大狀態(tài)大小如何,狀態(tài)都不能大于akka幀的大小(請(qǐng)參閱配置)。

聚合狀態(tài)必須適合 JobManager 內(nèi)存。

建議MemoryStateBackend 用于:

本地開發(fā)和調(diào)試。

狀態(tài)很少的作業(yè),例如僅包含一次記錄功能的作業(yè)(Map,FlatMap,Filter,...),kafka的消費(fèi)者需要很少的狀態(tài)。

2) FsStateBackend

該持久化存儲(chǔ)主要將快照數(shù)據(jù)保存到文件系統(tǒng)中,目前支持的文件系統(tǒng)主要是 HDFS和本地文件。如果使用HDFS,則初始化FsStateBackend時(shí),需要傳入以 “hdfs://”開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,則需要傳入以“file://”開頭的路徑(即:new FsStateBackend("file:///Data"))。在分布式情況下,不推薦使用本地文件。如果某 個(gè)算子在節(jié)點(diǎn)A上失敗,在節(jié)點(diǎn)B上恢復(fù),使用本地文件時(shí),在B上無(wú)法讀取節(jié)點(diǎn) A上的數(shù)據(jù),導(dǎo)致狀態(tài)恢復(fù)失敗。

建議FsStateBackend:

具有大狀態(tài),長(zhǎng)窗口,大鍵 / 值狀態(tài)的作業(yè)。

所有高可用性設(shè)置。

3) RocksDBStateBackend

RocksDBStatBackend介于本地文件和HDFS之間,平時(shí)使用RocksDB的功能,將數(shù) 據(jù)持久化到本地文件中,當(dāng)制作快照時(shí),將本地?cái)?shù)據(jù)制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用戶特別指明,只需在初始化時(shí)傳入HDFS 或本地路徑即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

如果用戶使用自定義窗口(window),不推薦用戶使用RocksDBStateBackend。在自定義窗口中,狀態(tài)以ListState的形式保存在StatBackend中,如果一個(gè)key值中有多個(gè)value值,則RocksDB讀取該種ListState非常緩慢,影響性能。用戶可以根據(jù)應(yīng)用的具體情況選擇FsStateBackend+HDFS或RocksStateBackend+HDFS。

4) 語(yǔ)法val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// 設(shè)置checkpoint的執(zhí)行模式,最多執(zhí)行一次或者至少執(zhí)行一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 設(shè)置checkpoint的超時(shí)時(shí)間
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 如果在只做快照過(guò)程中出現(xiàn)錯(cuò)誤,是否讓整體任務(wù)失敗:true是  false不是
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
//設(shè)置同一時(shí)間有多少 個(gè)checkpoint可以同時(shí)執(zhí)行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
5) 修改State Backend的兩種方式

第一種:單任務(wù)調(diào)整

修改當(dāng)前任務(wù)代碼

env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

或者new MemoryStateBackend()

或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】

第二種:全局調(diào)整

修改flink-conf.yaml

state.backend: filesystem

state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

6) Checkpoint的高級(jí)選項(xiàng)

默認(rèn)checkpoint功能是disabled的,想要使用的時(shí)候需要先啟用checkpoint開啟之后,默認(rèn)的checkPointMode是Exactly-once

//配置一秒鐘開啟一個(gè)checkpoint
env.enableCheckpointing(1000)
//指定checkpoint的執(zhí)行模式
//兩種可選:
//CheckpointingMode.EXACTLY_ONCE:默認(rèn)值
//CheckpointingMode.AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
一般情況下選擇CheckpointingMode.EXACTLY_ONCE,除非場(chǎng)景要求極低的延遲(幾毫秒)
注意:如果需要保證EXACTLY_ONCE,source和sink要求必須同時(shí)保證EXACTLY_ONCE
//如果程序被cancle,保留以前做的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
默認(rèn)情況下,檢查點(diǎn)不被保留,僅用于在故障中恢復(fù)作業(yè),可以啟用外部持久化檢查點(diǎn),同時(shí)指定保留策略:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作業(yè)取消時(shí)保留檢查點(diǎn),注意,在這種情況下,您必須在取消后手動(dòng)清理檢查點(diǎn)狀態(tài)
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:當(dāng)作業(yè)在被cancel時(shí),刪除檢查點(diǎn),檢查點(diǎn)僅在作業(yè)失敗時(shí)可用
//設(shè)置checkpoint超時(shí)時(shí)間
env.getCheckpointConfig.setCheckpointTimeout(60000)
//Checkpointing的超時(shí)時(shí)間,超時(shí)時(shí)間內(nèi)沒有完成則被終止
//Checkpointing最小時(shí)間間隔,用于指定上一個(gè)checkpoint完成之后
//最小等多久可以觸發(fā)另一個(gè)checkpoint,當(dāng)指定這個(gè)參數(shù)時(shí),maxConcurrentCheckpoints的值為1
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//設(shè)置同一個(gè)時(shí)間是否可以有多個(gè)checkpoint執(zhí)行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
指定運(yùn)行中的checkpoint最多可以有多少個(gè)
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
用于指定在checkpoint發(fā)生異常的時(shí)候,是否應(yīng)該fail該task,默認(rèn)是true,如果設(shè)置為false,則task會(huì)拒絕checkpoint然后繼續(xù)運(yùn)行
2. Flink的重啟策略

Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟。集群可以通過(guò)默認(rèn)的重啟策略來(lái)重啟,這個(gè)默認(rèn)的重啟策略通常在未指定重啟策略的情況下使用,而如果Job提交的時(shí)候指定了重啟策略,這個(gè)重啟策略就會(huì)覆蓋掉集群的默認(rèn)重啟策略。

1) 概覽

默認(rèn)的重啟策略是通過(guò)Flink的 flink-conf.yaml 來(lái)指定的,這個(gè)配置參數(shù) restart-strategy 定義了哪種策略會(huì)被采用。如果checkpoint未啟動(dòng),就會(huì)采用 no restart 策略,如果啟動(dòng)了checkpoint機(jī)制,但是未指定重啟策略的話,就會(huì)采用 fixed-delay 策略,重試 Integer.MAX_VALUE 次。請(qǐng)參考下面的可用重啟策略來(lái)了解哪些值是支持的。

每個(gè)重啟策略都有自己的參數(shù)來(lái)控制它的行為,這些值也可以在配置文件中設(shè)置,每個(gè)重啟策略的描述都包含著各自的配置值信息。

重啟策略重啟策略值Fixed delayfixed-delayFailure ratefailure-rateNo restartNone

除了定義一個(gè)默認(rèn)的重啟策略之外,你還可以為每一個(gè)Job指定它自己的重啟策略,這個(gè)重啟策略可以在 ExecutionEnvironment 中調(diào)用 setRestartStrategy() 方法來(lái)程序化地調(diào)用,注意這種方式同樣適用于 StreamExecutionEnvironment。

下面的例子展示了如何為Job設(shè)置一個(gè)固定延遲重啟策略,一旦有失敗,系統(tǒng)就會(huì)嘗試每10秒重啟一次,重啟3次。

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 3, // 重啟次數(shù)
 Time.of(10, TimeUnit.SECONDS) // 延遲時(shí)間間隔
))
2) 固定延遲重啟策略(Fixed Delay Restart Strategy)

固定延遲重啟策略會(huì)嘗試一個(gè)給定的次數(shù)來(lái)重啟Job,如果超過(guò)了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間。

重啟策略可以配置flink-conf.yaml的下面配置參數(shù)來(lái)啟用,作為默認(rèn)的重啟策略:

restart-strategy: fixed-delay
配置參數(shù)描述默認(rèn)值restart-strategy.fixed-delay.a(chǎn)ttempts在Job最終宣告失敗之前,Flink嘗試執(zhí)行的次數(shù)1,如果啟用checkpoint的話是Integer.MAX_VALUErestart-strategy.fixed-delay.delay延遲重啟意味著一個(gè)執(zhí)行失敗之后,并不會(huì)立即重啟,而是要等待一段時(shí)間。akka.a(chǎn)sk.timeout,如果啟用checkpoint的話是1s

例子:

restart-strategy.fixed-delay.a(chǎn)ttempts: 3
restart-strategy.fixed-delay.delay: 10 s

固定延遲重啟也可以在程序中設(shè)置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 3, // 重啟次數(shù)
 Time.of(10, TimeUnit.SECONDS) // 重啟時(shí)間間隔
))
3) 失敗率重啟策略

失敗率重啟策略在Job失敗后會(huì)重啟,但是超過(guò)失敗率后,Job會(huì)最終被認(rèn)定失敗。在兩個(gè)連續(xù)的重啟嘗試之間,重啟策略會(huì)等待一個(gè)固定的時(shí)間。

聲明: 本文由入駐維科號(hào)的作者撰寫,觀點(diǎn)僅代表作者本人,不代表OFweek立場(chǎng)。如有侵權(quán)或其他問題,請(qǐng)聯(lián)系舉報(bào)。

發(fā)表評(píng)論

0條評(píng)論,0人參與

請(qǐng)輸入評(píng)論內(nèi)容...

請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字

您提交的評(píng)論過(guò)于頻繁,請(qǐng)輸入驗(yàn)證碼繼續(xù)

  • 看不清,點(diǎn)擊換一張  刷新

暫無(wú)評(píng)論

暫無(wú)評(píng)論

人工智能 獵頭職位 更多
掃碼關(guān)注公眾號(hào)
OFweek人工智能網(wǎng)
獲取更多精彩內(nèi)容
文章糾錯(cuò)
x
*文字標(biāo)題:
*糾錯(cuò)內(nèi)容:
聯(lián)系郵箱:
*驗(yàn) 證 碼:

粵公網(wǎng)安備 44030502002758號(hào)