UTALI

書き溜めておいた技術記事や旅行記のバックアップです。

ScalaのRPCフレームワーク - Finagle入門

元記事

FinagleはTwitterのRPCシステムです。このブログではその説明とコアデザインを説明していきたいとおもいます。FinagleのREADMEにはより詳しい詳細が書いてあります。Finagleはクライアントとサーバーの強固な関係を簡単に構築できるように設計されています。

FinagleフレンドリーなREPL

これからはScalaの標準的なコードとはかけ離れたコードを利用します。もし学習用にREPLを利用する場合は、TwitterのFinagleと依存関係をどのように設定すべきか困惑するかもしれません。

FInagleのソースコードをfinagleという名のディレクトリに格納している場合は、コンソールに以下のコードを入力することでFinagleを利用できるようになるはずです。

$ cd finagle
$ ./sbt "project finagle-http" console
 ...build output...
scala>

Futures

Finagleはcom.twitter.util.Futureを遅延操作をエンコードするために使用します。Future型とは将来利用可能になる場合があるが現時点では利用できない値を扱うための型です。FinagleはFuture型を非同期に取得するAPIの返り値を扱うために使用します。同期APIは結果が返ってくるまで処理を待ちますが、非同期APIはそうではありません。いくつかのサービスのHTTPリクエストは結果が返ってくるまで0.5秒以上もかかることは珍しくありません。このように長時間にわたって処理をブロックすることはあまり望ましいことではありませんので、結果が返ってくるのが遅いAPIはFuture型を利用すべきです。

val myFuture = MySlowService(request) // 値はすぐに返却されるので、この間に別のことをする。
val serviceResult = Await.result(myFuture) // 結果が返ってくるまで処理が中断される。

実際にはAwait.result()のようなリクエストが返ってきてから呼ばれる関数は利用すべきではありません。これからFuture型組み込みの値が利用可能になってから呼ばれるコールバックを登録するメソッドを紹介します。

もし別の非同期APIを利用している場合、コールバックという言葉にうんざりしているかもしれません。それは関数を実際に利用される場所から遠い場所で定義する、非常に可読性の低いコードを書くことを強要すると考えているからかもしれません。

しかしFuture型はScalaの第1級関数が提供する非常に読みやすいコードフローを利用するので、関数が発火する場所でシンプルな関数を定義することができるのです。

例として、リクエストを送って返り値を処理する関数を定義してみましょう

val future = dispatch(req) // returns immediately, but future is "empty"
future onSuccess { reply => // when the future gets "filled", use its va
  println(reply)
}

REPLでFuture型の威力を体験してみてください。これは実運用として使うには陳腐なものですが、APIの機能を理解するには十分でしょう。もしREPLを使用している場合は、Promise型が便利なはずです。これはFuture型の抽象化された強固なサブクラスで、現在は何の値も入っていないFuture型を作成することができます。

scala> import com.twitter.util.{Await, Future,Promise}
import com.twitter.util.{Await, Future, Promise}

scala> val f6 = Future.value(6) // create already-resolved future
f6: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@c63a8af

scala> Await.result(f6)
res0: Int = 6

scala> val fex = Future.exception(new Exception) // create resolved sad future
fex: com.twitter.util.Future[Nothing] = com.twitter.util.ConstFuture@38ddab20

scala> Await.result(fex)
java.lang.Exception
  ... stack trace ...

scala> val pr7 = new Promise[Int] // create unresolved future
pr7: com.twitter.util.Promise[Int] = Promise@1994943491(...)

scala> Await.result(pr7)
  ...console hangs, waiting for future to resolve...
Ctrl-C
Execution interrupted by signal.

scala> pr7.setValue(7)

scala> Await.result(pr7)
res1: Int = 7

scala>

実運用でFuture型を利用する場合は,Await.result()を利用すべきではありません。代わりにコールバックを設定してください。あくまでREPLで簡単なデモを実行するためにAwait.result()を利用しているのです。

シーケンシャルの統合

Future型は mapやflatMapに似た集計関数を実装しています。コレクション・コンビネーターと聞くとこのようなことを思い出すはずです。「整数のリストがあってこれをすべて2乗したい、map を利用してみよう」これはスマートなやり方です。コンビネーター関数を別の関数と組み合わせることで効果的に別の関数を定義できます。Futureコンビネーターを簡単に説明すると「ここにはもしかしたら整数型の数値が入るかもしれないFuture型の変数と2乗する関数がある、これをmapしたい」

非同期APIを実装したい場合にリクエストの値を参照してAPIがレスポンスの値をFuture型にラッピングして渡すことになります。これらの入力値と関数をFuture型に変換するコンビネーターは極めて有用で、一度定義して非同期APIをほかの非同期APIで簡単に利用できるようになるからです。

極めて優れたFutureコンビネーターはflatMapです。

def Future[A].flatMapB: Future[B]

flatMapは二つのFuture型のシーケンスです。つまり、Future型と非同期関数を利用して別のFuture型を返すものです。このメソッドの記法はストーリーを伝えてくれます。Future型の前のメソッドでうまく処理が完了した変数を考えましょう。関数fは次のFuture型を返します。flatMapは入力値のFuture型が正常に前のメソッドで返却されたときに、自動的にfを呼び出します。この操作の結果値はこれらのFuture値の両方が正常に処理されたときのみに返却されます。もしどちらかのFuture値が失敗したときには、このFuture型も同様に失敗します。この暗黙に織り込まれたエラーは構文的に重要な場合はのみエラーを返すことになります。flatMapはこれらの構文を含むコンビネーターの標準的な名前です。

もし、あるFuture型を非同期APIに適用した場合はflatMap型を利用してください。たとえばUser型が格納されたFuture型Future[User]を受け取り、そのユーザーがブロックされていることをあらわすFuture[Boolean]を返却したいとします。これをユーザーのブロック判定に使うisBannedAPIとして定義します。しかし、これは非同期であるので、flatMapを利用することができます。

scala> import com.twitter.util.{Await, Future,Promise}
import com.twitter.util.{Await, Future, Promise}

scala> class User(n: String) { val name = n }
defined class User

scala> def isBanned(u: User) = { Future.value(false) }
isBanned: (u: User)com.twitter.util.Future[Boolean]

scala> val pru = new Promise[User]
pru: com.twitter.util.Promise[User] = Promise@897588993(...)

scala> val futBan = pru flatMap isBanned // apply isBanned to future
futBan: com.twitter.util.Future[Boolean] = Promise@1733189548(...)

scala> Await.result(futBan)
  ...REPL hangs, futBan not resolved yet...
Ctrl-C
Execution interrupted by signal.

scala> pru.setValue(new User("prudence"))

scala> Await.result(futBan)
res45: Boolean = false

scala>

同様に、同期関数をあるFuture型に適用してみることにしましょう。このときはmapを利用します。たとえば、Future[RawCredentials]があり、これからFuture[Credentials]を返却したいとします。ここで同期で処理を行うnormalize関数を定義して、RawCredentialsをCredentialsに変換するとしましょう。

scala> class RawCredentials(u: String, pw: String) {
     |   val username = u
     |   val password = pw
     | }
defined class RawCredentials

scala> class Credentials(u: String, pw: String) {
     |   val username = u
     |   val password = pw
     | }
defined class Credentials

scala> def normalize(raw: RawCredentials) = {
     |   new Credentials(raw.username.toLowerCase(), raw.password)
     | }
normalize: (raw: RawCredentials)Credentials

scala> val praw = new Promise[RawCredentials]
praw: com.twitter.util.Promise[RawCredentials] = Promise@1341283926(...)

scala> val fcred = praw map normalize // apply normalize to future
fcred: com.twitter.util.Future[Credentials] = Promise@1309582018(...)

scala> Await.result(fcred)
   ...REPL hangs, fcred doesn't have a value yet...
Ctrl-C
Execution interrupted by signal.

scala> praw.setValue(new RawCredentials("Florence", "nightingale"))

scala> Await.result(fcred).username
res48: String = florence

scala>

ScalaにはflatMapを呼び出すための短縮構文があります。それはfor構文のことです。 その威力を試すためにこのような条件を考えてみて下さい。非同期APIでログイン認証を実装したいとします。同時にアカウントが凍結されていないかを非同期APIで検証したいとします。for構文を用いるとこのように書くことができます。

scala> def authenticate(req: LoginRequest) = {
     |   // TODO: we should check the password
     |   Future.value(new User(req.username))
     | }
authenticate: (req: LoginRequest)com.twitter.util.Future[User]

scala> val f = for {
     |  u <- authenticate(request)
     |  b <- isBanned(u)
     | } yield (u, b)
f: com.twitter.util.Future[(User, Boolean)] = Promise@35785606(...)

scala>

これでuserオブジェクトとアカウントが凍結されているかを示すBoolean型からFuture型のf:Future[(User,Boolean)]を生成することができます。ここで注意すべきなのはシーケンシャルの混合が必要になっているという点です。isBannedはauthenticateの出力値を引数として取る必要があります。

同時に返り値を取る場合の対応

もしかしたら一度に複数のサービスから情報を取得する必要があるかもしれません。たとえば、コンテンツと広告を同時に表示したい場合です。ここではコンテンツとは別々に広告を表示するとします。しかし両方のサービスにリクエストを送って結果を受けとるロジックを構築するのは難しいかもしれません。もしこの種のサービスを作った経験があればトリッキーな手法が必要なことがお分かりでしょう。しかしコンカレント・コンビネーターがあれば簡単です。

Future型はいくつかのコンカレント・コンビネーターを実装しています。一般的に、これはFutureのシーケンスを、少々違った方法でこれは別のFutureのシーケンスに変換するものです。これはいくつかのFuture型を1つのFuture型にまとめることができるという点で優れたやり方です。

object Future {
  …
  def collect[A](fs: Seq[Future[A]]): Future[Seq[A]]
  def join(fs: Seq[Future[_]]): Future[Unit]
  def select(fs: Seq[Future[A]]) : Future[(Try[A], Seq[Future[A]])]
}

collectが同じ型を持つFuture型の集合を引数にして、その同じ型のシーケンスを持つFuture型を返します。このFuture型は引数にとるすべてのFuture型が正常に格納された場合にのみ呼び出され、そのうち1つでも失敗すると呼び出されることがありません。返り値のシーケンスの順番は引数のシーケンスの順番と同じになります。

scala> val f2 = Future.value(2)
f2: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@13ecdec0

scala> val f3 = Future.value(3)
f3: com.twitter.util.Future[Int] = com.twitter.util.ConstFuture@263bb672

scala> val f23 = Future.collect(Seq(f2, f3))
f23: com.twitter.util.Future[Seq[Int]] = Promise@635209178(...)

scala> val f5 = f23 map (_.sum)
f5: com.twitter.util.Future[Int] = Promise@1954478838(...)

scala> Await.result(f5)
res9: Int = 5

joinは複数の型を持つFuture型のシーケンスを引数にとります。返り値のFuture[Unit]は、やはり引数のFuture型の中身が正常に格納されたときにだけ呼び出されます。これは色々な関数がすべて正常に実行されたことを確認するのに便利な機能です。そしてこれは最初に挙げたコンテンツと広告の問題を解決するのに有効でしょう。

scala> val ready = Future.join(Seq(f2, f3))
ready: com.twitter.util.Future[Unit] = Promise@699347471(...)

scala> Await.result(ready) // doesn't ret value, but I know my futures are done

scala>

select は引数の複数のFuture型のシーケンスの一番最初の変数が正常に格納されたときにFuture型を返却します。この返却されたFuture型は残りの未処理のFuture型を要素に含みます。残りのFuture型の処理が中断された場合も関係ありません。

scala> val pr7 = new Promise[Int] // unresolved future
pr7: com.twitter.util.Promise[Int] = Promise@1608532943(...)

scala> val sel = Future.select(Seq(f2, pr7)) // select from 2 futs, one resolved
sel: com.twitter.util.Future[...] = Promise@1003382737(...)

scala> val(complete, stragglers) = Await.result(sel)
complete: com.twitter.util.Try[Int] = Return(2)
stragglers: Seq[...] = List(...)

scala> Await.result(complete)
res110: Int = 2

scala> Await.result(stragglers(0)) // our list of not-yet-finished futures has one item
  ...Await.result() hangs the REPL because this straggling future is not finished...
Ctrl-C
Execution interrupted by signal.

scala> pr7.setValue(7)

scala> Await.result(stragglers(0))
res113: Int = 7

scala>

コンポジションの例:キャッシュの同時接続制限

これらのコンビネーターはネットワークサービスの典型的な処理を行う場合に役立つものです。この例は同時接続数の制限を行うもので、あるユーザーの一定範囲の時間内のリクエストをバックエンドに引き渡す数を制限するものです。

// Find out if user is rate-limited. This can be slow; we have to ask
// the remote server that keeps track of who is rate-limited.
def isRateLimited(u: User): Future[Boolean] = {
  ...
}

// Notice how you can swap this implementation out now with something that might
// implement a different, more restrictive policy.

// Check the cache to find out if user is rate-limited. This cache
// implementation is just a Map, and can return a value right way. But we
// return a Future anyhow in case we need to use a slower implementation later.
def isLimitedByCache(u: User): Future[Boolean] =  Future.value(limitCache(u))

// Update the cache
def setIsLimitedInCache(user: User, v: Boolean) { limitCache(user) = v }

// Get a timeline of tweets... unless the user is rate-limited (then throw
// an exception instead)
def getTimeline(cred: Credentials): Future[Timeline] =
  isLimitedByCache(cred.user) flatMap {
    case true => Future.exception(new Exception("rate limited"))
    case false =>

      // First we get auth'd user then we get timeline.
      // Sequential composition of asynchronous APIs: use flatMap
      val timeline = auth(cred) flatMap(getTimeline)
      val limited = isRateLimited(cred.user) onSuccess(
                                       setIsLimitedInCache(cred.user, _))

      // 'join' concurrently combines differently-typed futures
      // 'flatMap' sequentially combines, specifies what to do next
      timeline join limited flatMap {
        case (_, true) => Future.exception(new Exception("rate limited"))
        case (timeline, _) => Future.value(timeline)
      }
  }
}

この例はシーケンスとコンカレント・コンポジションを組み合わせるものです。驚くべきなのは同時接続の制限に対するレスポンスを除いてエラー処理を全く行っていないということです。もし引数のFutureが1つでも失敗すれば、自動的に返り値のFuture型に反映されるのです。

コンポジションの例:クローラー

Future型にコンビネーターを利用する方法が理解できたと思います。しかし、もっと使用例を見たいと考える方が多いでしょう。シンプルなHTMLサイトと画像があるとします。そのページは画像と別のページへのリンクが複数あります。あなたはそのリンク先からデータを取得したいとします。しかしAPIは非同期です。この「ニセ」のAPIは「取得可能」なリソースです。

 import com.twitter.util.{Try,Future,Promise}

// a fetchable thing
trait Resource {
  def imageLinks(): Seq[String]
  def links(): Seq[String]
}

// HTML pages can link to Imgs and to other HTML pages.
class HTMLPage(val i: Seq[String], val l: Seq[String]) extends Resource {
  def imageLinks() = i
  def links = l
}

// IMGs don't actually link to anything else
class Img() extends Resource {
  def imageLinks() = Seq()
  def links() = Seq()
}

// profile.html links to gallery.html and has an image link to portrait.jpg
val profile = new HTMLPage(Seq("portrait.jpg"), Seq("gallery.html"))
val portrait = new Img

// gallery.html links to profile.html and two images
val gallery = new HTMLPage(Seq("kitten.jpg", "puppy.jpg"), Seq("profile.html"))
val kitten = new Img
val puppy = new Img

val internet = Map(
  "profile.html" -> profile,
  "gallery.html" -> gallery,
  "portrait.jpg" -> portrait,
  "kitten.jpg" -> kitten,
  "puppy.jpg" -> puppy
)

// fetch(url) attempts to fetch a resource from our fake internet.
// Its returned Future might contain a Resource or an exception
def fetch(url: String) = { new Promise(Try(internet(url))) }

シーケンシャル・コンポジション

ここで最初のページの最初の画像を取得したいとします。もしかしたらユーザーがお気に入りのページのURLをシェアできるようなサイトかもしれません。この場合別のユーザーにそのリンクが本当に価値のあるものか教えるにはどうすればいいのでしょうか?たとえばページの最初の画像をサムネイルとして表示することにしましょう。

def getThumbnail(url: String): Future[Resource]={
  val returnVal = new Promise[Resource]

  fetch(url) onSuccess { page => // callback for successful page fetch
    fetch(page.imageLinks()(0)) onSuccess { p => // callback for successful img fetch
      returnVal.setValue(p)
    } onFailure { exc => // callback for failed img fetch
      returnVal.setException(exc)
    }
  } onFailure { exc => // callback for failed page fetch
    returnVal.setException(exc)
  }
  returnVal
}

残念ながらこのクローラは実用的ではありません。なぜならいつクローリングを停止するかわからないからです。

サービス

Finagleのサービス(Service)はRPCを取り扱うサービスを提供します。リクエストを取得してリスポンスを返却するのです。Service の定義はReq => Future[Rep] で1つないしは複数のリクエストとリスポンスの型を指定する必要があります。

Finagleではクライアントとサーバーの関係をServiceを軸に構築します。

Finagleのクライアントはネットワーク上からサービスを読み込みます。概念としてはFinagleのクライアントには2つの役割があります。

  • Serviceを利用する関数:Reqを送信して、返り値のFuture[Rep]を処理する
  • リクエストの送信の調整:例としてapi.twitter.comへのHTTPリクエスト

同様にFinagleのサーバーはServiceをネットワーク上に書き出します。もちろん2つの役割があり、

  • Serviceを実装する関数:Reqを受け取りFuture[Rep]を返却する
  • どのようにReqを待ち受けるかを設定:たとえばHTTPリクエストをポート80番で待ち受けるか?

このようにデータフローがネットワークをどのように流れるか?をServiceのロジックから分離することができます。

クライアント(Client)

FinagleのクライアントはServiceを読み込みます。それはネットワーク上でどのようにデータを送信するかを設定するものです。シンプルなHTTPクライアントは以下のようなものです。

import org.jboss.netty.handler.codec.http.{DefaultHttpRequest, HttpRequest, HttpResponse, HttpVersion, HttpMethod}
import com.twitter.finagle.Service
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.http.Http

// Don't worry, we discuss this magic "ClientBuilder" later
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
  .codec(Http())
  .hosts("twitter.com:80") // If >1 host, client does simple load-balancing
  .hostConnectionLimit(1)
  .build()

val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")

val f = client(req) // Client, send the request

// Handle the response:
f onSuccess { res =>
  println("got response", res)
} onFailure { exc =>
  println("failed :-(", exc)
}

サーバー(Server)

サーバーはServiceとネットワークからのリクエストをどのように待ち受けるかの設定によって実装されます。シンプルなHTTPサーバーは以下のようになります。

import com.twitter.finagle.Service
import com.twitter.finagle.http.Http
import com.twitter.util.Future
import org.jboss.netty.handler.codec.http.{DefaultHttpResponse, HttpVersion, HttpResponseStatus, HttpRequest, HttpResponse}
import java.net.{SocketAddress, InetSocketAddress}
import com.twitter.finagle.builder.{Server, ServerBuilder}

// Define our service: OK response for root, 404 for other paths
val rootService = new Service[HttpRequest, HttpResponse] {
  def apply(request: HttpRequest) = {
    val r = request.getUri match {
      case "/" => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
      case _ => new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)
    }
    Future.value(r)
  }
}

// Serve our service on a port
val address: SocketAddress = new InetSocketAddress(10000)
val server: Server = ServerBuilder()
  .codec(Http())
  .bindTo(address)
  .name("HttpServer")
  .build(rootService)

Filter

Filterはサービスを変換します。これはサービスジェネリックの機能性を提供するものです。たとえば、同時接続制限を行うサービスを実装したいとします。この場合は一度、同時接続制限のフィルターを実装すれば、それをすべてのサービスに適応することができます。Filterはサービスを複数の層に分解するような設計に最適です。

class MyService(client: Service[..]) extends Service[HttpRequest, HttpResponse]
{
  def apply(request: HttpRequest) = {
    client(rewriteReq(request)) map { res =>
      rewriteRes(res)
    }
  }
}

ここれrewriteReqとrewriteResはプロトコルの書き換えを行っています。

abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
  extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])

この型は図にするとわかりやすいでしょう。

    ((ReqIn, Service[ReqOut, RepIn])
         => Future[RepOut])


           (*   Service   *)
[ReqIn -> (ReqOut -> RepIn) -> RepOut]

ここでサービスのタイムアウト機能を提供するfilterです。

class TimeoutFilter[Req, Rep](
  timeout: Duration,
  exception: RequestTimeoutException,
  timer: Timer)
  extends Filter[Req, Rep, Req, Rep]
{
  def this(timeout: Duration, timer: Timer) =
    this(timeout, new IndividualRequestTimeoutException(timeout), timer)

  def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
    val res = service(request)

    res.within(timer, timeout) rescue {
      case _: java.util.concurrent.TimeoutException =>
        res.cancel()
        Trace.record(TimeoutFilter.TimeoutAnnotation)
        Future.exception(exception)     }
  }
}

この例は認証を実装する方法を示すものです。このコードはフィルターを利用してHttpReqをAuthHttpReqに変換するものです。次に

Service[HttpReq, HttpRep]型を持つサービスの代わりに、 Service[AuthHttpReq, HttpRep]を変換します。つまり、Serviceは認証済みのものとしてリクエストを扱うのです。

class RequireAuthentication(authService: AuthService)
  extends Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] {
  def apply(
    req: HttpReq,
    service: Service[AuthHttpReq, HttpRep]
  ) = {
    authService.auth(req) flatMap {
      case AuthResult(AuthResultCode.OK, Some(passport), _) =>
        service(AuthHttpReq(req, passport))
      case ar: AuthResult =>
        Future.exception(
          new RequestUnauthenticated(ar.resultCode))
    }
  }
}

フィルターをこのように利用することはいくつかの利点があります。第一に認証ロジックを一箇所に集約することができるということと、認証済みのリクエストに対して分離された型はプログラムのセキュリティの観点から望ましいものだからです。

Filter型はandThenメソッドと統合することができます。これであるServiceの返り値をandThenに渡して別のServiceを作ることができます。

val authFilter: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep]
val timeoutfilter[Req, Rep]: Filter[Req, Rep, Req, Rep]
val serviceRequiringAuth: Service[AuthHttpReq, HttpRep]

val authenticateAndTimedOut: Filter[HttpReq, HttpRep, AuthHttpReq, HttpRep] =
  authFilter andThen timeoutFilter

val authenticatedTimedOutService: Service[HttpReq, HttpRep] =
  authenticateAndTimedOut andThen serviceRequiringAuth

ビルダー(Builders)

Builderですべての機能を統合することができます。clientBuilderはいくつかのパラメータの集合を前提にServiceのインスタンスを作成することができ、ServerBuilderはServiceのインスタンスを取り、リクエストに対して返答することができます。Serviceの型を決定するためにCodecを提供する必要があります。Codecはプロトコルの実装の詳細(HTTP,thrift,memcachedなど)を記載したものです。どちらのビルダーも複数のパラメータを取ることができるので。ほんのすこしだけの追加で済みます。

val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
  .codec(Http)
  .hosts("host1.twitter.com:10000,host2.twitter.com:10001,host3.twitter.com:10003")
  .hostConnectionLimit(1)
  .tcpConnectTimeout(1.second)
  .retries(2)
  .reportTo(new OstrichStatsReceiver)
  .build()

これはポート上にサービスへのリクエストを送信するThriftサーバーを提供します。hostConnectionMaxLifeTimeの行をコメントアウトした場合、最大5分までコネクションが維持されます。もし readTimeoutの行をコメントアウトした場合、2分以内にリクエストが送信される必要があります。 ServerBuilder に最低限必要なのはname, bindTo と codecです。

Licensed under the Apache License v2.0.