トップ «前の日記(2009-02-28) 最新 次の日記(2009-03-02)» 編集

日々の破片

Subscribe with livedoor Reader
著作一覧

2009-03-01

_ IOU設計パターン

というわけで(昨日の続き)、DDJJ 1996年10月号の『非同期設計パターン』だが、.NET Frameworkではそれらしきものが採用されているものの、そんなに広く使われているわけでもなさそうだ。

IOUパターンは、非同期IOを同期IOモデルのように、オブジェクトの利用者に見せかけるためのパターンで、非同期IOの結果をまるで同期IOのように、呼び出し側に返す。しかし、同期IOではなく、すぐに返す。

そのため、呼び出し側はお話にならないエラーはすぐに検出できる(これは同期IOでも同様。たとえばクローズ済みIOに対してメソッドを呼んだ場合)。

そうではない、たぶん、実行されるであろうIOについても、すぐに戻る。ただし、結果として返されるオブジェクトは、同期IOの場合と異なり、IOUオブジェクトだ。このオブジェクトは、筆者のAllan Vermeulenによって以下のように定義されている。訳は上林靖氏。

IOUとは、サプライヤ(呼び出された関数)からのアグリーメントであり、約束したオブジェクトを提供することによって最終的に閉じられる。いったんIOUが閉じられてしまえば、サプライヤが提供したオブジェクトでIOUを償還すればよいのである。それまでの間プログラムコードは、他の有用な作業を続けることができるのだ。
IOUの概念は、2つの理由により有効である。まず単純であること。IOUをリターンする関数の呼び出しは、そうでない関数の呼びだしと全く同じに書くことができる。第2にIOUは、サプライヤが使用する非同期機構(もしあればだが)から完全に独立していることである。

2番目の有効性については、IAsyncResultがまあ証明していると言える。

以下に、Rubyで実装したIOUの例を示す。ここではIOUはHTTP 1.1の送信と受信をカプセル化する。チャンクの処理はいい加減だがやっつけ仕事の意味のハックなのでしょうがない。

#!/usr/local/bin/ruby -Ku
# coding: utf-8
 
require 'socket'
 
class AsyncSocket < TCPSocket
  class IOU
    def initialize(socket, queue, data = nil)
      @socket = socket
      @queue = queue
      @trns = data
      @complete = nil
      @callback = nil
      queue << self
      do_io
    end
 
    ##
    # true if this IOU was closed
    #
    def close?
      @complete ? true : false
    end
 
    ##
    # wait until redeem is ready (IOU was closed)
    #
    def stand_by
      Thread.pass until @complete
    end
    
    ##
    # return the data or waiting it
    #
    def redeem
      stand_by
      @complete
    end
    
    ##
    # register user method called after closing this IOU
    #
    def add_callback(&f)
      p "add_callback(#{f})" if $DEBUG
      if @complete
        f.call(@complete) 
      else  
        @callback = f
      end  
    end
    
    def inspect
      "#<IOU:#{hash}, @complete=#{@complete}, @callback=#{@callback}>"
    end
    
    protected
 
    def do_io
      return unless @queue.empty? || @queue[0] == self
      Thread.start do
        p "start thread by #{self}" if $DEBUG
        begin
          if @trns
            @complete = @socket.write(@trns)
          else
            r = ''
            begin
              r << @socket.read(1)
            end until r[-4..-1] == "\r\n\r\n"
            m = r.match(/^content-length\s*:\s*(\d+)\s*$/i)
            if m
              r << @socket.read(m[1].strip.to_i)
            else
              r << read_chunk
            end  
            @complete = r
          end
        rescue
          @complete = $!  
        end
        @queue.shift
        if @queue.length > 0
          @queue[0].do_io
        end
        @callback.call(@complete) if @callback
        p "exit thread by #{self}, current queue = #{@queue}" if $DEBUG        
      end  
    end  
    def read_chunk
      r = ''
      loop do
        sz = ''
        begin
          sz << @socket.read(1)
        end until sz[-2..-1] == "\r\n"
        p "chunk #{sz}" if $DEBUG
        r << sz
        len = sz[0..-3].to_i(16)
        break if len == 0 
        r << @socket.read(len)
        begin
          r << @socket.read(1)
        end until r[-2..-1] == "\r\n"
      end
      r
    end
  end
  
  def initialize(host, service)
    super(host, service)
    @queue = []
  end
 
  def async_read
    IOU.new(self, @queue)
  end
  
  def async_write(s)
    IOU.new(self, @queue, s)
  end
end
 
if $0 == __FILE__
  sock = AsyncSocket.new("www.google.com", 80)
  ious = []
  ['iou', 'java', 'async'].each do |word|
    puts "search #{word}"
    iou = sock.async_write("GET /search?q=#{word} HTTP/1.1\r\nHost: www.google.com\r\n\r\n")
    iou.add_callback do |result|
      puts "wrote #{result} bytes"
    end  
    ious << iou    
    iou = sock.async_read
    iou.add_callback do |result|
      printf "%.80s\n", result
    end
    ious << iou
  end
=begin  すべてのIOUの完了を待つ場合
  loop do
    break if ious.map {|iou| iou.close? }.uniq == [true]
    Thread.pass
  end
=end    最後のIOUの完了を待つ。この例では設定したasync_callbackも実行される
  printf "%.80s\n", ious.last.redeem
  sock.shutdown
end

ここでは、見掛け上は同時に複数のクェリーをGoogleに対して実行する。ただし、実際には1つのコネクションを利用して、一問一答形式で行われる。

実行例を示す。

C:\home\test>ruby ..\doc\books\rbasic\samples\3\iou.rb
search iou
search java
search async
wrote 52 bytes
HTTP/1.1 200 OK
Cache-Control: private, max-age=0
Date: Sat, 28 Feb 2009 15:41
wrote 53 bytes
 
HTTP/1.1 200 OK
Cache-Control: private, max-age=0
Date: Sat, 28 Feb 2009 15:
wrote 54 bytes
 
HTTP/1.1 200 OK
Cache-Control: private, max-age=0
Date: Sat, 28 Feb 2009 15:
 
HTTP/1.1 200 OK
Cache-Control: private, max-age=0
Date: Sat, 28 Feb 2009 15:

4回結果を表示しているのは、最後のIOUに対してredeemメソッドの呼び出しとasync_callbackの呼び出しが行われているからだ。通常はどちらか一方を利用することになる。

このプログラムは1.8.7、1.9.1の両方で実行できるが、1.9.1では標準出力をスレッド間でシェアしているためか、改行が他のスレッドの出力に食われて正しく処理されなかったりするようだが、そういうものなのだろう。

ここで示したIOUの実装例では、実際のIOはスレッドを利用して同期的に行っている。しかし、仮に非同期IOを利用するように変えたとしても、それは呼びだし側にとってはIOUによって隠されているため、呼び出し側のコードには影響しない。

さて、これでこのDDJJを捨てられる。(他にはそれほど興味深い記事はなくもないけど、まあないとする)

本日のツッコミ(全2件) [ツッコミを入れる]
_ yojik (2009-03-01 17:47)

Doug LeaのFutureパターンと動機と実装がほぼ同じ感じですね。

_ arton (2009-03-01 19:33)

詳細を隠す(IOUの第2の目的)という意味では似ている(実装含め)と思いますが、IOUの第1の目的は、非同期IOを同期IO並みに呼び出し側にとって単純にする(呼び出し側は非同期IOを意識する必要があるので、戻り値としてIOUを返す。Futureは単に意識させないのだと思います)で、むしろそこが肝だと思うので、僕にはそれほど似ているようには思えなかったです。


2003|06|07|08|09|10|11|12|
2004|01|02|03|04|05|06|07|08|09|10|11|12|
2005|01|02|03|04|05|06|07|08|09|10|11|12|
2006|01|02|03|04|05|06|07|08|09|10|11|12|
2007|01|02|03|04|05|06|07|08|09|10|11|12|
2008|01|02|03|04|05|06|07|08|09|10|11|12|
2009|01|02|03|04|05|06|07|08|09|10|11|12|
2010|01|02|03|04|05|06|07|08|09|10|11|12|
2011|01|02|03|04|05|06|07|08|09|10|11|12|
2012|01|02|03|04|05|06|07|08|09|10|11|12|
2013|01|02|03|04|05|06|07|08|09|10|11|12|
2014|01|02|03|04|05|06|07|08|09|10|11|12|
2015|01|02|03|04|05|06|07|08|09|10|11|12|
2016|01|02|03|04|05|06|07|08|09|10|11|12|
2017|01|02|03|04|05|06|07|08|09|10|11|12|
2018|01|02|03|04|05|06|07|

ジェズイットを見習え