よしだのブログ

サブタイトルはありません。

勉強会メモ - 第1回“Learning Spark”読書会

どうも!最近、いじり倒している Spark の読書会があるとことで、参加してきましたので、メモを公開!

http://readlearningspark.connpass.com/event/11846/

本はこちら!英語版のみですが、Spark の実質初の実践的な解説書です。

Learning Spark: Lightning-fast Big Data Analysis

Learning Spark: Lightning-fast Big Data Analysis

  • 作者: Holden Karau,Andy Kowinski,Matei Zaharia,Patrick Wendell
  • 出版社/メーカー: Oreilly & Associates Inc
  • 発売日: 2015/02/22
  • メディア: ペーパーバック
  • この商品を含むブログを見る

主催者の @data_sciesotist 様、会場を貸していただいた富士通ラーニングメディア様、ありがとうございました!引き続きよろしくお願いします。

読書会について

読書会の説明の後は、事前アンケート結果のご紹介から。2014 年から知っていると答える人が最も多く、2012年から知っているという人も1人いました!ただ、触ったことのある経験については、半分以上の人は触ったことがない、という結果でした。Spark 初心者向け、という体の勉強会ですね。これから Spark やりたい!ビックデータって何?という方にも良い勉強会だと思います。

第1章 "Introduction to Data Analysis with Spark” -Apache Sparkによるデータ分析への招待-

@data_sciesotist

Apache Spark のソフトウェアの概要の紹介です。ソフトウェアの概要と、Spark の売りポイントが色々とアピールされています。著者がコミッターだったりしますしね。

その中でも、最近個人的に魅力に感じているところを1つ紹介しますと、「一連の処理を Spark 上でできるので、データ分析パイプラインを作れる」という点です。Streaming / SQL / MLLib(機械学習) / グラフ処理 が、一連の処理としてひとつのプログラムの中で処理することができます。これまでは、Mahout など、それぞれのライブラリが個別のソフトウェアに分かれていて、開発者は組み合わせて作る必要がありました。場合によっては、稼働する Hadoop のバージョンが違うとか、Mesos でしか動かないとか、面倒なことが色々とありました。Spark の場合は、ひとつのプログラム、ひとつのライブラリのなかでこれらの処理を自由に組み合わせて作ることができるので、非常に開発が用意です。*1

以下に、先日の Developer Summit 2015 のDMMラボさんが発表していた、事例を貼ります。この事例が Spark を使ってデータ分析パイプラインを作った、最も端的に示している具体的な例だと思います。

DMM.com - DMMのビッグデータ分析のご紹介 ~Sparkによるリアルタイムレコメンド~ by DMM Tokyo-des on Prezi

第2章 "Downloading Spark and Getting Started” -Sparkを導入し、使ってみよう

@data_sciesotist

その他、懇親会など

  • Spark SQL は Hive などと連携できるほか、JDBC でつなぐことも可能なので、フロントはいつものツールを使うこともできるそうです。いわゆる、超エクセルツールとの連携もできるので、高速なアドホックの分析用途にも活用できそうです。これは是非試してみたいですね。

  • Mahout を Spark で動かすことができるそうです。ただし、" very early experimental stage ! “ http://mahout.apache.org/users/sparkbindings/play-with-shell.html

  • Spark の元の開発元である UCB の AMP Lab の公開した、分析処理機版の全体像。 https://amplab.cs.berkeley.edu/software/

  • ipython よさげ。matplotlib などと連携して、グラフを作ったりできるので、分析用途などとても便利。

  • Spark のパフォーマンスについて fastutil を普通のコレクションの代わりに使うと速くなったそうです。Koloboke なども速いという話なので、Spark で組み合わせてできるか、色々とやってみたいですね。

SmartNews小宮さんによるJavaでの統計・機械学習・自然言語処理ライブラリの紹介 | break the code!! | codebreak;

  • ec2-script について、1.1.0 で試した時は、うまく動かない *2 ということがあったのですが、1.2.1 では一応使えるようです。ただし、相変わらずエラー連発だったり、リージョンの指定をつけないとうまくいかない、--delete-groups のオプションをつけると destroy がうまくいく、ネットワークのご機嫌次第では失敗するなど、不安定みたいです。standalone クラスタにこだわらなければ、個人的には EMR おすすめです。以下、以前書いたエントリーですので、ご参考まで。

Spark 1.2.0 を Amazon EMRで動かす。 - よしだのブログ

次回について

3/28 (土) を予定しているとのことです。まだ、インストールしたところまでなので、ここからの飛び込み参加もいけますので是非一緒に参加しませんか? ちなみに、私は次回 LT に、勢いで立候補しました!EMR か、パフォーマンス・チューニングネタを話そうかしら。お楽しみに。

おまけw

*1:ただし、特に機械学習などは、使いたいアルゴリズムによっては組み合わせに制約が生じる場合があります。例えば、Streaming でリアルタイムに取り込んだデータを元に、モデルを更新したい場合、ALS は増分でモデルをアップデートできないので Streaming との組み合わせは現状意味がありません。

*2:起動時にタイムアウト時間を500秒に設定する必要があった、destroy が全く効かない、など

あなたのビジネスの指標をどうやって決めるのか? - 書評 -「LEAN ANALYTICS リーンアナリティクス」

どうも!翻訳者の角さんから献本いただきました。人生初。頂いてから随分たってしまいましたが、僭越ながら書評させていただければと思います。

f:id:yoshi0309:20150123113631j:plain

本書、LEAN ANALYTICS は、一連のリーンシリーズの最新作です。他のシリーズと同様に、主にいわゆるスタートアップ向けにどのように事業を起こし、経営していき、イノベーションを起こすかというガイドになっています。本書は、その中でもリーンの考え方で大事とされている、データ、すなわち立ち上げた事業の指標をどのように定め、測り、進むかを示しています。

アナリティクスとは即ち指標を定め、集計、結果をもとに行動を起こし、指標自信をチューニングする作業のことを指しています。アナリティクスとタイトルにあるので、統計とか、集計や可視化の手法の本のように見えるがそうではありませんのでご注意を。

まず、読み始めて最初に気がつくのは、ビジネスの指標とは、元から定められていて変えられないものではなく、自らの目指す方向に向けて決めていくものである、という点です。さらに、指標は常に固定ではなく、そのビジネスの規模や状況によって変化させなければいけず、立ち上げたばかりなのか、軌道に乗ってきた後なのかによって全く違い、適切なものを選択する必要があります。まだ、指標を定めるにあたって、指標が良いものか、イマイチなものかを示しており、一つだけ紹介すると、その結果を見て次の具体的な行動に繋がるものが良い指標である、としています。

さらに親切なことに、ビジネスの進み具合や、ビジネスのタイプ、ECなのか、SaaS なのか、無料モバイルアプリなのか、CGM なのか、種類ごとに指標の例を紹介していて、すぐに明日から使えるようになっています。何事もゼロから考えるのは大変。このように1を予め用意しておいてもらえると、本当の自分の指標を考えるベースとなり、やりやすいと思います。

私の個人的な感想としては、会社の中で働いていると指標は、目的にそって予め図るべきものが定められていて、それを図り大事なのは分析だと思っていましたが、そもそも適切な指標を選ぶ必要があり、選び方があるというのは勉強になりました。SIer づとめですが、日々プロジェクトの指標 *1 を計測して報告につかっていますが、より効果的な指標を探すために、本書のアイディアは使えそうだなと思いました。

というわけで、ベンチャーやスタートアップのみならず、企業の中でも新なサービスを起こそうとしている人、私のようにプロジェクト管理で効果的な指標が何か悩んでいる人にも、すぐに役に立つ本だと思います!

余談ですが、早速役に立ちそうな本だったので、頂いた本は悩んでいる後輩に譲って、私は買い直しましたw

Running Lean ―実践リーンスタートアップ (THE LEAN SERIES)

Running Lean ―実践リーンスタートアップ (THE LEAN SERIES)

  • 作者: アッシュ・マウリャ,渡辺千賀,エリック・リース,角征典
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2012/12/21
  • メディア: 単行本(ソフトカバー)
  • 購入: 3人 クリック: 14回
  • この商品を含むブログ (19件) を見る

*1:例えば、プログラムのライン数w とか?

Solr 5.0 リリース!

どうも! Solr がメジャーバージョンアップしましたね!いぇい。

というわけで、lucidworks 社のブログエントリーから今回のハイライトを見ていきたいと思います。

Apache Solr 5.0 Highlights - Lucidworks

所感としては、眼を見張るような大きな機能追加等は見受けられないものの、大規模運用時のクラスターの安定性の向上や、管理機能の充実によりより使いやすくなったように思います。また、地味ですが multi-value の data 型がレンジ検索できるようになったとか、結構嬉しいです。

というわけで、以下からどうぞ。

ユーザビリティの向上

  • 実装の詳細を覆い隠すかたちで、API の使いやすさを向上させている
  • 4.10 から導入された solr スクリプトでできることが大幅に増えた。コレクションの設定ファイルのセットをコピーしたり、ドキュメントのインデキシングやコレクションの削除などができるようになった。以下、その例
  • solrcloud モードで起動し、gettingstarted コレクションに lucidworks.com のインデックスを作成した後、ブラウザでコレクションの詳細を確認する。最後に、getingstarted コレクションを削除する。
bin/solr start -e cloud -noprompt
bin/post gettingstarted http://lucidworks.com
open http://localhost:8983/solr/gettingstarted/browse
bin/solr delete -c gettingstarted

管理機能

パラメータのセットを定義し、検索時に利用できるようになった。

Add/edit param sets and use them in Requests https://issues.apache.org/jira/browse/SOLR-6770

これまでは、faset や sort などの検索パラメータは、毎回クエリに条件として設定するか、あらかじめ solrconfig.xml の requesthandler の定義にデフォルト値として記載しておく必要がありました。5.0 では、起動中の Solr にリクエストで、そのパラメータのセットを設定し、検索時に利用ができるというものです。以下、上記のチケットより利用イメージの例です。

  • パラメータのセットを登録します。

以下の内容は、クエリのフォーマットに edismax を使って、デフォルトのクエリとして : を設定して、10件取ってきて・・というような一般的な内容です。繰り返し、再利用して使いたいパラメーターのセットを登録します。json 形式のデータを http で投げます。

curl http://localhost:8983/solr/collection1/config/params -H 'Content-type:application/json' -d '{
"set":{"query":{
  "defType":"edismax",
  "q.alt":"*:*",
  "rows":10,
  "fl":"*,score" },
 "facets":{
  "facet":"on",
  "facet.mincount": 1
 },
"velocity":{
 "wt": "velocity",
 "v.template":"browse",
 "v.layout": "layout"
}
}
}
  • 設定したパラメータセットを利用して検索する

次に、登録したパラメーターセットを利用します。userParams で利用したいパラメーターセットを指定します。この際に、パラメータセットを複数指定することも可能です。*1

http://localhost/solr/collection/select?useParams=query
  • 予めリクエストハンドラに設定しておく

また、あらかじめ定義したパラメータセットを利用するリクエストハンドラを用意することも可能です。これによって、以下の例のように、新たに定義した dump1 というリクエストハンドラーを呼び出すと、クライアントが毎回明示的に userParams をつけなくても、自動的に query というパラメータセットを利用することが出来ます。

<requestHandler name="/dump1" class="DumpRequestHandler" useParams="query"/>

この方式の嬉しい点は、はやり動かしながらパラメータセットを変更できる点でしょう。これまでのように再起動もしくはリロードが不要で、solrcinfig.xml というコアで重要なファイルをいじることなく、パラメーターセットをいじることができるのは管理がやりやすくなるように思います。

BALANCESLICEUNIQUE

SolrCloud では、自動的にリーダーノードがクラスタ中から選出されるようになっているが、サーバーのメンテナンス等、システム管理上の理由から特定のノードをリーダーに設定したい場合があります。5.0 から、これが可能になっています。

https://issues.apache.org/jira/browse/SOLR-6491

また、その機能追加の中で追加された API の1つに、BALANCESHARDUNIQUE というものが有ります。通常の設定方法では、1台ずつ設定していく手順になりますが、100台クラスのクラスターを構成していた場合、非常に面倒な作業になります。これを一括して行おうというのが、このAPI です。

その他の管理機能のエンハンス

  • スキーマAPIで、詳細を隠す形でのエンハンスが図られており、フィールドの追加などがやりやすくなっている

-トランザクションログの再実行時のステータスをログに出力できるようになった。スローリクエストのログもオプションでサポートした。

  • *nix 環境のサービスとしてインストールするためのスクリプトを含めるようにした

スケーラビリティ

Solr 5.0 では、クラスターの状態を保管する方法を変更することによって、大量のコレクションをスケールすることができるようになりました。5.0 よりも前では、クラスターの状態は1つのファイルに書かれており、全てのノードから変更をチェックされ、クラスターの状態に変更があった場合はこのファイルに書かれました。5.0 ではデフォルトでは、ノードごとにファイルをもたせるようにすることによって、全てのノードはクラスターの状態を確認するために、自身のファイルだけを見れば良くなりました。

安定性

  • レプリカ時の帯域幅を指定できるようになった
  • timeAllowed パラメータの値に従い、分散検索時にタイムアウト時間経過後も残っていたクエリを停止するようになった
  • ワイルドカードなどの重いクエリを投げられると、最悪クラスタ全て停止ということもあった

分散IDF

  • 分散IDFをついにサポート
  • 4つの実装が含まれる。デフォルトでは local のみを見るようになっているが、solrconfig.xml の設定変更でグローバルなIDFを見るように変更できる

統計コンポーネント

  • 統計コンポーネント(stats)を、ファンクションコンポーネントと組み合わて使うことができるようになった
stats.field={!func key=mean_rating mean=true}prod(user_rating,pow(editor_rating,2))

その他

  • DataRangeField の、multi-value フィールドのサポート
  • spacial field が、マイルとキロメートルをサポートしてより使いやすく
  • MoreLikeThis が、SolrCloudのサポート
  • Blob stage API でカスタムの jar をアップロードし、Config API でアップロードした jar を登録することで、容易にカスタムハンドラーを利用できるようになった
  • Collection API が SolrJ でサポート
  • Tika の 1.7 へのバージョンアップ (Outlook PST ファイル、MATLABファイルのサポート)

以上!

*1:複数のパラメータセット間で、同じパラメータを利用していた場合、どのような順番で値が採用されるかは確認が必要です。

勉強会メモ - 第8回elasticsearch勉強会

どうも!今日も勉強会に参加いたしました。

今日の勉強会は、初心者向けから上級者向け、ハイパフォーマンスから自然言語処理を活用したディープダイブまで、とかなり幅広く面白い勉強会でした。個人的に面白かったのは、はてなの事例 B!KUMA は Elasticsearch をバックエンドに作られている、という話や、最近リリースされたトピックのバックエンドに使われているという話です。Elasticsearch って、そんな機能まで持っているのね・・と驚きました。

というわけで、以下メモでございます。

Elasticsearch導入チェックリスト?

Elasticsearch株式会社 Jun Ohtani @johtani 大谷さん

導入、起動後に、次に何をするべきかのチェックリストだそうです。

  • 最新版の es を使う。
  • java も新しい物を使う。(java7も対応しているが、一部のバージョンに lucene のインデックスを壊すバグがあるので注意)

  • OS

    • ファイルディスクリプターを 32000 もしくは 64000 に増やす
    • ヒープを増やす、物理メモリの半分以下(ファイルキャッシュを活かすため)。最大32GBまで。MAXとMINを同じ値にする。
    • メモリのスワップを出来るだけさせない、swapオフ、swappiness、malockall
  • cluster.name

    • デフォルト値は elasticsearch、zendiscovery の機能で隣の人のマシンと勝手にクラスタを組む場合があるので、名前を必ず変える。
  • node.name

    • デフォルト:Marvel のヒーローの名前
    • 例えば、監視していた場合など、再起動後にノード名が変わるので監視できなくなる時がある
  • network.host

    • ネットワークアドレスの指定。複数 nic が刺さっている場合に、バインドしたい ip を指定する
  • discovery.zen.ping

    • Unicast にしましょう。明示的にホスト名を指定する。
    • さもないと勝手にクラスタにノードが追加されることがある。。
  • minimum_master_nodes

    • スプリットプレイン防止
    • 設定する値は (ノード数/2) + 1
    • クラスタは 3 台以上で組むことが推奨
  • その他パス

    • path.log ログディレクトリ
    • path.data データの保存先
    • path.plugins プラグインインストール先
    • バージョンアップ時などに、上書きしたり消したりしないように、elasticsearch 以下以外にする。

質問から補足

  • メモリについて
    • fielddata がメモリを食う。ソートやファセットなど、一度にデータをオンメモリに持つケースが有る。
    • docvalue を使うと、直接ファイルから読めるので最近は推奨している
    • ただし、クエリのパフォーマンスが必要なケースは fielddata のほうがいいケースも有る

Elasticsearch クエリとスキーマ定義のすごい細かい話

株式会社ドワンゴ 藤堂淳也 さん

  • niconico の全サービスを統合する検索基盤へ移行中
  • サービス側からの要件に柔軟に対応できる
    • フィールドの追加ができる
    • スキーマの変更が無停止でできるなど
    • ローリングリスタートで、再起動が必要な場合もサービスを停止せずに反映ができる

ElasticsearchとKibanaで実現する、30億req/dayのリアルタイム分析

株式会社サイバーエージェント 山田直行さん @satully

前回の勉強会の続き!以下、その時のエントリです。

勉強会メモ - 第6回elasticsearch勉強会 - よしだのブログ

  • 広告配信プラットフォームの分析・可視化基盤(Kibana+fluentd)で利用 (DSP)
  • Redshift + Tableau と併用
  • Elasticsearch はリアルタイム特化、Redshift は長期・集計済みのデータ可視化、という用途で使い分け
  • EBS から InstanceStore に変更することで、データノードを半分の数に減らしてコスト削減
  • グラフだけでなく、生ログをテーブルで見ている。生ログが見えるのは評判が良い。
  • 手がかからず安定して動作している
  • リアルタイム用途というふうに役割を絞ることでうまく使えていると思う

はてなのメディア面を支えるElasticsearch

株式会社はてな 山家雄介さん @yanbe

(参考記事:http://bookmark.hatenastaff.com/entry/2014/06/27/180000

  • B!KUMA は Elasticsearch を使って、同じDBの見せ方を変えている
  • 自動カテゴライズ編成機能
    • 恋愛と結婚などうまくカテゴライズして見せている
  • 管理画面で抽出条件を指定している
    • 一般的な検索条件以外にも、R15コンテンツの除外や、機械学習の結果などを使うオプションもある
  • はてなトピックでもりようしている
  • トピックの抽出に Significant Terms Aggregation で実現している (特徴語の抽出)。ネストするとその関連語も取れる

その他 Tips

  • URLをインデックスするときは、tokenize版、非tokeniez版、両方作るとベスト
    • 非tokenize フィールドで prefix query を使い高速に、tokenizeクエリに match フィールドでサブドメイン横断

おまけw

高速スケーラブル検索エンジン ElasticSearch Server

高速スケーラブル検索エンジン ElasticSearch Server

  • 作者: Rafal Kuc,Marek Rogozinski,株式会社リクルートテクノロジーズ,大岩達也,大谷純,兼山元太,水戸祐介,守谷純之介
  • 出版社/メーカー: KADOKAWA/アスキー・メディアワークス
  • 発売日: 2014/03/21
  • メディア: 大型本
  • この商品を含むブログ (3件) を見る

Spark / RDD のネストできない!

どうも!最近 Spark を触りたおしているよしだです。

先日、いつものように改修をしていたら、例外がでるようになってしまい、1日つぶしてしまったので皆様が同じ轍を踏むように、共有しようと思います(笑)

結論から言えば、Spark において、 RDD のネストはサポートされません。 具体的には、ある RDD を for ループや map などで処理しているブロックの中で、他のRDDは参照できません。参照しようとすると NullPointerException が出ます。なお、環境は、Apache Spark 1.2.0 です。

この例外ですが、一見すると、原因は参照しようとしているRDD中のデータが Null のように見えるのですが、もちろん実際には Null のデータが無くても発生しますので、トレースにかなり苦戦しました。一生懸命 println しても、発見できません。また、for の直前と、for に入ってすぐで比較すると、入ってすぐの場合に Exception が出るので、スコープの問題かのようにも見えます。 知っているかどうかが全てです!

以下、具体例です。

まず、参照しようとして NullPointerException を出している例です。sc.textFile で2つ RDD を作成し、一方を for ループして、その中でもう一方を参照しようとしています。

scala> val r1 = sc.textFile("s3n://abc-takumiyoshida/datasets/restaurants.csv")
r1: org.apache.spark.rdd.RDD[String] = s3n://abc-takumiyoshida/datasets/restaurants.csv MappedRDD[3] at textFile at <console>:12

scala> val r2 = sc.textFile("s3n://abc-takumiyoshida/datasets/ratings.csv")
r2: org.apache.spark.rdd.RDD[String] = s3n://abc-takumiyoshida/datasets/ratings.csv MappedRDD[5] at textFile at <console>:12

scala> for (r <- r1) {
     |
     | r2 take(5) foreach(println)
     | }

15/02/05 08:52:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1239)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
        at $line13.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:18)
        at $line13.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:17)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:765)

~以下略~

対策としては、以下のように collect を呼んでしまう、というのが一つの手になるかと思います。collect すれば、ただの Array になるので、その中で別の RDD を処理することも可能になります。

scala> val c1 = r1.collect
c1: Array[String] = Array(id,name,property,alphabet,name_kana,pref_id,area_id,station_id1,station_time1,station_distance1,station_id2,station_time2,station_distance2,station_id3,station_time3,station_distance3,category_id1,category_id2,category_id3,category_id4,category_id5,zip,address,north_latitude,east_longitude,description,purpose,open_morning,open_lunch,open_late,photo_count,special_count,menu_count,fan_count,access_count,created_on,modified_on,closed, 2,"ラ・マーレ・ド・茶屋","2F・3F","LA MAREE DE CHAYA","らまーれどちゃや",14,1013,2338,22,1789,2401,28,2240,2867,47,3755,201,0,0,0,0,240-0113,"三浦郡葉山町堀内24-3",35.16.53.566,139.34.20.129,"こちら2.3Fのレストランへのコメントになります。  『ラ・マーレ・ド・茶屋』1F(テラス&バー)へのコメントはそちらにお願いします。    駐車場15台(専用)    06/06/19 営業時間等更新(From東京グルメ)",,0,1,0,1,0,0,5,6535,"2000-09-10 11:22:02","2011-04-22 16...

scala> for (c <- c1) {
     | r2 take(5) foreach(println)
     | }
id,restaurant_id,user_id,total,food,service,atmosphere,cost_performance,title,body,purpose,created_on
156445,310595,ee02f26a,5,0,0,0,0,,"名前は忘れましたが、札幌で食べたお店よりも、全然こっちの方が美味しかったので、載せました。お店も綺麗(新規オープン・・)でランチは結構混んでいます。個人的にはゆったりと食事できるので夜の方がオススメです。  辛さが0倍から50倍まで選べるのもGOOD!、スープも2種類みたいで、友達は黄色がオススメと言っていましたが、自分は赤の方を食べました。かなり美味しかったです。店長も好感のもてるお兄さんでした。  駅近くなので一度お試しあれです!",0,"2006-10-07 05:06:09"

ただし、この方法の注意点は collect を呼ぶと、RDD のデータが driver に全て集約されるので、パフォーマンスが良くありません。データの分量を見比べて、少ない方を collect してループできれば、いい感じになるかと思います。

以上です!

Learning Spark: Lightning-fast Big Data Analysis

Learning Spark: Lightning-fast Big Data Analysis

  • 作者: Holden Karau,Andy Kowinski,Matei Zaharia,Patrick Wendell
  • 出版社/メーカー: Oreilly & Associates Inc
  • 発売日: 2015/02/22
  • メディア: ペーパーバック
  • この商品を含むブログを見る

Spark 1.2.0 を Amazon EMRで動かす。

どうも!今年の初エントリーです。今年もよろしくお願い致します。

今回は、Apache Spark 1.2.0 を Amazon EMR で動かしてみることに挑戦しました。Spark ではずっと遊んでいたんですが、MLLib をつかうのが目的だったので開発中はクラスタで動かすひつようもなく、ローカルで動かしていたためイマイチその速さがわかりませんでした。まあ、今後の本番のことを考えると、クラスターでの可動は検証しておく必要があったので、トライしてみました。Amazon EMR で動かしてみた理由は、Hadoop / YARN を立てるのがめんどくさかったので、YARN にも対応したとのことなので、こちらで動かしてみました。結果としては、かなりお手軽に動かすことができるので、おすすめです。

ちなみに、EMR で動かす前に、Spark 1.2.0 に付属している、Spark の スタンドアロンモードを EC2 上で機動する、EC2 スクリプトを試してみましたが、うまく動きませんでした。起動は timeout を500秒に設定することで成功しましたが、停止はうまく動かず手でインスタンスを全て停止する必要がありました。多分スクリプトのバグですw

作業は、以下の記事を参考にしました。詳細な手順はこちらをご参考にどうぞ。私のエントリでは、下記の記事でうまく行かなかったところやTIPSなどを載せたいと思います。

Spark on EMR(YARN対応)を動かす - Qiita

Spark on Amazon EMR の、そもそもの仕組み

まず、エントリで紹介されているそもそもの仕組みについて、ちょっとだけ補足。この辺りを先に知っておくと、作業がスムーズに進むと思います。

  • そもそもですが、EMR はそのまま起動しただけでは、Spark は使えません。

  • EMR で Apache Spark を動かす方法ですが、起動コマンドで指定する bootstrap-action が肝になっています。bootstrap-action で指定された URL 上にあるスクリプトを EMR はダウンロードしてきて、インスタンスの起動後に実行します。Spark の場合、install-spark というスクリプトが用意されておりこれが実行され、全インスタンスに Spark がインストールされます。

  • エントリでは作業用にインスタンスを1台立てています。IAMロールの指定など条件がありますが、条件が揃っているインスタンスならOKです。こちらでは、Spark は稼働しませんのでスペックは問いません。コマンド実行時に、MASTER と CORE のインスタンス数をそれぞれ指定しますが、さらにその台数分インスタンスが起動します。この上で Spark は稼働します。したがって、エントリ通りですと、1台+11台(MASTER 1台、CORE 10台)のインスタンスが全部で起動します。

  • インスタンスの種別に m1.large を指定していますが、特に問題はありませんでした。EC2 でお馴染みの、m3.large は指定できないなど、EC2 とは差異があります。*1

  • クラスタの正常起動後の Spark の使い方ですが、ssh で MASTER に指定したサーバーにログインし、そこで Spark を起動します。pyspark を起動している様は以下の通り。

[hadoop@ip-172-31-6-19 ~]$ cd spark/
[hadoop@ip-172-31-6-19 spark]$ ls
bin  CHANGES.txt  classpath  conf  ec2  examples  lib  LICENSE  NOTICE  python  README.md  RELEASE  sbin
[hadoop@ip-172-31-6-19 spark]$ bin/
beeline               run-example           spark-shell.cmd       utils.sh
compute-classpath.sh  spark-class           spark-sql
pyspark               spark-shell           spark-submit
[hadoop@ip-172-31-6-19 spark]$ bin/pyspark
  • 自作した Spark アプリを動かすには、クラスタ起動後に SSH で MASTER に転送する必要があります。MASTER のサーバーへのプログラムの配置は、emr ssh コマンドで行うのが正解だと思いますが、EC2 のセキュリティグループをいじって、手元のローカルから SCP 接続してファイルを転送することも可能です。セキュリティ上問題がある可能性もあるのでおすすめはしませんが。。。どのインスタンスがマスターかは、EMR の管理UIで確認することが出来ます。

コマンドラインについて

まずはじめに、エントリー通りの以下のコマンドを実行してみました。結果、インスタンスが起動するところまではうまくいくのですが、bootstrap 処理でエラーになり、クラスタが起動しませんでした。

aws emr create-cluster --region ap-northeast-1 --name SparkCluster --ami-version 3.3.1 --no-auto-terminate --service-role EMR_DefaultRole --instance-groups InstanceCount=1,BidPrice=0.03,Name=sparkmaster,InstanceGroupType=MASTER,InstanceType=m1.large InstanceCount=10,BidPrice=0.03,Name=sparkworker,InstanceGroupType=CORE,InstanceType=m1.large --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName=xxxxxxxxxx --applications Name=HIVE --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-v,1.2]

そこで、Spark 用の bootstrap-action が配布されている AWS Lab の Github レポジトリ *2 を確認したところ、バージョンの指定がどうやら違うことがわかりました。バージョンを 1.2 から 1.2.0.a に変更して再実行したところうまくいきました。(一番おしりのバージョン指定のみが異なります。)

aws emr create-cluster --region ap-northeast-1 --name SparkCluster --ami-version 3.3.1 --no-auto-terminate --service-role EMR_DefaultRole --instance-groups InstanceCount=1,BidPrice=0.03,Name=sparkmaster,InstanceGroupType=MASTER,InstanceType=m1.large InstanceCount=10,BidPrice=0.03,Name=sparkworker,InstanceGroupType=CORE,InstanceType=m1.large --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,KeyName=xxxxxxxxxx --applications Name=HIVE --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args=[-v,1.2.0.a]

上記のコマンド実行後は、MASTER に自作の Spark アプリなり、スクリプトなりを転送して実行するとクラスタで稼働します。spark-submit コマンドの --master 引数に yarn-client を指定することを忘れないようにしましょう。

アドバイス:Spark するなら Scala で! (もしくは Java...)

以下は、Python のスクリプトを ganglia でモニタリングしているところ。なんか一箇所だけ赤い。。

f:id:yoshi0309:20150129221239p:plain

ganglia でモニターしてわかったことですが、pyspark はクラスタ構成時の実行に問題があるようで、1つの WORKER に処理が集中していました。このことが原因で、すべて Scala で書き直しをするはめになったのですが pyspark の問題はそれだけではなく、処理の途中でフリーズすることもあるようです。同じ処理を Scala で書きなおしたものを実行したところ、こちらはフリーズせずに稼働しました。

参考。

Hadoop のおすすめ書籍はこちら。Kindle版 もあります。オライリーの Hadoop 本は厚すぎて引いた、まずは使ってみて手を動かしながら覚えたい、という方には特におすすめです。

Spark の話はありませんが、Spark を使う上での必須の知識が詰まっています。

Hadoop徹底入門 第2版

Hadoop徹底入門 第2版

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

Hadoop徹底入門 第2版 オープンソース分散処理環境の構築

*1:こちらに指定可能なインスタンスタイプが一覧されています。 http://aws.amazon.com/jp/elasticmapreduce/pricing/

*2:https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark