跳到主要內容

淺談 Cloud Dataflow & Apache Beam 處理資料流 | Data Pipeline using Apache Beam and Cloud Dataflow

 


前言

在這個資訊爆炸的時代, 資料的產生速度飛快, 面對這些海量的資訊, 若以傳統的方法做資料分析或是分析前的資料處理(Extract Load Transform, ETL), 勢必會面臨到硬體資源不足(記憶體不夠)或是資料處理速度太慢的問題


記憶體不夠時, 將虛擬機做垂直擴展 scaling up 就能夠暫時解決, 但若要加快資料處理的速度, 就會需要一個分散式的資料處理引擎 Spark 或是 Cloud Dataflow, 透過動態的水平擴展 Scaling out 的方式來增加資料處理的 worker 去消化大量資訊


Apache Beam

2016 年時 Google 開源出一套分散式資料處理框架, 定義在 Spark, Flink 以及 Cloud Dataflow 等等分散式資料處理引擎的上層, 讓開發者可以輕鬆地設計資料處理的 pipeline 並運行在分散式的環境裡




它有以下的優點

開源
背後社群龐大, 容易找到資源

移植性
同一個資料處理邏輯可以跑在各大分散式資料處理引擎上 (Flink, Spark, Dataflow...)

擴充性
除了使用 built-in 的 I/O Connector 之外, 開發者也可以自由地實作客製化版本的 Connector 模組



Cloud Dataflow

Google 完全託管式的(Fully Managed) 分散式資料處理引擎, No-Ops 的服務特色強調不需要額外的維運人員來維護底層的基礎設施

Dataflow 的上層主要是運行 Apache Beam 的程式碼, 並支援在 Runtime 時動態的彈性擴充底層的運算資源, 因此使用 Dataflow 的開發人員可以更專注地開發資料處理的邏輯以及 pipeline 的設計


開始寫 Apache Beam 程式

Step 1. 建立實驗環境

為了簡化說明, 接下來的示範會跑在 Jupyter Notebook 的環境裡

因此首先會需要在 GCP 上用 Dataflow 的 Notebook 功能建立一個可以跑 Jupyter Notebook 的環境



建立完成後選擇 Open Jupyterlab 進入筆記本



接下來 Step 2. & Step 3. 的程式碼可以直接複製貼到筆記本上執行

Step 2. import 必須的套件

程式碼如下


Step 3. 定義 pipeline 

程式碼如下



在 Jupyter Notebook 上顯示執行結果

在 notebook 上跑實驗的好處是, Jupyter Notebook 有非常多的插件可以用, 這些插件可以快速地將執行結果視覺化


程式碼如下


執行完後可以看到結果如下


在 Dataflow 上運行 Apache Beam 程式碼

Step 1. 建立 hello-world.py

在 Notebook 的終端機上執行以下指令, 將 Apache Beam 的程式碼寫入到 hello-world.py 中


$cat << EOF >> hello-world.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

import re

class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')    

pipeline_options = PipelineOptions(['--output', './result.txt'])
p = beam.Pipeline(options=pipeline_options,runner=InteractiveRunner())

wordcount_options = pipeline_options.view_as(WordcountOptions)

count = (p
         | 'ReadCollection' >> beam.io.ReadFromText(wordcount_options.input)
         | 'findWord' >> beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE))
         | "lower" >> beam.Map(lambda word: word.lower())
         | "lower_count" >> beam.combiners.Count.PerElement())
EOF


Step 2. 建立 Dataflow template

Apache Beam 的程式碼必須打包成 dataflow template 才可以被執行


$python -m hello-world \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

 

指令執行完後會輸出 Dataflow template 到 --template_location 指定的路徑底下


Step 3. 執行 Dataflow template

最後用 glcoud 指令執行 template 跑資料處理的 pipeline


$gcloud dataflow jobs run hello-world-job \
    --gcs-location gs://andy-bucket/templates/hello-world-job \
    --region us-central1 \
    --staging-location gs://my-tmp

 


Reference

https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python

https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development

https://www.infoq.cn/article/why-google-replace-beam-with-apache-mapreduce?utm_source=related_read_bottom&utm_medium=article

https://www.oliverhayman.com/analytics/serverless-etl-google-cloud-dataflow-apache-beam

https://ithelp.ithome.com.tw/articles/10217719

https://medium.com/swlh/dataflow-and-apache-beam-the-result-of-a-learning-process-since-mapreduce-c591b2ab180e

https://beam.apache.org/documentation/transforms/python/elementwise/map/

https://www.infoq.cn/article/aly182jgm6mtitg7nl0r

https://www.infoq.cn/article/why-google-replace-beam-with-apache-mapreduce?utm_source=related_read_bottom&utm_medium=article

https://tech.hahow.in/%E6%B7%BA%E8%AB%87-cloud-dataflow-apache-beam-%E8%99%95%E7%90%86%E8%B3%87%E6%96%99%E6%B5%81-a1a73af87fe9

https://cloud.google.com/dataflow/docs/guides/templates/creating-templates

留言

這個網誌中的熱門文章

[解決方法] docker: permission denied

前言 當我們執行docker 指令時若出現以下錯誤訊息 docker: Got permission denied while trying to connect to the Docker daemon socket at unix:///var/run/docker.sock: Post http://%2Fvar%2Frun%2Fdocker.sock/v1.26/containers/create: dial unix /var/run/docker.sock: connect: permission denied. See 'docker run --help'. 表示目前的使用者身分沒有權限去存取docker engine, 因為docker的服務基本上都是以root的身分在執行的, 所以在指令前加sudo就能成功執行指令 但每次實行docker指令(就連docker ps)都還要加sudo實在有點麻煩, 正確的解法是 我們可以把目前使用者加到docker群組裡面, 當docker service 起來時, 會以這個群組的成員來初始化相關服務 sudo groupadd docker sudo usermod -aG docker $USER 需要退出重新登錄後才會生效 Workaround 因為問題是出在權限不足, 如果以上方法都不管用的話, 可以手動修改權限來解決這個問題 sudo chmod 777 /var/run/docker.sock https://docs.docker.com/install/linux/linux-postinstall/

[C#] Visual Studio, 如何在10分鐘內快速更改命名專案名稱

前言: 由於工作需要, 而且懶得再重寫類似的專案, 所以常常將之前寫的專案複製一份加料後, 再重新命名編譯 假設今天我有一個專案HolyUWP, 我想把它重新命名成 BestUWP 時該怎麼做? 以下是幾個簡單的的步驟 使用Visual Studio 2017 備份原來專案 更改Solution名稱 更改Assembly name, Default namespce 更改每支程式碼的Namespace 更改專案資料夾名稱 備份原來專案 由於怕改壞掉, 所以在改之前先備份 更改Solution名稱 更改sln的名稱, 這邊我改成BestUWP.sln 使用Visual Studio打開你的.sln, 右鍵點擊Solution後選擇Rename, 這邊我把它重新命名成BestUWP(跟檔案名稱一致) 必要的話可以順便修改Porject名稱 更改Assembly name, Default namespce 進入 Project > OOXX Properties    修改Assembly Name, Default namesapce 更改每支程式碼的Namespace 基本上隨便挑一支有用到預設Namesapce(HolyUWP)的程式碼來改就好了 重新命名後點擊Apply,  這個動作做完後所有用到舊Namespace的程式碼都會被改成新的 更改專案資料夾名稱 以上動作做完後, 基本上就可以把專案編譯出來測看看了~

[解決方法] mac 作業系統上無法使用 docker

  錯誤訊息 Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running? 原因 因為 docker 的設計是走 client-server 的架構,  如果少裝了 server 的部分就會出現以上的錯誤訊息 解決方法 因為 docker daemon 需要使用 linux kernel 上的某些功能, 所以若想要在 mac 的 OS X 上使用 docker 必須額外起一台 linux VM 給 docker daemon 用  Step 1. 安裝 virtual box $ brew install virtualbox --cask   Step 2. 安裝 docker machine $ brew install docker-machine --cask   Step 3. 設定 使用 docker-machine 建立 VM 跑容器 $docker-machine create --driver virtualbox default $docker-machine restart   輸出環境變數 $docker-machine env default 如果執行以上的指令出現錯誤訊息 Error checking TLS connection: ...  可以執行以下指令重新產生憑證 $docker-machine regenerate-certs 最後套用環境變數, 讓 docker 知道要怎麼去跟這台 VM 溝通  $eval $(docker-machine env default)   測試 若做完以上的步驟沒噴錯誤訊息的話, 可以跑個 hello-world 看看 docker daemon 有沒有起來 $docker run hello-world Unable to find image 'hello-world:latest' locally latest: Pulling from library/hello-world 0e03bdcc26d7: Pull complete Digest: sha256:95ddb6c31407e84e91a986b004aee40975cb0