こちらの記事はDatabricks Advent Calendar 2022の24日目の記事です。
はじめに
初めまして。カケハシでデータサイエンティストをしている赤池です。
弊社はフルリモートで業務できるため今年9月から地元の仙台市で業務していますが、本格的な冬の到来を前に戦々恐々しています。(寒い。雪。路面凍結。)
さて、あなたは「Pandas API on Spark」を知っていますか? これは「pandasと同じ書き方でSpark上で処理を実行できる」という代物で、pandasでは処理に時間がかかる or そもそも扱えないような大規模データを、ほとんどpandasと同じ感覚で処理できる というものです。 私はそう信じ、いつものpandasと同じ感覚でPandas API on SparkのDataFrameを操作してみたら・・・あら不思議。何度もつまずきました。
今回は私のTry & Errorを元に、普段pandasばかり使っている私が、Pandas API on Sparkをpandasと同じ感覚で使っていたらつまずいたポイントを原因や対処法とともにご共有します。 最後までお付き合いいただけると幸いです。
この記事の主な対象者
下記全てを満たす方(私の状況と似ている方)
- (業務・プライベート問わず)データ分析をしている
- pandasに習熟している
- Spark初心者
- 「Pandas API on Spark」を使おうと思っている or 使い始めたばかり or なんとなく興味がある
目次
- 【Case 1】どうやってテーブルを読み込めばいいの?
- 【Case 2】
groupby
の結果をイテレータとして使えない - 【Case 3】
assign
+cumcount
で連番を振れない - 【Case 4】
plot
メソッドにmatplotlibのaxisを設定できない - 【Case 5】
dtypes
の実行結果はPandas API on SparkのSeriesではない - 【Case 6】各種DataFrameの変換メソッド名に統一感がなく、混乱する
- 【Case 7】DataFrameの変換時にデータ型が変わるケースがある
【Case 1】どうやってテーブルを読み込めばいいの?
つまずき
テーブルをPandas API on SparkのDataFrameとして読み込みたいけど、どうすればいいんだろう・・・。 PySpark?Pandas API on Spark?はて・・・?
解決方法
pyspark.pandas.read_table()
の引数に文字列で「 カタログ名.DB名.テーブル名 」を指定すれば、Pandas API on SparkのDataFrameとして読み込めます。
【Case 2】groupby
の結果をイテレータとして使えない
つまずき
「いつものようにfor文にgroupby
の結果を指定して(in
の右側)、DataFrameの任意のカラムの区分値ごとに、いくつかのグラフを並べて描画しよう!」
そう思い、DataFrameをpandasからPandas API on Sparkに差し替えて実行。
import pyspark.pandas as ps psdf = ps.DataFrame({ 'col1': ['A', 'A', 'B'], 'col2': [3, 2, 1] }) for k, _psdf in psdf.groupby('col1'): print(k, ':', len(_psdf))
すると、以下のエラーが・・・1
KeyError: (0,)
情報少な・・・なにこれ・・・
原因
Pandas API on Sparkのgroupby()
の返り値には__iter__
が実装されていない ため、エラーになるようです。
解決方法
スマートかどうかは置いておいて・・・pandasのDataFrameに変換するか、もしくは同様の処理ができる他の方法を実装するなどして回避する必要がありそうです。
【pandasのDataFrameに変換する例】
import pyspark.pandas as ps psdf = ps.DataFrame({ 'col1': ['A', 'A', 'B'], 'col2': [3, 2, 1] }) pdf = psdf.to_pandas() for k, _pdf in pdf.groupby('col1'): print(k, ':', len(_pdf))
【同様の処理ができる他の方法の例】
import pyspark.pandas as ps psdf = ps.DataFrame({ 'col1': ['A', 'A', 'B'], 'col2': [3, 2, 1] }) col1_list = psdf['col1'].unique().tolist() for k in col1_list: cnt = psdf.query(f'col1 == "{k}"').shape[0] print(k, ':', cnt)
【Case 3】assign
+ cumcount
で連番を振れない
つまずき
「以下のデータに現在のソート順でcol1の値ごとに上から連番を振りたい・・・pandasと同じやり方で!」
そう思い、Pandas API on SparkのDataFrame(以下、変数psdf
)にデータを読み込み、以下の2パターンの方法を試してみました・・・が、いずれもうまくいかず・・・。
【元データ】
col1 | col2 |
---|---|
a | 3 |
a | 8 |
b | 4 |
c | 1 |
c | 4 |
c | 1 |
パターン1
- 【処理】
python psdf = psdf.assign(num = lambda x : x.groupby('col1').cumcount())
- 【結果】
- None (返り値はNoneだが、
psdf.columns
を実行するとnum
というカラム名は追加されていた)
- None (返り値はNoneだが、
パターン2
- 【処理】
python psdf['num'] = psdf.groupby('col1').cumcount()
- 【結果】
ValueError: Cannot combine the series or dataframe because it comes from a different dataframe. In order to allow this operation, enable 'compute.ops_on_diff_frames' option.
・・・ん? In order to allow this operation, enable 'compute.ops_on_diff_frames' option.
?
原因
異なるDataFrame間の結合は内部的にコストがかかるため、compute.ops_on_diff_frames
オプションで制御されている(デフォルトはfalse
)ことが、上記パターン2の直接的な原因だそうです。2
※パターン1はわかりません・・・ちなみに、compute.ops_on_diff_frames
オプションをtrue
に設定しても結果は変わりませんでした。
解決方法
現状、私には「ある区分値ごとに連番を振る」処理のベストプラクティスはわかりませんが・・・少なくとも以下の3つの方法で対処できました。
【対処法1】compute.ops_on_diff_frames
オプションを操作する
上記パターン2の実行前にcompute.ops_on_diff_frames
オプションをtrue
に設定するとうまくいきます。
ps.set_option('compute.ops_on_diff_frames', True) psdf['num'] = psdf.groupby('col1').cumcount()
【対処法2】連番を生成したSeriesやDataFrameをJOINする
元データから連番を生成したSeriesやDataFrameを作成し、そのDataFrameと元データをインデックスをキーにしてJOINすればうまくいきます。
psdf_num = psdf.groupby('col1').cumcount().rename('num') psdf = psdf.merge(psdf_num, left_index=True, right_index=True)
【対処法3】SQLのROW_NUMBER関数を使用する
pyspark.pandas.sql()
を使用し、Pandas API on SparkのDataFrameに対してSQLを実行することができます。
この機能を用い、WINDOW関数のROW_NUMBER()
を使用することで対処できました。
query = """ SELECT col1 , col2 , ROW_NUMBER() OVER(PARTITION BY col1 ORDER BY index) AS num FROM {psdf} """ # ROW_NUMBER()のORDER BY用のカラムを追加しておく pyspark.pandas.sql(query, psdf=psdf.reset_index().sort_values(by='index'))
【Case 4】plot
メソッドにmatplotlibのaxisを設定できない
つまずき
「下のデータをcol1、col2ごとに合計して、col1の値ごとに横並びで可視化しよう・・・pandasと同じようにplot
メソッドを使って、そこにmatplotlib
のadd_subplot
を指定して実行・・・っと!」
【元データ】
col1 | col2 | col3 |
---|---|---|
A | a | 3 |
A | a | 2 |
A | b | 1 |
B | a | 4 |
B | b | -2 |
B | a | 1 |
【実行した処理】
# 集計 _psdf = psdf.groupby(['col1', 'col2'], as_index=False)[['col3']].sum() # 描画領域の設定 fig = plt.figure(figsize=(12, 5)) ax1 = fig.add_subplot(1, 2, 1) ax2 = fig.add_subplot(1, 2, 2) # 可視化 _psdf.query(f'col1 == "A"').plot.bar(x='col2', y='col3', title='A', ax=ax1) _psdf.query(f'col1 == "B"').plot.bar(x='col2', y='col3', title='B', ax=ax2)
すると、こんなエラーが・・・ え、この引数、想定もされてない・・・?
TypeError: bar() got an unexpected keyword argument 'ax'
原因
Pandas API on SparkのDataFrameのplot
メソッドで呼ばれるのはPlotlyであり、matplotlibのsubplotは受け付けないようです。
解決方法
素直にPlotlyに入門してPlotlyの方法で可視化の処理を書くか、DataFrameの大きさ次第ではpandasに変換して上記処理を実行するのがよさそうです。
なお、上記のエラーで失敗した可視化は、Plotlyでは以下のように実装できます。3
from plotly.subplots import make_subplots # 集計 _psdf = psdf.groupby(['col1', 'col2'], as_index=False)[['col3']].sum() # 描画領域の設定 fig = make_subplots(subplot_titles=('A', 'B'), rows=1, cols=2) # 可視化 fig.add_trace(_psdf.query(f'col1 == "A"').plot.bar(x='col2', y='col3').data[0], row=1, col=1) fig.add_trace(_psdf.query(f'col1 == "B"').plot.bar(x='col2', y='col3').data[0], row=1, col=2)
【結果】
【Case 5】dtypes
の実行結果はPandas API on SparkのSeriesではない
つまずき
「Pandas API on Sparkのメソッドなどを実行した時に画面上SeriesやDataFrameの形で表示されてるのは、Pandas API on SparkのSeriesやDataFrame でしょ?」・・・そう思って実行したdtypes
。
ところが返り値の型を確認すると、 pandasのSeries でした。
「え、処理によってはpandasの場合もあるのかよ!?」動揺した私は、割と使う以下の処理について調べてみましたが、この中では dtypes
のみがpandas由来の型 でした。どうして・・・。
dtypes
describe()
corr()
集計関数()
groupby().集計関数()
nunique()
解決方法
上記処理については dtypes
のみpandas由来でそれ以外はPandas API on Spark由来の型 であることを理解した上で、出力結果がSeriesやDataFrameと思われる上記以外の処理については逐一型を確認するのがよさそうです。
【Case 6】各種DataFrameの変換メソッド名に統一感がなく、混乱する
ここまでPandas API on Sparkにフォーカスしてきましたが、これはpandasやPySparkと併用することができます。
余談ですが、学習初期の私は PySparkとPandas API on SparkのDataFrameが別物かどうかすら理解しておらず 、そのため、どちらのやり方でデータを読み込めばいいかわかりませんでした。 (PySparkとPandas API on SparkのDataFrameはオブジェクト上、別物 でした。)
「やれ」と言われたらやる気が失せる。「やめろ」と言われたらやりたくなる。「併用できる」と聞くと、これについては素直に併用したくなるのが人情。
・・・かどうかはさておき、例えば、PySparkならspark.sql()
の引数にクエリを文字列で指定すれば SQLの実行結果をPySparkのDataFrameとして取得 できたり、あまり大きくないデータを処理する際のPandas API on SparkのDataFrameの実行時間を節約したい場合は pandasに変換して素早く処理 することもできるため、それらの特長をいいとこ取りしようとすると結果的に併用することになるのではないでしょうか?
つまずき
ところが、 メソッド名が微妙に違う という驚きの統一感のなさによって、私には全然覚えられない・・・ 思わぬ敵が行く手を阻みます。
解決方法
変換元と変換先の型に対応するメソッド名を一覧化し、それを逐一参照しながら覚える・・・しかなさそうです(私は)。
以下、短気でおっちょこちょいな私用のチートシートを作ったのでご参考までに。
元データ | 変換後データ | 処理 |
---|---|---|
PySpark | Pandas API on Spark | psdf = sdf.pandas_api() |
PySpark | pandas | pdf = sdf.toPandas() |
Pandas API on Spark | PySpark | sdf = psdf.to_spark() |
Pandas API on Spark | pandas | pdf = psdf.to_pandas() |
pandas | PySpark | sdf = spark.createDataFrame(pdf) |
pandas | Pandas API on Spark | pdf = ps.from_pandas(pdf) |
※ DataFrameはそれぞれ以下のように対応
- pdf → pandas
- sdf → PySpark
- psdf → Pandas API on Spark
【Case 7】DataFrameの変換時にデータ型が変わるケースがある
つまずき
Case 6を通じ、3種類のDataFrameを自由に変換できるようになった私。
これを活かせば、 SQLの方が簡単に書ける処理はspark.sql()
で、pandas形式の方が簡単に書ける処理はPandas API on SparkのDataFrameで作成し、それぞれを結合することもできる ・・・よし、そのやり方で前処理してみよう!」
そう思い、Case 6の変換処理を活用して前処理していると、思わぬエラーが・・・え、カラムのデータ型が違うって?一体どこが?
原因
PySparkからPandas API on SparkのDataFrameに変換する際に、カラムが 意図せぬデータ型に変換されることがあります 。 例えば decimal型、date型はobject型に勝手に変換される ことに注意が必要です。4
解決方法
脚注3のリンク先に記載された各種DataFrameの変換時におけるカラムのデータ型の変化を参照し、対応関係を一つ一つ覚えるほかなさそうです。
おわりに
Pandas API on Sparkに関する私のつまづきを挙げてきましたが、これらがデータ分析のお役に立てば幸いです。
個人的に使っていて感じるのは、「地道にpandasとの違いを把握するのと同じく、扱っているDataFrameの型を見失わないように変数を 命名規則などで一目で区別 できるようにするのが重要」ということです。 「自社の業務や副業、プライベート、学習などを問わず、日々の分析ではPandas API on Sparkしか使わない」という場合は命名規則による区別は不要かもしれません。しかし、そうでない方にとっては「記法がほぼ同じだが処理が部分的に異なる」それぞれのDataFrameを明示的に区別しないと時に混乱を招くように思います。
長くなりましたが、ここまでお読みくださりありがとうございました!
- 実際は、このfor文を使ったやり方でPandas API on SparkのDataFrameに対してmatplotlibのaxisを使用して複数グラフを描画しようとすると、groupbyの箇所だけでなくplotのメソッドにてaxisを指定する箇所でもエラーが発生します。↩
- 「Pandas API on Spark」User Guide の「Operations on different DataFrames」より。↩
- 「Pandas API on Spark」API Referenceの「pyspark.pandas.DataFrame.plot.bar」より。↩
- 「Pandas API on Spark」User Guide の「Type Support in Pandas API on Spark」より。↩