Python高级
一、内存管理 1 内部结构 1.1 环状双向链表
环状双向链表(refchain
)。在python程序中创建的任何对象都会放在refchain
链表中。
static PyObject refchain = {&refchain, &refchain, ...}
假设我name = "Bob"
,内部的结构体会存上:上一对象、下一对象、类型、引用个数等内容。如果此时我再new = name
,那么引用个数则会增加1个。
对于所有对象来说,内部结构体都会存上上一对象、下一对象、类型、引用个数,而不同的数据类型还会存储一切额外内容。比如创建int
还会将int
值存入结构体,创建list
还会将列表,列表长度存入结构体。
综上:每个对象都有PyObject
结构体,由多个元素组成的对象则是:PyObject
+ ob_size
#define Pyobject_HEAD Pyobject ob_base; #define Pyobject_VAR_HEAD PyVarobject ob_base; #define _Pyobject_HEAD_EXTRA \ struct _object *_ob_next; \ struct _object *_ob_prev; typedef struct _object { _PyObject_HEAD_EXTRA Py_ssize_t ob_refcnt; struct _typeobject *ob_type ; } PyObject; typedef struct { PyObject ob_base; Py_ssize_t ob_size; } PyVarObject;
1.2 具体类型结构体
假设设置了data=3.14
,那么内部会创建:
_ob_prev = refchain 中上一个对象
_ob_next = refchain 中下一个对象
ob_refcnt = 1
ob_type = float
ob_fval = 3.14
typedef struct { PyObject_HEAD double ob_fval; } PyFloatObject; struct _longobject { PyObject_HEAD digit ob_digit[1 ]; } typedef struct _longobject PyLongObject ; typedef struct { PyObject_VAR_HEAD PyObject **ob_item; Py_ssize_t allocated; } PyListObject; typedef struct { PyObject_VAR_HEAD PyObject **ob_item[1 ]; } PyTupleObject; typedef struct { PyObject_HEAD Py_ssize_t ma_used; PyDictKeysObject *ma_keys; PyObject **ma_values; } PyDictObject;
1.3 引用计数器
当python程序运行时,会根据数据类型的不同找到其对应的结构体,根据结构体中的字段来进行创建相关的数据。然后将对象添加到refchain双向链表中。
在C源码中有两个关键的结构体:PyObject
、PyVarObject
每个对象中的ob_refcnt
就是引用计数器,值默认为1,当有其他变量引用对象时,引用计数器就会发生变化。
当一个对象的引用计数器为0时,意味着没有人再使用这个对象了,这个对象就是垃圾,此时进行垃圾回收。
垃圾回收简单理解步骤(实际不止,可以看后续):
对象从refchain
链表移除。
将对象销毁,内存归还。
""" 注意,下列a、b对应的对象实际上是同一个 """ a = 99999 b = a del bdel a
1.4 循环引用问题
只使用引用计数器来管理内存看似完美,实际上会存在循环引用问题。
v1 = [11 , 22 , 33 ] v2 = [44 , 55 , 66 ] v1.append(v2) v2.append(v1) del v1del v2""" 此时v1,v2对象引用计数器还有1,那么这两个列表此时就会常驻在内存中 """
2 标记清除
目的:为解决引用计数器循环引用的不足。
实现:在python的底层再维护一个链表,链表中专门放那些可能存在循环引用的对象(list / tuple / dict / set)
。
在Python内部某种情况下触发,会去扫描可能存在循环应用的链表中的每个元素,检查是否有循环引用,如果有则让双方的引用计数器 -1。如果是0则垃圾回收。
3 分代回收
标记清除问题:
什么时候扫描?
可能存在循环引用的链表扫描代价大,每次扫描耗时久。
于是python
引入了分代回收机制,在python
内存管理系统将内存分为不同的世代,新创建的对象首先放在第0代。经过多次垃圾回收周期,如果对象依然存活,则会被提升至老一代,以此类推。
标记清除维护的列表就是指的是分代回收的三个列表,它包含了可能存在循环引用的对象,通过定期扫描这个列表来处理可能的循环引用问题。
将可能存在循环应用的对象维护成3个链表:
0代:0代中对象个数达到700个则扫描一次。
1代:0代扫描10次,则1代扫描一次。
2代:1代扫描10次,则2代扫描一次。
4 缓存机制 4.1 总结
在python中维护了一个refchain
的双向环状链表,这个链表中存储程序创建的所有对象,每种类型的对象中都有一个ob_refcnt
引用计数器的值,引用个数+1、-1,最后当引用计数器变为0时会进行垃圾回收(对象销毁、refchain
中移除)。
但是,在python中对于那些可以有多个元素组成的对象可能会存在循环引用的问题,为了解决这个问题python又引入了标记清除和分代回收,在其内部为了4个链表。
在源码内部当达到各自的阈值时,就会触发扫描链表进行标记清除的动作(有循环则各自-1)。
4.2 池
作用于int / str
类型。
为了避免重复创建和销毁一些常见对象,于是会维护一个池。
在启动解释器时,对于int
类型,python内部会帮我们创建:[-5, 256]
。对于str
类型,会维护一个unicode_latin1[256]
的链表,内部存所有的ascii
字符,之后使用就不会再重复创建。
字符串驻留机制:Python会对长度为0到20个字符的由英文字符,数字,下划线构成的字符串进行驻留,下次再创建时,不会新开辟内存,通过id()
可以发现值相等。
v1 = 7 v2 = 9 v3 = 9 print (id (v2), id (v3))
4.3 free_list
作用于:float / list / tuple / dict
当一个对象的引用计数器为0时,按理说应该回收,但是内部实际上不会直接回收,而是将对象添加到free_list
链表中当缓存。以后再去创建对象时,不会重新开辟内存,而是直接使用free_list
。
free_list
是由上限的,当free_list
没满时引用计数器为0的对象才会加到free_list
中,满了的话则会直接销毁对象。
v1 = 3.14 del v1v9 = 999.99
5 源码剖析 5.1 float 类型 创建
val = 3.14
int
类型和float
很相似,只是int
类型还会先去小数据池里找,没有才会创建。
static PyFloatObject *free_list = NULL ;static int numfree = 0 ;PyObject *PyFloat_FromDouble (double fval) { PyFloatobject *op = free_list; if (op != NULL ) { free_list = (PyFloatobject *) Py_TYPE(op); numfree--; } else { op = (PyFloatobject *) PyObject_MALLOC(sizeof (PyFloatobject)); if (!op) return PyErr_NoMemory(); } (void )PyObject_INIT(op, &PyFloat_Type); op->ob_fval = fval; return (PyObject *)op; }
#define PyObject_INIT(op, typeobj) (Py_TYPE(op) = (typeobj), _Py_NewReference((PyObject *)(op)), (op))
static PyObejct refchain = {&refchain, &refchain};void _Py_AddToAllObjects(PyObject *op, int force){ if (force || op->ob_prev == NULL ){ op->_ob_next = refchain._ob_next; op->_ob_prev = &refchain; refchain._ob_next->_ob_prev = op; refchain._ob_next = op; } } void _Py_NewReference(PyObject *op){ _Py_INC_REFTOTAL; op->ob_refcnt = 1 ; _Py_AddToAllObjects(op, 1 ); _Py_INC_TPALLOCS(op); }
引用
val = 3.14
data = val
项目中这样的引用关系会使原对象的引用计数器+1。
static inline void _Py_INCREF(PyObject *op){ _Py_INC_REFTOTAL; op->ob_refcnt++; } #define Py_INCREF(op) _Py_INCREF(_PyObject_CAST(op))
销毁
val = 3.14
del 3.14
static inline void _Py_DECREF(const char *filename, int lineno, PyObject *op){ (void )filename; (void )lineno; _Py_DEC_REFTOTAL; if (--op->ob_refcnt != 0 ) { #ifdef Py_REF_DEBUG if (op->ob_refcnt < 0 ) { _Py_NegativeRefcount(filename, lineno, op); } #endif } else { _Py_Dealloc(op); } } #define Py_DECREF(op) _Py_DECREF(__FILE__, __LINE__, _PyObject_CAST(op))
void _Py_Dealloc(PyObject *op){ destructor dealloc = Py_TYPE(op)->tp_dealloc; _Py_ForgetReference(op); (*dealloc)(op); } void _Py_ForgetReference(PyObject *op){ op->_ob_next->_ob_prev = op->_ob_prev; op->_ob_prev->_ob_next = op->_ob_next; op->_ob_next = op->_ob_prev = NULL ; _Py_INC_TPFREES(op); }
#define PyFloat_MAXFREELIST 100 static int numfree = 0 ;static PyFloatObject *free_list = NULL ;PyTypeObject PyFloat_Type = { PyVarObject_HEAD_INIT(&PyType_Type, 0 ) "float" , sizeof (PyFloatObject), 0 , (destructor) float_dealloc, 0 , }; static void float_dealloc (PyFloatObject *op) { if (PyFloat_CheckExact(op)) { if (numfree >= PyFloat_MAXFREELIST) { PyObject_FREE(op); return ; } numfree++; Py_TYPE(op) = (struct _typeobject *) free_list; free_list = op; } else { Py_TYPE(op)->tp_free((PyObject *) op); } }
5.2 list 类型 创建
v =[11, 22, 33]
二、多线程 1 进程和线程 1.1 定义
线程:是计算机中可以被cpu调度的最小单元(真正在工作)。
进程:是计算机资源分配的最小单元(进程为线程提供资源)。
一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源。
通过进程和线程都可以将串行的程序变成并发。
Python的多线程主要是在用户态中创建线程,在内核态中也会创建线程,并且多个用户线程可以映射到多个内核线程上,形成多对多的映射关系。
1.2 多线程
在同一个进程中开启多个线程执行任务。
子线程开始后,主线程会一直向下走,不会停止。
import timeimport requestsimport threadingurl_list = { ("1.mp4" , "https://www.xxx.com/?id=1" ), ("2.mp4" , "https://www.xxx.com/?id=2" ), ("3.mp4" , "https://www.xxx.com/?id=3" ), } def task (file_name, video_url ): res = requests.get(video_url) with open (file_name, mode='wb' ) as f: f.write(res.content) print (time.time()) for name, url in url_list: t = threading.Thread(target=task, args=(name, url)) t.start()
1.3 多进程
开启不同的进程执行任务(不同的进程中会自动创建一个线程)。
注:Linux系统使用fork创建进程,Windows使用spawn,Mac支持fork和spawn。在python3.8版本后默认使用spawn创建线程,所以程序会要求创建线程代码必须在if __name__ == '__main__':
下,否则会异常。
mac中可以修改创建线程使用的方式来避免改变创建方式。
import timeimport requestsimport multiprocessingurl_list = { ("1.mp4" , "https://www.xxx.com/?id=1" ), ("2.mp4" , "https://www.xxx.com/?id=2" ), ("3.mp4" , "https://www.xxx.com/?id=3" ), } def task (file_name, video_url ): res = requests.get(video_url) with open (file_name, mode='wb' ) as f: f.write(res.content) print (time.time()) if __name__ == '__main__' : for name, url in url_list: t = multiprocessing.Process(target=task, args=(name, url)) t.start()
1.4 GIL锁
GIL,全局解释器锁(Global Interpreter Lock),是CPython解释器特有的东西,让一个进程中同一时刻只有一个线程可以被CPU调度。
如果程序想利用计算机的多核优势,让CPU同时处理一些任务,适合用多进程开发(即使资源开销大)
如果程序不需要计算机的多核优势,适合用多线程开发,比如网络下载文件,多用网卡而非CPU。
常见的程序开发中,计算操作需要使用CPU多核优势,IO操作不需要利用CPU的多核优势:
计算密集型,用多进程,例如∶大量的数据计算【累加计算示例】。
IO密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】。
import timeimport multiprocessingdef task (start, end, queue ): res = 0 for i in range (start, end): res += 1 queue.put(res) if __name__ == '__main__' : queue = multiprocessing.Queue() start_time = time.time() p1 = multiprocessing.Process(target = task, args=(0 , 50000000 , queue)) p1.start() p2 = multiprocessing.Process(target = task, args=(50000000 , 100000000 , queue)) p2.start() v1 = queue.get(block=True ) v2 = queue.get(block=True ) print (v1 + v2) end_time = time.time() print (end_time - start_time) import timeimport requestsimport multiprocessingurl_list = { ("1.mp4" , "https://www.xxx.com/?id=1" ), ("2.mp4" , "https://www.xxx.com/?id=2" ), ("3.mp4" , "https://www.xxx.com/?id=3" ), } def task (file_name, video_url ): res = requests.get(video_url) with open (file_name, mode='wb' ) as f: f.write(res.content) print (time.time()) if __name__ == '__main__' : for name, url in url_list: t = multiprocessing.Process(target=task, args=(name, url)) t.start() import multiprocessingimport threadingdef thread_task (): print ("Thread task executed" ) def task (start, end ): t1 = threading.Thread(target=thread_task) t1.start() t2 = threading.Thread(target=thread_task) t2.start() t3 = threading.Thread(target=thread_task) t3.start() if __name__ == "__main__" : p1 = multiprocessing.Process(target=task, args=(0 , 5000000 )) p1.start() p2 = multiprocessing.Process(target=task, args=(5000000 , 10000000 )) p2.start()
2 多线程开发 2.1 常见API
IO密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】
import threadingt = threading.Thread(target=task, args=('xxx' , )) t.start() t.join() t.daemon = True t.daemon = False t.name = "thread_one" name = threading.current_thread().name
class MyThread (threading.Thread): def run (self ): print ("执行此线程" , self._args) t = MyThread(args=(100 , )) t.start()
2.2 API使用
在t.join()
案例中,最终print
的结果是随机的,因为存在CPU调度算法,所以一个线程不一定会完全执行完毕,可能在执行过程中就切换了。所以在开启t1
,t2
线程后,有可能加了一部分数,t1
就切换成了t2
,于是join
就过去了,t2
同理,所以最后的number
就不会是0。
import threadingdef task (arg ): pass t.threading.Thread(target=task, args=('xxx' , )) t.start() print ("继续执行..." )
import threadingloop = 1000000 number = 0 def _add (count ): global number for i in range (loop): number += 1 def _sub (count ): global number for i in range (loop): number -= 1 t1 = threading.Thread(target=_add, args=(loop, )) t2 = threading.Thread(target=_sub, args=(loop, )) t1.start() t2.start() t1.join() t2.join() print (number)
import requestsimport threadingclass DownloadThread (threading.Thread): def run (self ): file_name, video_url = self._args res = requests.get(video_url) with open (file_name, mode="wb" ) as f: f.write(res.content) url_list = { ("1.mp4" , "https://www.xxx.com/?id=1" ), ("2.mp4" , "https://www.xxx.com/?id=2" ), ("3.mp4" , "https://www.xxx.com/?id=3" ), } for item in url_list: t = DownloadThread(args=(item[0 ], item[1 ])) t.start()
2.3 线程安全
因为CPU在执行的过程中,可能会轮流地执行线程。
因此可以给线程加锁,这样就可以保证一个线程被执行完才会执行另一个线程。
不同线程使用的锁必须是同一把锁,不然就没有加锁的意义了。
在开发的过程中要注意有些操作默认都是线程安全的(内部集成了锁的机制),我们在使用的时无需再通过锁再处理,比如L.append(x)
等。
官网:official::Python.org ,搜:atomic
根据官网,常见线程安全的操作:
L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()
常见线程不安全的操作:
i = i+1
L.append(L[-1])
L[i] = L[j]
D[x] = D[x] + 1
lock_object = threading.RLock() lock_object.acquire() lock_object.release()
import threadinglock_object = threading.RLock() loop = 1000000 number = 0 def _add (count ): lock_object.acquire() global number for i in range (loop): number += 1 lock_object.release() def _sub (count ): with lock_object: global number for i in range (loop): number -= 1 t1 = threading.Thread(target=_add, args=(loop, )) t2 = threading.Thread(target=_sub, args=(loop, )) t1.start() t2.start() t1.join() t2.join() print (number)
import threadingdata_list = [] def task (): print ("开始" ) for i in range (100000 ): data_list.append(i) print (len (data_list)) for i in range (2 ): t = threading.Thread(target=task) t.start()
2.4 线程锁
在python中,锁有Lock
和RLock
。它们功能几乎一致,但是在嵌套使用上存在差异。Lock
是同步锁,RLock
是嵌套锁,也就是说Lock
是不支持锁嵌套的,但是RLock
可以进行锁的嵌套。
import threadingnum = 0 lock_object = threading.Lock() def task (): print ("开始" ) lock_object.acquire() print (threading.current_thread().name) global num for i in range (100000 ): num += 1 lock_object.release() lock_object.acquire() print (threading.current_thread().name) for i in range (100000 ): num += 1 lock_object.release() print (num) for i in range (2 ): t = threading.Thread(target=task) t.start() def task (): print ("开始" ) lock_object.acquire() print (threading.current_thread().name) lock_object.acquire() print (threading.current_thread().name) global num for i in range (100000 ): num += 1 lock_object.release() lock_object.release()
import threadingnum = 0 lock_object = threading.RLock() def task (): print ("开始" ) lock_object.acquire() lock_object.acquire() print (threading.current_thread().name) lock_object.release() lock_object.release() for i in range (3 ): t = threading.Thread(target=task) t.start() import threadinglock = threading.RLock() def func (): with lock: pass def run (): print ("其他功能" ) func() print ("其他功能" ) def process (): with lock: print ("其他功能" ) func() print ("其他功能" )
2.5 死锁
程序因为锁无法向下进行称为死锁。
死锁的情况:
使用Lock两次加锁会导致死锁。因为Lock本身不可以进行嵌套,所以嵌套使用Lock会导致程序卡死。
使用多把锁时,不同线程互相持有对方需要的锁,互相等待对方释放锁,从而造成死锁
import threadingnum = 0 lock_object = threading.Lock() def task (): print ("开始" ) lock_object.acquire() print (threading.current_thread().name) lock_object.acquire() print (threading.current_thread().name) global num for i in range (100000 ): num += 1 lock_object.release() lock_object.release() for i in range (2 ): t = threading.Thread(target=task) t.start()
import threadingimport timelock_1 = threading.Lock() lock_2 = threading.Lock() def task1 (): lock_1.acquire() print (1 ) time.sleep(1 ) lock_2.acquire() print (11 ) lock_2.release() print (111 ) lock_1.release() print (1111 ) def task2 (): lock_2.acquire() print (2 ) time.sleep(1 ) lock_1.acquire() print (22 ) lock_1.release() print (222 ) lock_2.release() print (2222 ) t1 = threading.Thread(target=task1) t1.start() t2 = threading.Thread(target=task2) t2.start()
2.6 线程池
Python3中官方才正式提供线程池。
线程不是开的越多越好,开的多了可能会导致系统的性能更低了。因为线程的上下文切换也需要耗费时间和资源。
不建议:无限制的创建线程。
建议:使用线程池。
在循环中向线程池提交任务时,循环会很快的结束,任务会全部加入到线程池中,但是不是所有任务都会被很快地被执行,而是要看线程池的调度。
案例:
线程池的使用:会立刻输出END,然后再依次执行子线程任务。因为线程池使用时,线程会立刻被创建完成,然后交给线程池进行调度。
主线程等待线程池工作完毕:加上pool.shutdown(True)
会等待子线程执行完再再向下执行。
执行完任务后,再额外干点别的:加上future.add_done_callback(done)
,会在子线程做完操作后再做额外操作。在线程池中,回调是由子线程做的。
最后统一获得结果:将回调结果加入列表中,最后统一来执行回调结果。
import threadingdef task (video_url ): pass url_list = ["www.xxxx-{}.com" .format (i) for i in range (30000 )] for url in url_list: t = threading.Thread(target=task, args=(url, )) t.start()
import timefrom concurrent.futures import ThreadPoolExecutorpool = ThreadPoolExecutor(n) future = pool.submit(func, para_1, para_2, ... ) pool.shutdown(True ) future.add_done_callback(func_done)
""" 线程池示例1:线程池的使用 """ import timefrom concurrent.futures import ThreadPoolExecutordef task (video_url, num ): print ("开始执行任务" , video_url) time.sleep(5 ) pool = ThreadPoolExecutor(10 ) url_list = ["www.xxx{}.com" .format (i) for i in range (300 )] for url in url_list: pool.submit(task, url, 2 ) print ("END" )""" 线程池案例2:主线程等待线程池工作完毕 """ import timefrom concurrent.futures import ThreadPoolExecutordef task (video_url, num ): print ("开始执行任务" , video_url) time.sleep(0.5 ) pool = ThreadPoolExecutor(10 ) url_list = ["www.xxx{}.com" .format (i) for i in range (300 )] for url in url_list: pool.submit(task, url, 2 ) print ("Wait..." )pool.shutdown(True ) print ("Next..." )""" 线程池案例3:执行完任务后,再额外干点别的""" import threadingimport timeimport randomfrom concurrent.futures import ThreadPoolExecutordef task (video_url ): print ("开始执行任务" , video_url) print ("执行任务的线程id:" , threading.current_thread().ident) time.sleep(2 ) return random.randint(0 , 10 ) def done (response ): print ("任务执行后的返回值" , response.result()) print ("回调函数的线程id:" , threading.current_thread().ident) pool = ThreadPoolExecutor(10 ) url_list = ["www.xxx{}.com" .format (i) for i in range (100 )] for url in url_list: future = pool.submit(task, url) future.add_done_callback(done) """ 线程池案例4:最后统一获得结果 """ import timeimport randomfrom concurrent.futures import ThreadPoolExecutordef task (video_url ): print ("开始执行任务" , video_url) time.sleep(2 ) return random.randint(0 , 10 ) pool = ThreadPoolExecutor(10 ) future_list = [] url_list = ["www.xxx{}.com" .format (i) for i in range (20 )] for url in url_list: future = pool.submit(task, url) future_list.append(future) pool.shutdown(True ) for fu in future_list: print (fu.result())
2.7 单例模式
多线程单例模式下,因为线程之间的交换,所以可能会导致单例模式创造出来的不是一个单例。
为了解决问题则需要加锁。
import threadingimport timeclass Singleton : instance = None def __init__ (self, name ): self.name = name def __new__ (cls, *args, **kwargs ): if cls.instance: return cls.instance time.sleep(0.1 ) cls.instance = object .__new__(cls) return cls.instance def task (): obj = Singleton('x' ) print (obj) for i in range (10 ): t = threading.Thread(target=task) t.start()
import threadingimport timeclass Singleton : instance = None lock = threading.RLock() def __init__ (self, name ): self.name = name def __new__ (cls, *args, **kwargs ): if cls.instance: return cls.instance with cls.lock: if cls.instance: return cls.instance cls.instance = object .__new__(cls) return cls.instance def task (): obj = Singleton('x' ) print (obj) for i in range (10 ): t = threading.Thread(target=task) t.start()
3 多进程开发 3.1 进程定义
进程是计算机中资源分配的最小单元。一个进程中可以有多个线程,同一个进程中的线程共享资源。
进程与进程之间则是相互隔离。
Python中通过多进程可以利用CPU的多核优势,
多进程适用于计算密集型,例如∶大量的数据计算【累加计算示例】。
创建进程模式:
fork
:”拷贝”几乎所有的资源,支持文件对象 / 线程锁传参,用于unix,可以在代码中任意位置开始,操作较快。
spawn
:相当于在内部线创建一个python解释器,然后让解释器执行代码,不支持文件对象 / 线程锁传参,用于unix、win,必须从main代码块开始,操作较慢。
forkserver
:在程序开始前会把多进程部分当做模版加载,然后在启动时会找到模板,拷贝一份执行。不支持文件对象 / 线程锁传参,用于部分unix,必须从main代码块开始。
注:Linux系统使用fork创建进程,Windows使用spawn,Mac支持fork和spawn。在python3.8版本后默认使用spawn创建线程,所以程序会要求创建线程代码必须在if __name__ == '__main__':
下,否则会异常。
mac中可以修改创建线程使用的方式来避免改变创建方式。
import timeimport requestsimport multiprocessingurl_list = { ("1.mp4" , "https://www.xxx.com/?id=1" ), ("2.mp4" , "https://www.xxx.com/?id=2" ), ("3.mp4" , "https://www.xxx.com/?id=3" ), } def task (file_name, video_url ): res = requests.get(video_url) with open (file_name, mode='wb' ) as f: f.write(res.content) print (time.time()) if __name__ == '__main__' : for name, url in url_list: t = multiprocessing.Process(target=task, args=(name, url)) t.start()
""" fork案例(仅限unix) """ import multiprocessingimport timedef task (): print (name) name.append(123 ) if __name__ == '__main__' : multiprocessing.set_start_method("fork" ) name = [] p1 = multiprocessing.Process(target=task) p1.start() time.sleep(2 ) print (name) def task (): print (name) if __name__ == '__main__' : multiprocessing.set_start_method("fork" ) name = [] name.append(123 ) p1 = multiprocessing.Process(target=task) p1.start() def task (): print (name) if __name__ == '__main__' : multiprocessing.set_start_method("fork" ) name = [] p1 = multiprocessing.Process(target=task) p1.start() name.append(123 )
""" spawn案例 """ import multiprocessingimport timedef task (): print (name) if __name__ == '__main__' : multiprocessing.set_start_method("spawn" ) name = [] p1 = multiprocessing.Process(target=task) p1.start() def task (data ): print (data) data.append(999 ) if __name__ == '__main__' : multiprocessing.set_start_method("spawn" ) name = [] p1 = multiprocessing.Process(target=task, args=(name,)) p1.start() time.sleep(2 ) print (name)
""" fork和spwan传递资源的区别 """ import multiprocessingimport timedef task (fb ): print (fb, lock) if __name__ == '__main__' : multiprocessing.set_start_method("fork" ) name = [] file_object = open ("1.txt" , mode="a+" , encoding="utf-8" ) lock = threading.RLock() p1 = multiprocessing.Process(target=task, args=(file_object,)) p1.start() def task (fb, lk ): print (fb, lk) if __name__ == '__main__' : multiprocessing.set_start_method("fork" ) name = [] file_object = open ("1.txt" , mode="a+" , encoding="utf-8" ) lock = threading.RLock() p1 = multiprocessing.Process(target=task, args=(file_object, lock, )) p1.start()
3.2 进程案例
案例1:
操作:主进程和子进程都进行文件操作。
结果:武沛齐 alex 武沛齐
原因:子进程拷贝主进程,所以一开时子进程有武沛齐,然后又写入了alex,之后flush()
一下,此时文件中就有了武沛齐和alex。又因为主进程再等着子进程,主进程的武沛齐一开始没有写入到文件中,当子进程结束后,主进程结束,此时武沛齐才写入到文件中,所以会出现上述结果。前两个单词是子进程写的,后一个单词是主进程写的。
案例2:
操作:主进程在案例1的基础上先flush()
一下。
结果:武沛齐 alex
原因:主进程写入后因为直接进行了flush()
,所以直接就先把武沛齐写到文件中了,此时传给子进程的缓存内容是空,所以子进程只会再额外写入alex。此时前一个单词是主进程写的,后一个单词是子进程写的。
案例3:
操作:主进程加锁后传递给子进程。
结果:<locked ... >
666
原因:主进程加锁后传递给子进程,子进程拿到锁后,一样是锁的状态,只不过在子进程的锁的对象是子进程中的主线程,而主进程的锁的对象是主进程中的主线程。因为此时锁是RLock()
,所以可以进行嵌套锁,所以666会被输出在屏幕上。
案例4:
操作:子进程中创建子线程。
结果:先输出十个”来了”,等2秒后依次输出666。
原因:由案例3可知,因为一开始锁就是锁的状态,然后子进程中锁的作用对象是主线程,所以一开始子进程中的子线程全都会卡主,当子进程的主线程的锁释放后,子进程的子线程才会依次开始执行。
import multiprocessingdef task (): print (name) file_object.write("alex\n" ) file_object.flush() if __name__ == "__main__" : multiprocessing.set_start_method("fork" ) name = [] file_object = open ('x1.txt' , mode='a+' , encoding='utf-8' ) file_object.write("武沛齐\n" ) p1 = multiprocessing.Process(target=task) p1.start() import multiprocessingdef task (): print (name) file_object.write("alex\n" ) file_object.flush() if __name__ == "__main__" : multiprocessing.set_start_method("fork" ) name = [] file_object = open ('x1.txt' , mode='a+' , encoding='utf-8' ) file_object.write("武沛齐\n" ) file_object.flush() p1 = multiprocessing.Process(target=task) p1.start() import multiprocessingimport threadingdef task (): print (lock) lock.acquire() print (666 ) if __name__ == "__main__" : multiprocessing.set_start_method("fork" ) name = [] lock = threading.RLock() lock.acquire() p1 = multiprocessing.Process(target=task) p1.start() def func (): print ("来了" ) with lock: print (666 ) def task (): for i in range (10 ): t = threading.Thread(target=func) t.start() time.sleep(2 ) lock.release()
3.3 进程API import multiprocessingmultiprocessing.set_start_method("fork" ) p = multiprocessing.Process(target=task, args=("xxx" , )) p.start() p.join() p.daemon = True p.daemon = False p.name = "进程1" multiprocessing.current_process().name multiprocessing.cpu_count() """ 补充知识 """ import osos.getpid() os.getppid() import threadinglst = threading.enumerate ()
import multiprocessingclass MyProcess (multiprocessing.Process): def run (self ): print ("执行次线程" , self._args) if __name__ == "__main__" : multiprocessing.set_start_method("spawn" ) p = MyProcess(args=("xxx" , )) p.start() print ("继续执行..." )
3.4 数据共享
默认情况下进程之间的资源是独立的,不进行共享。
如果要让他们之间进行共享,需要借助特殊方式实现。
特殊方式:
使用Value
和Array
。通过这两者创建的数据可以进行共享。但是因为是比较底层,所以使用较少。
使用Manager()
。通过Manager()
创建的数据类型,可以进行共享。因为可以用python语法,使用较舒服。
使用multiprocessing.Queue()
。就是一个队列,只不过这个队列可以实现资源共享,且不会数据混乱。使用较多。
使用multiprocessing.Pipe()
。双端队列,可以双向通信,资源共享,且不会数据混乱。使用较多。
上述都是Python内部提供的进程之间数据共享和交换的机制,作为了解即可,在项目开发中很少使用,后期项目中一般会借助第三方的来做资源的共享,例如:MySQL,Redis等。
value参数
代表的类型
c
ctypes.c_char
b
ctypes.c_byte
h
ctypes.c_short
i
ctypes.c_int
l
ctypes.c_long
f
ctypes.c_float
u
ctypes.c_wchar
B
ctypes.c_ubyte
H
ctypes.c_ushort
I
ctypes.c_uint
L
ctypes.c_ulong
d
ctypes.c_double
""" Value和Array方式 """ from multiprocessing import Process, Value, Arraydef func (n, m1, m2 ): n.value = 888 m1.value = 'a' .encode('utf-8' ) m2.value = '武' if __name__ == '__main__' : num = Value('i' , 666 ) v1 = Value('c' , b' ' ) v2 = Value('u' , ' ' ) p = Process(target=func, args=(num, v1, v2)) p.start() p.join() print (num.value) print (v1.value) print (v2.value) def f (data_array ): data_array[0 ] = 666 if __name__ == '__main__' : arr = Array('i' , [11 , 22 , 33 , 44 ]) p = Process(target=f, args=(arr,)) p.start() p.join() print (arr[:])
""" Manager()方式 """ from multiprocessing import Process, Managerdef f (d, l ): d[1 ] = '1' d['2' ] = 2 d[0.25 ] = None l.append(666 ) if __name__ == "__main__" : with Manager() as manager: d = manager.dict () l = manager.list () p = Process(target=f, args=(d, l)) p.start() p.join() print (d) print (l)
""" Queue()方式 """ import multiprocessingdef task (q ): for i in range (10 ): q.put(i) if __name__ == "__main__" : queue = multiprocessing.Queue() p = multiprocessing.Process(target=task, args=(queue,)) p.start() p.join() print ("主进程开始获取数据:" ) for _ in range (10 ): print (queue.get())
""" Pipe()方式 """ import timeimport multiprocessingdef task (child_conn ): time.sleep(1 ) child_conn.send([111 , 22 , 33 , 44 ]) data = child_conn.recv() print ("子进程接收:" , data) time.sleep(2 ) if __name__ == "__main__" : parent_conn, child_conn = multiprocessing.Pipe() p = multiprocessing.Process(target=task, args=(child_conn,)) p.start() info = parent_conn.recv() print ("主进程接收:" , info) parent_conn.send(666 ) p.join()
3.5 进程锁
进程锁:使用类似于线程锁,但是线程锁不能作为参数传递到子进程中,而进程锁是可以传递到子进程中的。
spawn模式下,在主进程的结尾处需要做一些特殊的处理,不然可能会报错:
使用time.sleep(7)
等待7秒。
使用p.join()
,主进程等待所有子进程完成后再向下。
import timeimport multiprocessingdef task (lock ): print ("开始" ) lock.acquire() with open ('f1.txt' , mode='r' , encoding="utf-8" ) as f: current_num = int (f.read()) print ("排队抢票了" ) time.sleep(0.5 ) current_num -= 1 with open ('f1.txt' , mode='w' , encoding="utf-8" ) as f: f.write(str (current_num)) lock.release() if __name__ == '__main__' : multiprocessing.set_start_method("spawn" ) lock = multiprocessing.RLock() for i in range (10 ): p = multiprocessing.Process(target=task, args=(lock,)) p.start() for p in process_list: p.join() time.sleep(7 ) if __name__ == '__main__' : multiprocessing.set_start_method("spawn" ) lock = multiprocessing.RLock() process_list = [] for i in range (10 ): p = multiprocessing.Process(target=task, args=(lock,)) p.start() process_list.append(p) for item in process_list: item.join()
3.6 进程池
案例:
线程池的使用:会立刻输出1,然后再依次执行子进程任务。因为进程池使用时,进程会立刻被创建完成,然后交给进程池进行调度。
主线程等待线程池工作完毕:加上pool.shutdown(True)
,会等待子进程执行完再再向下执行。
执行完任务后,再额外干点别的:加上fur.add_done_callback(done)
,会在子进程做完操作后再做额外操作,但是此处和线程池有区别。进程池中的回调都是由主进程进行操作。
在子进程中使用进程锁:在进程池中加锁,需要用到Manager()
中的Lock()
和RLock()
,不能使用自带的进程锁。因为在multiprocessing
模块中,每个进程都有自己独立的内存空间,因此无法直接共享普通的线程锁,而Manager()
中的Lock()
和RLock()
是专门为进程间通信设计的锁,所以应该使用Manager()
中的进程锁。
import timefrom concurrent.futures import ProcessPoolExecutorpool = ProcessPoolExecutor(n) future = pool.submit(func, para_1, para_2, ... ) pool.shutdown(True ) future.add_done_callback(func_done) def outer (info, file_name ): def done (res, *args, **kwargs ): info[file_name] = res.result() return done fur.add_done_callback(outer(info, file_name))
""" 线程池示例1:线程池的使用 """ import timefrom concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutordef task (num ): print ("执行" , num) time.sleep(2 ) if __name__ == '__main__' : pool = ProcessPoolExecutor(4 ) for i in range (10 ): pool.submit(task, i) print (1 ) """ 线程池案例2:主线程等待线程池工作完毕 """ import timefrom concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutordef task (num ): print ("执行" , num) time.sleep(2 ) if __name__ == '__main__' : pool = ProcessPoolExecutor(4 ) for i in range (10 ): pool.submit(task, i) pool.shutdown(True ) print (1 ) """ 线程池案例3:执行完任务后,再额外干点别的""" import timefrom concurrent.futures import ProcessPoolExecutorimport multiprocessingdef task (num ): print ("执行" , num) time.sleep(2 ) return num def done (res ): print (multiprocessing.current_process()) time.sleep(1 ) print (res.result()) time.sleep(1 ) if __name__ == '__main__' : pool = ProcessPoolExecutor(4 ) for i in range (50 ): fur = pool.submit(task, i) fur.add_done_callback(done) print (multiprocessing.current_process()) pool.shutdown(True ) """ 线程池案例4:在子进程中使用进程锁 """ import timeimport multiprocessingfrom concurrent.futures import ProcessPoolExecutordef task (lock ): print ("开始" ) with lock: with open ('f1.txt' , mode='r' , encoding='utf-8' ) as f: current_num = int (f.read()) print ("排队抢票了" ) time.sleep(1 ) current_num -= 1 with open ('f1.txt' , mode='w' , encoding='utf-8' ) as f: f.write(str (current_num)) if __name__ == '__main__' : pool = ProcessPoolExecutor() manager = multiprocessing.Manager() lock_object = manager.RLock() for _ in range (10 ): pool.submit(task, lock_object)
3.7 多进程案例
要求:统计日志文件中的数据量和ip种数。
数据存在files
文件夹下,格式:127.0.0.1 - - [21/Mar/2021] "GET" ......
import osimport timefrom concurrent.futures import ProcessPoolExecutordef task (file_name ): ip_set = set () total_count = ip_count = 0 file_path = os.path.join("files" , file_name) with open (file_path, mode='r' , encoding='utf-8' ) as file_object: for line in file_object: if not line.strip(): continue user_ip = line.split(" - -" , maxsplit=1 )[0 ].split(", " )[0 ] total_count += 1 if user_ip in ip_set: continue ip_count += 1 ip_set.add(user_ip) time.sleep(1 ) return {"total" : total_count, 'ip' : ip_count} def outer (info, file_name ): def done (res, *args, **kwargs ): info[file_name] = res.result() return done def run (): info = {} pool = ProcessPoolExecutor(4 ) for file_name in os.listdir("files" ): fur = pool.submit(task, file_name) fur.add_done_callback(outer(info, file_name)) pool.shutdown(True ) for k, v in info.items(): print (k, v) if __name__ == '__main__' : run()
4 协程 4.1 定义
了解为主。
计算机中提供了:线程、进程用于实现并发编程(真实存在)。
协程(Coroutine),是程序员通过代码搞出来的一个东西(非真实存在)。
协程也可以被称为微线程,是一种用户态内的上下文切换技术。
简而言之,其实就是通过一个线程实现代码块相互切换执行(来回跳着执行)。
下列greenlet和yield可以模拟协程,但是一般不这么使用。
from greenlet import greenletdef func1 (): print (1 ) gr2.switch() print (2 ) gr2.switch() def func2 (): print (3 ) gr1.switch() print (4 ) gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch()
def func1 (): yield 1 yield from func2() yield 2 def func2 (): yield 3 yield 4 f1 = func1() for item in f1: print (item)
4.2 asyncio
协程如何才能更有意义呢:不要让用户手动去切换,而是遇到IO操作时能自动切换。
Python在3.4之后推出了asyncio
模块+ Python3.5推出async
、async
语法,内部基于协程并且遇到IO请求自动化切换。
async def func ():await func();asyncio.sleep(n) asyncio.ensure_future(func1()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop = asyncio.get_event_loop() asyncio.wait(tasks) loop.run_until_complete(asyncio.wait(tasks))
import asyncioasync def func1 (): print (1 ) await asyncio.sleep(2 ) print (2 ) async def func2 (): print (3 ) await asyncio.sleep(2 ) print (4 ) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
4.3 aiohttp案例
利用协程来爬取图片。
需要先安装aiohttp
。
实际使用中,一般不在协程中做数据处理,而是只负责获取数据,获取的数据直接放到队列、文件或数据库里。
后续学习:asyncio异步编程,你搞懂了吗? - 知乎 (zhihu.com)
后续学习:asyncio到底是个啥?【python async await】_哔哩哔哩_bilibili
import aiohttpimport asyncioimport urllibasync def download_image (session, url ): print (f"开始下载: {url} " ) async with session.get(url, verify_ssl=False ) as response: content = await response.content.read() return content async def save_image (content, filename ): print (f"开始保存: {filename} " ) with open (filename, mode='wb' ) as file_object: file_object.write(content) async def fetch_and_save (session, url ): content = await download_image(session, url) parsed_url = urllib.parse.urlparse(url) path_parts = parsed_url.path.split('/' ) file_name = path_parts[-1 ] if '.' not in file_name: file_name += ".jpg" await save_image(content, file_name) async def main (): async with aiohttp.ClientSession() as session: url_list = [ 'https://gitcode.net/qq_44112897/images/-/raw/master/comic/25.jpg' , 'https://gitcode.net/qq_44112897/images/-/raw/master/comic/26.jpg' , 'https://gitcode.net/qq_44112897/images/-/raw/master/comic/27.jpg' ] tasks = [asyncio.create_task(fetch_and_save(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == '__main__' : loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main())