訂閱
糾錯
加入自媒體

基于XML描述的可編程函數(shù)式ETL實現(xiàn)

2019-07-02 10:24
EAWorld
關注

轉載本文需注明出處:微信公眾號EAWorld,違者必究。

引言:

傳統(tǒng) ETL 主要以 SQL 為主要技術手段,把數(shù)據(jù)經(jīng)抽取、清洗轉換之后加載到數(shù)據(jù)倉庫。但是在如今移動互聯(lián)網(wǎng)大力發(fā)展的場景下,產(chǎn)生大量碎片化和不規(guī)則的數(shù)據(jù)。政府,公安等行業(yè),傳統(tǒng)數(shù)據(jù)庫已經(jīng)遠遠無法滿足需求。數(shù)據(jù)原始文件通過文件導入到基礎庫,再通過大數(shù)據(jù) HQL等技術手段提取出二級庫,這中間的數(shù)據(jù)導入和 SQL ETL 的提取的過程,大量消耗 IO 性能和計算資源,在很多場景下已經(jīng)是數(shù)據(jù)處理的瓶頸所在。

普元在實施公安項目過程中開發(fā)了一種基于 XML 描述的可編程的函數(shù) ETL 轉換方法。主要用于大數(shù)據(jù)文件處理領域,能從原始數(shù)據(jù)文件直接、快速加載到專題庫的技術手段。技術方案主要解決了用 XML 的技術手段描述數(shù)據(jù)文件的格式,包含文件字段切分、字段類型、默認值、異常值校驗、時間格式校驗。在處理時可添加自行開發(fā)的 JAVA UDF 函數(shù),函數(shù)實參支持變量、常量、表達式、函數(shù)和運算符重載。同時函數(shù)支持多層嵌套,即內部函數(shù)的返回值最為外部函數(shù)的實參。該方案實現(xiàn)了 XML 內函數(shù)體的語法解析并在運行過程中直接編譯為 Java 字節(jié)碼的技術。有效的解決了政府、公安、電信行業(yè)巨量的數(shù)據(jù)處理需要的大量計算資源和 IO 性能瓶頸,有效的提高了數(shù)據(jù)處理效率和降低了數(shù)據(jù)處理開發(fā)難度。

目錄:

一、基于 XML 控制文件解析數(shù)據(jù)文件方案介紹

二、XML 控制文件結構和語法

三、函數(shù)和多層嵌套函數(shù)傳參

四、UDF 函數(shù)編寫方法

五、數(shù)據(jù)測試工具

六、FlumeOnYarn 架構和分布式部署

一、基于 XML 控制文件解析數(shù)據(jù)文件方案介紹

對于數(shù)據(jù)開發(fā)項目,我們常常會面臨眾多的數(shù)據(jù)對接,部分場景不僅數(shù)據(jù)量大,且數(shù)據(jù)種類多,數(shù)據(jù)解析開發(fā)工作量巨大。對于大量數(shù)據(jù)對接,一般設計的 RPC 接口和 WebService 一般都達不到數(shù)據(jù)性能要求的。并且他們都是點對點的服務,一旦上下游系統(tǒng)故障,都會造成整個數(shù)據(jù)對接異常。因此大部分都會選擇使用文件的方式進行數(shù)據(jù)對接。

對于非實時數(shù)據(jù)對接需求,這種方式的優(yōu)點:

在數(shù)據(jù)量大的情況下,可以通過文件傳輸,上游只寫入,無需關心數(shù)據(jù)業(yè)務和故障;

方案簡單,避免了網(wǎng)絡協(xié)議相關的概念;

維護簡單,只需保證磁盤寫入穩(wěn)定性即可;

我們常常會面臨基于此架構的數(shù)據(jù)對接。但基于此架構數(shù)據(jù)處理工作都在下游(即數(shù)據(jù)使用方)。

面對大量數(shù)據(jù)對接和眾多的數(shù)據(jù)類型,我們對于每種數(shù)據(jù)文件解析、解碼、清洗消耗大量的人力,并且基于編碼的方式對于較多數(shù)據(jù)類型的場景代碼量大,且難以管理。因此經(jīng)過多次數(shù)據(jù)開發(fā)實踐,我們開發(fā)了一種基于 XML 描述的方式來解析和清洗數(shù)據(jù)文件的實現(xiàn)。

本架構實現(xiàn)適合以下幾個方面:

基于文件的數(shù)據(jù)對接;

文件無法直接導入到目標數(shù)據(jù)庫,需要做轉換,清洗為目標格式;

如上數(shù)據(jù)對接架構圖,F(xiàn)lume 基本實現(xiàn)了基于文件系統(tǒng)的自動掃描和讀取,因此架構實現(xiàn)了基于 Flume Sink 的模塊。本架構也可作為SDK 作為框架集成到現(xiàn)有數(shù)據(jù)處理方案中。

二、XML數(shù)據(jù)控制文件結構和語法

<?xml version="1.0" encoding="UTF-8"?><schema><key>JD_TYPE_V1</key><type>textfile</type><delimiter>,</delimiter><fields><field type="int">exp_flag</field>    <field type="string">sender_id</field>        <field type="string">sender_num</field>  <field type="string" value="unknown">sender_address</field>  <field type="string">receiver_num</field>  <field type="date" pattern="yyyy-MM-dd HH:mm:ss">expect_time</field>  <field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field>    <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field>    <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>    <field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>  <field type="string" default="true" value="province_code(sender_province)">sender_province_code</field>  </fields></schema>

(可左右滑動查看全部代碼)

如上 XML 描述了一種數(shù)據(jù)文件類型及該類型的切分方法,數(shù)據(jù)每行經(jīng)過切分后,產(chǎn)生的多個數(shù)據(jù)列的轉換方法。

理論上,每種數(shù)據(jù)類型應該對應一個控制文件,意味著控制文件來描述該種數(shù)據(jù)類型如何解析和轉換。

Key 主要標注該控制文件處理的類型ID;

Delimiter 為文件列切割字符;

Fields 中包含每列的字段描述;

數(shù)據(jù)類型支持Java基本類型和date類型;

Skip為數(shù)據(jù)對齊語法,控制在列中忽略某列的值;

Default = true 屬性為數(shù)據(jù)對齊語法,給某列提供默認值,提供默認值的列在數(shù)據(jù)列中不移動位移;

Value 提供了給該字段提供當列中無值時提供默認值;value=null則指定列值為null;

Date 類型需 pattern 屬性;

三、函數(shù)和多層嵌套函數(shù)傳參

默認值

詞法分析時字段field 的value 屬性值沒有以英文小括號閉合的實體。如下示例中的primeton:

<field type="string" default="true" value="primeton">data_vendor</field>

(可左右滑動查看全部代碼)

函數(shù)

函數(shù)是由一組字符串、數(shù)字、下劃線組成的合法函數(shù)名和0 到多個形式參數(shù)組成。在詞法分析時字段field 的 value 屬性值由英文小括號閉合的實體。如下示例中的:

location(),yn(),concat();<field type="string" default="true" value=" unix_timestamp ">curr_time</field><field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field>    <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field>    <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>

(可左右滑動查看全部代碼)

函數(shù)名

函數(shù)體小括號前面的部分。一般由字符串、數(shù)字、下劃線組成的一組特定的名稱。如location(receiver_tel),location 即為該函數(shù)的函數(shù)名稱。

函數(shù)的形式參數(shù):

1.無參數(shù)

詞法分析時value的值滿足函數(shù)條件且函數(shù)體內無參數(shù)。如下示例中:unix_timestamp() 獲得當前系統(tǒng)內的 Unix 時間戳;

<field type="string" default="true" value=" unix_timestamp()">curr_time</field>

(可左右滑動查看全部代碼)

2.常量型形參

詞法分析時函數(shù)體內以英文單引號引用的值為函數(shù)體的常量型形參。如’100’,函數(shù)示例為:random_int(‘100’),生成 0-100 以內的隨機整形數(shù)值;

<field type="string" default="true" value="random_int(‘100’)">rand_num</field>

(可左右滑動查看全部代碼)

3.變量型形參

詞法分析時函數(shù)體內參數(shù)沒有英文單引號引用并且不以英文小括號閉合的為函數(shù)體的變量型形參。如下示例中的receiver_tel;

<field type="string" default="true" value="location(receiver_tel)">r_num_loc</field>

(可左右滑動查看全部代碼)

4.函數(shù)型形參

詞法分析時函數(shù)體內沒有英文單引號并且以英文小括號閉合的參數(shù)類型參數(shù)為函數(shù)體的函數(shù)型參數(shù)。如下示例中的:none(sender_num)和none(receiver_num);

<field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>

(可左右滑動查看全部代碼)

詞法分析獲得到函數(shù)體的同時,使用函數(shù)名調用UdfRegistors.getUdf(udfName) 函數(shù),以檢驗當前系統(tǒng)必要存在該函數(shù),否則則拋出無法識別的函數(shù)異常。

5.類型校驗

詞法分析階段獲得了字段 field 的取值是默認值或者函數(shù),下一步需校驗其默認值或函數(shù)的返回值是否能和定義的字段類型相匹配。如果是函數(shù)同時校驗函數(shù)的形參和實參類型是否相匹配。

<field type="string" default="true" value="primeton">data_vendor</field><field type="int"    default="true" value="2">call_flag</field>

(可左右滑動查看全部代碼)

如上示例中的primeton 需能轉換為 string 類型,call_flag 需能轉換為 int 類型。如果類型不能轉換,則會拋出類型無法轉換異常。對于函數(shù),通過 returnType 返回類型和字段類型進行校驗,可匹配或者是該類型的子類型則類型驗證通過。

四、UDF 函數(shù)編寫方法

編寫一個UDF函數(shù)的步驟:

繼承 UDF 類,實現(xiàn) eval 方法;

Eval 方法傳入的是一個數(shù)組參數(shù);

判斷參數(shù)長度是否和預期的一致;

判斷位置參數(shù)類型是否和預期的一致;

實現(xiàn)函數(shù)體;

返回eval函數(shù)執(zhí)行的返回值,理論上該返回值的類型應該一致,不應該同一函數(shù)返回多種類型值;

函數(shù)編寫者應該保證函數(shù)體內是線程安全的;

UDF 實現(xiàn)如下:

public abstract class UDF {   /**   * 是否支持該組參數(shù)類型,不支持拋出UnsupportedTypeException異常。默認返回 true   */   public void support(Class<?>... paramsClass)throws UnsupportedTypeException;   /*** 該 UDF 返回值類型,用于校驗嵌套函數(shù)類型是否匹配。可返回簡單類型,map,array,record 等類型.默認返回 String 類型*/   public Class<?> returnType();/*** UDF 執(zhí)行函數(shù),當輸入不符合預期時,向外拋出異常* @param params 函數(shù)的輸入實參* @return 函數(shù)輸出結果,簡單類型或者復雜類型,支持簡單類型,map,array,record 類型*/public abstract Object eval(Object... params);}

(可左右滑動查看全部代碼)

一個判斷是否包含子串的UDF 寫法:

所有的UDF都通過一個核心注冊類(這點類似 Hive 的FunctionRegistry)

public final class UdfRegistors {   /**    * UDF 函數(shù)映射    */static final Map<String, UDF> UDF_CACHED = new HashMap<String, UDF>();    static {UDF_CACHED.put("copy", new CopyUDF());  // 復制一個變量的值      UDF_CACHED.put("eq", new EqUDF()); // 判斷兩個變量是否相等      UDF_CACHED.put("yn", new YnUDF()); // 根據(jù)輸入true,false 轉換為 Y、NUDF_CACHED.put("null", new NullUDF()); // 判斷變量是否為null// add udf methodUDF_CACHED.put("location", new LocationUDF());     // 獲得手機號碼的歸屬地   UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根據(jù)國家名稱獲取國家代碼    UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根據(jù)省名稱獲取省代碼    UDF_CACHED.put("city_code", new CityCodeUDF());    // 根據(jù)城市名稱獲取城市代碼    UDF_CACHED.put("phone_num", new PhoneNumUDF());  // 校驗是否是手機號或者固話UDF_CACHED.put("number_format", new NumberFormatUDF()); //校驗是否可以轉化成數(shù)字}/*** 添加一個UDF函數(shù)     * @param key UDF 函數(shù)     * @param value UDF 函數(shù) eval 應線程安全    * @return     */    public static boolean addUdf(String key, UDF value) {        return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) 。 null;    }    /**     * 獲得內置的 udf 函數(shù)     */    public static UDF getUdf(String udfName) {        return UDF_CACHED.get(udfName.toLowerCase());    }}

(可左右滑動查看全部代碼)

UDF 函數(shù)注冊時期:

可在編譯期綁定內置的 UDF 函數(shù);

可在系統(tǒng)啟動時配置自加載的 UDF 函數(shù);

可在運行期動態(tài)注入UDF 函數(shù);

五、數(shù)據(jù)測試工具

數(shù)據(jù)對接過程,面對數(shù)據(jù)是否能轉換為目標結果常常無從所知。基于XML 控制文件的數(shù)據(jù)解析,可實現(xiàn)一個測試工具。該工具通過上傳數(shù)據(jù)文件和上傳 XML 控制文件,可對數(shù)據(jù)文件隨機的讀取行進行匹配測試,只要數(shù)據(jù)列和目標 XML文件能通過列匹配測試,則數(shù)據(jù)可通過 ETL 解析清洗。否則繼續(xù)修改 XML 控制文件,直到順利通過匹配。

六、FlumeOnYarn 架構和分布式部署

本架構適合以文件作為數(shù)據(jù)對接的方案,另一方面,通過擴展 Flume 即可實現(xiàn)拿來主義。Flume 內部實現(xiàn)對 Channel 的 Transaction,對于每個以文件構造的 Event 對象是原子操作,要么全部成功,要么失敗。flume依賴事務來保證event的可靠性。Flume 默認沒有分布式實現(xiàn),因此開發(fā)了 FlumeOnYarn 的架構,用于支持 Flume 的分布式部署。

FlumeOnYarn優(yōu)勢:

無需每個節(jié)點安裝 Flume,可一鍵啟動和停止;

配置文件在客戶端節(jié)點修改,自動復制到 Yarn 上各實例,無需每個節(jié)點修改;

基于 CDH或HDP的發(fā)行版,即使實現(xiàn)了 Web 可視化化的配置和分布式部署,但是對于 Flume 只能實現(xiàn)單配置文件實例,無法實現(xiàn)多配置實例;

集群的規(guī)?梢愿鶕(jù)數(shù)據(jù)量大小進行實時的調整(增減節(jié)點),實現(xiàn)彈性處理。通過命令或者 api 即可控制(CDH 等需要在頁面添加 host,繁瑣且不易動態(tài)調整);

多個租戶或者同一租戶多個處理實例互不影響,且能隔離(Yarn Container);

FlumeOnYarn 架構

上圖所示,提交FlumeOnYarn 需要客戶端,該客戶端沒有太多和Flume安裝包結構特殊的地方,只是在 lib 下添加了 flume-yarn 的架構支持和 bin 下 flume-on-yarn 的啟動腳本。

Flume OnYarn 客戶端程序

通過 bin/flume-on-yarn 即可提交 FlumeOnYarn Application 集群。如下的命令即可一次性申請多個 Yarn 資源節(jié)點,實現(xiàn)一鍵部署:

bin/flume-on-yarn yarn -s --name agent_name –conf  conf/flume-h(huán)dfs.conf  --num-instances 5

(可左右滑動查看全部代碼)

總結

推薦閱讀

元數(shù)據(jù)新型存儲架構的探索

基于 Spark 的數(shù)據(jù)分析實踐

本地讀寫的多活數(shù)據(jù)存儲架構設計要義

關于作者:震秦,普元資深開發(fā)工程師,專注于大數(shù)據(jù)開發(fā) 8 年,擅長 Hadoop 生態(tài)內各工具的使用和優(yōu)化。參與某公關廣告(上市)公司DMP 建設,負責數(shù)據(jù)分層設計和批處理,調度實現(xiàn),完成交付使用;參與國內多省市公安社交網(wǎng)絡項目部署,負責產(chǎn)品開發(fā)(Spark 分析應用);參與數(shù)據(jù)清洗加工為我方主題庫并部署上層應用。

關于EAWorld:微服務,DevOps,數(shù)據(jù)治理,移動架構原創(chuàng)技術分享。關注微信公眾號EAWorld!

聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯(lián)系舉報。

發(fā)表評論

0條評論,0人參與

請輸入評論內容...

請輸入評論/評論長度6~500個字

您提交的評論過于頻繁,請輸入驗證碼繼續(xù)

暫無評論

暫無評論

人工智能 獵頭職位 更多
掃碼關注公眾號
OFweek人工智能網(wǎng)
獲取更多精彩內容
文章糾錯
x
*文字標題:
*糾錯內容:
聯(lián)系郵箱:
*驗 證 碼:

粵公網(wǎng)安備 44030502002758號