読者です 読者をやめる 読者になる 読者になる

望月いちろうのREADME.md

書き溜めておいた技術記事や旅行記のバックアップです。

Spark から MongoDB が使える? Mongo Sparkが面白い

Scala MongoDB

github.com

Scalaの分散処理フレームワークSparkからMongoDBが使えるという触れ込みのこのライブラリ Spark 1.6以上に対応しているそうです。

初期設定

SparkContextを作成する前にMongoConnectorの準備を完了する必要があります。 最低限必要な設定はmongodb.input.urimongodb.output.uriです。

まずテストのためにspark-shellから呼び出してみることにしましょう。 この場合は読み込む元も書き込む先も同じローカルで動作するMongoDBインスタンスのtestコレクションなので以下のオプションを指定します。

spark-shell --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.coll?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.coll" --packages org.mongodb.spark:mongo-spark-connector_2.10:2.0.0-rc0

もしjava.net.BindException: Can't assign requested addressが表示された場合はSparkのシェルがローカルで動作しているか確認してください。 またはexport SPARK_LOCAL_IP=127.0.0.1を設定するか、--driver-java-options "-Djava.net.preferIPv4Stack=true"をspark-shellの起動時のオプションに追加してください。

MongoDBとRDDの連携

MongoDBとRDDの通信は実際にMongoDBに読み書きが実行される際に行われる。 SparkとMongoDBとの連携を有効かするには以下のパッケージを読み込む必要がある。

import com.mongodb.spark._

MongoDBからRDDへのデータのロード

sc.loadFromMongoDBを利用して任意のMongoDBコレクションをロードして RDDへ変換することができる。ロード元のMongoDBのアドレスは名前空間spark.mongodb.inputに指定する。 この設定を変更するにはReadConfigを利用してsc.loadFromMongoDBと組み合わせたりする必要がある。

import com.mongodb.spark.config._

val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd = MongoSpark.load(sc, readConfig)
println(customRdd.count)
println(customRdd.first.toJson)

同様に暗黙のヘルパー関数としてSparkContextのメソッドとして利用することもできる。

sc.loadFromMongoDB() // SparkConfのメソッド
sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // ReadConfigを読み込む

集計

SparkのRDDでの基本操作は変換とアクションに大別される。MapやFilterのような変換処理は実際にアクションが実行されるときに始めて評価されることに注意しよう。 とくにMongoDBとRDDの連携の際にはどのデータがSparkに読み出されるかをみるのは大事である。以下のFilter処理はtestコレクションが値が5よりも大きいものだけが読み込まれる

val filteredRdd = rdd.filter(doc => doc.getInteger("test") > 5)
println(filteredRdd.count)
println(filteredRdd.first.toJson)

もしjava.lang.NullPointerExceptionを受け取ったときは誤ったテーブルを参照している。aggregation pipelineを利用することでこのリスクを低減することができる。 この操作によってMongoDBから必要なデータだけをSparkに呼び出すことが可能になる。MongoRDDはaggregation pipelineに渡すことで実際にデータをSparkに呼び出す前にFilter処理を可能にするものである。 以下の例はtestフィールドが5よりも大きな値をもつテーブルだけを抽出するものである。

val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { test : { $gt : 5 } } }")))
println(aggregatedRdd.count)
println(aggregatedRdd.first.toJson)
val rdd = MongoSpark.load(sc)
println(rdd.count)
println(rdd.first.toJson)

RDDからMongoDBへのデータの保存

RDDからMongoDBにデータが保存される際には必ずMongoDBのデータ形式であるBSONへの変換を行わないといけない。この場合は各データをDocument (または BsonDocument, DBObject)へのmap関数として処理を行う。 またScalaのList型には対応していないのでimport scala.collection.JavaConverters._を読み込んだのち asjava()メソッドを使ってJavaの型に変換する必要がある。

import org.bson.Document
val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))

MongoSpark.save(documents) // SparkConfを事前に設定しておく

コレクションの選択

MongoDBへの保存されるコレクションを個別に選択するには WriteConfigを利用する。

import com.mongodb.spark.config._

val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(sc))
val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))
MongoSpark.save(sparkDocuments, writeConfig)

RDDのメソッドとして以下のような形式も指定できる。

rdd.saveToMongoDB() //  SparkConfを事前に設定しておく
rdd.saveToMongoDB(WriteConfig(Map("uri" -> "mongodb://example.com/database.collection"))) //WriteConfigを利用する

もし、細かい設定を行いたいときはMongoSparkのbuilder()メソッドを利用して Mongo Spark Connectorの細かい制御をすべきである。