fbpx

CL LAB

HOME > CL LAB > Spark > 普通のPythonスクリプトをSpark化してお手軽並列処理する #spark

普通のPythonスクリプトをSpark化してお手軽並列処理する #spark

 ★ 1

こんにちは。木内です。

Apache Sparkはいわゆる「スケーラブルな汎用分散処理エンジン」なのですが、実際にはユーザの利用形態はSQLに関する処理や、機械学習などのデータ分析関連に偏っているように思えます。"汎用"というからにはデータ分析に限らずおおよそ並列処理できるようなユースケースにも使用できると Apache Spark の用途の幅が広がるのではないかなと思います。

そこで今回はデータ分析とは全く関係のないような処理をApache Sparkで並列化してみます。

そもそもどんなところで並列処理は使用されているのか

一般的に並列処理が使用されているのはいわゆるスパコンの分野です。私はスパコンのことはよくわからないのですが、線形代数、数値解析といった用途に利用されているようです。例えば Abaqus というソフトウェアは有限要素解析を行うことができるソフトウェアですが、応用分野としては弾性解析(どれくらいの圧力をあたえるとどの程度物体が歪むか)などがあり、自動車の車体の強度(例えばハンドルを切った時にどの程度車体が"よれる"か、など)や、タイヤの材料の解析(例えばブレーキをかけた時にどの程度路面との間に摩擦が生じるか、など)などに利用されています。

身近なところではゲームの3次元グラフィックスにも並列処理が活用されています。人間が目で見ることのできる画面が、実際にはどの色で塗りつぶされるべきか1ピクセルづつ計算しながら高速に描画されています。画面上の全てのピクセルを高速に塗りつぶすために、GPUを使用した並列計算が活用されています。

今回はレイトレーシングという3次元グラフィックスを描画する計算を Apache Spark を使用して並列化してみます。レイトレーシングは上記に述べた3次元グラフィックスを描画する方法の一つで、比較的並列処理に向いているので今回使用してみます。

Spark化する前のレイトレーシングプログラム

今回はこちら( https://gist.github.com/rossant/6046463 )のプログラムを改変前のプログラムとして使用します。実行してみると以下のような画像が生成されます。

このプログラムは特に並列処理されておらず、シングルコアを使用した1プロセスで実行されます。実際には画像の左上から右下まで1ピクセルづつ全く同じ処理を繰り返しています。つまり同時に同じ処理を複数のピクセルに対して実行できれば、並列化による高速化が期待できます。

Apache Sparkでの並列化

Apache SparkではRDD(Resilient Distributed Dataset)が分散処理のためのデータの単位になります。RDDはざっくり言うと、配列を任意の単位で区切ったもので、RDD単位で別のプロセス、別のノードで処理を行うことができます。処理された結果もまた別のRDDに格納され、後段の処理にまわすことができます。

引用: http://datastrophic.io/core-concepts-architecture-and-internals-of-apache-spark/

今回は1ピクセルを処理するためのデータをRDD化し、一律に同じ処理を実行し、結果を結合するようにします。

Spark化 Step-1: 全ての処理を関数化

元のプログラムを見ると、いくつかグローバル変数が適されており、複数の関数から参照されています。Apache Sparkでは基本的には全ての関数から共有できるグローバル変数を許しておらず、変数はそれぞれの関数の中で完結していることが必要です。(broadcast変数という例外もありますが、ここでは触れません)

そこで関数 main() という関数を作り、変数と処理を移動します。変更したものが以下のコードになります。

https://github.com/m-kiuchi/cllab181217-spark/blob/master/raytracing-2.py

Spark化 Step-2: 変数を引数としてそれぞれの関数に渡す

ただしStep-1の変更によってエラーが出るようになってしまいました。

$ python3 raytracing-2.py
0.00%
Traceback (most recent call last):
  File "raytracing-2.py", line 185, in <module>
    main()
  File "raytracing-2.py", line 171, in main
    traced = trace_ray(rayO, rayD)
  File "raytracing-2.py", line 82, in trace_ray
    for i, obj in enumerate(scene):
NameError: name 'scene' is not defined

いままでグローバル変数として使用していた変数 scene が関数の中のローカル変数となったことにより、別の関数から参照できなくなってしまったことが原因です。ここではグローバル変数を参照するのではなく、関数を呼び出す時に引数として変数の内容を渡すようにします。

変更したものが以下のコードになります。

https://github.com/m-kiuchi/cllab181217-spark/blob/master/raytracing-3.py

Spark化 Step-3: 並列化したい部分を関数化する

Apache SparkではRDD毎に同じ関数を実行する map() 関数を定義することができます。今回はこの map() 関数を使用して並列化するため、呼び出し先の関数を定義します。

先ほどこのレイトレーシングプログラムは1ピクセルづつ同じ処理を実行していると書きました。その部分が以下の内容です。

while depth &lt; depth_max:
    in_param = {
        ()
    }
    traced = trace_ray(in_param)
    if not traced:
        break
    obj, M, N, col_ray = traced
    # Reflection: create a new ray.
    rayO, rayD = M + N * .0001, normalize(rayD - 2 * np.dot(rayD, N) * N)
    depth += 1
    col += reflection * col_ray
    reflection *= obj.get('reflection', 1.)

上記の部分を関数化して、新しい関数 genpix() を作りました。変更したものが以下のソースコードになります。

https://github.com/m-kiuchi/cllab181217-spark/blob/master/raytracing-4.py

Spark化 Step-4: 入力値をRDD化し、並列実行する

プログラム側で並列化の準備ができました。最後に入力データをRDD化します。入力値は以下の部分で作成されています。

for i, x in enumerate(np.linspace(S[0], S[2], w)):
    if i % 10 == 0:
        print("{0:.2f}%".format(i / float(w) * 100))
    for j, y in enumerate(np.linspace(S[1], S[3], h)):
        col = 0
        Q[:2] = (x, y)
        D = normalize(Q - O)
        depth = 0
        rayO, rayD = O, D
        reflection = 1.
        in_param = {
            ()
        }
        # Loop through initial and secondary rays.
        ret = genpix(in_param, depth, depth_max)

RDDは配列から簡単に作成することができます。そこで上記の2段の for ループから処理する関数を呼び出す代わりに、入力値の配列を作成するようにします。以下のように変更しました。

tar = []
for i, x in enumerate(np.linspace(S[0], S[2], w)):
    if i % 10 == 0:
        print("{0:.2f}%".format(i / float(w) * 100))
    for j, y in enumerate(np.linspace(S[1], S[3], h)):
        ()
        in_param = {
            ()
        }
        # Loop through initial and secondary rays.
        tar.append(in_param)

入力値の配列ができたらRDD化します。 配列をRDD化するには SparkContext の parallelize() を呼び出します。オプションで numSlices を指定していますが、必須ではありません。

from pyspark import SparkContext
from pyspark.sql import SparkSession
def main():
    ()
    sc = SparkContext()
    inrdd = sc.parallelize(tar, numSlices=100)

作成したRDDに対して map() 関数を実行し、結果を Python の配列として取得します。

    inrdd = sc.parallelize(tar, numSlices=100)
    retar = inrdd.map(genpix).collect()

変更したものが以下のものになります。

https://github.com/m-kiuchi/cllab181217-spark/blob/master/raytracing-5.py

Spark化による並列化の速度向上を検証する

さて、並列化によって速度はどの程度向上したでしょうか。実行時間を比較してみます。

[変更前]
$ time python3 ./raytracing.py
()
real    0m21.130s
user    0m20.941s
sys 0m0.332s
[Spark版]
$ time ~/spark/bin/spark-submit ./raytracing-5.py
()
real    0m20.398s
user    0m19.444s
sys 0m1.464s

・・・なんかあまり速くなった気がしません。Sparkのオーバーヘッドがそれなりにあるので並列化による恩恵を余り受けられていないのかもしれません。出力画像サイズを 幅400px x 高300px から、HDサイズの 幅1920px x 高1080px に変更してもう一度試してみます。

[変更前]
$ time python3 ./raytracing.py
()
real    5m38.236s
user    5m36.490s
sys 0m0.518s
[Spark版]
$ time ~/spark/bin/spark-submit ./raytracing-5.py
()
real    4m9.103s
user    1m21.582s
sys 0m5.487s

30%ほど高速化できました。上記はCPU4コア、メモリ8GBのパソコンでやった結果です。CPUコア数やメモリ量を増やして変化を見てみました。以下の結果は CPU16コア, 16GB のパソコンで実行した結果です。

[変更前]
$ time python3 ./raytracing.py
()
real    5m22.154s
user    5m21.189s
sys 0m1.137s
[Spark版]
$ time ~/spark/bin/spark-submit --conf spark.driver.memory=11G ./raytracing-5.py
()
real    1m26.337s
user    0m59.485s
sys 0m5.899s

強烈に高速化できました。最高ですね!

さいごに

今回使用したコードは https://github.com/m-kiuchi/cllab181217-spark にまとめていますので参考になれば幸いです。

データ分析に限らない Apache Spark の「スケーラブルな汎用分散処理エンジン」の側面がなんとなく伝わればうれしいです! では!

Related post