Brief view of coroutine / asynchronous in Python [进行中]

我们先从 协程 开始,具体对术语的定义在此就不凑字数了,可以具体的参考一下别人的文章,相信比我写的更优秀。

本文不会去讨论当前流行的异步框架,如 tornado gevent asyncio 等的具体实现方式,因为工程框架会更优秀更严谨,我会在另外几篇文章中搞点大新闻(逃,Python到目前为止,涉及到 coroutine 的东西,其底层的物理载体是 generator , Python 的 generator 在语义上已经具有了 暂停函数并返回保存断点处函数的上下文 的含义,所以 generator 的行为和 coroutine 是非常类似的,必然是实现 coroutine 的最佳载体。

我们先来看一段有关 generator 的直观体验 (注意运行环境是 Python 2)

def coroutine():
    counter = 0

    print '[coroutine] enter'
    for i in range(1, 4):
        print '[coroutine] return val', i
        val = yield i
        print '[coroutine] received', val
    print '[coroutine] exit'

obj = coroutine()
print '[main] run coroutine'
first_ret = obj.next()
print '[main] coroutine return and reach first breakpoint', first_ret
print '[main] start resume coroutine'

for i in range(1, 4):
    print '[main] will send %d to coroutine' % i
    val = obj.send(i)
    print '[main] coroutine return', val

Output:

[main] run coroutine
[coroutine] enter
[coroutine] return val 1
[main] coroutine return and reach first breakpoint 1
[main] start resume coroutine
[main] will send 1 to coroutine
[coroutine] received 1
[coroutine] return val 2
[main] coroutine return 2
[main] will send 2 to coroutine
[coroutine] received 2
[coroutine] return val 3
[main] coroutine return 3
[main] will send 3 to coroutine
[coroutine] received 3
[coroutine] exit
Traceback (most recent call last):
  File "t.py", line 19, in <module>
    val = obj.send(i)
StopIteration

下面我会用 coroutine 的方式去说明如此调用 generator 的方法的含义。

  • L11: 初始化一个 coroutine 此时该 coroutine 的状态在 READY, 但是并没有执行
  • L13: 等于对线程调用了 start(), 此时 coroutine 的状态在 RUNNING,注意看输出的 L2, L3, 这时候执行流等于从主协程(这个术语可能不大准确)中切换到了 coroutine 中,开始执行其语句, 达到第一个 yield 的点.
  • L7: yield 语句等于一次 trap,并且执行流切换到 stack 的上一层, 即主协程,并且携带了一个值 i,(等于做了一次 mov EAX, i; JMP _main_thread),注意我并没有用 返回 这个词语,意味着此时 coroutine 并没有真正的返回,他的栈空间并没有释放,并且保留了 resume 时执行的语句位置.
  • 主协程继续执行,到达循环, 用 send 方法去 resume 一个 coroutine 并且向其传递一个值, 这个值是在 coroutine 中断点语句的返回值.
  • 来回反复,直到 coroutine 内的 for 循环 i == 3 时,此时外部主协程再次向 * coroutine* 发送 3coroutine 被激活,继续执行输出 [coroutine] received 3 完成以后,跳出 for 循环,输出 [ciroutine] exit 然后结束,注意我的术语是结束,并不是返回

为什么用 结束 而不是 返回 ?
这里的结束有两个含义
1. 协成的指令全部运行完成
2. 切到其他地方

有点类似于 分时系统的切换线程 的概念, 可怜的协程君并没有谁在期待他返回什么,甚至于你给他加一个return,会爆出这样的错误

  File "t.py", line 10
    return 0
SyntaxError: 'return' with argument inside generator

实在有点惨,为什么会这样?再看这样一段代码

def coroutine():
    print '[c] running'
    for i in range(1, 3):
        val = yield i
        print '[c]', val


c = coroutine()
c.next()


print '[m] yoooooo'
print '[m] I do lots of things here and leave the poor coroutine alone to death'
c.send(None)

Output:

[c] running
[m] yoooooo
[m] I do lots of things here and leave the poor coroutine alone to death
[c] None

这段代码反映的一个现象是:我想切换到你的时候切换到你,等你切出来的时候我不一定期待你马上再运行 所以,并没有谁在期待着谁的返回,所谓的 send 不过只是一个 trick 让你方便的传状态给它~
C1 表示 yield 前的 coroutine 的状态, 用 C2 表示从主协程切回去时 coroutine 的状态, C1≠C2, 区别在哪儿? C2的上下文中多了一个入参

那么, coroutine 在 python 中用 generator 来实现的大概是已经有个映像了,那么就涉及到一个问题:coroutine 是如何被调度的?意思就是如果把 coroutine 类比成 thread,怎么来回切换的,调度的策略又是什么呢?

我们先来引用一波 Wikipedia 上的段落[1]

One important difference between threads and coroutines is that threads are typically preemptively scheduled while coroutines are not. Because threads can be rescheduled at any instant and can execute concurrently, programs using threads must be careful about locking. In contrast, because coroutines can only be rescheduled at specific points in the program and do not execute concurrently, programs using coroutines can often avoid locking entirely. (This property is also cited as a benefit of event-driven or asynchronous programming.)

抽出几个关键点就是:

  1. coroutines is that threads are typically preemptively scheduled while coroutines are not – 协程要自己调度
  2. coroutines can only be rescheduled at specific points – 协程仅在一些特定的调度点进行调度
  3. programs using coroutines can often avoid locking entirely – 用协程可以完全的避免锁(因为调度点自己控制的)
  4. a benefit of event-driven or asynchronous programming – 适合搞事件驱动型的异步编程

这么短的一段话居然解释了协程好处都有啥(误,在这里感慨一下词条编辑者很6. 顺便说一句协程无锁化那都是扯,因为大多数时间用的都是框架而不是

那我们先从这些特点,在 Python 中怎么搞事情分别去做一些实验。

怎么做调度?

前文已经提到,Python的 generator 可以通过 yield 语法保存函数运行堆栈(类似 闭包 的概念),并且切换出和切换回,这个语法特性是 让出执行权切换上下文 的核心。

我们显然不是想做成函数调用的形式,链式一波流走完回到头,所以,我们要设计一个 Scheduler 来决定 谁可以下一个执行

那么我们就以一个事件驱动的调度模型来实际去搞点事情,引入 Future 概念,这个东西表达的是一个 值在未来确定 的对象,并且在返回值被设置的时候,调用callback函数,所以,一个 coroutine 可以在断点处返回一个 Future 对象,并且将这个 coroutine 存入该 Future 对象的 callback 属性。

future.py

class Future:

    def __init__(self):
        self.callback = None
        self._ret_val = None

    def set_callback(self, callback):
        self.callback = callback

    def set_return(self, val):
        self._ret_val = val

然后我们看看一个 coroutine 的例子,假设是设计某种 worker,你可以把它当作是 HTTPClient 一类的东西

def worker():
    future = Future()

    # 设计成传出 future 传入 future.ret_val
    in_data = yield future

    ret_val = in_data
    yield ret_val

下面就是 Scheduler 的实现了:

class Scheduler:

    def __init__(self):
        self.futures = []

    def add_future(self, future):
        self.futures.append(future)

    def loop(self):
        while 1:
            # pop_a_ready_future 弹出一个 future.ret_val 不为 None 的 future
            future = self.pop_a_ready_future()
            # 传入 future.ret_val 给 future 的下文
            ret = future.send(future.ret_val)

            # 如果返回值是个 Future 则继续丢进等待设值的futures中
            if isinstance(ret, Future):
                self.futures.append(ret)
            else:
                pass  # 忽略返回值

Final

为了避免坑没有填完,嘛,几句话概括就是

  1. 调度器用户态实现,一般不涉及抢占式调度
  2. 原语是yield和resume
  3. 状态是 suspend/running/dead
  4. 一般用在事件驱动开发

其他的什么保存现场都是琐碎的工作大概就和线程切换类似吧。

References

  1. Coroutine

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s