読者です 読者をやめる 読者になる 読者になる

望月いちろうのREADME.md

書き溜めておいた技術記事や旅行記のバックアップです。

Apache Spark 入門 - ビックデータ解析のための分散処理フレームワーク - その実績も

Scala

Apache Sparkとは

ビックデータ解析のための分散処理フレームワーク

特徴

Hadoopなどと連携させてビックデータを解析することが得意

なぜSparkなのか?

有向非循環グラフを実行可能、複雑な処理を手軽に実行できる。

RやPythonではダメなの?

スケーラビリティに欠けるのでビッグデータには不向き

Scalaは関数型言語なので集計計算など並列処理が得意

RもPythonも関数型ではないので並列計算が苦手

RやPythonからSparkを利用することもできる。 またSparkからRやPythonを利用することも可能。

実績

国内

  • リクルート
  • NTTデータ

など

リクルートがSparkを利用して計算時間を大幅に改善した事例が以下の本に載っています。

初めてのSpark

初めてのSpark

  • 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia,Sky株式会社玉川竜司
  • 出版社/メーカー: オライリージャパン
  • 発売日: 2015/08/22
  • メディア: 大型本
  • この商品を含むブログ (4件) を見る

Sparkの基本的なモデル

複数のデータセットを利用する、通常はHadoopのような永続化されたストレージに格納されている。

RDD

RDD(耐障害性分散データセット)はデータを複数のマシンに分散させることで障害に強くなっている。

よくある誤解

SparkはHadoopを組み合わせて利用するもの

確かにSparkはHadoopとの相性がよいが、SparkはHadoopを使わなくても利用することができる。たとえばMySQLなどのRDBMS、CassandraやMongoDBなどのNoSQLを利用することもできるし、Amazon S3などのクラウドストレージ、さらにはローカルのデータをそのまま扱うこともできる。

Cassandraとの連携用

github.com

MongoDBの公式が提供しているmongo-spark

github.com

Macでの導入

brewを利用して簡単にインストールをすることができる。注意すべき点はsparkではなくapache-sparkであること。

brew install apache-spark

うまくいけば以下のような表示が出る

==> Using the sandbox
==> Downloading https://www.apache.org/dyn/closer.lua?path=spark/spark-2.0.1/spark-2.0.1-bin-hadoop2.7.tgz
==> Best Mirror http://ftp.jaist.ac.jp/pub/apache/spark/spark-2.0.1/spark-2.0.1-bin-hadoop2.7.tgz
######################################################################## 100.0%
🍺  /usr/local/Cellar/apache-spark/2.0.1: 1,206 files, 204.3M, built in 3 minutes 8 seconds

勉強のためにローカルでsparkを起動してみましょう。

spark-shell

するとこのようにSparkの対話的実行環境が起動します。

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
16/11/06 12:17:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/06 12:17:42 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://10.27.9.225:4040
Spark context available as 'sc' (master = local[*], app id = local-1478402261740).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

ローカルでWeb UIを見ることができます。http://10.27.9.225:4040にアクセスしてみましょう。 するとこのような画面がでてきます。

f:id:mochizuki_p:20161107201426p:plain

ここからタスクやクラスタについての概要が確認できます。

RDDの作成

前述した通りSparkでファイルを扱う単位はRDDです。まずはRDDを作成してその基本的な操作について実行してみることにしましょう。

RDDとしてテキストファイルを扱うにはsc.textFile()を利用します。ここではカレントディレクトリにあるtest.pyをRDDに上げてみたいと思います。

scala> val test = sc.textFile("test.py")
test: org.apache.spark.rdd.RDD[String] = test.py MapPartitionsRDD[1] at textFile at <console>:24

2行目にあるのorg.apache.spark.rddがSparkのRDDの基底クラスです。 テキストファイルは行ごとに扱われています。まずは行数を確認してみましょう

scala> test.count()

res0: Long = 10   

10行のファイルであることがわかります。

次に最初の行を確認してみましょう。

scala> test.count()
res0: Long = 10   

SparkContenx - Sparkでファイルを扱う基本単位

感の良い方ならRDDを作成するときに利用したscについて疑問に思われたかもしれません。これはSparkContenxのことです。SparkContextはRDDを管理して、操作を行うための基本単位です。scについて調べてみましょう

scala> sc
res3: org.apache.spark.SparkContext = org.apache.spark.SparkContext@38d4488c

例としてimportを含む行を抽出してみようと思います

val imports = test.filter(_.contains("import"))

imports: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26

注意点としては対話的実行環境では自動的にSparkContextが構築される一方、実際のアプリケーションでSparkを運用する際には自分で適切な設定を行う必要がある点です。 sbtやmavenのようなプロジェクト管理ツールを利用している場合は設定ファイルに関連するパッケージを読み込むように追記する必要があります。

たとえばsbtであれば以下の形式でlibraryDependenciesを追記してsparkを読み込む必要があります。

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.2.0" % "provided"
)
val imports = test.filter(_.contains("import"))
imports: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26

プロジェクトにsparkがインストールされている前提でsparkライブラリをインポートする例

import org.apache.spark.sparkConf

import org.apache.spark.sparkContext.

import org.apache.spark.sparkContext._

val conf = new SparkConf().setMaster() .setAppName("local")
val sc = new SparkContext(conf)

Sparkでの式評価について

SparkでのRDDの操作は原則的に遅延評価になっています。データに操作を加えるたびにデータを読み込んでいたのでは非常に不効率であるのは自明であることから、最終的な評価する時点まで実際の計算は行われません。

もしあるRDDをプログラム中で何度も処理する場合はpersist()を利用してストレージを永続化する必要があります。

SparkでのRDDの生成について

Sparkでは2つの方法でRDDを生成することができる。

  • HadoopあるいはCassandra、MongoDBといった外部のストレージまたはローカルファイルからデータをロードする
  • コレクションをプログラムで分散する

一般的には前者の方法が用いられる。 後者はプログラム中でparallelize()メソッドを用いることで利用出来る。

RDDの操作

RDDに対する操作は変換とアクションに2つに大別できる。 変換は元のRDDから新しくRDDを定義する操作で、有向非循環グラフとして定義することが可能。この関係は系統グラフとして表現でき、それはScala内部で保存され、障害が発生した際にはこのグラフを元にRDDの復元を試みる。変換は基本的に遅延評価で実際にデータが取り出される段階で評価される。

イテラブルなRDDに対する基本的な変換は以下のようになる。

変換 説明 例の説明
map() RDDの各要素に関数を適用してその結果を各要素と置き換えた結果を返す rdd.map(x => x*x) 各要素を2乗したRDDを返す
flatMap() RDDの各要素に関数を適用して、その関数が返すイテラブルな要素を結合したRDDを返す rdd.flatMap(_.split(" ")) RDDの各要素(英語の文章を)単語に分割する。
filter() RDDに各要素に真偽値を返す関数を適用して、trueを返した要素のみを取り出す。 lines.filter(_.contains("136.245.12.67") "136.245.12.67"を含んでいる行のみを返す
distinct() RDDの重複する要素を取り出す
sample() RDDから一部のデータのみランダムに抽出する

またアクションはRDDから特定の情報を取り出す操作で、プログラミング的な見方をすればRDDから別の型の値を取り出す操作と言える。

このことを確かめるためにサーバーのログファイルをRDDに取り込み、特定のIPのアクセスログを抽出する操作をしてみたいと思う。

val lines = sc.textFile("access.log")
val ips = lines.filter(_.contains("136.245.12.67")) //この時点では評価されない
ipp.foreach(println)//この時点で評価
 

Sparkと関数型プログラミング

SparkのRDDへの変換は基本的に副作用のないメソッドである。これは元のデータを破壊しないという点で安全な操作である。

RDDのデータの扱い

処理後のRDDはcollect()を利用してローカルに取り出すことができる。しかしこれはサイズの大きいRDDには不適切で、一般的にはHDFSなどの分散ストレージに保存する。またはsaveAsTestFile()saveAsSequenceFile()などを利用して保存することができる。

Sparkと集合演算

Sparkでは同じ形式のRDDに対して、各種集合演算を実行することができる。これはデータ型が集合として定義できない場合も使用できる。和集合や積集合、カルテシアン積などの生成である。

変換 説明
union() 2つのRDDの和集合を取る rdd.union(otherRdd)
intersection() 2つのRDDの積集合を取る rdd.union(otherRdd)
subtract() 2つのRDDの差分を取る rdd.subtract(otherRdd)
cartesian() 2つのRDDからカルテシアン積を作成する rdd.cartesian(otherRdd)

reduceとfold

RDDに対するアクションとしてもっとも重要なのは、RDDの隣接した2つの要素に対して同じ型を返すreduce()fold()です

しかし、この同じ型を返すという制約は、時に問題になることがあります。このような場合は、aggregate()関数を利用すると良いでしょう。