经过锁和进度池的利用,进度间通讯

 

多进程

进程之间是相互独立的,python是运营进度的时候,是开发银行的是原生进程。进度是未曾GIL锁的,而且不设有锁的概念,进程之间的数据式不能够共享的,而线程是足以的。

当四线程创立完成之后,start并未了当时运营,依然要求和其余线程抢CPU的身价,只是
时间极短。
经过之间的通讯分为三种,queue和pipe

亚洲必赢官网 ,写一个game 循环
game
loop是每一个游戏的大旨.它不停的收获用户输入,更新游戏状态,渲染游戏结果到荧屏上.网络电游分为客户端和服务端两部分.两边的loop通过互联网连接起来.经常状态下,客户端获取用户输入,发送到服务端,服务端处理总结数据,更新游戏者状态,发送结果个客户端.比如游戏者只怕游戏物体的地点.卓殊重大的是,不要把客户端和服务端的功用混淆了,若是没有充足的说辞的话.
假若在客户端做游戏总结,那么不一致的客户端卓殊轻巧就不联合了.

进程

一、进度的概念

用muliprocessing这些包中的Process来定义多进度,跟定义八线程类似

from multiprocessing import Process   # 导入进程模块
import time


def run(name):
    time.sleep(2)
    print("hello", name)

if __name__ == "__main__":
    p_obj_list = list()    # 存放进程对象
    for i in range(10):    # 启动10个进程
        p = Process(target=run, args=("QQ{0}".format(i),))  # 产生一个进程实例
        p.start()   # 启动进程
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()   # 等待进程结果
 1 import multiprocessing
 2 def foo(q):
 3     q.put([1,'hello',True])
 4 if __name__=='__main__':
 5     q=multiprocessing.Queue()#通过multiprocessing建立一个队列
 6     p=multiprocessing.Process(target=foo,args=(q,))
 7   #用multiprocessing在调用Process,建立一个子进程,定义函数名,将q作为参数传到foo函数,
 8     #foo函数就可以通过这个参数来与主进程做交互了。
 9     p.start()#激活这个子进程
10     print(q.get())#主进程

A game loop iteration is often called a tick. Tick is an event
meaning that current game loop iteration is over and the data for the
next frame(s) is ready.

1.含义:Computer中的程序关于某数码集合上的三次运转活动,是系统开展能源分配和调度的为主单位。说白了正是3个先后的试行实例。

实施八个先后正是三个历程,比如您展开浏览器看到本身的博客,浏览器本身是三个软件程序,你此时张开的浏览器就是贰个进度。

 

贰、进程中参加线程

from multiprocessing import Process
import time,threading


def thread_run(name):   # 定义线程执行的方法
    print("{0}:{1}".format(name, threading.get_ident()))  # thread.get_ident ()返回当前线程的标识符,标识符是一个非零整数


def run(name):
    time.sleep(2)
    print("hello", name)
    t = threading.Thread(target=thread_run, args=(name,))   # 嵌入线程
    t.start()   # 执行线程


if __name__ == "__main__":
    p_obj_list = list()
    for i in range(10):
        p = Process(target=run, args=("QQ{0}".format(i),))
        p.start()
        p_obj_list.append(p)

    for p in p_obj_list:
        p.join()

上边函数通过multiprocessing的queue来实现进度间通讯。

在下二个事例中,大家写一个客户端,这么些客户端通过WebSocket连接服务器,同时运维八个简约的loop,接受输入发送给服务器,回显新闻.Client
source code is located
here.

2.经过的表征

  • ### 2个进度里能够有四个子进度

  • ### 新的长河的创制是一心拷贝整个主进度

  • ### 进度里能够包括线程

  • ### 进度之间(包括主进度和子进程)不存在多少共享,互相通讯(浏览器和python之间的多少不能够互通的),要通讯则要借助队列,管道之类的

经过锁和进度池的利用,进度间通讯。 

3、老爹和儿子过程

各种子进程都以由2个父进度运维的,每一种程序也是有一个父进度

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 获得父进程ID
    print('process id:', os.getpid())  # 获得子进程ID
    print("\n\n")


def f(name):
    info('\033[31;1m function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1m main process line\033[0m')
    p = Process(target=f, args=('QQ',))
    p.start()
    p.join()

  

 

 

 1 from multiprocessing import  Pipe,Process
 2 def foo(sk):
 3     sk.send('hello world')#通过管道sk发送内容
 4     print(sk.recv())#打印接收到的内容
 5 if __name__ == '__main__':
 6     sock,conn=Pipe()#定义一个管道的两头
 7     p=Process(target=foo,args=(sock,))#由于上面已经通过multiprocessing导入了Process,
 8     # 所以这里直接就可以创建一个子进程,并将sock(管道的一头)作为参数给foo函数
 9     p.start()#激活这个进程
10     print(conn.recv())#打印接收到的内容,conn是管道的另一头
11     conn.send('hi son')#通过管道发送内容

3.1

Example 3.1 source
code

咱们接纳aiohttp来创设3个game
server.那些库能够创制asyncio的client和server.那个库的利润是还要辅助http请求和websocket.所以服务器就不需求把结果处理成html了.
来看一下server如何运维:

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")
    return ws

async def game_loop(app):
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        await asyncio.sleep(2)


app = web.Application()
app["sockets"] = []

asyncio.ensure_future(game_loop(app))

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

那些代码就不翻译了,

三.历程和线程之间的界别

  • ### 线程共享地址空间,而经过之间有相互独立的空中

  • ### 线程之间数据互通,相互操作,而经过不得以

  • ### 新的线程比新的进度创建轻易,比开进度的花费小诸多

  • ### 主线程能够影响子线程,而主进度不能够影响子进度

 

 

进程间数据交互与共享

知道不相同进程之间内部存款和储蓄器是不共享的,要想完结五个进度间的通信需求选取multiprocessing库中的queue(队列)模块,那一个multiprocessing库中的queue模块跟单纯的queue库是不平等的。进度导入前者(那里的queue是特地为经过之间的通讯设计的)不不可信赖,导入后者(那里的queue首假若线程间数据交互)出错。

地点代码通过Pipe来贯彻多个经过间的通信。

3.二 有请求才先导loop

地方的例子,server是不停的loop.以往改成有请求才loop.
同时,server上也许存在五个room.二个player创设了一个session(一场较量依然3个副本?),其余的player能够加入.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


async def wshandler(request):
    app = request.app
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    app["sockets"].append(ws)

    if app["game_is_running"] == False:
        asyncio.ensure_future(game_loop(app))
    while 1:
        msg = await ws.receive()
        if msg.tp == web.MsgType.text:
            print("Got message %s" % msg.data)
            ws.send_str("Pressed key code: {}".format(msg.data))
        elif msg.tp == web.MsgType.close or\
             msg.tp == web.MsgType.error:
            break

    app["sockets"].remove(ws)
    print("Closed connection")

    return ws

async def game_loop(app):
    app["game_is_running"] = True
    while 1:
        for ws in app["sockets"]:
            ws.send_str("game loop says: tick")
        if len(app["sockets"]) == 0:
            break
        await asyncio.sleep(2)
    app["game_is_running"] = False


app = web.Application()

app["sockets"] = []
app["game_is_running"] = False

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

四.在python中,进度与线程的用法就只是名字区别,使用的不二等秘书诀也是没多大分裂

一、线程访问queue

import queue,threading


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = threading.Thread(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']
 1 from multiprocessing import  Manager,Process
 2 def foo(l,i):#收到参数,l是Mlist,i是循环的i
 3     l.append(i*i)#将i平方添加到Mlist
 4 if __name__=='__main__':
 5     manager=Manager()
 6     Mlist=manager.list([11,22,33])#定义一个列表
 7 
 8     l=[]
 9     for i in range(5):#创建5个子进程
10         p=Process(target=foo,args=(Mlist,i))#定义一个进程,将Mlist和i作为参数传到foo
11         p.start()#激活这个进程,执行foo函数
12         l.append(p)#将5个进程添加到l这个列表
13     for i in l:
14         i.join()#循环这个列表,然后将每个进程join
15     print(Mlist)#当所有的子进程都结束,运行主进程

3.3 管理task

直白操作task对象.未有人的时候,能够cancel掉task.
注意!:
This cancel()
call tells scheduler not to pass execution to this coroutine anymore and
sets its state tocancelled
which then can be checked by cancelled()
method. And here is one caveat worth to mention: when you have external
references to a task object and exception happens in this task, this
exception will not be raised. Instead, an exception is set to this task
and may be checked by exception()
method. Such silent fails are not useful when debugging a code. Thus,
you may want to raise all exceptions instead. To do so you need to call
result()
method of unfinished task explicitly. This can be done in a callback:

壹旦想要cancel掉,也不想触发exception,那么就检查一下canceled状态.
app["game_loop"].add_done_callback(lambda t: t.result() if not t.cancelled() else None)

伍.简短实例

1)创造一个简便的多进度:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(name):
    time.sleep(1)
    print('hello',name,time.ctime())

ml = []
for i in range(3):
    p = multiprocessing.Process(target=func,args=('yang',))
    p.start()
    ml.append(p)

for i in ml:
    i.join() #注意这里,进程必须加join方法,不然会导致僵尸进程

  

运营结果:

亚洲必赢官网 1

 

不管怎么说,反正报错了,同样的代码,在python自带的IDLE里探索:

亚洲必赢官网 2

从没其余东西就结束了。好的,那里要说下了,依据本身个人的领会,当您用pycharm或然IDLE时,pycharm只怕IDLE在你的微处理器里本人也是3个历程,并且暗中认可是主进程。所以在pycharm会报错,而在IDLE里运营就是空荡荡,个人明白,对不对暂时不谈,早先时期学到子进度时再说。

 

消除办法正是,别的的不改变,加1个if __name == ‘__main__’推断就行:

亚洲必赢官网 3

 

那般就一蹴而就了,好的,你将来能够体会到那句话了,进程与线程的用法就只是名字分裂,使用的措施也是没多大分别。不多说,自行体会。而运转结果来看的时刻是一同的,那么那进度才是真正意义上的竞相运转。

 

2)自定义类式进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

class myprocess(multiprocessing.Process):
    def __init__(self,name):
        super(myprocess,self).__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print('hello',self.name,time.ctime())

if __name__ == '__main__':
    ml = []
    for i in range(3):
        p = myprocess('yang')
        p.start()
        ml.append(p)

    for j in ml:
        j.join()

  

运作结果:

亚洲必赢官网 4

 

 

接下来setDaemon之类的不二秘诀和线程也是完全壹致的。

 

三)每三个经过都有根进程,换句话,每二个经过都有父进度

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time,os

def info():
    print('mudule name:',__name__)
    print('parent process:',os.getppid()) #父进程号
    print('son process:',os.getpid())     #子进程号

if __name__ == '__main__':
    info()
    print('-----')
    p = multiprocessing.Process(target=info,args=[])
    p.start()
    p.join()

  

运营结果:

 

亚洲必赢官网 5

 

而查看自身本机的进度:

亚洲必赢官网 6

 

能够掌握,620四正是pycharm,就是那时的根进程,而主进程正是作者这一个py文件(由__main__可见),接着再往下的子进程等等等的。

 

2、进度访问queue

from multiprocessing import Process
import queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = queue.Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
Traceback (most recent call last):
  File "C:/Users/dell/PycharmProjects/untitled/process/进程的定义.py", line 77, in <module>
    p.start()
  File "C:\Python36\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\Python36\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python36\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python36\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

上面代码通过Manger达成子进度间的通讯。

3.四 守候几个事件

Example 3.4 source
code
在多数景观下,必要在服务器处理客户端的handler中,
等待多个事件.除了等候客户端的音讯,或许还索要拭目以俟区别的音讯发生.比如,
游戏一局的年华到了,需求3个timer的确定性信号.或然,须求其余进度的音讯,大概其余server的音信.(使用分布式消息系统).
上面那么些例子使用了Condition.那里不保留全体的socket,而是在每一次循环结束通过Condition.notify_all来文告.那么些应用pub/sub情势实现.
为了在2个handler中,等待三个事件,首先我们使用ensure_future来包装一下.

if not recv_task: 
  recv_task = asyncio.ensure_future(ws.receive())
if not tick_task: 
  await tick.acquire() 
  tick_task = asyncio.ensure_future(tick.wait())```

在我们调用Condition.call之前,我们需要获取一下锁.这个锁在调用了tick.wait之后就释放掉.这样其他的协程也可以用了.但是当我们得到一个notification, 会重新获取锁.所以我们在收到notification之后要release一下.

done, pending = await asyncio.wait( [recv_task, tick_task],
return_when=asyncio.FIRST_COMPLETED)“`
本条会阻塞住直到有二个职分达成,那一年会再次来到七个列表,完结的和仍旧在运营的.假若task
is done,大家再安装为None,那样下3个循环里会再一次创制.

import asyncio
from aiohttp import web

async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)



tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

async def game_loop():
    while 1:
        await tick.acquire()
        tick.notify_all()
        tick.release()
        await asyncio.sleep(1)

asyncio.ensure_future(game_loop())

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

(那个首尽管asyncio.Condition的用法)

陆.多进度间的通讯和数据共享

先是大家都早就领会进度之间是独自的,不可能互通,并且数据交互独立,而在实际支付中,一定会蒙受需求进度间通讯的景色供给,那么大家怎么搞呢

有三种艺术:

  • pipe
  • queue

1)使用queue通信

在八线程那里已经学过queue了,创设queue的办法,q =
queue.Queue(),那种创造是创办的线程queue,并不是经过queue。创立进度queue的主意是:

亚洲必赢官网 7

 

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age): #这里必须要把q对象作为参数传入才能实现进程之间通信
    q.put({'name':name,'age':age})

if __name__ == '__main__':
    q = multiprocessing.Queue() #创建进程queue对象
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get()) #获取queue信息
    print(q.get()) 
    print(q.get())
    for i in ml:
        i.join()

  

运营结果:

亚洲必赢官网 8

 

好的,已经经过queue达成通讯,那么细心的爱人大概会想,此时的queue到底是同一个吧依然copy的啊?起头测试,码如下:

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(q,name,age):
    q.put({'name':name,'age':age})
    print('id:',id(q))
if __name__ == '__main__':
    q = multiprocessing.Queue()
    ml = []
    print('id:',id(q))
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(q,'yang',21))
        p.start()
        ml.append(p)
    print(q.get())
    print(q.get())
    print(q.get())
    for i in ml:
        i.join()

  

在Windows平台运营结果:

亚洲必赢官网 9

 

Linux的ubuntu下是那般的:

亚洲必赢官网 10

 

那就倒霉怎么说了,小编个人的驾驭,线程和经过那类与计算机硬件(CPU,RAM)等有关系的都有不分明因素,姑且感觉在Windows平台里queue是copy的,在Linux里是同2个啊,并且据经验职员表示,在macbook上也是同四个。

 

还有个难题, 假使使用的queue是线程式的呢?

代码其余都没变,只改了此地:

亚洲必赢官网 11

 

结果:

亚洲必赢官网 12

 

尽管报错了,可是却有1个关键点,指示的是不能够pickle线程锁对象,也正是说刚才大家运用的queue是进度对象,所以能够pickle,注意了,那里正是关键点,使用了pickle,那么约等于说,在Windows平台里是copy的,倘使不是copy,就不供给存在pickle对啊?直接拿来用正是啊,干嘛要pickle之后取的时候再反pickle呢对吗?

 

再看Linux下吧,由于Linux暗中同意是python2,所以模块包名稍微有点分裂

亚洲必赢官网 13

结果阻塞住了,然而后边的要么出来了,看到的id果然照旧一样的。

 

那里就有叁点需求专注:(个人精通,如有误望指正)

一.进度里的确无法使用线程式queue

2.Windows平台的进度式queue是copy的

三.Linux平台的线程式和进程式都以同2个,不过1旦在经过里使用线程式queue会阻塞住

但本身个人以为copy更有安全性

 

2)使用pipe通信

 

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())  #接受数据,不能加参数1024之类的
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    p = multiprocessing.Process(target=func,args=(son_conn,))
    p.start()
    print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
    parent_conn.send('不约')    #发送数据
    p.join()                   #join方法是进程特有

 

  

运营结果:

亚洲必赢官网 14

 

那般就联络上了,相信你发觉了,宗旨和前边的socket大致,然则唯1的区别是recv()方法不可能加参数,不信的话,你加来试试看

回想线程通讯,相信你会认为进度比线程更便宜

 

当然pipe也足以有三个:

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing,time

def func(conn):
    conn.send('约吗?')  #子进程发送数据
    print(conn.recv())
    conn.close()        #子进程关闭连接
if __name__ == '__main__':
    parent_conn,son_conn = multiprocessing.Pipe() #创建pipe对象,父进程,子进程
    ml = []
    for i in range(3):
        p = multiprocessing.Process(target=func,args=(son_conn,))
        p.start()
        ml.append(p)
        print(parent_conn.recv())  #父进程接受数据,不能加参数1024之类的
        parent_conn.send('不约')
    for i in ml:
        i.join()

  

运维结果:

亚洲必赢官网 15

 

七.进度之间数据共享——manager

相比简单,就接纳了经过里的manager对象下的依次数据类型,别的的很轻巧的,我就不注释了

#!usr/bin/env python
#-*- coding:utf-8 -*-

# author:yangva

import multiprocessing

def func(l,d,num):
    l.append(num)
    d[num] = num

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        l = manager.list()
        d = manager.dict()
        ml = []
        for i in range(6):
            p = multiprocessing.Process(target=func,args=(l,d,i))
            p.start()
            ml.append(p)
        for i in ml:
            i.join()
        print('d:',d)
        print('l:',l)

  

运行结果:

亚洲必赢官网 16

 

那般是或不是就兑现了数据共享了?

 

好的,进度也剖析完了

 

叁、进程访问`multiprocessing库中的Queue模块`

from multiprocessing import Process,Queue


def f(q):
    q.put([66, None, 'hello word'])

if __name__ == '__main__':
    q = Queue()   # 把这个q传给了子线程
    p = Process(target=f, args=(q,))   # 子线程访问父线程的q
    p.start()
    print(q.get())
    p.join()

#执行结果
[66, None, 'hello word']

父进程约等于克隆叁个Q,把本身的Q克隆了一份交给子进程,子进度今年往Q里面放了1份数据,然后父进程又能实际的取获得。可是你克隆了1份是或不是就和父进度未有提到了,为啥仍是能够联络在一同呢?可是实际:等于那四个Q里面包车型地铁数码又把它体系化了,系列化到三个中间的地方,类似于翻译,然后反种类化给那几个父进度那边来了,其实那五个Q正是通过pickle来连串化的,不是多少个真正的Q。

总括:七个线程之间能够修改一个数目,不加锁,可能就会出错。未来进度中的Queue,是贯彻了数额的传递,不是在改动同1份数据,只是完成贰个经过的数目传给了别的2个历程。

亚洲必赢官网 17

三.5 和线程一同行使

Example 3.5 source
code

本条例子,大家把asyncio的loop放到其余多个单身线程中.上面也说过了,因为python的GIL的设计,不或者同时运营四个code.所以使用四线程来拍卖总括瓶颈的难点,并不是3个好主意.然后还有其它二个行使线程原因就是:
假使部分函数大概库不支持asyncio,那么就会阻塞住主线程的运维.那种境况下唯壹的措施正是位于其余1个线程中.

要专注asyncio本人不是threadsafe的,然而提供了多个函数.call_soon_threadsafe和run_coroutine_threadsafe.
当您运营这几个例子的时候,你晤面到notify的线程id就是主线程的id,那是因为notify协程运维在主线程中,sleep运行在其它二个线程,所以不会阻塞住主线程.

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor
import threading
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    print("Game loop thread id {}".format(threading.get_ident()))
    # a coroutine to run in main thread
    async def notify():
        print("Notify thread id {}".format(threading.get_ident()))
        await tick.acquire()
        tick.notify_all()
        tick.release()

    while 1:
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        # blocking the thread
        sleep(1)
        # make sure the task has finished
        task.result()

print("Main thread id {}".format(threading.get_ident()))

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

 

 

叁.六 多进度和扩充scaling up

1个线程的server能够干活了,可是那个server只有1个cpu可用.
为了扩展,大家必要周转多少个进程,种种进程包涵自身的eventloop.
所以大家必要经过之间通信的格局.同时在戏耍领域,平常会有多量的盘算(寻路什么的).那几个职责平日不会火速到位(1个tick内).
在协程中开展大气耗费时间的计量未有趣,因为会阻塞住音信循环本人.所以在那种情形下,把大批量的持筹握算交给其余的进度就很有须求了
最简易的法子即是开行多少个单线程的server.然后,能够应用haproxy那样的load
balancer,来把客户端的接连分散到分裂的长河上去.进城之间的通讯有繁多方法.一种是依据互联网连接,也足以扩张到多个server.以往早就有无数兑现了音讯和储存系统的框架(基于asyncio).
比如:

aiomcache
for memcached client
aiozmq
for zeroMQ
aioredis
for Redis storage and pub/sub

还有其余的有的乱柒捌糟,在git上,大部分是aio打头.
动用互连网新闻,能够十二分有效的蕴藏数据,或许沟通消息.可是倘若要拍卖大量实时的数量,而且有大气进度通讯的情状,就卓殊了.在那种处境下,三个更合适的不二等秘书籍是选拔正式的unix
pipe.asyncio has support for pipes and there is a very low-level
example of the server which uses
pipes
inaiohttp repository.
在这么些事例中,大家选取python的高层次的multiprocessing库来触发一个新的历程来张开总结,通过multiprocessing.Queue来拓展进度间通讯.不幸的是,如今的multiprocessing完结并不协理asyncio.所以阻塞的调用就会卡住住event
loop.
那正是利用线程的最好案例.因为我们在其它一个线程运维multiprocessing的代码.看代码

import asyncio
from aiohttp import web

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from multiprocessing import Queue, Process
import os
from time import sleep


async def handle(request):
    index = open("index.html", 'rb')
    content = index.read()
    return web.Response(body=content)


tick = asyncio.Condition()

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    recv_task = None
    tick_task = None
    while 1:
        if not recv_task:
            recv_task = asyncio.ensure_future(ws.receive())
        if not tick_task:
            await tick.acquire()
            tick_task = asyncio.ensure_future(tick.wait())

        done, pending = await asyncio.wait(
            [recv_task,
             tick_task],
            return_when=asyncio.FIRST_COMPLETED)

        if recv_task in done:
            msg = recv_task.result()
            if msg.tp == web.MsgType.text:
                print("Got message %s" % msg.data)
                ws.send_str("Pressed key code: {}".format(msg.data))
            elif msg.tp == web.MsgType.close or\
                 msg.tp == web.MsgType.error:
                break
            recv_task = None

        if tick_task in done:
            ws.send_str("game loop ticks")
            tick.release()
            tick_task = None

    return ws

def game_loop(asyncio_loop):
    # coroutine to run in main thread
    async def notify():
        await tick.acquire()
        tick.notify_all()
        tick.release()

    queue = Queue()

    # function to run in a different process
    def worker():
        while 1:
            print("doing heavy calculation in process {}".format(os.getpid()))
            sleep(1)
            queue.put("calculation result")

    Process(target=worker).start()

    while 1:
        # blocks this thread but not main thread with event loop
        result = queue.get()
        print("getting {} in process {}".format(result, os.getpid()))
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        task.result()

asyncio_loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
asyncio_loop.run_in_executor(executor, game_loop, asyncio_loop)

app = web.Application()

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

worker()在此外多个历程中运营.包括了一部分耗费时间计算,把结果放在queue中.获得结果之后,文告主线程的主eventloop那二个等待的client.那一个事例13分简陋,进度未有适合结束,同时worker或者须要此外贰个queue来输入数据.

Important! If you are going to run anotherasyncio
event loop in a different thread or sub-process created from main
thread/process, you need to create a loop explicitly, using
asyncio.new_event_loop()
, otherwise, it will not work.

4、通过Pipe()达成进度间的数量交互,manger完毕数量共享

地点的事例是通过进度中的Queue,来实行多中国少年共产党享的,其实还有1种办法贯彻数量共享,那正是管道,pipe,以及数额共享manger。

4.1、Pipe()函数

管道函数会回去由管道双方连日来的一组连接对象,该管道暗许是双向的(双向的)。

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # 接收子进程的消息
    p.join()

四.二、接受反复和出殡和埋葬多次

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([66, None, 'hello,word'])  # 发送消息给父进程
    conn.send("QQ")  # 发送消息给父进程
    print(conn.recv())   # 接收父进程的消息
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 管道生成返回两个实例,是双向的,这边把第1个作为父连接,第2个作为子连接。也可以,两者角色调换一下
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())  # 接收两次
    parent_conn.send("微信")   # 发送给子进程
    p.join()

4.3、manger

manger能够达成多少间的共享。

from multiprocessing import Process, Manager
import os


def f(d, l):
    d[os.getpid()] = os.getpid()
    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()   # 声明一个字典,这个字典是用manger声明的,不是用dict()声明的
        # manger.dict()是用专门的语法生产一个可在多进程之间进行传递和共享的一个字典
        l = manager.list(range(5))  # 同样声明一个列表
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

线程修改同一份数据的时候须求加锁,进程修改数据吧:不用加锁,因为那一个manger已经帮您加锁了,它就暗许不允许八个进程同时修改一份数据。八个经过没有章程同时修改1份数据,过程之间是单身的,它自个儿也要加锁,因为它把团结的事物还要copy好几份,跟刚刚的极度Queue同样,copy十个字典最终合成八个字典

 

 

 

协程
协程,又叫微线程,实际上就是单线程,通过python语法,或模块来兑现产出。
真相上正是三个进程2个线程。

进程锁和进程池的施用

亚洲必赢官网 18

1、进程锁

经过multiprocessing中的Lock模块来落到实处进程锁

from multiprocessing import Process,Lock   # 导入进程锁


def f(l, i):
    l.acquire()    # 加锁
    try:
        print("hello word", i)
    finally:
        l.release()   # 释放锁

if __name__ == "__main__":
    lock = Lock()     # 定义锁
    for num in range(10):
        Process(target=f, args=(lock, num,)).start()  # 把锁传入进程中

经过中不是并行独立的吗?为何还要加锁:即使各类进度都以单独运营的,可是难点来了,它们共享1块显示屏。那几个锁存在的意义正是荧屏共享。假如经过壹想着打字与印刷数据,而经过二想也想打印数据的气象,就有不小希望乱套了,然后通过那些锁来支配,去打字与印刷的时候,这一个显示屏唯有自己独占,导致荧屏不会乱。

亚洲必赢官网 19

2、进程池apply和apply_saync

2.1、appley

一路实行,也便是串行施行的

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply(func=foo, args=(i,))   # 同步执行挂起进程
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.2、apply_saync

异步实行,也正是并行实施。

from multiprocessing import Pool  # 导入进程池模块pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印进程号


if __name__ == "__main__":
    pool = Pool(processes=5)   # 设置进程池个数为5,也可以写成pool = Pool(5),允许进程池同时放入5个进程,并且这5个进程交给cpu去运行
    for i in range(10):
        pool.apply_async(func=foo, args=(i,))   # 采用异步方式执行foo函数
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

2.3、异步下回调函数

程序实行完成之后,再回调过来实行这些Bar函数。

from multiprocessing import Process,Pool
import time,os


def foo(i):
    time.sleep(2)
    print("in process", os.getpid())  # 打印子进程的进程号


def bar(arg):
    print('-->exec done:', arg, os.getpid())   # 打印进程号

if __name__ == "__main__":
    pool = Pool(processes=2)
    print("主进程", os.getpid())   # 主进程的进程号
    for i in range(3):
        pool.apply_async(func=foo, args=(i,), callback=bar)   # 执行回调函数callback=Bar
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

#执行结果
主进程 752
end
in process 2348
-->exec done: None 752
in process 8364
-->exec done: None 752
in process 2348
-->exec done: None 752

注:

  1. 回调函数表明fun=Foo干不完就不进行bar函数,等Foo试行完就去实行Bar
  2. 本条回调函数是主进度去调用的,而不是种种子进程去调用的。
  3. 回调函数的用途:

      比如说你从种种机器上备份实现,在回调函数中活动写一个剧本,说备份实现

  4. 回调函数是主进程调用的因由?

      假若是子进度去调用这几个回调函数,有多少个子进度就有多少个延续,假如是主进度的话,只需求2遍长连接就足以了,这么些功效就高了

  

 

上图是用yield完成了三个七个函数逇并发处理。

 1 from greenlet import greenlet#导入这个模块
 2 def foo():#定义一个函数
 3     print('ok1')#打印
 4     gr2.switch()#将程序切换到下面一个函数,按照名字切
 5     print('ok3')#打印
 6     gr2.switch()#将程序切换到下面一个函数,按照名字切
 7 def bar():
 8     print('ok2')#打印
 9     gr1.switch()#切到上面foo函数
10     print('ok4')
11 gr1=greenlet(foo)#实例化这个函数
12 gr2=greenlet(bar)
13 gr1.switch()#在外面写这个就执行了这个函数

由此greenlet模块的switch来贯彻协程的切换,greenlet模块须求手动去pycharm下载

 1 import gevent#导入这个模块
 2 def foo():
 3     print('running in foo')
 4     gevent.sleep(2)#打印之后睡一秒,模拟io操作
 5     print('switch to foo again')
 6 def bar():
 7     print('switch  to bar')
 8     gevent.sleep(1)#打印之后睡一秒,模拟io操作
 9     print('switch to bar again')
10 gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])
11 '''
12 这个程序的运行过程是,先执行foo函数,
13 打印之后遇到了IO操作,然后自动切换到下一个函数执行,
14 打印之后又遇到了IO操作,然后切回foo函数发现IO2秒还没有结束,
15 然后又切到了bar函数发现IO结束,打印,再切回foo函数打印
16 '''

地方代码通过gevent模块来兑现写成的IO时期机动切换完毕产出的主次。
gevent需要从pycharm下载。

亚洲必赢官网 20

 

网站地图xml地图