前言
在這個資訊爆炸的時代, 資料的產生速度飛快, 面對這些海量的資訊, 若以傳統的方法做資料分析或是分析前的資料處理(Extract Load Transform, ETL), 勢必會面臨到硬體資源不足(記憶體不夠)或是資料處理速度太慢的問題
記憶體不夠時, 將虛擬機做垂直擴展 scaling up 就能夠暫時解決, 但若要加快資料處理的速度, 就會需要一個分散式的資料處理引擎 Spark 或是 Cloud Dataflow, 透過動態的水平擴展 Scaling out 的方式來增加資料處理的 worker 去消化大量資訊
Apache Beam
2016 年時 Google 開源出一套分散式資料處理框架, 定義在 Spark, Flink 以及 Cloud Dataflow 等等分散式資料處理引擎的上層, 讓開發者可以輕鬆地設計資料處理的 pipeline 並運行在分散式的環境裡
Cloud Dataflow
Google 完全託管式的(Fully Managed) 分散式資料處理引擎, No-Ops 的服務特色強調不需要額外的維運人員來維護底層的基礎設施
Dataflow 的上層主要是運行 Apache Beam 的程式碼, 並支援在 Runtime 時動態的彈性擴充底層的運算資源, 因此使用 Dataflow 的開發人員可以更專注地開發資料處理的邏輯以及 pipeline 的設計
開始寫 Apache Beam 程式
Step 2. import 必須的套件
程式碼如下
Step 3. 定義 pipeline
程式碼如下
在 Jupyter Notebook 上顯示執行結果
程式碼如下
執行完後可以看到結果如下
在 Dataflow 上運行 Apache Beam 程式碼
$cat << EOF >> hello-world.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import re
class WordcountOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Path of the file to read from')
parser.add_argument(
'--output',
required=True,
help='Output file to write results to.')
pipeline_options = PipelineOptions(['--output', './result.txt'])
p = beam.Pipeline(options=pipeline_options,runner=InteractiveRunner())
wordcount_options = pipeline_options.view_as(WordcountOptions)
count = (p
| 'ReadCollection' >> beam.io.ReadFromText(wordcount_options.input)
| 'findWord' >> beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE))
| "lower" >> beam.Map(lambda word: word.lower())
| "lower_count" >> beam.combiners.Count.PerElement())
EOF
Step 2. 建立 Dataflow template
Apache Beam 的程式碼必須打包成 dataflow template 才可以被執行
$python -m hello-world \ --runner DataflowRunner \ --projectYOUR_PROJECT_ID \ --staging_location gs://YOUR_BUCKET_NAME /staging \ --temp_location gs://YOUR_BUCKET_NAME /temp \ --template_location gs://YOUR_BUCKET_NAME /templates/YOUR_TEMPLATE_NAME
指令執行完後會輸出 Dataflow template 到 --template_location 指定的路徑底下
Step 3. 執行 Dataflow template
最後用 glcoud 指令執行 template 跑資料處理的 pipeline
$gcloud dataflow jobs run hello-world-job \
--gcs-location gs://andy-bucket/templates/hello-world-job \
--region us-central1 \
--staging-location gs://my-tmp
Reference
https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python
https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development
https://www.infoq.cn/article/why-google-replace-beam-with-apache-mapreduce?utm_source=related_read_bottom&utm_medium=article
https://www.oliverhayman.com/analytics/serverless-etl-google-cloud-dataflow-apache-beam
https://ithelp.ithome.com.tw/articles/10217719
https://medium.com/swlh/dataflow-and-apache-beam-the-result-of-a-learning-process-since-mapreduce-c591b2ab180e
https://beam.apache.org/documentation/transforms/python/elementwise/map/
https://www.infoq.cn/article/aly182jgm6mtitg7nl0r
https://www.infoq.cn/article/why-google-replace-beam-with-apache-mapreduce?utm_source=related_read_bottom&utm_medium=article
https://tech.hahow.in/%E6%B7%BA%E8%AB%87-cloud-dataflow-apache-beam-%E8%99%95%E7%90%86%E8%B3%87%E6%96%99%E6%B5%81-a1a73af87fe9
https://cloud.google.com/dataflow/docs/guides/templates/creating-templates
留言
張貼留言