2014年9月8日月曜日

Programming in Scala, First Edition: Chapter 30

30. Actors and Concurrency
  Javaでもマルチスレッドプログラミングはサポートされている。
    一通り必要な機能はそろっている。
    大規模な複雑なシステムを上手く作るのは困難。
  ScalaではActorライブラリを用意している。
    Javaのモデルより使いやすい。
  例としてChapter 18の回路シュミレーションのマルチスレッド版を扱う。

30.1 Trouble in paradise
  Javaの組み込みのスレッドは共有メモリとロックをベースにしている。
    各オブジェクトをmonitorに関連付ける。
    共有データをsynchronizedになっているコード部分からアクセスする。
    ロックを使ってsynchronizedのコードが他スレッドから割り込まれない様にする。
  大規模で複雑になると共有メモリとロックでは安全に作るのが困難。
    どのデータが他のスレッドからアクセスされるのか把握している必要がある。
    どのロックを取るべきかを把握している必要がある。
    デッドロックしない様にする必要がある。
    机上チェックのみでプログラムを正しく作るしかない。
      コンパイル時に間違いをチェック出来ない。
      テストもタイミングに依存して問題を摘出出来ない場合がある。
    ロックを過不足なしに実装する必要がある。
      過剰にロックしても問題になる。ロック不足でも問題になる。
      安全のためにロックを多めにするという戦略も使えない。
    Java 5のjava.util.concurrentライブラリより使いやすくはなっている。
      それでもロックと共有メモリベースなので本質的は難しい。
  Scalaではメッセージパッシングモデルを使えるようにした。
    共有メモリやロックより使いやすい。
      デッドロックや競合が発生しにくい。

30.2 Actors and message passing
  Actorとは?
    スレッドの様なentity
    メッセージを受信するためのメールボックスを持っている。
  一番簡単なActorの例
    scala.actors.Actorをextendして使う。
      act()メソッドを実装する。
      start()メソッドでスレッドを生成、実行する。
    import scala.actors._
    object SillyActor extends Actor {
      def act() { for (i <- 1 to 5) {
        println("I'm acting!"); Thread.sleep(1000) }}}
    SillyActor.start()
  actor関数を使えばstart()メソッドを使わずにいきなりThreadを生成、実行出来る。
    scala> val a = actor { Thread.sleep(1000); println("hoge") }
    a: scala.actors.Actor = scala.actors.Actor$$anon$1@1d638bc
  ! メソッドでactorにメッセージを送る
    scala> SillyActor ! "hi there"
  receiveメソッドでメッセージを受け取る
    val echoActor = actor { while (true) { receive {
      case msg => println("received message: "+ msg) }}}
    receiveはpartial function(Section 15.7)にメッセージを渡す。
      caseにマッチするメッセージのみが渡される。
      マッチしないものは単に無視される。
  メッセージを送るときにブロックはされない。
    何時でもメッセージを送ることが出来る。
    ?? blockingにしてタイミングを合わせる事は出来ない ??
  メッセージを受信しても割り込まれない。
    receiveが呼ばれるまでメッセージはキューの中に溜まっている。
    ?? キューが一杯になったら? キューの大きさは制限出来ない ??
  ?? actorを外から強制的に止める方法がない? Java threadのstop()は使えない??

30.3 Treating native threads as actors
  actorの中ではJavaのnative threadを一つか沿い令嬢使っている。
    actorを普通に使っている限りでは、どうマップされているか意識する必要はない。
  元々Javaのnative threadとして使われているthreadをactorで扱う事が出来る。
    Thread.currentを直接は使用できない。
      actorとして必要な情報が足りないから。
    Actor.selfを使う。
      これはshellからactorのデバッグをする時にも便利。
      shellが動いているthreadをactorとして使える。
        scala> self ! "hello"
        scala> self.receive { case x => x }
        res6: Any = hello
      shellを永久にblockするのを避けるのにreceiveWithinでタイムアウト指定する。
        scala> self.receiveWithin(1000) { case x => x } // wait a sec!
        res7: Any = TIMEOUT

30.4 Better performance through thread reuse
  actorはJavaのthread上に実装されている。
  Javaのthreadは名前に似合わずに「重い」
    沢山メモリを消費する。
      数約万のobjectを作れる様なVMでも数千のthreadしか作れない。
    threadの切り替えに時間が掛かる。
      数百サイクルのプロセッサ時間を消費する。
  性能を上げるためにはthreadの生成と切り替えをなるべく少なくしたい。
  reactと言うメソッドを使えばこれが出来る。
    receiveと同様に部分関数を引数にとる。
    receiveと違ってNothingがreturn type。
      メッセージハンドラを評価するのみで関数から戻らないということ。
    実行結果は他のactorにメッセージを送るなどして受け渡す。
  reactはThreadの生成や切り替えをしない実装になっている。
    関数から戻った後処理が無いので呼ぶ前までのコールスタックが不要。
    ?? クロージャのみ保持しておけOKと言うこと ??
    あるメッセージを処理したら同じthreadで別のactorを処理出来る。
    receiveの替りにreactを使えば原理的にはsingle threadでOK。
      マルチコアの場合はそれに見合うだけのthreadを使うように実装されている。
  極力reactを使うようにすべき。
  reactから戻れないのでメッセージを受信した後の処理も全てreactから行う必要あり。
    通常はact自身のようなtop levelの処理をメッセージ受信処理後に呼び出す。
      ?? 要するにループにしておくということ??
    Actor.loopを使えばblock内にreactがあってもループ処理を実現できる。
      ?? 抜けるときはどうする? exceptionで抜ける?
  return typeがNothingだと言う事は関数から戻らないと言う事だが、exceptionでは抜けられる。
    reactについてもこれは成り立つ。
  reactの実装について。下記ほど単純ではないが、概念的にはそうなっている。
    actorのstartを呼ぶと、threadが割り当てられてactが呼ばれる。
    actからreactを呼び出すと処理できるメッセージがあるかを調べる。
    ?? あったらメッセージを後で処理する準備をしてexceptionを投げる。??
    無かったら自身を"cold storage"に保管した後にexceptionを投げる。
      後でメッセージが来たら復活出来るように。
    どちらの場合でもreact->actはexceptionで無理やり完了させられる。
      actを呼んだthreadはexceptionを捕捉して、次の処理に移る。
        actorを呼んだと言う事は忘れてしまう。
    複数のメッセージは処理するためには部分関数内からactを再び呼ぶ必要がある。

30.5 Good actors style
  actorの良いところはactors styleでマルチスレッドプログラムが出来る事。
    良いactors styleで書くためのガイドラインを本節で示す。
  Actors should not block
    actorはメッセージ待ち以外でblockすべきではない。
    blockしている最中にメッセージを処理できないから。
      三竦みになってdeadlockする可能性がある。
    blockする代わりに起床用のメッセージを送るhelperのactorを使う。
      sleepの代わりに一定時間後にメッセージを送るhelperを使う。
      このhelper自体はメッセージ待ちしないのでsleepしても問題ない。
    ** UNIX系でも待ちをselect等で一本化すべきなのと同じ。
  Communicate with actors only via messages
    メッセージのみで通信すべきで、共有データを使うべきでない。
      共有データを使うと結局ロックも必要となってactorの良さが生かせない。
    ただし、scalaでは共有データ、ロックとの併用も許してはいる。
      ここはerlangよりも自由度がある。
      信頼できるライブラリで共有データ、ロックが隠ぺいされているなら使っても良いかも。
      actorで作り直すことも出来るが、どちらが簡単で安全か?
    If you're considering shared data and locks
      ?? ダーティーハリーの"Do I feel lucky?"の一節が参考になる。??
  Prefer immutable messages
    actorのactメソッドの中でもthread-safeとかを気にしなくて良い。
      非同期、mutalbeなオブジェクトも使える。
      "share-nothing model"と呼ばれている。
    メッセージに入れて送るオブジェクトだけは例外。
      thread-safeか意識する必要がある。
    一番良いのはimmutableにする事。
      case classにするのが簡単。
      Scalaのimmutableライブラリや自作のものでもOK。
      安全な上にリソースをあまり消費しないで済む。
      並列計算をする場合は特に有効。
        actorを使わない場合でも有効。
    mutableにする場合はどのactorにアクセス権限があるのか明確にする必要がある。
      mutableなオブジェクトを送ったらそれ以降はアクセスしないなら安全。
        その後、他人が修正する時に間違うリスクはあるが。。。
      アクセス権限をメッセージでやり取りする方法もある。
      複数でアクセスする必要があるならコピーすべき。
        単にコピーするよりimmutableに変換してコピーするほうが望ましい。
  Make messages self-contained
    通常のメソッド呼び出しではリターン後に呼び出し元は何をすべきか分かっている。
    actorの場合はreceive後にどうするかを決めるのが難しい場合がある。
      send後にreceiveするのはしばらく後の場合もある。
      send時の状況を思い出す必要があるため。
    メッセージに冗長な内容を含めてこれを保管する。
      requestの内容をresponseにそのまま含める。
        requestがimmutableならあまりリソースを消費しない。
      selfを含める。
        メッセージが誰から来たのか分かりやすくなる。
      case classにしてwrapした方がコードが分かりやすくなる。

30.6 A longer example: Parallel discrete event simulation
  Chapter 18の離散事象シミュレーションをactorで並列実行可能にする。
    並列化すればマルチコア等を使って高速化できる。
    シミュレーションの各々の要素をactorにする。
  Overall Design
    Chapter 18を大きく変えずにactorで並列化出来る。
      各々の要素をactorにしてeventをメッセージにすれば良い。
      各要素で共通している部分はtrait Simulantとして各要素でextendsする。
    各要素の同期方法
     clock actorを用意して各要素をクロック同期させる。
      clock actorから各要素に各周期の開始をpingメッセージで伝える。
      各要素は処理が終了したらpongメッセージでclockに知らせる。
      ping/pongにはパラメータは本来不要だが、分かりやすくするためにtickを入れる。
        debugの時に特に有効。
    各要素を直接通信させずにclockを経由させる。
      各要素が直接通信すると、各周期の終わりが何時なのか判断できない。
        他の要素からさらにメッセージが来るかも知れないから。
      各要素は次の周期のためのイベントをclockに送る。
      clockはそれをまとめてagendaとして各要素に送る。
      各要素はpingを受けたらagendaを処理する。
    シミュレーションの開始。
      全ての要素を立ち上げて、受信可能状態になったらclockを動かせば良い。
  Implementing the simulation framework
    メッセージをcase classで定義する。
      case class Ping(time: Int)
      case class Pong(time: Int, from: Actor)
      case class WorkItem(time: Int, msg: Any, target: Actor)
      case class AfterDelay(delay: Int, msg: Any, target: Actor)
      case object Start
      case object Stop
    core frameworkへの追加要素
      Clock class と Simulant trait
    class Clock extends Actor {
      // running = falseで上がってStartメッセージが来たらtrueになる。
      // falseの間はclockからメッセージを出さないので要素は動作しない。
      // これにより初期化中はメッセージを使わずに普通に関数を呼んでOK。
      private var running = false
      private var currentTime = 0
      private var agenda: List[WorkItem] = List()
      // 存在する全ての要素
      private var allSimulants: List[Actor] = List()
      // current time tickの処理がまだ終わっていない(Pongを返していない)要素
      // busySimulantsがemptyになったら次のtickに進める。
      // tickを進める時にallSimulantsをbusySimulantsにコピーする。
      private var busySimulants: Set[Actor] = Set.empty
      // Startメッセージ来るまで何もしないので、先にactorとして起動しても安全。
      start()
      // 要素をClockに登録する。
      def add(sim: Simulant) {
        allSimulants = sim :: allSimulants
      }
      // tickを進める処理とtick中のメッセージ受信に処理を分ける。
      def act() {
        loop {
          if (running && busySimulants.isEmpty) advance()
          reactToOneMessage()
        }
      }
      // tickを進める処理。
      def advance() {
        if (agenda.isEmpty && currentTime > 0) {
          println("** Agenda empty.  Clock exiting at time "+ currentTime+".")
          self ! Stop
          return
        }
        currentTime += 1
        println("Advancing to time "+currentTime)
        // Ping前にagendaを要素に送っている。
        // Ping前はClockは受信しないので、要素からメッセージを送っても影響なし。
        // 要素はPing受信直後にPongを返すことになる。
        processCurrentEvents()
        for (sim <- allSimulants) sim ! Ping(currentTime)
        busySimulants = Set.empty ++ allSimulants
      }
      private def processCurrentEvents() {
        // currentTimeより前の物は無いはずだが念のために<=にしている。
        // currentTime>_.time が見つかったらその時点でループを止めても良い。
        val todoNow = agenda.takeWhile(_.time <= currentTime)
        // agendaはinsert時にtickでsortされているので前から順番に捨てられる。
        agenda = agenda.drop(todoNow.length)
        for (WorkItem(time, msg, target) <- todoNow) {
          // currentTimeより前の物は前tickで既に処理されて存在しないはず。
          assert(time == currentTime)
          target ! msg
        }
      }
      def reactToOneMessage() {
        react {
          case AfterDelay(delay, msg, target) =>
            val item = WorkItem(currentTime + delay, msg, target)
            // insertはListing 18.8と同じ。tickでsortされる様に挿入する。
            agenda = insert(agenda, item)
          case Pong(time, sim) =>
            assert(time == currentTime)
            assert(busySimulants contains sim)
            busySimulants -= sim
          // Startは単にrunning = trueのみでOK。
          // reactを抜けた後loop=>advanceで動作が始まる。
          // Start直後のtick=1だけ特別扱い
          //   Ping受信後にagendaがSimulantから送られてくるように細工してある。
          case Start => running = true
          case Stop =>
            for (sim <- allSimulants)
              sim ! Stop
            exit()
        }
      }
    }
    // 各要素の共通部分
    trait Simulant extends Actor {
      val clock: Clock
      // 共通メッセージ(Ping/Stop)以外の処理。subclassで定義。
      def handleSimMessage(msg: Any)
      // Start直後のtick=1のPing受信後のみに実行される立ち上げ処理。subclass。
      def simStarting() { }
      def act() {
        loop {
          react {
            case Stop => exit()
            case Ping(time) =>
              // Start直後のtick=1だけ特別扱い。
              // Ping受信後に最初のagendaをClockに送る等立ち上げ処理する。
              // それ以外ではClockからのagenda受信処理の中でClockに送り返す。
              if (time == 1) simStarting()
              // Start直後のtick=1以外はPongを返すのみ。
              clock ! Pong(time, self)
            case msg => handleSimMessage(msg)
          }
        }
      }
      // インスタンス化されるタイミングactorとしてはstartさせる。便利だから。
      // メッセージを受けるまで何もしないので安全。
      start()
    }
    Chapter 18の並列化前のコードと同様に驚くほど少ないコードで実現出来た。
  Implementing a circuit simulation
    シミュレーションする回路(Class Circuit)の実装。
      wireとgateとclockからなる。
      wireはtrue/falseを保持するのみ。
      gateはwireを結合する。入力wireの値から出力wareの状態を変更する。
      wire、gate等はClass Circuit内のみで使うので、sub classにした方が良い。
  class Circuit {
    val clock = new Clock
    // simulation messages
    // gateからwireの状態をセットするためのメッセージ
    case class SetSignal(sig: Boolean)
    // wireからgateにwireの状態の変化を知らせるためのメッセージ
    case class SignalChanged(wire: Wire, sig: Boolean)
    // delay constants
    // 必要に応じてdelayを変更できるように定数に定義しておく。
    // ** C言語でよくある#defineで定数を定義するのと同じ。コード変更必要。
    val WireDelay = 1
    val InverterDelay = 2
    val OrGateDelay = 3
    val AndGateDelay = 3
    // Wire classes and methods
    class Wire(name: String, init: Boolean) extends Simulant {
      def this(name: String) { this(name, false) }
      def this() { this("unnamed") }
      // Wireをnewする時にはouterのCircuitがnewされていて、その中のclockを使う。
      val clock = Circuit.this.clock
      // Wireがnewされる時に呼ばれる。
      clock.add(this)
      private var sigVal = init
      // WireをInputとしているGateのリスト
      private var observers: List[Actor] = List()
      // SetSignalのメッセージを受け取ったらobserversにSignalChangedを知らせる。
      def handleSimMessage(msg: Any) {
        msg match {
          case SetSignal(s) =>
            if (s != sigVal) {
              sigVal = s
              signalObservers()
            }
        }
      }
      def signalObservers() {
        for (obs <- observers)
          clock ! AfterDelay(
            WireDelay,
            SignalChanged(this, sigVal),
            obs)
      }
      // 信号の初期状態をobserversに知らせる。
      override def simStarting() { signalObservers() }
      // WireをInputとしているGateの追加。初期化時のみなので関数コールでOK。
      def addObserver(obs: Actor) {
        observers = obs :: observers
      }
      override def toString = "Wire("+ name +")"
    }
    // Gate classes and methods
    // and/or/invertの3種類のGateを実装する。
    // 共通分はabstract classにする。
    // 共通化のためにintertも2入力に統一して、片側を無視する様にする。
    // 無視する入力は常にfalseのDummyWireと接続しておく。
    private object DummyWire extends Wire("dummy")
    // Gateの共通部分
    abstract class Gate(in1: Wire, in2: Wire, out: Wire) extends Simulant {
      def computeOutput(s1: Boolean, s2: Boolean): Boolean
      // defでも定義出来るが、valにした方が定数であることを表しやすい。
      val delay: Int
      val clock = Circuit.this.clock
      clock.add(this)
      in1.addObserver(this)
      in2.addObserver(this)
      // 入力のWireの状態のキャッシュ。
      // 変更時しか通知されないため保持する必要あり。
      var s1, s2 = false
      // 入力が変更されたらそれをキャッシュして更新後の出力を通知する。
      def handleSimMessage(msg: Any) {
        msg match {
          case SignalChanged(w, sig) =>
            if (w == in1)
              s1 = sig
            if (w == in2)
              s2 = sig
            clock ! AfterDelay(delay,
                SetSignal(computeOutput(s1, s2)),
                out)
        }
      }
    }
    // 各Gateはutilityメソッドの副作用としてする。
    // delayの設定と演算の定義を行う。
    // 抽象クラスGateをanonymousクラス経由で直接newする。
    def orGate(in1: Wire, in2: Wire, output: Wire) =
      new Gate(in1, in2, output) {
        val delay = OrGateDelay
        def computeOutput(s1: Boolean, s2: Boolean) = s1 || s2
      }
    def andGate(in1: Wire, in2: Wire, output: Wire) =
      new Gate(in1, in2, output) {
        val delay = AndGateDelay
        def computeOutput(s1: Boolean, s2: Boolean) = s1 && s2
      }
    // inveter自体は1入力だが内部でGateを生成する際にDummyWireに置き換える。
    def inverter(input: Wire, output: Wire) =
      new Gate(input, DummyWire, output) {
        val delay = InverterDelay
        def computeOutput(s1: Boolean, ignored: Boolean) = !s1
      }
    // misc. utility methods
    // Wireの状態を表示するutilityメソッド。
    // 要素の一つとして登録するだけでOK。
    def probe(wire: Wire) = new Simulant {
      val clock = Circuit.this.clock
      clock.add(this)
      wire.addObserver(this)
      def handleSimMessage(msg: Any) {
        msg match {
          case SignalChanged(w, s) =>
             println("signal "+ w +" changed to "+ s)
        }
      }
    }
    // clockにStartメッセージを送ってシミュレーションを開始する。
    def start() { clock ! Start }
  }
  Circuit classの使い方。
    Circuitをnewする。
    Wireをnewする。
    Gateをutility関数を呼び出して作成する。
    気になるWireをprobeする。
    start()を呼び出す。
  Circuitの拡張。
    Circuitをextendsしたtraitを作れば要素を追加できる。
      Circuitのメンバに自由にアクセスする事が出来る。
    加算器の追加。
    trait Adders extends Circuit {
      def halfAdder(a: Wire, b: Wire, s: Wire, c: Wire) {
        val d, e = new Wire
        orGate(a, b, d)
        andGate(a, b, c)
        inverter(c, e)
        andGate(d, e, s)
      }
      def fullAdder(a: Wire, b: Wire, cin: Wire,
          sum: Wire, cout: Wire) {
        val s, c1, c2 = new Wire
        halfAdder(a, cin, s, c1)
        halfAdder(b, s, sum, c2)
        orGate(c1, c2, cout)
      }
    }
    使うときは下記。
      val circuit = new Circuit with Adders
    classの代わりにtraitを沢山用意すれば必要な部分をmixして使える。
      val circuit = new Circuit with Adders with Multiplexers with FlipFlops ..
  Trying it all out
    実際のシミュレーションの例。
    object Demo {
      def main(args: Array[String]) {
        val circuit = new Circuit with Adders
        //直後にimport circuit._する事でcircuitのメンバに簡単アクセスできる。
        import circuit._
        val ain = new Wire("ain", true)
        val bin = new Wire("bin", false)
        val cin = new Wire("cin", true)
        val sout = new Wire("sout")
        val cout = new Wire("cout")
        probe(ain)
        probe(bin)
        probe(cin)
        probe(sout)
        probe(cout)
        fullAdder(ain, bin, cin, sout, cout)
        circuit.start()
      }
    }

30.7 Conclusion
  コンカレントプログラムは素晴らしい。
    コードを簡単に出来る。
    マルチコアを活用して性能を上げられる。
  コンカレントプログラムは地雷原。
    thread, lock, monitorで正しくdeadlockや競合を避けるのは大変。
  actorを使えば地雷原から脱する事が出来る。
    reactを使えば性能も改善できる。

0 件のコメント:

コメントを投稿