訂閱
糾錯(cuò)
加入自媒體

Flink未來將與 Pulsar集成提供大規(guī)模的彈性數(shù)據(jù)處理

未來整合

Pulsar可以以不同的方式與Apache Flink集成。一些潛在的集成包括使用流式連接器為流式工作負(fù)載提供支持,并使用批量源連接器支持批量工作負(fù)載。Pulsar還提供對(duì)schema 的本地支持,可以與Flink集成并提供對(duì)數(shù)據(jù)的結(jié)構(gòu)化訪問,例如使用Flink SQL作為在Pulsar中查詢數(shù)據(jù)的方式。最后,集成這些技術(shù)的另一種方法可能包括使用Pulsar作為Flink的狀態(tài)后端。由于Pulsar具有分層架構(gòu)(Streams和Segmented Streams,由Apache Bookkeeper提供支持),因此將Pulsar用作存儲(chǔ)層并存儲(chǔ)Flink狀態(tài)變得很自然。

從體系結(jié)構(gòu)的角度來看,我們可以想象兩個(gè)框架之間的集成,它使用Apache Pulsar作為統(tǒng)一的數(shù)據(jù)層視圖,Apache Flink作為統(tǒng)一的計(jì)算和數(shù)據(jù)處理框架和API。

現(xiàn)有集成

兩個(gè)框架之間的集成正在進(jìn)行中,開發(fā)人員已經(jīng)可以通過多種方式將Pulsar與Flink結(jié)合使用。例如,Pulsar可用作Flink DataStream應(yīng)用程序中的流媒體源和流式接收器。開發(fā)人員可以將Pulsar中的數(shù)據(jù)提取到Flink作業(yè)中,該作業(yè)可以計(jì)算和處理實(shí)時(shí)數(shù)據(jù),然后將數(shù)據(jù)作為流式接收器發(fā)送回Pulsar主題。這樣的例子如下所示:

// create and configure Pulsar consumer

PulsarSourceBuilder<String>builder = PulsarSourceBuilder

.builder(new SimpleStringSchema())

.serviceUrl(serviceUrl)

.topic(inputTopic)

.subscriptionName(subscription);

SourceFunction<String> src = builder.build();

// ingest DataStream with Pulsar consumer

DataStream<String> words = env.a(chǎn)ddSource(src);

// perform computation on DataStream (here a simple WordCount)

DataStream<WordWithCount> wc = words

.flatMap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {

collector.collect(new WordWithCount(word, 1));

})

.returns(WordWithCount.class)

.keyBy("word")

.timeWindow(Time.seconds(5))

.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->

new WordWithCount(c1.word, c1.count + c2.count));

// emit result via Pulsar producer

wc.a(chǎn)ddSink(new FlinkPulsarProducer<>(

serviceUrl,

outputTopic,

new AuthenticationDisabled(),

wordWithCount -> wordWithCount.toString().getBytes(UTF_8),

wordWithCount -> wordWithCount.word)

);

開發(fā)人員可以利用的兩個(gè)框架之間的另一個(gè)集成包括將Pulsar用作Flink SQL或Table API查詢的流式源和流式表接收器,如下例所示:

// obtain a DataStream with words

DataStream<String> words = ...

// register DataStream as Table "words" with two attributes ("word", "ts").

//   "ts" is an event-time timestamp.

tableEnvironment.registerDataStream("words", words, "word, ts.rowtime");

// create a TableSink that produces to Pulsar

TableSink sink = new PulsarJsonTableSink(

serviceUrl,

outputTopic,

new AuthenticationDisabled(),

ROUTING_KEY);

// register Pulsar TableSink as table "wc"

tableEnvironment.registerTableSink(

"wc",

sink.configure(

new String[]{"word", "cnt"},

new TypeInformation[]{Types.STRING, Types.LONG}));

// count words per 5 seconds and write result to table "wc"

tableEnvironment.sqlUpdate(

"INSERT INTO wc " +

"SELECT word, COUNT(*) AS cnt " +

"FROM words " +

"GROUP BY word, TUMBLE(ts, INTERVAL '5' SECOND)");

最后,F(xiàn)link將批量工作負(fù)載與Pulsar集成為批處理接收器,其中所有結(jié)果在Apache Flink完成靜態(tài)數(shù)據(jù)集中的計(jì)算后被推送到Pulsar。這樣的例子如下所示:

// obtain DataSet from arbitrary computation

DataSet<WordWithCount> wc = ...

// create PulsarOutputFormat instance

OutputFormat pulsarOutputFormat = new PulsarOutputFormat(

serviceUrl,

topic,

new AuthenticationDisabled(),

wordWithCount -> wordWithCount.toString().getBytes());

// write DataSet to Pulsar

wc.output(pulsarOutputFormat);

結(jié)論

Pulsar和Flink都對(duì)應(yīng)用程序的數(shù)據(jù)和計(jì)算級(jí)別如何以批量作為特殊情況流“流式傳輸”方式分享了類似的觀點(diǎn)。通過Pulsar的Segmented Streams方法和Flink在一個(gè)框架下統(tǒng)一批處理和流處理工作負(fù)載的步驟,有許多方法將這兩種技術(shù)集成在一起,以提供大規(guī)模的彈性數(shù)據(jù)處理。

<上一頁  1  2  
聲明: 本文由入駐維科號(hào)的作者撰寫,觀點(diǎn)僅代表作者本人,不代表OFweek立場(chǎng)。如有侵權(quán)或其他問題,請(qǐng)聯(lián)系舉報(bào)。

發(fā)表評(píng)論

0條評(píng)論,0人參與

請(qǐng)輸入評(píng)論內(nèi)容...

請(qǐng)輸入評(píng)論/評(píng)論長度6~500個(gè)字

您提交的評(píng)論過于頻繁,請(qǐng)輸入驗(yàn)證碼繼續(xù)

  • 看不清,點(diǎn)擊換一張  刷新

暫無評(píng)論

暫無評(píng)論

人工智能 獵頭職位 更多
掃碼關(guān)注公眾號(hào)
OFweek人工智能網(wǎng)
獲取更多精彩內(nèi)容
文章糾錯(cuò)
x
*文字標(biāo)題:
*糾錯(cuò)內(nèi)容:
聯(lián)系郵箱:
*驗(yàn) 證 碼:

粵公網(wǎng)安備 44030502002758號(hào)