カテゴリー
Uncategorized

twisted で非同期で実行される関数の前後にイベントを設置する

twisted はネットワーク環境を中心とした Python の非同期イベントフレームワークな訳ですが、ユーザーが登録したコードが実行される前と後に setup と teardown みたいなのを仕込めれば何かいいなというか私はなんでこんな汚い真似ばかりするの?

class Filter(list):
def __call__(self):
for filter in self:
filter()
begin_filters = Filter()
end_filters = Filter()
def addFilter(begin, end):
if callable(begin):
begin_filters.append(begin)
if callable(end):
end_filters.append(end)
def removeFilter(begin, end):
if callable(begin):
begin_filters.remove(begin)
if callable(end):
end_filters.remove(end)
def doFilter(function, *args, **kwargs):
begin_filters()
try:
return function(*args, **kwargs)
finally:
end_filters()
from heapq import heappop
from twisted.internet import base
from twisted.internet.selectreactor import SelectReactor
class FilteredFunction(object):
def __init__(self, func):
self.func = func
def __call__(self, *args, **kwargs):
return doFilter(self.func, *args, **kwargs)
def newheappop(heap):
result = heappop(heap)
try:
if isinstance(result, base.DelayedCall) and not isinstance(result.func, FilteredFunction):
result.func = FilteredFunction(result.func)
finally:
return result
old_doReadOrWrite = SelectReactor._doReadOrWrite
def new_doReadOrWrite(self, selectable, method, dict):
doFilter(old_doReadOrWrite, self, selectable, method, dict)
def install():
setattr(base, 'heappop', newheappop)
setattr(SelectReactor, '_doReadOrWrite', new_doReadOrWrite)
install()

intervention.py とか intercept.py とか filter.py とかにして import すればいいですよ。

Twisted のコードを追った結果 twisted.internet.base.ReactorBase.runUntilCurrent がキューに溜まったタスクを消化してました。その内部で heapq.heappop がいい感じに使われてるんでそれを書き換え。

非同期で呼び出された関数内でスタックダンプしてみると twisted.internet.selectreactor.SelectReactor._doReadOrWrite がネットワークのやりとりをするとこみたいで twisted.web.client.getPage とかするとこの辺りで介在コード挟むのがちょうどいいみたいなのでこれもフック。

まあなんとか動くレベルとしか言えません。

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です