python之协程

发布时间:2019-09-22 07:40:18编辑:auto阅读(1746)

      协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。

      所谓用户态就是说协程是由用户来控制的,CPU不认识协程,协程是跑在线程中的。

      协程拥有自己的寄存器上下文栈。协程调试切换时,将寄存器上下文栈保存到其他地方,在切回来时,恢复先前保存的寄存器上下文栈。

      因此,协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,也就是进入上一次离开时所处逻辑流的位置。

      线程切换时会将上下文和栈保存到CPU的寄存器中。


      协程的标准定义,即符合以下所有条件就能称之为协程

      1.在单线程里实现并发

      2.修改共享数据不需要加锁

      3.用户程序里自己保存多个控制流的上下文栈

      4.一个协程遇到IO操作自动切换到其它协程


      协程的好处:

      无需线程上下文切换的开销

      无需原子操作锁定及同步的开销

        原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束

      方便切换控制流,简化编程模型

      高并发+高扩展性+低成本:一个CPU支持上万的协程都行,很适合用于高并发处理

      协程的缺点:

      无法利用多核资源:

        协程的本质是个单线程,它不能同时将单个CPU的多个核用上

        协程需要和进程配合才能运行在多CPU上。

      进行阻塞(Blocking)操作(如IO)时会阻塞掉整个程序


      使用yield实现协程的例子:

    #!/usr/bin/python
    #Author:sean
    
    import time
    
    def consumer(name):
        print("--->start eating baozi...")
        while True:
            new_baozi = yield
            print("[%s] is eating baozi %s"% (name,new_baozi))
            # time.sleep(2)
    def producter():
        r = tom.__next__()
        r = jerry.__next__()
        n = 0
        while n < 5:
            n += 1
            tom.send(n)
            jerry.send(n)
            print("\033[32;1m[producter]\033[0m is making baozi %s"% n)
    
    if __name__ == '__main__':
        tom = consumer("tom")
        jerry = consumer("jerry")
        p = producter()


      如何在单线程下实现并发效果?

      答案是遇到IO操作就切换,因为IO操作耗时比较长


      协程之所以能处理高并发,其实就是把IO操作给干掉了,就是一遇到IO操作就切换。

      这样的话整个程序就变成了只有CPU在运算。

      一遇到IO操作就切换,那么到底什么时候再切回去呢?

      答案是当IO操作结束后就切回去。

      那么问题又来了,python怎么来监测IO操作是否结束呢?带着这个问题先来看看几个例子


      greenlet模块:

        greenlet是一个封装好的协程,通过switch方法手动进行切换

    #!/usr/bin/python
    #Author:sean
    
    from greenlet import greenlet
    
    def func1():
        print("haha11")
        gr2.switch()
        print("haha22")
        gr2.switch()
    
    def func2():
        print("haha33")
        gr1.switch()
        print("haha44")
    
    gr1 = greenlet(func1)
    gr2 = greenlet(func2)
    gr1.switch()

      gevent模块:

        gevent是一个第三方库,可以轻松实现并发同步或异步编程。

        在gevent中用到的主要是greenlet,它是以C扩展模式形式接入python的轻量级协程。

        greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度

        gevent能够自动进行IO切换

    #!/usr/bin/python
    #Author:sean
    
    import gevent
    
    def foo():
        print("Running in foo")
        gevent.sleep(0) #模仿IO操作
        print('Explicit context switch to foo again')
    
    def bar():
        print('Explicit context to bar')
        gevent.sleep(0) #模仿IO操作
        print('Implicit context switch back to bar')
    
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar)
    ])

      同步与异步的区别:

    #!/usr/bin/python
    #Author:sean
    
    import gevent
    
    def task(pid):
        """
        Some non-deterministic task
        """
        gevent.sleep(0.5)
        print('Task %s done' % pid)
    
    def synchronous():
        for i in range(1, 10):
            task(i)
    
    def asynchronous():
        threads = [gevent.spawn(task, i) for i in range(10)]
        gevent.joinall(threads)
    
    print('Synchronous:')
    synchronous()
    
    print('Asynchronous:')
    asynchronous()

      用协程并发爬虫爬取网站:

    #!/usr/bin/python
    #Author:sean
    
    from urllib import request
    import gevent
    #默认情况下,gevent并不知道urllib或者socket什么时候进行了IO操作
    #默认情况下,gevent和urllib以及socket并没有任何关联,当然就无法提高效率,因为其实质上还是串行操作
    #要想让gevent知道urllib或socket正在进行IO操作,需要给gevent打个补丁
    from gevent import monkey
    monkey.patch_all()  #把当前程序的所有IO操作单独做上标记
    
    def f(url):
        print('GET: %s'% url)
        resp = request.urlopen(url)
        data = resp.read()
        # f = open("url.html","wb")
        # f.write(data)
        # f.close()
        print('%d bytes received from %s.'% (len(data),url))
    
    gevent.joinall([
        gevent.spawn(f,'https://www.python.org'),
        gevent.spawn(f,'https://yahoo.com'),
        gevent.spawn(f,'https://github.com')
    ])

      用gevent协程写一个单线程高并发的socket:

      服务端:

    #!/usr/bin/python
    #Author:sean
    
    import sys
    import socket
    import time
    import gevent
    
    from gevent import socket,monkey
    monkey.patch_all()  #把当前程序的所有IO操作单独做上标记
    
    def server(host,port):
        s = socket.socket()
        s.bind((host,port))
        s.listen(500)
        while True:
            cli,addr = s.accept()
            gevent.spawn(handle_request,cli)
    
    def handle_request(conn):
        try:
            while True:
                data = conn.recv(1024)
                print("recv: ",data)
                conn.send(data)
                if not data:
                    conn.shutdown(socket.SHUT_WR)
        except Exception as e:
            print(e)
        finally:
            conn.close()
    
    if __name__ == '__main__':
        server('0.0.0.0',8001)

      客户端:

    #!/usr/bin/python
    #Author:sean
    
    import socket
    
    HOST = 'localhost'  #The remote host
    PORT = 8001 #The same port as used by the server
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.connect((HOST,PORT))
    while True:
        msg = bytes(input(">>:"),encoding="utf-8")
        s.sendall(msg)
        data = s.recv(1024)
        print('Received',repr(data))
    s.close()

     并发100个sock连接:

    #!/usr/bin/python
    #Author:sean
    
    import socket
    import threading
    
    def sock_conn():
        client = socket.socket()
        client.connect(("localhost",8001))
        count = 0
        while True:
            #msg = input(">>:").strip()
            #if len(msg) == 0:continue
            client.send( ("hello %s" %count).encode("utf-8"))
            data = client.recv(1024)
            print("[%s]recv from server:" % threading.get_ident(),data.decode()) #结果
            count +=1
        client.close()
    
    for i in range(100):
        t = threading.Thread(target=sock_conn)
        t.start()


      事件驱动与异步IO请往这走


      现在我们可以来回答下这个问题了,python如何监测IO操作是否结束?

      IO操作是由操作系统进行处理的,当遇到IO操作时就切换

      等IO操作完以后让其调用回调函数,回调函数会通知协程说这个IO操作完成了

关键字