最近這幾年Big Data(大數據)這個詞相當的流行,媒體新聞也會常常用到這個字眼。
當然,我們可以輕易的從網路上取得Big Data的定義,不過還是簡單的講一下,Big Data的Big(大)通常指的是資料的數量、速度以及類型(volume, velocity, variety),根據情況還有其他的特性。基本上當這些要素符合的時候,一般的單一電腦系統往往無法負荷。
因此,我們會利用所謂的分散式運算(distributed computing),簡單來說將資料拆成很多塊,並分給很多電腦來同時處理:一種團結力量大的概念。這些連在一起的大大小小的電腦形成一個所謂的Cluster。
實際上,一般來說自己會碰到Big Data的機會可能不多,大概也不會需要用到Cluster。但是這裡仍然希望給大家作一個參考,如果將來在工作上有遇到這個需求,或許可以提供一個方向😀。
今天要跟大家一起看的是在這種大數據概念下產生的一個工具(或是架構),叫做Spark(全名Apache Spark),或是更精準一點(因為我們用的是Python)-- 我們要來介紹PySpark(Python + Spark)。
我們這次主要想跟大家一起看看pyspark的一些syntax以及它的使用方式(相信這比談論一些理論來的容易懂)。話說,如果想試試看Cluster的感覺,可以參考像是Databricks,它提供的雲端平台讓我們可以快速的建立一個Cluster(記得使用Community Edition的免費版本喔)並在上面做執行Spark的程式。
PySpark
PySpark是Python用來跟Spark溝通的工具。首先,我們會需要pyspark這個library(注意:要在自己的local電腦上跑Spark的話,還需要安裝Spark本身跟做一些調整,完整安裝可以參考這裡)。
pip install pyspark
接下來的例子,我們會在Jupyter Notebook上操作。
首先第一個步驟,是建立SparkSession的物件。下圖可以看到,在SparkSession產生的同時你會看到一個叫做SparkContext的東西。這個SparkContext是Spark主要的核心,負責連結Spark Cluster。
一旦SparkContext建立後,可以利用瀏覽器進到localhost:4040去看到Spark的使用者介面。會看到像這樣👇的頁面,裡面記錄著你的Spark App的使用狀況:
話說關於處理效能,.defaultParallelism告訴你現在是用幾個CPU core在同時處理你的Spark job。
Cool!現在我們的Spark App已經在Local跑起來了。
RDD
接下來,我們來看一下Spark的核心資料結構 -- RDD(全名為Resilient Distributed Dataset)。
透過將資料轉成RDD,Spark能夠將要處理的資料拆解成許多小塊(partitioning)並分發到cluster中的各個電腦(或稱之為node)去處理。
舉個例子,這裡用簡單的Python的list來當作我們資料來源:
my_big_list是一個python的list,含有0~999999,一百萬個值在裡面。
我們利用.sparkContext.parallelize將list轉成RDD的物件。
現在我們有了RDD,重點是該如何處理它呢?跟普通的Python list有什麼不同呢?
一般在處理像list這樣的集合時,可能動不動就會想要用for loop是吧?
但是在RDD的情況下,則通常會利用下面介紹的作業功能(operations):
map
假設,今天我想要將剛剛定義的rdd裡面的每一個數字都給他加倍,我可以利用.map這個method,範例如下:
map需要的argument是一個function物件。
lambda可以讓我們快速做出一個不需要名字的function(anonymous function),當我們只需要簡單的計算時,lambda是一個常用的方式。
map會回傳一個新的RDD物件。這裡的mapped_rdd就是一個RDD的物件。
.take可以給你指定數量的資料(以list的方式)
filter
接著,假設我想要挑出mapped_rdd裡面可以被4整除的數字,可以怎麼做呢?
我們會利用.flter這個功能:
filter也是需要function的物件作為argument,但是請注意這個function所回傳的值需要是一個boolean(True or False)值。
filter也是回傳一個新的RDD物件。因此這裡的filtered_rdd還是一個RDD物件。
reduce
再接下來,如果我想要算出這個filter_rdd的數值總和,可以怎麼做呢?
我可以利用.reduce這個功能:
reduce也是需要function的物件,但是注意這個function必須要接受兩個argument,並回傳單一數值。
reduce最後只會回傳一個數值。
map、filter、reduce在各個程式語言其實都存在。習慣這些技巧在Spark的作業中是相當重要的💪。
讀取外部資料來源
剛剛我們利用原生的list來生成RDD,但是其實大部分的情況會讀取來自外部的資料。Spark當然也有提供方便的API讓我們可以做到這一點。
譬如說,當需要的讀取的資料來源是一個text檔案的時候,我們可以利用.sparkContext.TextFile這個功能,檔案範例如下:
下面的例子我們同時用.filter來截取得文件中每一行中含有spark的句子。
當然除了lambda,我們也可以用一般def的方式來定義,下面的例子可以取得每一行的字數:
Spark DataFrame
接下來,我們來看一下Spark另一種資料類型,叫做DataFrame,這可能算是最常用的Spark資料類型。Spark DataFrame就像pandas的DataFrame一樣是table格式的概念,但是想像它可以拆解成許多小塊來被同時處理。
下面的例子,我們讀進一個csv的檔案並生成一個Spark DataFrame:
操作上,跟pandas的DataFrame略有不同,但是概念大同小異。
譬如說,如果你想要看某個Column的時候,會用.select(column_name)。要將DataFrame秀出來的話,我們會用.show這個功能。
然後如果你想要篩選data的時候,可以用.filter(此filter跟之前看的rdd.filter是不一樣的喔)這個method,使用方法如下:
再者,如果你對SQL比較熟悉的話,DataFrame還可以讓你利用SQL的Query寫法來選擇資料喔:
關於dataframe的更多API可以參考官方文件。
小結
今天我們快速介紹了Spark以及pyspark,並帶大家看了一些簡單的例子。希望對Spark好奇的你有一點幫助。想要了解更多細節或是例子的話可以參考Spark的官方文件。
實作上,我們會用到Spark是因為有需要處理龐大的資料量。因此如果說資料本身不大的話其實是不需要用到Spark,但是理解這個工具的存在目的可以讓我們有更多的選項。尤其如果你已經使用過pandas的話,相信應該在概念上及操作方式上有許多相似之處,也會降低學習門檻。
當然,今天看到的只是Spark的冰山一角。Spark提供了資料分析處理的諸多元素,包括SparkSQL(處理table格式資料)、Spark Streaming(處理即時資料)、MLlib(機器學習)、Graph X(圖形資料),來因應當今科技(state of the art)的各種需求。希望透過這篇文章給有興趣的朋友一點參考喔~😊
Comments