Python高级  
一、内存管理 1 内部结构 1.1 环状双向链表 
环状双向链表(refchain)。在python程序中创建的任何对象都会放在refchain链表中。
static PyObject refchain = {&refchain, &refchain, ...}
假设我name = "Bob",内部的结构体会存上:上一对象、下一对象、类型、引用个数等内容。如果此时我再new = name,那么引用个数则会增加1个。
对于所有对象来说,内部结构体都会存上上一对象、下一对象、类型、引用个数,而不同的数据类型还会存储一切额外内容。比如创建int还会将int值存入结构体,创建list还会将列表,列表长度存入结构体。
综上:每个对象都有PyObject结构体,由多个元素组成的对象则是:PyObject + ob_size
 
#define  Pyobject_HEAD Pyobject ob_base; #define  Pyobject_VAR_HEAD PyVarobject ob_base; #define  _Pyobject_HEAD_EXTRA  \     struct _object *_ob_next;  \     struct _object *_ob_prev; typedef  struct  _object  {    _PyObject_HEAD_EXTRA       Py_ssize_t ob_refcnt;       struct  _typeobject  *ob_type ;    } PyObject; typedef  struct  {    PyObject ob_base;       Py_ssize_t ob_size;   } PyVarObject; 
 
1.2 具体类型结构体 
假设设置了data=3.14,那么内部会创建:
_ob_prev = refchain 中上一个对象 
_ob_next = refchain 中下一个对象 
ob_refcnt = 1 
ob_type = float 
ob_fval = 3.14 
 
 
typedef  struct  {    PyObject_HEAD     double  ob_fval;  } PyFloatObject; struct  _longobject  {    PyObject_HEAD     digit ob_digit[1 ]; } typedef  struct  _longobject  PyLongObject ;  typedef  struct  {    PyObject_VAR_HEAD     PyObject **ob_item;     Py_ssize_t allocated; } PyListObject; typedef  struct  {    PyObject_VAR_HEAD     PyObject **ob_item[1 ]; } PyTupleObject; typedef  struct  {    PyObject_HEAD      Py_ssize_t ma_used;     PyDictKeysObject *ma_keys;     PyObject **ma_values; } PyDictObject; 
 
1.3 引用计数器 
当python程序运行时,会根据数据类型的不同找到其对应的结构体,根据结构体中的字段来进行创建相关的数据。然后将对象添加到refchain双向链表中。
在C源码中有两个关键的结构体:PyObject、PyVarObject
每个对象中的ob_refcnt就是引用计数器,值默认为1,当有其他变量引用对象时,引用计数器就会发生变化。
当一个对象的引用计数器为0时,意味着没有人再使用这个对象了,这个对象就是垃圾,此时进行垃圾回收。
垃圾回收简单理解步骤(实际不止,可以看后续):
对象从refchain链表移除。 
将对象销毁,内存归还。 
 
 
""" 注意,下列a、b对应的对象实际上是同一个 """ a = 99999  b = a del  bdel  a
 
1.4 循环引用问题 
只使用引用计数器来管理内存看似完美,实际上会存在循环引用问题。
 
v1 = [11 , 22 , 33 ]   v2 = [44 , 55 , 66 ]   v1.append(v2) v2.append(v1) del  v1del  v2""" 此时v1,v2对象引用计数器还有1,那么这两个列表此时就会常驻在内存中 """ 
 
2 标记清除 
目的:为解决引用计数器循环引用的不足。
实现:在python的底层再维护一个链表,链表中专门放那些可能存在循环引用的对象(list / tuple / dict / set)。
在Python内部某种情况下触发,会去扫描可能存在循环应用的链表中的每个元素,检查是否有循环引用,如果有则让双方的引用计数器 -1。如果是0则垃圾回收。
 
3 分代回收 
标记清除问题:
什么时候扫描? 
可能存在循环引用的链表扫描代价大,每次扫描耗时久。 
 
于是python引入了分代回收机制,在python内存管理系统将内存分为不同的世代,新创建的对象首先放在第0代。经过多次垃圾回收周期,如果对象依然存活,则会被提升至老一代,以此类推。
标记清除维护的列表就是指的是分代回收的三个列表,它包含了可能存在循环引用的对象,通过定期扫描这个列表来处理可能的循环引用问题。
将可能存在循环应用的对象维护成3个链表:
0代:0代中对象个数达到700个则扫描一次。 
1代:0代扫描10次,则1代扫描一次。 
2代:1代扫描10次,则2代扫描一次。 
 
 
4 缓存机制 4.1 总结 
在python中维护了一个refchain的双向环状链表,这个链表中存储程序创建的所有对象,每种类型的对象中都有一个ob_refcnt引用计数器的值,引用个数+1、-1,最后当引用计数器变为0时会进行垃圾回收(对象销毁、refchain中移除)。
但是,在python中对于那些可以有多个元素组成的对象可能会存在循环引用的问题,为了解决这个问题python又引入了标记清除和分代回收,在其内部为了4个链表。
在源码内部当达到各自的阈值时,就会触发扫描链表进行标记清除的动作(有循环则各自-1)。
 
4.2 池 
作用于int / str类型。
为了避免重复创建和销毁一些常见对象,于是会维护一个池。
在启动解释器时,对于int类型,python内部会帮我们创建:[-5, 256]。对于str类型,会维护一个unicode_latin1[256]的链表,内部存所有的ascii字符,之后使用就不会再重复创建。
字符串驻留机制:Python会对长度为0到20个字符的由英文字符,数字,下划线构成的字符串进行驻留,下次再创建时,不会新开辟内存,通过id()可以发现值相等。
 
v1 = 7    v2 = 9    v3 = 9    print (id (v2), id (v3))
 
4.3 free_list 
作用于:float / list / tuple / dict
当一个对象的引用计数器为0时,按理说应该回收,但是内部实际上不会直接回收,而是将对象添加到free_list链表中当缓存。以后再去创建对象时,不会重新开辟内存,而是直接使用free_list。
free_list是由上限的,当free_list没满时引用计数器为0的对象才会加到free_list中,满了的话则会直接销毁对象。
 
v1 = 3.14  del  v1v9 = 999.99  
 
5 源码剖析 5.1 float 类型 创建 
val = 3.14
int类型和float很相似,只是int类型还会先去小数据池里找,没有才会创建。
 
static  PyFloatObject *free_list = NULL ;static  int  numfree = 0 ;PyObject *PyFloat_FromDouble (double  fval)  {          PyFloatobject *op = free_list;     if  (op != NULL ) {         free_list = (PyFloatobject *) Py_TYPE(op);         numfree--;     } else  {                  op = (PyFloatobject *) PyObject_MALLOC(sizeof (PyFloatobject));         if  (!op)             return  PyErr_NoMemory();     }               (void )PyObject_INIT(op, &PyFloat_Type);          op->ob_fval = fval;     return  (PyObject *)op; } 
 
#define  PyObject_INIT(op, typeobj) (Py_TYPE(op) = (typeobj), _Py_NewReference((PyObject *)(op)), (op)) 
 
static  PyObejct refchain = {&refchain, &refchain};void  _Py_AddToAllObjects(PyObject *op, int  force){     if (force || op->ob_prev == NULL ){         op->_ob_next = refchain._ob_next;         op->_ob_prev = &refchain;         refchain._ob_next->_ob_prev = op;         refchain._ob_next = op;     } } void  _Py_NewReference(PyObject *op){     _Py_INC_REFTOTAL;               op->ob_refcnt = 1 ;               _Py_AddToAllObjects(op, 1 );          _Py_INC_TPALLOCS(op); } 
 
引用 
val = 3.14
data = val
项目中这样的引用关系会使原对象的引用计数器+1。
 
static  inline  void  _Py_INCREF(PyObject *op){     _Py_INC_REFTOTAL;          op->ob_refcnt++; } #define  Py_INCREF(op) _Py_INCREF(_PyObject_CAST(op)) 
 
销毁 
val = 3.14
del 3.14
 
static  inline  void  _Py_DECREF(const  char  *filename, int  lineno, PyObject *op){     (void )filename;      (void )lineno;      _Py_DEC_REFTOTAL;          if  (--op->ob_refcnt != 0 ) {         #ifdef  Py_REF_DEBUG          if  (op->ob_refcnt < 0 ) {             _Py_NegativeRefcount(filename, lineno, op);         }         #endif       } else  {         _Py_Dealloc(op);     } } #define  Py_DECREF(op) _Py_DECREF(__FILE__, __LINE__, _PyObject_CAST(op)) 
 
void  _Py_Dealloc(PyObject *op){          destructor dealloc = Py_TYPE(op)->tp_dealloc;          _Py_ForgetReference(op);          (*dealloc)(op); } void  _Py_ForgetReference(PyObject *op){          op->_ob_next->_ob_prev = op->_ob_prev;     op->_ob_prev->_ob_next = op->_ob_next;     op->_ob_next = op->_ob_prev = NULL ;     _Py_INC_TPFREES(op); } 
 
#define  PyFloat_MAXFREELIST 100 static  int  numfree = 0 ;static  PyFloatObject *free_list = NULL ;PyTypeObject PyFloat_Type = {     PyVarObject_HEAD_INIT(&PyType_Type, 0 )     "float" ,     sizeof (PyFloatObject),     0 ,          (destructor) float_dealloc,           0 ,                                }; static  void  float_dealloc (PyFloatObject *op) {          if  (PyFloat_CheckExact(op))     {                  if  (numfree >= PyFloat_MAXFREELIST)         {             PyObject_FREE(op);             return ;         }                  numfree++;         Py_TYPE(op) = (struct  _typeobject *) free_list;         free_list = op;     }     else      {                  Py_TYPE(op)->tp_free((PyObject *) op);     } } 
 
5.2 list 类型 创建 
v =[11, 22, 33]
 
 
二、多线程 1 进程和线程 1.1 定义 
线程:是计算机中可以被cpu调度的最小单元(真正在工作)。
进程:是计算机资源分配的最小单元(进程为线程提供资源)。
一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源。
通过进程和线程都可以将串行的程序变成并发。
Python的多线程主要是在用户态中创建线程,在内核态中也会创建线程,并且多个用户线程可以映射到多个内核线程上,形成多对多的映射关系。
 
1.2 多线程 
在同一个进程中开启多个线程执行任务。
子线程开始后,主线程会一直向下走,不会停止。
 
import  timeimport  requestsimport  threadingurl_list = {     ("1.mp4" , "https://www.xxx.com/?id=1" ),     ("2.mp4" , "https://www.xxx.com/?id=2" ),     ("3.mp4" , "https://www.xxx.com/?id=3" ), } def  task (file_name, video_url ):    res = requests.get(video_url)     with  open (file_name, mode='wb' ) as  f:         f.write(res.content)     print (time.time())      for  name, url in  url_list:         t = threading.Thread(target=task, args=(name, url))          t.start() 
 
1.3 多进程 
开启不同的进程执行任务(不同的进程中会自动创建一个线程)。
注:Linux系统使用fork创建进程,Windows使用spawn,Mac支持fork和spawn。在python3.8版本后默认使用spawn创建线程,所以程序会要求创建线程代码必须在if __name__ == '__main__':下,否则会异常。
mac中可以修改创建线程使用的方式来避免改变创建方式。
 
import  timeimport  requestsimport  multiprocessingurl_list = {     ("1.mp4" , "https://www.xxx.com/?id=1" ),     ("2.mp4" , "https://www.xxx.com/?id=2" ),     ("3.mp4" , "https://www.xxx.com/?id=3" ), } def  task (file_name, video_url ):    res = requests.get(video_url)     with  open (file_name, mode='wb' ) as  f:         f.write(res.content)     print (time.time()) if  __name__ == '__main__' :    for  name, url in  url_list:         t = multiprocessing.Process(target=task, args=(name, url))         t.start() 
 
1.4 GIL锁 
GIL,全局解释器锁(Global Interpreter Lock),是CPython解释器特有的东西,让一个进程中同一时刻只有一个线程可以被CPU调度。
如果程序想利用计算机的多核优势,让CPU同时处理一些任务,适合用多进程开发(即使资源开销大)
如果程序不需要计算机的多核优势,适合用多线程开发,比如网络下载文件,多用网卡而非CPU。
常见的程序开发中,计算操作需要使用CPU多核优势,IO操作不需要利用CPU的多核优势:
计算密集型,用多进程,例如∶大量的数据计算【累加计算示例】。 
IO密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】。 
 
 
import  timeimport  multiprocessingdef  task (start, end, queue ):    res = 0      for  i in  range (start, end):         res += 1      queue.put(res) if  __name__ == '__main__' :    queue = multiprocessing.Queue()     start_time = time.time()          p1 = multiprocessing.Process(target = task, args=(0 , 50000000 , queue))     p1.start()          p2 = multiprocessing.Process(target = task, args=(50000000 , 100000000 , queue))     p2.start()          v1 = queue.get(block=True )     v2 = queue.get(block=True )          print (v1 + v2)          end_time = time.time()     print (end_time - start_time)      import  timeimport  requestsimport  multiprocessingurl_list = {     ("1.mp4" , "https://www.xxx.com/?id=1" ),     ("2.mp4" , "https://www.xxx.com/?id=2" ),     ("3.mp4" , "https://www.xxx.com/?id=3" ), } def  task (file_name, video_url ):    res = requests.get(video_url)     with  open (file_name, mode='wb' ) as  f:         f.write(res.content)     print (time.time()) if  __name__ == '__main__' :    for  name, url in  url_list:         t = multiprocessing.Process(target=task, args=(name, url))         t.start()           import  multiprocessingimport  threadingdef  thread_task ():    print ("Thread task executed" ) def  task (start, end ):    t1 = threading.Thread(target=thread_task)     t1.start()     t2 = threading.Thread(target=thread_task)     t2.start()     t3 = threading.Thread(target=thread_task)     t3.start() if  __name__ == "__main__" :    p1 = multiprocessing.Process(target=task, args=(0 , 5000000 ))     p1.start()     p2 = multiprocessing.Process(target=task, args=(5000000 , 10000000 ))     p2.start() 
 
2 多线程开发 2.1 常见API 
IO密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】
 
import  threadingt = threading.Thread(target=task, args=('xxx' , )) t.start() t.join() t.daemon = True    t.daemon = False    t.name = "thread_one"  name = threading.current_thread().name 
 
class  MyThread (threading.Thread):         def  run (self ):                  print ("执行此线程" , self._args)          t = MyThread(args=(100 , )) t.start() 
 
2.2 API使用 
在t.join()案例中,最终print的结果是随机的,因为存在CPU调度算法,所以一个线程不一定会完全执行完毕,可能在执行过程中就切换了。所以在开启t1,t2线程后,有可能加了一部分数,t1就切换成了t2,于是join就过去了,t2同理,所以最后的number就不会是0。
 
import  threadingdef  task (arg ):    pass  t.threading.Thread(target=task, args=('xxx' , )) t.start() print ("继续执行..." )
 
import  threadingloop = 1000000  number = 0  def  _add (count ):    global  number     for  i in  range (loop):         number += 1  def  _sub (count ):    global  number     for  i in  range (loop):         number -= 1  t1 = threading.Thread(target=_add, args=(loop, )) t2 = threading.Thread(target=_sub, args=(loop, )) t1.start() t2.start() t1.join()   t2.join()   print (number)
 
import  requestsimport  threadingclass  DownloadThread (threading.Thread):    def  run (self ):         file_name, video_url = self._args         res = requests.get(video_url)         with  open (file_name, mode="wb" ) as  f:             f.write(res.content) url_list = {     ("1.mp4" , "https://www.xxx.com/?id=1" ),     ("2.mp4" , "https://www.xxx.com/?id=2" ),     ("3.mp4" , "https://www.xxx.com/?id=3" ), } for  item in  url_list:    t = DownloadThread(args=(item[0 ], item[1 ]))     t.start() 
 
2.3 线程安全 
因为CPU在执行的过程中,可能会轮流地执行线程。
因此可以给线程加锁,这样就可以保证一个线程被执行完才会执行另一个线程。
不同线程使用的锁必须是同一把锁,不然就没有加锁的意义了。
在开发的过程中要注意有些操作默认都是线程安全的(内部集成了锁的机制),我们在使用的时无需再通过锁再处理,比如L.append(x)等。
官网:official::Python.org ,搜:atomic
根据官网,常见线程安全的操作:
L.append(x) 
L1.extend(L2) 
x = L[i] 
x = L.pop() 
L1[i:j] = L2 
L.sort() 
x = y 
x.field = y 
D[x] = y 
D1.update(D2) 
D.keys() 
 
常见线程不安全的操作:
i = i+1 
L.append(L[-1]) 
L[i] = L[j] 
D[x] = D[x] + 1 
 
 
lock_object = threading.RLock() lock_object.acquire() lock_object.release() 
 
import  threadinglock_object = threading.RLock() loop = 1000000  number = 0  def  _add (count ):         lock_object.acquire()     global  number     for  i in  range (loop):         number += 1           lock_object.release() def  _sub (count ):         with  lock_object:         global  number         for  i in  range (loop):             number -= 1  t1 = threading.Thread(target=_add, args=(loop, )) t2 = threading.Thread(target=_sub, args=(loop, )) t1.start() t2.start() t1.join()   t2.join()   print (number)  
 
import  threadingdata_list = [] def  task ():    print ("开始" )     for  i in  range (100000 ):         data_list.append(i)     print (len (data_list)) for  i in  range (2 ):    t = threading.Thread(target=task)     t.start() 
 
2.4 线程锁 
在python中,锁有Lock和RLock。它们功能几乎一致,但是在嵌套使用上存在差异。Lock是同步锁,RLock是嵌套锁,也就是说Lock是不支持锁嵌套的,但是RLock可以进行锁的嵌套。
 
import  threadingnum = 0  lock_object = threading.Lock() def  task ():    print ("开始" )          lock_object.acquire()     print (threading.current_thread().name)     global  num     for  i in  range (100000 ):         num += 1      lock_object.release()     lock_object.acquire()     print (threading.current_thread().name)     for  i in  range (100000 ):         num += 1      lock_object.release()     print (num) for  i in  range (2 ):    t = threading.Thread(target=task)     t.start()      def  task ():    print ("开始" )          lock_object.acquire()     print (threading.current_thread().name)     lock_object.acquire()     print (threading.current_thread().name)     global  num     for  i in  range (100000 ):         num += 1      lock_object.release()     lock_object.release() 
 
import  threadingnum = 0  lock_object = threading.RLock() def  task ():    print ("开始" )          lock_object.acquire()     lock_object.acquire()     print (threading.current_thread().name)     lock_object.release()     lock_object.release() for  i in  range (3 ):    t = threading.Thread(target=task)     t.start() import  threadinglock = threading.RLock() def  func ():    with  lock:         pass  def  run ():    print ("其他功能" )     func()      print ("其他功能" ) def  process ():    with  lock:         print ("其他功能" )         func()          print ("其他功能" ) 
 
2.5 死锁 
程序因为锁无法向下进行称为死锁。
死锁的情况:
使用Lock两次加锁会导致死锁。因为Lock本身不可以进行嵌套,所以嵌套使用Lock会导致程序卡死。 
使用多把锁时,不同线程互相持有对方需要的锁,互相等待对方释放锁,从而造成死锁 
 
 
import  threadingnum = 0  lock_object = threading.Lock() def  task ():    print ("开始" )          lock_object.acquire()     print (threading.current_thread().name)     lock_object.acquire()     print (threading.current_thread().name)     global  num     for  i in  range (100000 ):         num += 1      lock_object.release()     lock_object.release()      for  i in  range (2 ):    t = threading.Thread(target=task)     t.start() 
 
import  threadingimport  timelock_1 = threading.Lock() lock_2 = threading.Lock() def  task1 ():    lock_1.acquire()     print (1 )     time.sleep(1 )     lock_2.acquire()     print (11 )     lock_2.release()     print (111 )     lock_1.release()     print (1111 ) def  task2 ():    lock_2.acquire()     print (2 )     time.sleep(1 )     lock_1.acquire()     print (22 )     lock_1.release()     print (222 )     lock_2.release()     print (2222 ) t1 = threading.Thread(target=task1) t1.start() t2 = threading.Thread(target=task2) t2.start() 
 
2.6 线程池 
Python3中官方才正式提供线程池。
线程不是开的越多越好,开的多了可能会导致系统的性能更低了。因为线程的上下文切换也需要耗费时间和资源。
不建议:无限制的创建线程。
建议:使用线程池。
在循环中向线程池提交任务时,循环会很快的结束,任务会全部加入到线程池中,但是不是所有任务都会被很快地被执行,而是要看线程池的调度。
案例:
线程池的使用:会立刻输出END,然后再依次执行子线程任务。因为线程池使用时,线程会立刻被创建完成,然后交给线程池进行调度。
 
主线程等待线程池工作完毕:加上pool.shutdown(True)会等待子线程执行完再再向下执行。
 
执行完任务后,再额外干点别的:加上future.add_done_callback(done),会在子线程做完操作后再做额外操作。在线程池中,回调是由子线程做的。 
最后统一获得结果:将回调结果加入列表中,最后统一来执行回调结果。 
 
 
import  threadingdef  task (video_url ):    pass  url_list = ["www.xxxx-{}.com" .format (i) for  i in  range (30000 )] for  url in  url_list:    t = threading.Thread(target=task, args=(url, ))     t.start()      
 
import  timefrom  concurrent.futures import  ThreadPoolExecutorpool = ThreadPoolExecutor(n) future = pool.submit(func, para_1, para_2, ... ) pool.shutdown(True ) future.add_done_callback(func_done) 
 
""" 线程池示例1:线程池的使用 """ import  timefrom  concurrent.futures import  ThreadPoolExecutordef  task (video_url, num ):    print ("开始执行任务" , video_url)     time.sleep(5 ) pool = ThreadPoolExecutor(10 ) url_list = ["www.xxx{}.com" .format (i) for  i in  range (300 )] for  url in  url_list:         pool.submit(task, url, 2 ) print ("END" )""" 线程池案例2:主线程等待线程池工作完毕 """ import  timefrom  concurrent.futures import  ThreadPoolExecutordef  task (video_url, num ):    print ("开始执行任务" , video_url)     time.sleep(0.5 ) pool = ThreadPoolExecutor(10 ) url_list = ["www.xxx{}.com" .format (i) for  i in  range (300 )] for  url in  url_list:         pool.submit(task, url, 2 ) print ("Wait..." )pool.shutdown(True ) print ("Next..." )""" 线程池案例3:执行完任务后,再额外干点别的""" import  threadingimport  timeimport  randomfrom  concurrent.futures import  ThreadPoolExecutordef  task (video_url ):    print ("开始执行任务" , video_url)          print ("执行任务的线程id:" , threading.current_thread().ident)     time.sleep(2 )     return  random.randint(0 , 10 ) def  done (response ):         print ("任务执行后的返回值" , response.result())          print ("回调函数的线程id:" , threading.current_thread().ident) pool = ThreadPoolExecutor(10 ) url_list = ["www.xxx{}.com" .format (i) for  i in  range (100 )] for  url in  url_list:    future = pool.submit(task, url)     future.add_done_callback(done)      """ 线程池案例4:最后统一获得结果 """ import  timeimport  randomfrom  concurrent.futures import  ThreadPoolExecutordef  task (video_url ):    print ("开始执行任务" , video_url)     time.sleep(2 )     return  random.randint(0 , 10 ) pool = ThreadPoolExecutor(10 ) future_list = [] url_list = ["www.xxx{}.com" .format (i) for  i in  range (20 )] for  url in  url_list:    future = pool.submit(task, url)     future_list.append(future) pool.shutdown(True ) for  fu in  future_list:    print (fu.result()) 
 
2.7 单例模式 
多线程单例模式下,因为线程之间的交换,所以可能会导致单例模式创造出来的不是一个单例。
为了解决问题则需要加锁。
 
import  threadingimport  timeclass  Singleton :    instance = None      def  __init__ (self, name ):         self.name = name     def  __new__ (cls, *args, **kwargs ):         if  cls.instance:             return  cls.instance                  time.sleep(0.1 )         cls.instance = object .__new__(cls)         return  cls.instance def  task ():    obj = Singleton('x' )     print (obj) for  i in  range (10 ):    t = threading.Thread(target=task)     t.start() 
 
import  threadingimport  timeclass  Singleton :    instance = None      lock = threading.RLock()     def  __init__ (self, name ):         self.name = name     def  __new__ (cls, *args, **kwargs ):         if  cls.instance:             return  cls.instance                  with  cls.lock:             if  cls.instance:                 return  cls.instance             cls.instance = object .__new__(cls)             return  cls.instance def  task ():    obj = Singleton('x' )     print (obj) for  i in  range (10 ):    t = threading.Thread(target=task)     t.start() 
 
3 多进程开发 3.1 进程定义 
进程是计算机中资源分配的最小单元。一个进程中可以有多个线程,同一个进程中的线程共享资源。
进程与进程之间则是相互隔离。
Python中通过多进程可以利用CPU的多核优势,
多进程适用于计算密集型,例如∶大量的数据计算【累加计算示例】。
创建进程模式:
fork:”拷贝”几乎所有的资源,支持文件对象 / 线程锁传参,用于unix,可以在代码中任意位置开始,操作较快。 
spawn:相当于在内部线创建一个python解释器,然后让解释器执行代码,不支持文件对象 / 线程锁传参,用于unix、win,必须从main代码块开始,操作较慢。 
forkserver:在程序开始前会把多进程部分当做模版加载,然后在启动时会找到模板,拷贝一份执行。不支持文件对象 / 线程锁传参,用于部分unix,必须从main代码块开始。 
 
注:Linux系统使用fork创建进程,Windows使用spawn,Mac支持fork和spawn。在python3.8版本后默认使用spawn创建线程,所以程序会要求创建线程代码必须在if __name__ == '__main__':下,否则会异常。
mac中可以修改创建线程使用的方式来避免改变创建方式。
 
import  timeimport  requestsimport  multiprocessingurl_list = {     ("1.mp4" , "https://www.xxx.com/?id=1" ),     ("2.mp4" , "https://www.xxx.com/?id=2" ),     ("3.mp4" , "https://www.xxx.com/?id=3" ), } def  task (file_name, video_url ):    res = requests.get(video_url)     with  open (file_name, mode='wb' ) as  f:         f.write(res.content)     print (time.time()) if  __name__ == '__main__' :    for  name, url in  url_list:         t = multiprocessing.Process(target=task, args=(name, url))         t.start() 
 
""" fork案例(仅限unix) """ import  multiprocessingimport  timedef  task ():         print (name)     name.append(123 )      if  __name__ == '__main__' :    multiprocessing.set_start_method("fork" )     name = []          p1 = multiprocessing.Process(target=task)     p1.start()          time.sleep(2 )          print (name)      def  task ():         print (name)      if  __name__ == '__main__' :    multiprocessing.set_start_method("fork" )     name = []     name.append(123 )          p1 = multiprocessing.Process(target=task)     p1.start()      def  task ():         print (name)      if  __name__ == '__main__' :    multiprocessing.set_start_method("fork" )     name = []          p1 = multiprocessing.Process(target=task)     p1.start()          name.append(123 ) 
 
""" spawn案例 """ import  multiprocessingimport  timedef  task ():         print (name) if  __name__ == '__main__' :    multiprocessing.set_start_method("spawn" )     name = []             p1 = multiprocessing.Process(target=task)     p1.start()      def  task (data ):         print (data)     data.append(999 )      if  __name__ == '__main__' :    multiprocessing.set_start_method("spawn" )     name = []             p1 = multiprocessing.Process(target=task, args=(name,))     p1.start()          time.sleep(2 )          print (name) 
 
""" fork和spwan传递资源的区别 """ import  multiprocessingimport  timedef  task (fb ):         print (fb, lock) if  __name__ == '__main__' :    multiprocessing.set_start_method("fork" )          name = []     file_object = open ("1.txt" , mode="a+" , encoding="utf-8" )     lock = threading.RLock()          p1 = multiprocessing.Process(target=task, args=(file_object,))     p1.start()      def  task (fb, lk ):         print (fb, lk)                     if  __name__ == '__main__' :    multiprocessing.set_start_method("fork" )          name = []     file_object = open ("1.txt" , mode="a+" , encoding="utf-8" )     lock = threading.RLock()          p1 = multiprocessing.Process(target=task, args=(file_object, lock, ))     p1.start() 
 
3.2 进程案例 
案例1:
操作:主进程和子进程都进行文件操作。 
结果:武沛齐 alex 武沛齐 
原因:子进程拷贝主进程,所以一开时子进程有武沛齐,然后又写入了alex,之后flush()一下,此时文件中就有了武沛齐和alex。又因为主进程再等着子进程,主进程的武沛齐一开始没有写入到文件中,当子进程结束后,主进程结束,此时武沛齐才写入到文件中,所以会出现上述结果。前两个单词是子进程写的,后一个单词是主进程写的。 
 
案例2:
操作:主进程在案例1的基础上先flush()一下。 
结果:武沛齐 alex 
原因:主进程写入后因为直接进行了flush(),所以直接就先把武沛齐写到文件中了,此时传给子进程的缓存内容是空,所以子进程只会再额外写入alex。此时前一个单词是主进程写的,后一个单词是子进程写的。 
 
案例3:
操作:主进程加锁后传递给子进程。 
结果:<locked ... >666 
原因:主进程加锁后传递给子进程,子进程拿到锁后,一样是锁的状态,只不过在子进程的锁的对象是子进程中的主线程,而主进程的锁的对象是主进程中的主线程。因为此时锁是RLock(),所以可以进行嵌套锁,所以666会被输出在屏幕上。 
 
案例4:
操作:子进程中创建子线程。 
结果:先输出十个”来了”,等2秒后依次输出666。 
原因:由案例3可知,因为一开始锁就是锁的状态,然后子进程中锁的作用对象是主线程,所以一开始子进程中的子线程全都会卡主,当子进程的主线程的锁释放后,子进程的子线程才会依次开始执行。 
 
 
import  multiprocessingdef  task ():    print (name)     file_object.write("alex\n" )     file_object.flush() if  __name__ == "__main__" :    multiprocessing.set_start_method("fork" )          name = []     file_object = open ('x1.txt' , mode='a+' , encoding='utf-8' )     file_object.write("武沛齐\n" )          p1 = multiprocessing.Process(target=task)     p1.start()        import  multiprocessingdef  task ():    print (name)     file_object.write("alex\n" )     file_object.flush() if  __name__ == "__main__" :    multiprocessing.set_start_method("fork" )          name = []     file_object = open ('x1.txt' , mode='a+' , encoding='utf-8' )     file_object.write("武沛齐\n" )          file_object.flush()          p1 = multiprocessing.Process(target=task)     p1.start()      import  multiprocessingimport  threadingdef  task ():    print (lock)     lock.acquire()     print (666 )      if  __name__ == "__main__" :    multiprocessing.set_start_method("fork" )     name = []     lock = threading.RLock()     lock.acquire()          p1 = multiprocessing.Process(target=task)     p1.start()        def  func ():    print ("来了" ) 	with  lock:         print (666 )      def  task ():    for  i in  range (10 ):         t = threading.Thread(target=func)         t.start()     time.sleep(2 )     lock.release() 
 
3.3 进程API import  multiprocessingmultiprocessing.set_start_method("fork" ) p = multiprocessing.Process(target=task, args=("xxx" , )) p.start() p.join() p.daemon = True  p.daemon = False  p.name = "进程1"  multiprocessing.current_process().name multiprocessing.cpu_count() """ 补充知识 """ import  osos.getpid() os.getppid() import  threadinglst = threading.enumerate () 
 
import  multiprocessingclass  MyProcess (multiprocessing.Process):    def  run (self ):                  print ("执行次线程" , self._args)          if  __name__ == "__main__" :    multiprocessing.set_start_method("spawn" )     p = MyProcess(args=("xxx" , ))     p.start()     print ("继续执行..." ) 
 
3.4 数据共享 
默认情况下进程之间的资源是独立的,不进行共享。
如果要让他们之间进行共享,需要借助特殊方式实现。
特殊方式:
使用Value和Array。通过这两者创建的数据可以进行共享。但是因为是比较底层,所以使用较少。 
使用Manager()。通过Manager()创建的数据类型,可以进行共享。因为可以用python语法,使用较舒服。 
使用multiprocessing.Queue()。就是一个队列,只不过这个队列可以实现资源共享,且不会数据混乱。使用较多。 
使用multiprocessing.Pipe()。双端队列,可以双向通信,资源共享,且不会数据混乱。使用较多。 
 
上述都是Python内部提供的进程之间数据共享和交换的机制,作为了解即可,在项目开发中很少使用,后期项目中一般会借助第三方的来做资源的共享,例如:MySQL,Redis等。
 
value参数 
代表的类型 
 
 
c 
ctypes.c_char 
 
b 
ctypes.c_byte 
 
h 
ctypes.c_short 
 
i 
ctypes.c_int 
 
l 
ctypes.c_long 
 
f 
ctypes.c_float 
 
u 
ctypes.c_wchar 
 
B 
ctypes.c_ubyte 
 
H 
ctypes.c_ushort 
 
I 
ctypes.c_uint 
 
L 
ctypes.c_ulong 
 
d 
ctypes.c_double 
 
 
 
""" Value和Array方式 """ from  multiprocessing import  Process, Value, Arraydef  func (n, m1, m2 ):    n.value = 888      m1.value = 'a' .encode('utf-8' )     m2.value = '武'  if  __name__ == '__main__' :         num = Value('i' , 666 )     v1 = Value('c' , b' ' )     v2 = Value('u' , ' ' )     p = Process(target=func, args=(num, v1, v2))     p.start()     p.join()     print (num.value)       print (v1.value)       print (v2.value)          def  f (data_array ):    data_array[0 ] = 666       if  __name__ == '__main__' :         arr = Array('i' , [11 , 22 , 33 , 44 ])          p = Process(target=f, args=(arr,))     p.start()     p.join()          print (arr[:]) 
 
""" Manager()方式 """ from  multiprocessing import  Process, Managerdef  f (d, l ):    d[1 ] = '1'      d['2' ] = 2      d[0.25 ] = None      l.append(666 ) if  __name__ == "__main__" :    with  Manager() as  manager:         d = manager.dict ()         l = manager.list ()         p = Process(target=f, args=(d, l))         p.start()         p.join()         print (d)         print (l) 
 
""" Queue()方式 """ import  multiprocessingdef  task (q ):    for  i in  range (10 ):         q.put(i) if  __name__ == "__main__" :         queue = multiprocessing.Queue()          p = multiprocessing.Process(target=task, args=(queue,))     p.start()          p.join()          print ("主进程开始获取数据:" )     for  _ in  range (10 ):         print (queue.get()) 
 
""" Pipe()方式 """ import  timeimport  multiprocessingdef  task (child_conn ):    time.sleep(1 )     child_conn.send([111 , 22 , 33 , 44 ])       data = child_conn.recv()       print ("子进程接收:" , data)     time.sleep(2 ) if  __name__ == "__main__" :         parent_conn, child_conn = multiprocessing.Pipe()          p = multiprocessing.Process(target=task, args=(child_conn,))     p.start()          info = parent_conn.recv()     print ("主进程接收:" , info)          parent_conn.send(666 )          p.join() 
 
3.5 进程锁 
进程锁:使用类似于线程锁,但是线程锁不能作为参数传递到子进程中,而进程锁是可以传递到子进程中的。
spawn模式下,在主进程的结尾处需要做一些特殊的处理,不然可能会报错:
使用time.sleep(7)等待7秒。 
使用p.join(),主进程等待所有子进程完成后再向下。 
 
 
import  timeimport  multiprocessingdef  task (lock ):    print ("开始" )     lock.acquire()          with  open ('f1.txt' , mode='r' , encoding="utf-8" ) as  f:         current_num = int (f.read())     print ("排队抢票了" )     time.sleep(0.5 )     current_num -= 1           with  open ('f1.txt' , mode='w' , encoding="utf-8" ) as  f:         f.write(str (current_num))     lock.release() if  __name__ == '__main__' :    multiprocessing.set_start_method("spawn" )          lock = multiprocessing.RLock()     for  i in  range (10 ):         p = multiprocessing.Process(target=task, args=(lock,))         p.start()     for  p in  process_list:         p.join() 	     time.sleep(7 )      if  __name__ == '__main__' :    multiprocessing.set_start_method("spawn" )          lock = multiprocessing.RLock()     process_list = []     for  i in  range (10 ):         p = multiprocessing.Process(target=task, args=(lock,))         p.start()         process_list.append(p)      	for  item in  process_list:         item.join() 
 
3.6 进程池 
案例:
线程池的使用:会立刻输出1,然后再依次执行子进程任务。因为进程池使用时,进程会立刻被创建完成,然后交给进程池进行调度。
 
主线程等待线程池工作完毕:加上pool.shutdown(True),会等待子进程执行完再再向下执行。
 
执行完任务后,再额外干点别的:加上fur.add_done_callback(done),会在子进程做完操作后再做额外操作,但是此处和线程池有区别。进程池中的回调都是由主进程进行操作。 
在子进程中使用进程锁:在进程池中加锁,需要用到Manager()中的Lock()和RLock(),不能使用自带的进程锁。因为在multiprocessing模块中,每个进程都有自己独立的内存空间,因此无法直接共享普通的线程锁,而Manager()中的Lock()和RLock()是专门为进程间通信设计的锁,所以应该使用Manager()中的进程锁。 
 
 
import  timefrom  concurrent.futures import  ProcessPoolExecutorpool = ProcessPoolExecutor(n) future = pool.submit(func, para_1, para_2, ... ) pool.shutdown(True ) future.add_done_callback(func_done) def  outer (info, file_name ):    def  done (res, *args, **kwargs ):         info[file_name] = res.result()     return  done fur.add_done_callback(outer(info, file_name)) 
 
""" 线程池示例1:线程池的使用 """ import  timefrom  concurrent.futures import  ProcessPoolExecutor, ThreadPoolExecutordef  task (num ):    print ("执行" , num)     time.sleep(2 ) if  __name__ == '__main__' :    pool = ProcessPoolExecutor(4 )     for  i in  range (10 ):         pool.submit(task, i)     print (1 ) """ 线程池案例2:主线程等待线程池工作完毕 """ import  timefrom  concurrent.futures import  ProcessPoolExecutor, ThreadPoolExecutordef  task (num ):    print ("执行" , num)     time.sleep(2 ) if  __name__ == '__main__' :    pool = ProcessPoolExecutor(4 )     for  i in  range (10 ):         pool.submit(task, i)     pool.shutdown(True )     print (1 )                               """ 线程池案例3:执行完任务后,再额外干点别的""" import  timefrom  concurrent.futures import  ProcessPoolExecutorimport  multiprocessingdef  task (num ):    print ("执行" , num)     time.sleep(2 )     return  num def  done (res ):         print (multiprocessing.current_process())     time.sleep(1 )          print (res.result())     time.sleep(1 ) if  __name__ == '__main__' :    pool = ProcessPoolExecutor(4 )     for  i in  range (50 ):         fur = pool.submit(task, i)         fur.add_done_callback(done)       print (multiprocessing.current_process())     pool.shutdown(True )      """ 线程池案例4:在子进程中使用进程锁 """ import  timeimport  multiprocessingfrom  concurrent.futures import  ProcessPoolExecutordef  task (lock ):    print ("开始" )                              with  lock:                  with  open ('f1.txt' , mode='r' , encoding='utf-8' ) as  f:             current_num = int (f.read())         print ("排队抢票了" )         time.sleep(1 )         current_num -= 1                            with  open ('f1.txt' , mode='w' , encoding='utf-8' ) as  f:             f.write(str (current_num)) if  __name__ == '__main__' :    pool = ProcessPoolExecutor()               manager = multiprocessing.Manager()     lock_object = manager.RLock()     for  _ in  range (10 ):         pool.submit(task, lock_object) 
 
3.7 多进程案例 
要求:统计日志文件中的数据量和ip种数。
数据存在files文件夹下,格式:127.0.0.1 - - [21/Mar/2021] "GET" ......
 
import  osimport  timefrom  concurrent.futures import  ProcessPoolExecutordef  task (file_name ):    ip_set = set ()     total_count = ip_count = 0      file_path = os.path.join("files" , file_name)     with  open (file_path, mode='r' , encoding='utf-8' ) as  file_object:         for  line in  file_object:             if  not  line.strip():                 continue              user_ip = line.split(" - -" , maxsplit=1 )[0 ].split(", " )[0 ]             total_count += 1              if  user_ip in  ip_set:                 continue              ip_count += 1              ip_set.add(user_ip)             time.sleep(1 )     return  {"total" : total_count, 'ip' : ip_count} def  outer (info, file_name ):    def  done (res, *args, **kwargs ):         info[file_name] = res.result()     return  done def  run ():         info = {}     pool = ProcessPoolExecutor(4 )     for  file_name in  os.listdir("files" ):         fur = pool.submit(task, file_name)         fur.add_done_callback(outer(info, file_name))     pool.shutdown(True )     for  k, v in  info.items():         print (k, v) if  __name__ == '__main__' :    run() 
 
4 协程 4.1 定义 
了解为主。
计算机中提供了:线程、进程用于实现并发编程(真实存在)。
协程(Coroutine),是程序员通过代码搞出来的一个东西(非真实存在)。
协程也可以被称为微线程,是一种用户态内的上下文切换技术。
简而言之,其实就是通过一个线程实现代码块相互切换执行(来回跳着执行)。
下列greenlet和yield可以模拟协程,但是一般不这么使用。
 
from  greenlet import  greenletdef  func1 ():    print (1 )       gr2.switch()       print (2 )       gr2.switch()   def  func2 ():    print (3 )       gr1.switch()       print (4 )   gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch()   
 
def  func1 ():    yield  1      yield  from  func2()     yield  2  def  func2 ():    yield  3      yield  4  f1 = func1() for  item in  f1:    print (item) 
 
4.2 asyncio 
协程如何才能更有意义呢:不要让用户手动去切换,而是遇到IO操作时能自动切换。
Python在3.4之后推出了asyncio模块+ Python3.5推出async、async语法,内部基于协程并且遇到IO请求自动化切换。
 
async  def  func ():await  func();asyncio.sleep(n) asyncio.ensure_future(func1()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop = asyncio.get_event_loop() asyncio.wait(tasks) loop.run_until_complete(asyncio.wait(tasks)) 
 
import  asyncioasync  def  func1 ():    print (1 )     await  asyncio.sleep(2 )     print (2 ) async  def  func2 ():    print (3 )     await  asyncio.sleep(2 )     print (4 ) tasks = [     asyncio.ensure_future(func1()),     asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 
 
4.3 aiohttp案例 
利用协程来爬取图片。
需要先安装aiohttp。
实际使用中,一般不在协程中做数据处理,而是只负责获取数据,获取的数据直接放到队列、文件或数据库里。
后续学习:asyncio异步编程,你搞懂了吗? - 知乎 (zhihu.com) 
后续学习:asyncio到底是个啥?【python async await】_哔哩哔哩_bilibili 
 
import  aiohttpimport  asyncioimport  urllibasync  def  download_image (session, url ):    print (f"开始下载: {url} " )     async  with  session.get(url, verify_ssl=False ) as  response:         content = await  response.content.read()         return  content async  def  save_image (content, filename ):    print (f"开始保存: {filename} " )     with  open (filename, mode='wb' ) as  file_object:         file_object.write(content) async  def  fetch_and_save (session, url ):         content = await  download_image(session, url)          parsed_url = urllib.parse.urlparse(url)     path_parts = parsed_url.path.split('/' )     file_name = path_parts[-1 ]          if  '.'  not  in  file_name:         file_name += ".jpg"           await  save_image(content, file_name) async  def  main ():    async  with  aiohttp.ClientSession() as  session:         url_list = [             'https://gitcode.net/qq_44112897/images/-/raw/master/comic/25.jpg' ,             'https://gitcode.net/qq_44112897/images/-/raw/master/comic/26.jpg' ,             'https://gitcode.net/qq_44112897/images/-/raw/master/comic/27.jpg'          ]         tasks = [asyncio.create_task(fetch_and_save(session, url)) for  url in  url_list]         await  asyncio.wait(tasks) if  __name__ == '__main__' :    loop = asyncio.new_event_loop()     asyncio.set_event_loop(loop)     loop.run_until_complete(main())