前面一篇关于Python协程的文章中,提到了python中协程的一些历史以及一些简单的用法,这些用法放在实际开发中可能不太够,我在后面的编写中就踩到了不少的坑,这里就回应上篇文章中末尾说的,再写一篇文章来梳理一下这些问题。时隔很久,终于还是磨磨唧唧的把第二篇写出来了。本来还准备有第三篇重生篇的,但是以防拖稿,合并到一起来写吧。
def easy_coroutine():
print("Start!")
x = yield
print("End! x: {}".format(x))
# lightless @ LL-DESKTOP in C:\Users\lightless\Desktop [20:40:27]
$ python -i .\ez_coroutine.py
>>> c = easy_coroutine()
>>> c
<generator object easy_coroutine at 0x0000016AD411E570>
>>> next(c)
Start!
>>> c.send(666)
End! x: 666
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>>
def prime_coroutine(func):
@functools.wraps(func)
def prime(*args, **kwargs):
t = func(*args, **kwargs)
next(t)
return t
return prime
def exception_handler_example():
print("Start!")
while True:
try:
x = yield
except TypeError:
print("Receive type error!!")
else:
print("Receive value: {}".format(x))
print("End!")
# lightless @ LL-DESKTOP in C:\Users\lightless\Desktop [21:43:06]
$ python -i .\ez_coroutine.py
>>> c = exception_handler_example()
>>> next(c) # 预激协程
Start!
>>> c.send(666)
Receive value: 666
>>> c.send(TypeError)
Receive value: <class 'TypeError'>
>>> c.send(777)
Receive value: 777
>>> c.close()
>>> c
<generator object exception_handler_example at 0x000001499EECEA40>
>>> from inspect import getgeneratorstate
>>> getgeneratorstate(c)
'GEN_CLOSED'
>>>
最后的End字符串始终没有打印出来。因为只有没有被捕获的异常会终止这个死循环,然而矛盾的是如果出现了未处理的异常,那么协程就挂了,所以不会输出最后的End字符串。
当调用c.close()
时,会使生成器的yield
表达式部分抛出一个GeneratorExit
异常。如果生成器本身没有处理这个异常,或者运行到了生成器的结尾(即抛出了StopIteration
)的情况下,调用方不会出现任何错误。另一方面,如果生成器收到了GeneratorExit
异常,那么生成器就无法生成值!
当send(TypeError)
时,并不会终止生成器的运行,这一点很容易理解。
def sum():
total = 0
nums = []
while True:
x = yield
if not x:
break
total += x
nums.append(x)
return total, nums
# lightless @ LL-DESKTOP in C:\Users\lightless\Desktop [21:58:28]
$ python -i .\ez_coroutine.py
>>> s = co_sum()
>>> next(s)
>>> s.send(666)
>>> s.send(233)
>>> s.send(1)
>>> s.send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: (900, [666, 233, 1])
>>>
# lightless @ LL-DESKTOP in C:\Users\lightless\Desktop [22:00:59]
$ python -i .\ez_coroutine.py
>>> s = co_sum()
>>> next(s)
>>> s.send(1)
>>> s.send(2)
>>> s.send(3)
>>> try:
... s.send(None)
... except StopIteration as e:
... result = e.value
...
>>> result
(6, [1, 2, 3])
>>>
委托生成器(delegating generator):包含yield from <iterable>
结构的生成器函数。
子生成器(subgenerator):从yield from
表达式中iteratable
部分获取的生成器。(有时候就叫iterator
,23333)
final_result = {}
def co_sum():
total = 0
nums = []
while True:
x = yield
print("co_sum receive: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums
def middle(key):
idx = 0
while True:
print("middle idx: ", idx)
final_result[key] = yield from co_sum()
print("sub-generator co_sum() done.")
idx += 1
def main():
data_sets = {
"set_1": [233, 666, 123],
"set_2": [1, 2, 4, 5],
"set_3": [999, 999, 999],
}
for key, data_set in data_sets.items():
print("start key:", key)
m = middle(key)
next(m) # 预激middle协程
for value in data_set:
m.send(value) # 给协程传递每一组的值
m.send(None)
print("final_result:", final_result)
if __name__ == '__main__':
main()
# lightless @ LL-DESKTOP in C:\Users\lightless\Desktop [22:51:54]
$ python yield_from.py
start key: set_1
middle idx: 0
co_sum receive: 233
co_sum receive: 666
co_sum receive: 123
co_sum receive: None
sub-generator co_sum() done.
middle idx: 1
start key: set_2
middle idx: 0
co_sum receive: 1
co_sum receive: 2
co_sum receive: 4
co_sum receive: 5
co_sum receive: None
sub-generator co_sum() done.
middle idx: 1
start key: set_3
middle idx: 0
co_sum receive: 999
co_sum receive: 999
co_sum receive: 999
co_sum receive: None
sub-generator co_sum() done.
middle idx: 1
final_result: {'set_1': (1022, [233, 666, 123]), 'set_2': (12, [1, 2, 4, 5]), 'set_3': (2997, [999, 999, 999])}
main()
函数通过循环分别读取每一个集合的key
和一个存有多个数字的list
。然后生成一个middle()
协程并预激,将列表中的每一个数字通过send
方法发过去。发送完毕后调用send(None)
结束协程的运行。
middle(key)
函数其实是一个委托生成器,因为它含有一个yield from <iterator>
结构。这个循环每次遍历的时候,会生成一个新的co_sum()
实例,每个实例都是一个生成器,当然是作为协程使用的生成器对象。委托生成器会在yield from
的时候暂停执行,将控制权交给co_sum()
,并等待co_sum()
执行结束。
co_sum()
函数是一个子生成器,与上一个例子中的代码一样。
假设yield from
出现在委托生成器中,并且由调用方调用,同时也驱动着子生成器。
假设不支持.throw()
和.close()
方法。
假设生成器不会抛出异常,而是执行到代码最后,直到抛出StopIteration
异常为止。
_i:子生成器,同时也是一个迭代器
_y:子生成器生产的值
_r:yield from表达式最终的值
_s:调用方通过send()
发送的值
_e:异常对象
_i = iter(EXPR) # EXPR是一个可迭代对象,_i其实是子生成器;
try:
_y = next(_i) # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
_r = _e.value # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
while 1: # 尝试执行这个循环,委托生成器会阻塞;
_s = yield _y # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
try:
_y = _i.send(_s) # 转发_s,并且尝试向下执行;
except StopIteration as _e:
_r = _e.value # 如果子生成器抛出异常,那么就获取异常对象的`value`属性保存到_r,退出循环,恢复委托生成器的运行;
break
RESULT = _r # _r就是整个yield from表达式返回的值。
子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()
和.close()
方法;
如果子生成器支持.throw()
和.close()
方法,但是在子生成器内部,这两个方法都会抛出异常;
调用方让子生成器自己抛出异常,没有原因,就是任性;
当调用方使用next()
或者.send(None)
时,都要在子生成器上调用next()
函数,当调用方使用.send()
发送非None值时,才调用子生成器的.send()
方法;
_i = iter(EXPR) # EXPR是一个可迭代对象,_i其实是子生成器;
try:
_y = next(_i) # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
_r = _e.value # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
while 1: # 尝试执行这个循环,委托生成器会阻塞;
try:
_s = yield _y # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
except GeneratorExit as _e:
try: # 这部分的作用是关闭子生成器和委托生成器,因为子生成器可能是个可迭代对象,所以要处理没有close()方法的情况;
_m = _i.close
except AttributeError:
pass
else:
_m()
raise _e
except BaseException as _e: # 这部分是处理通过.throw()方法传入的异常,如果子生成器是个迭代器,没有throw()方法的情况下,委托生成器会抛出一个异常
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else: # 子生成器有throw()的情况,那么就调用throw()方法,并且传入调用发提供的异常。这时,子生成器可能会处理异常并继续执行,叶可能会抛出`StopIteration`异常结束执行,也有可能没有处理并且向上"冒泡",抛出异常;
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else: # 这部分是当子生成器在执行时没有出现异常的情况
try:
if _s is None: # 如果调用方发送的是None,那么就在子生成器上调用next()函数;
_y = next(_i)
else: # 调用发发送的不是None,调用子生成器的.send()方法;
_y = _i.send(_s)
except StopIteration as _e: # 如果出现了StopIteration异常,那么就获取异常对象中value属性的值,并保存在返回值中,中断循环,让委托生成器继续运行
_r = _e.value
break
RESULT = _r # _r就是整个yield from表达式返回的值。
子生成器生产的值,都是直接传给调用方的;调用方通过.send()
发送的值都是直接传递给子生成器的;如果发送的是None,会调用子生成器的__next__()
方法,如果不是None,会调用子生成器的.send()
方法;
子生成器退出的时候,最后的return EXPR
,会触发一个StopIteration(EXPR)
异常;
yield from
表达式的值,是子生成器终止时,传递给StopIteration
异常的第一个参数;
如果调用的时候出现StopIteration
异常,委托生成器会恢复运行,同时其他的异常会向上"冒泡";
传入委托生成器的异常里,除了GeneratorExit
之外,其他的所有异常全部传递给子生成器的.throw()
方法;如果调用.throw()
的时候出现了StopIteration
异常,那么就恢复委托生成器的运行,其他的异常全部向上"冒泡";
如果在委托生成器上调用.close()
或传入GeneratorExit
异常,会调用子生成器的.close()
方法,没有的话就不调用。如果在调用.close()
的时候抛出了异常,那么就向上"冒泡",否则的话委托生成器会抛出GeneratorExit
异常。
# filename: spider_normal.py
import time
import requests
targets = [
"https://lightless.me/archives/python-coroutine-from-start-to-boom.html",
"https://github.com/aio-libs",
"https://www.python.org/dev/peps/pep-0380/",
"https://www.baidu.com/",
"https://www.zhihu.com/",
]
def spider():
results = {}
for url in targets:
r = requests.get(url)
length = len(r.content)
results[url] = length
return results
def show_results(results):
for url, length in results.items():
print("Length: {:^7d} URL: {}".format(length, url))
def main():
start_time = time.time()
results = spider()
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(results)
if __name__ == '__main__':
main()
# filename: spider_thread.py
import time
import threading
import requests
from spider_normal import targets, show_results
final_results = {}
def spider(url):
r = requests.get(url)
length = len(r.content)
final_results[url] = length
def main():
ts = []
start_time = time.time()
for url in targets:
t = threading.Thread(target=spider, args=(url, ))
ts.append(t)
t.start()
for t in ts:
t.join()
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
if __name__ == '__main__':
main()
import time
from concurrent import futures
import requests
from spider_normal import targets, show_results
final_results = {}
def spider(url):
r = requests.get(url)
length = len(r.content)
final_results[url] = length
return True
def main():
start_time = time.time()
with futures.ThreadPoolExecutor(10) as executor:
res = executor.map(spider, targets)
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
if __name__ == '__main__':
main()
import asyncio
import time
import aiohttp
from spider_normal import targets, show_results
final_results = {}
async def get_content(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
content = await resp.read()
return len(content)
async def spider(url):
length = await get_content(url)
final_results[url] = length
return True
def main():
loop = asyncio.get_event_loop()
cor = [spider(url) for url in targets]
start_time = time.time()
result = loop.run_until_complete(asyncio.gather(*cor))
print("Use time: {:.2f}s".format(time.time() - start_time))
show_results(final_results)
print("loop result: ", result)
if __name__ == '__main__':
main()
更多技术分享
请关注MLSRC微信公众号