Python多进程

众所周知,Python号称“单核语言”。不管你开多少个线程,最后都会一核有难无数核围观(x
不过这并不是Python语言本身的问题,归根到底,还是CPython的实现中GIL(全局解释器锁)的存在,致使所有的线程都在竞争这个锁,因此同一时刻只会有一个线程在执行Python代码。
那么既然多线程解决不了问题,多进程或许也是一个很好的选择。而且这个架构已经广泛运用到Apache Httpd、Nginx等网站服务器里面了,技术上非常成熟。
今天我刚好在做某个项目的时候也面临了这个问题,因此我们来好好研究一下Python的多进程,也顺便研究一下消息队列 + Worker的结构。


多进程的相关注意事项

首先,Python已经给我们提供了一个多进程库: multiprocessing. 这个库的资料相当多,至于如何使用,这里不再细说。
在这里重点要说的是多进程的一些思路问题。

既然是多进程,就必须要考虑一个问题了:进程和进程之间的隔离性。
这里的意思是,一旦进程被创建(或者fork)之后,两个进程就将会独立执行。这不像线程那样可以用某个变量来传递信息,而是,其中的变量不管发生了怎样的修改,如果不是主动想办法通知,那么另一个进程中的这个变量是绝对不会发生变化的。
换句话说,大概就是一分为二,至于之后发生了什么,两者再无关联了。

多进程的另外一个问题是进程启动所消耗的资源。进程不能像线程那样频繁的启动、结束,否则将带来大量的资源开销,且非常致命。一般而言,多进程程序更适合创建很多常驻进程,例如一个主程序对应10个工作进程(Worker)。

此外还有一个问题是Windows平台下进程不支持fork(),这给Python的多进程创建流程带来了一些问题(例如不支持在Bootstrap阶段就开进程等)。因为这些原因,Python的代码需要多加一行multiprocessing.freeze_support(),否则freeze module将无法正常使用。后文中出现的代码中将会更详细的写清楚具体要如何操作。


消息队列 + Worker结构

之前说到了进程的隔离性,以及进程启动过程和销毁过程中消耗系统资源的问题。
而这个结构顾名思义,一个主进程附带很多个Worker进程,它们之间使用一个消息队列(基于管道的底层实现)互相通讯,传递事件,进而互相协作做出响应。
这种架构比起每次请求创建一个线程来响应,可以看到主进程只负责IO操作、进程管理、消息分发等,而真正的计算任务全部在Worker进程上,从而避免了CPU密集的计算出现在同一个GIL解释器上。这种架构我个人更喜欢叫它“Event Based”,而基于线程创建的则叫它“Thread Based”。
毫无疑问,这样的结构吞吐量和性能上要更大,因此也广泛用于商业服务项目当中(如Nginx等)。


代码实现

好了,废话不多说了,这里直接上代码说话吧。
为了便于使用,我封装了两个class,一个是主进程,一个是Worker进程。分别如下:
BaseProcessManager.py

#coding: utf-8

import time
import threading
import multiprocessing

# 下面的代码用于临时编写简单的log函数, 实现调试输出信息
def log(level, msg):
    print('[%s][%s] %s' % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), level, msg))

class BaseProcessManager():
    def __init__(self, processClass, workerNum):
        self.processClass = processClass
        self.processPool = []
        self.messageQueue = multiprocessing.Queue()
        self.replyQueue = multiprocessing.Queue()
        self.workerNum = workerNum
        self.stillRunning = False
    
    def spawn(self):
        for i in range(0, self.workerNum):
            try:
                log('INFO', 'Spawning process %d' % (i, ))
                process = self.processClass(messageQueue = self.messageQueue, replyQueue = self.replyQueue)
                process.start()
                self.processPool.append(process)
            except Exception as e:
                log('ERROR', 'Error while spawning process %d: %s' % (i, str(e)))

    def crashCheck(self):
        for i in range(0, len(self.processPool)):
            process = self.processPool[i]
            if self.stillRunning == True and process.is_alive() == False:
                log('WARNING', 'Detected process %d crash, restarting...' % (i, ))
                try:
                    process = self.processClass(messageQueue = self.messageQueue, replyQueue = self.replyQueue)
                    process.start()
                    self.processPool[i] = process
                except Exception as e:
                    log('ERROR', 'Error while restarting process %d: %s' % (i, str(e)))
    
    def end(self):
        log('INFO', 'Sending end process signal')
        self.stillRunning = False
        for i in range(0, len(self.processPool)):
            self.messageQueue.put(['end', ])
        # 防止callbackloop因为阻塞而死锁
        self.replyQueue.put(['end', ])
    
    def mainloop(self):
        # Override it by youself
        pass
    
    def mainloopTrap(self):
        try:
            self.mainloop()
        except Exception as e:
            log('ERROR', 'Error while executing main loop: %s' % (str(e), ))
        finally:
            self.end()
    
    def do_reply_end(self):
        # 这是为了防止callbackloop死锁的event, 退出时使用, 无实际功能
        pass
    
    def dispatchCallback(self, msg):
        event = msg[0]
        args = msg[1:]
        try:
            handler = getattr(self, "do_reply_%s" % (event, ), None)
            if not handler:
                raise NotImplementedError("Reply event [%s] handle not found" % event)
            handler(*args)
        except Exception as e:
            log('ERROR', 'Error while handling reply event [%s]: %s' % (event, str(e)))
        
    def callbackloop(self):
        while self.stillRunning:
            msg = self.replyQueue.get()
            self.dispatchCallback(msg)
    
    def crashChecker(self):
        while self.stillRunning:
            time.sleep(10)
            self.crashCheck()
    
    def start(self):
        self.stillRunning = True
        self.spawn()
        threading.Thread(target=self.crashChecker).start()
        threading.Thread(target=self.callbackloop).start()
        threading.Thread(target=self.mainloopTrap).start()

BaseProcessWorker.py

#coding: utf-8

import time
import multiprocessing

# 下面的代码用于临时编写简单的log函数, 实现调试输出信息
def log(level, msg):
    print('[%s][%s] %s' % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), level, msg))

class BaseProcessWorker(multiprocessing.Process):
    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self)
        self.args = args
        self.kwargs = kwargs
        self.messageQueue = self.kwargs['messageQueue']
        self.replyQueue = self.kwargs['replyQueue']
        self.stillRunning = False
    
    def sendBack(self, sendBackMsg):
        self.replyQueue.put(sendBackMsg)

    def do_end(self):
        self.stillRunning = False
    
    def dispatch(self, msg):
        event = msg[0]
        args = msg[1:]
        try:
            handler = getattr(self, "do_%s" % (event, ), None)
            if not handler:
                raise NotImplementedError("Event [%s] handle not found" % event)
            handler(*args)
        except Exception as e:
            log('ERROR', 'Error while handling event [%s]: %s' % (event, str(e)))
        
    def run(self):
        self.stillRunning = True
        while self.stillRunning:
            msg = self.messageQueue.get()
            self.dispatch(msg)

而使用方法也很简单, 看如下的例子:
main.py

#coding: utf-8

import multiprocessing
from BaseProcessManager import BaseProcessManager
from BaseProcessWorker import BaseProcessWorker

# 下面的代码用于临时编写简单的log函数, 实现调试输出信息
import time
def log(level, msg):
    print('[%s][%s] %s' % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), level, msg))

class ProcessManager(BaseProcessManager):
    def mainloop(self):
        self.number = 0
        log('INFO', 'Start 50000 event')
        for i in range(0, 50000):
            self.messageQueue.put(('test', i))
        log('INFO', 'Event sent')
        log('INFO', 'Got number: %d' % (self.number, ))
    
    def do_reply_test(self, i):
        self.number += i

class ProcessWorker(BaseProcessWorker):
    def do_test(self, i):
        self.replyQueue.put(('test', 1))

if __name__ == '__main__':
    multiprocessing.freeze_support()
    # 上面两行在windows上必须要有, 原因是windows平台不支持fork, 在py的bootstrap阶段无法创建新进程, 具体细节不多说
    pm = ProcessManager(ProcessWorker, 12)
    pm.start()

执行效果

在12核CPU上创建12个线程,执行效果如图。


其实,可以看到插入50000个请求到队列花费了整整3秒的时间,这并不是一个非常好的性能。至于原因,我看到Python的进程之间是使用pickle来序列化和反序列化数据,之后使用管道来互相通讯的。因此我猜可能更多的性能损耗是损耗在pickle上了。
但是这并不代表我们的程序架构有问题,在更多的场合往往是CPU密集远超过IO密集。在这种情况下,我们的这个架构依然还是可以保持良好的吞吐率和响应速度的。
当然,pickle那块位置是否可以优化呢? 或许可以,以后有兴趣再继续研究吧~