asyncio与异步协程爬虫

async

关于异步协程

异步协程往往对IO密集型任务非常有效,如网络IO、磁盘IO,典型的应用场景是提升爬虫效率。

不适用任何并行或者异步的爬虫往往因为服务器响应过慢而将大量的时间花费在IO等待上。

当需要获取多个url的内容时,怎么url内容获取的累计时间?这里主要有两个思路。一是多进程并行,这种方法的效率主要取决于机器CPU的核数,一般情况下CPU核数有限,而且单个CPU在等待响应的过程中仍然处于空闲状态。二是异步协程,其设计思想为:在执行某个子程序如果遇到阻塞就将该子程序挂起转而执行其它的子程序到其完成或者挂起为止,这样在多个子程序间进行切换,从而充分利用处理器。在爬虫设计中,多个url的内容获取实际上是相互独立的,当程序处理一个url遇到阻塞时可以将其挂起等待服务器响应转而处理其它的url,通过队列访问的方式不断循环处理任务。当被挂起的子程序处理完成后自动添加到任务队列末尾等待轮执;如果所有任务都处于挂起状态则程序进入阻塞状态直到新的可执行任务出现。

Python3.5以后添加了async/await关键字用以实现异步协程,使用它们需要导入包asyncio:

  • async:定义一个异步函数(协程)或者异步生成器
  • await:当子程序阻塞时挂起子程序(任务)

一些基本概念:

  • 阻塞:程序在等待某个操作完成而不能去干别的事情,即程序处于挂起状态,称该程序处于 阻塞状态。常见的阻塞形式有:等待网络I/O,等待磁盘I/O,等待用户输入,CPU切换上下文等;
  • 非阻塞:非阻塞是相对于阻塞而言的:如果程序包含多个独立的子程序,当其中一个子程序完成或者阻塞时可以转而执行其它的子程序,从而避免因为毫无作为的等待而造成的效率低下;
  • 同步:为了完成某个任务,不同程序间需要相互协商、相互影响,则这些程序是同步的;同步通常通过“锁”来实现
  • 异步:如果在完成某个任务时,程序之间不需要相互通信而各自独立运行,则这些程序是异步的

异步协程中的基本概念:

  • 事件循环(event loop):我们需要向系统申请一个事件循环对象运行我们自己的协程对象,事件循环帮助我们自动执行、挂起协程子程序。

    1
    2
    3
    loop = asyncio.get_event_loop()
    loop
    # <_WindowsSelectorEventLoop running=False closed=False debug=False>
  • 协程(Coroutine):协程对象类型,需要被注册到事件循环中才能被循环调用,通过async关键字定义,直接调用不会被立即执行,而是返回一个协程对象

  • 任务(Task):任务是对协程对象的封装,包含了更多的信息。

  • future

异步协程爬虫

下面以一个例子说明异步协程爬虫的基本语法。

模拟慢速服务器

1
2
3
4
5
6
7
8
9
10
11
12
from flask import Flask
import time

app = Flask(__name__)

@app.route('/')
def index():
time.sleep(3)
return 'Hello!'

if __name__ == '__main__':
app.run(threaded=True)

这里我们定义了一个 Flask 服务,主入口是 index() 方法,方法里面先调用了 sleep() 方法休眠 3 秒,然后接着再返回结果,也就是说,每次请求这个接口至少要耗时 3 秒,这样我们就模拟了一个慢速的服务接口。

注意这里服务启动的时候,run() 方法加了一个参数 threaded,这表明 Flask 启动了多线程模式,不然默认是只有一个线程的。如果不开启多线程模式,同一时刻遇到多个请求的时候,只能顺次处理,这样即使我们使用协程异步请求了这个服务,也只能一个一个排队等待,瓶颈就会出现在服务端。所以,多线程模式是有必要打开的。

启动之后,Flask 应该默认会在 127.0.0.1:5000 上运行,运行之后控制台输出结果如下:

1
Running on http:*//127.0.0.1:5000/ (Press CTRL+C to quit)*

定义协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio # 异步I/O
import aiohttp # 实现了异步的网络通信工具,替换requests

async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result

async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
result = await get(url)
print('Get response from %s Result: %s'%(url, result))

asyncio中实现了定义和执行协程的关键字和方法,aiohttp中实现了支持异步操作的网络通信,requests不支持异步操作。

异步函数request()定义了一个协程,如果事件循环执行到本协程的await get(url)语句时遇到阻塞就会将此协程挂起转而执行任务队列中的其它任务。当本协程挂起时实际上是在等待get(url)的结果,get(url)本身也是一个协程,它返回一个可等待对象(Awaitable object)。

get(url)同样是一个协程,如果事件循环执行到await session.get(url)时遇到阻塞,该任务被挂起等待服务器响应,当下次轮执此协程时会再次遇到await response.text(),如果这个子程序没有遇到执行阻塞,就不会挂起继续向下执行。

总结一下:

  • async定义协程,await当任务阻塞时挂起任务,await只能用于async定义的协程里面
  • await关键字修饰的必须是可等待对象哪些是可等待对象
    • 异步函数定义的原生协程对象
    • 异步生成器
    • 实现了__await__()方法的迭代器

任务包装并注册到事件循环

协程对象包装为任务对象

1
tasks = [asyncio.ensure_future(request()) for _ in range(5)]

获取事件循环对象

1
loop = asyncio.get_event_loop()

将任务注册到事件循环

1
2
3
4
5
## 单个任务
loop.run_until_complete(task)

## 多个任务
loop.run_until_complete(asyncio.wait(tasks))

获取运行结果

1
results = [task.result() for task in tasks]

参考资料

非原创声明:本笔记非原创,主要内容参考博客 https://cuiqingcai.com/6160.html

>>>>>>>>>>>>> 转载请注明出处 <<<<<<<<<<<<<
0%