mg娱乐电子4355_mg娱乐电子游戏平台
做最好的网站

第10天,线程、协程、IO多路复用、socketserver

时间:2019-11-03 00:42来源:计算机编程
我们大多数的时候使用多线程,以及多进程,但是python中由于GIL全局解释器锁的原因,python的多线程并没有真的实现 目录 一、开启线程的两种方式 1.1 直接利用利用threading.Thread()类实例

    我们大多数的时候使用多线程,以及多进程,但是python中由于GIL全局解释器锁的原因,python的多线程并没有真的实现

目录

一、开启线程的两种方式
    1.1 直接利用利用threading.Thread()类实例化
    1.2 创建一个类,并继承Thread类
    1.3 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别
        1.3.1 谁的开启速度更快?
        1.3.2 看看PID的不同
        1.3.3 练习
        1.3.4 线程的join与setDaemon
        1.3.5 线程相关的其他方法补充

二、 Python GIL
    2.1 什么是全局解释器锁GIL
    2.2 全局解释器锁GIL设计理念与限制

三、 Python多进程与多线程对比
四、锁
    4.1 同步锁
    GIL vs Lock
    4.2 死锁与递归锁
    4.3 信号量Semaphore
    4.4 事件Event
    4.5 定时器timer
    4.6 线程队列queue

五、协程
    5.1 yield实现协程
    5.2 greenlet实现协程
    5.3 gevent实现协程

六、IO多路复用

七、socketserver实现并发
    7.1 ThreadingTCPServer

八、基于UDP的套接字

      实际上,python在执行多线程的时候,是通过GIL锁,进行上下文切换线程执行,每次真实只有一个线程在运行。所以上边才说,没有真的实现多现程。

一、开启线程的两种方式

在python中开启线程要导入threading,它与开启进程所需要导入的模块multiprocessing在使用上,有很大的相似性。在接下来的使用中,就可以发现。

同开启进程的两种方式相同:

      那么python的多线程就没有什么用了吗?

1.1 直接利用利用threading.Thread()类实例化

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()

    print('主线程')

              不是这个样子的,python多线程一般用于IO密集型的程序,那么什么叫做IO密集型呢,举个例子,比如说带有阻塞的。当前线程阻塞等待其它线程执行。

1.2 创建一个类,并继承Thread类

from threading import Thread
import time
calss Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        time.sleep(2)
        print("%s say hello" %self.name)

if __name__ == "__main__":
    t = Sayhi("egon")
    t.start()
    print("主线程")

      即然说到适合python多线程的,那么什么样的不适合用python多线程呢?

1.3 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

              答案是CPU密集型的,那么什么样的是CPU密集型的呢?百度一下你就知道。

1.3.1 谁的开启速度更快?

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello')

if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    hello
    主线程/主进程
    '''

    #在主进程下开启子进程
    t=Process(target=work)
    t.start()
    print('主线程/主进程')
    '''
    打印结果:
    主线程/主进程
    hello
    '''

结论:由于创建子进程是将主进程完全拷贝一份,而线程不需要,所以线程的创建速度更快。

      

1.3.2 看看PID的不同

from threading import Thread
from multiprocessing import Process
import os

def work():
    print('hello',os.getpid())

if __name__ == '__main__':
    #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    t1=Thread(target=work)
    t2=Thread(target=work)
    t1.start()
    t2.start()
    print('主线程/主进程pid',os.getpid())

    #part2:开多个进程,每个进程都有不同的pid
    p1=Process(target=work)
    p2=Process(target=work)
    p1.start()
    p2.start()
    print('主线程/主进程pid',os.getpid())


'''
hello 13552
hello 13552
主线程pid: 13552
主线程pid: 13552
hello 1608
hello 6324
'''

总结:可以看出,主进程下开启多个线程,每个线程的PID都跟主进程的PID一样;而开多个进程,每个进程都有不同的PID。

       现在有这样一项任务:需要从200W个url中获取数据?

1.3.3 练习

练习一:利用多线程,实现socket 并发连接
服务端:

from threading import Thread
from socket import *
import os

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
tcpsock.bind(("127.0.0.1",60000))
tcpsock.listen(5)

def work(conn,addr):
    while True:
        try:
            data = conn.recv(1024)
            print(os.getpid(),addr,data.decode("utf-8"))
            conn.send(data.upper())
        except Exception:
            break

if __name__ == '__main__':
    while True:
        conn,addr = tcpsock.accept()
        t = Thread(target=work,args=(conn,addr))
        t.start()

"""
开启了4个客户端
服务器端输出:
13800 ('127.0.0.1', 63164) asdf
13800 ('127.0.0.1', 63149) asdf
13800 ('127.0.0.1', 63154) adsf
13800 ('127.0.0.1', 63159) asdf

可以看出每个线程的PID都是一样的。
""

客户端:

from socket import *

tcpsock = socket(AF_INET,SOCK_STREAM)
tcpsock.connect(("127.0.0.1",60000))

while True:
    msg = input(">>: ").strip()
    if not msg:continue
    tcpsock.send(msg.encode("utf-8"))
    data = tcpsock.recv(1024)
    print(data.decode("utf-8"))

练习二:有三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件。

from threading import Thread

recv_l = []
format_l = []

def Recv():
    while True:
        inp = input(">>: ").strip()
        if not inp:continue
        recv_l.append(inp)

def Format():
    while True:
        if recv_l:
            res = recv_l.pop()
            format_l.append(res.upper())

def Save(filename):
    while True:
        if format_l:
            with open(filename,"a",encoding="utf-8") as f:
                res = format_l.pop()
                f.write("%sn" %res)

if __name__ == '__main__':
    t1 = Thread(target=Recv)
    t2 = Thread(target=Format)
    t3 = Thread(target=Save,args=("db.txt",))
    t1.start()
    t2.start()
    t3.start()

       那么我们真心不能用多线程,上下文切换是需要时间的,数据量太大,无法接受。这里我们就要用到多进程+协程

1.3.4 线程的join与setDaemon

与进程的方法都是类似的,其实multiprocessing模块是模仿threading模块的接口;

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #设置为守护线程,主线程结束,子线程也跟着线束。
    t.start()
    t.join()  #主线程等待子线程运行结束
    print('主线程')
    print(t.is_alive())

      那么什么是协程呢?

1.3.5 线程相关的其他方法补充

Thread实例对象的方法:

  • isAlive():返回纯种是否是活跃的;
  • getName():返回线程名;
  • setName():设置线程名。

threading模块提供的一些方法:

  • threading.currentThread():返回当前的线程变量
  • threading.enumerate():返回一个包含正在运行的线程的列表。正在运行指线程启动后、结束前,不包括启动前和终止后。
  • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同结果。
from threading import Thread
import threading
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主进程下开启线程
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName()) #获取当前线程名
    print(threading.current_thread()) #主线程
    print(threading.enumerate()) #连同主线程在内有两个运行的线程,返回的是活跃的线程列表
    print(threading.active_count())  #活跃的线程个数
    print('主线程/主进程')

    '''
    打印结果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    2
    主线程/主进程
    Thread-1
    '''

      协程,又称微线程,纤程。英文名Coroutine。

二、 Python GIL

GIL全称Global Interpreter Lock,即全局解释器锁。首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

      协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。

2.1 什么是全局解释器锁GIL

Python代码的执行由Python 虚拟机(也叫解释器主循环,CPython版本)来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:

  1. 设置GIL
  2. 切换到一个线程去运行
  3. 运行:
    a. 指定数量的字节码指令,或者
    b. 线程主动让出控制(可以调用time.sleep(0))
  4. 把线程设置为睡眠状态
  5. 解锁GIL
  6. 再次重复以上所有步骤

在调用外部代码(如C/C++扩展函数)的时候,GIL 将会被锁定,直到这个函数结束为止(由于在这期间没有Python 的字节码被运行,所以不会做线程切换)。

      协程有什么好处呢,协程只在单线程中执行,不需要cpu进行上下文切换,协程自动完成子程序切换。

2.2 全局解释器锁GIL设计理念与限制

GIL的设计简化了CPython的实现,使得对象模型,包括关键的内建类型如字典,都是隐含可以并发访问的。锁住全局解释器使得比较容易的实现对多线程的支持,但也损失了多处理器主机的并行计算能力。
但是,不论标准的,还是第三方的扩展模块,都被设计成在进行密集计算任务是,释放GIL。
还有,就是在做I/O操作时,GIL总是会被释放。对所有面向I/O 的(会调用内建的操作系统C 代码的)程序来说,GIL 会在这个I/O 调用之前被释放,以允许其它的线程在这个线程等待I/O 的时候运行。如果是纯计算的程序,没有 I/O 操作,解释器会每隔 100 次操作就释放这把锁,让别的线程有机会执行(这个次数可以通过 sys.setcheckinterval 来调整)如果某线程并未使用很多I/O 操作,它会在自己的时间片内一直占用处理器(和GIL)。也就是说,I/O 密集型的Python 程序比计算密集型的程序更能充分利用多线程环境的好处。

下面是Python 2.7.9手册中对GIL的简单介绍:
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.
However, some extension modules, either standard or third-party, are designed so as to release the GIL when doing computationally-intensive tasks such as compression or hashing. Also, the GIL is always released when doing I/O.
Past efforts to create a “free-threaded” interpreter (one which locks shared data at a much finer granularity) have not been successful because performance suffered in the common single-processor case. It is believed that overcoming this performance issue would make the implementation much more complicated and therefore costlier to maintain.

从上文中可以看到,针对GIL的问题做的很多改进,如使用更细粒度的锁机制,在单处理器环境下反而导致了性能的下降。普遍认为,克服这个性能问题会导致CPython实现更加复杂,因此维护成本更加高昂。

      这里没有使用yield协程,这个python自带的并不是很完善,至于为什么有待于你去研究了。

三、 Python多进程与多线程对比

有了GIL的存在,同一时刻同一进程中只有一个线程被执行?这里也许人有一个疑问:多进程可以利用多核,但是开销大,而Python多线程开销小,但却无法利用多核的优势?要解决这个问题,我们需要在以下几点上达成共识:

  • CPU是用来计算的!
  • 多核CPU,意味着可以有多个核并行完成计算,所以多核提升的是计算性能;
  • 每个CPU一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处。

当然,对于一个程序来说,不会是纯计算或者纯I/O,我们只能相对的去看一个程序到底是计算密集型,还是I/O密集型。从而进一步分析Python的多线程有无用武之地。

分析:

我们有四个任务需要处理,处理访求肯定是要有并发的效果,解决方案可以是:

  • 方案一:开启四个进程;
  • 方案二:一个进程下,开启四个进程。

单核情况下,分析结果:

  • 如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜;
  • 如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜。

多核情况下,分析结果:

  • 如果四个任务是密集型,多核意味着并行 计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜;
  • 如果四个任务是I/O密集型,再多的核 也解决不了I/O问题,方案二胜。

结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至 不如串行(没有大量切换),但是,对于I/O密集型的任务效率还是有显著提升的。

代码实现对比

计算密集型:

#计算密集型
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
    res=0
    for i in range(1000000):
        res+=i

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(100):
        # t=Thread(target=work) #我的机器4核cpu,多线程大概15秒
        t=Process(target=work) #我的机器4核cpu,多进程大概10秒
        t_l.append(t)
        t.start()

    for i in t_l:
        i.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))
    print('主线程')

I/O密集型:

#I/O密集型
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
    time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果
    print(os.getpid())

if __name__ == '__main__':
    t_l=[]
    start_time=time.time()
    for i in range(500):
        # t=Thread(target=work) #run time is 2.195
        t=Process(target=work) #耗时大概为37秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()
    stop_time=time.time()
    print('run time is %s' %(stop_time-start_time))

总结:
应用场景:
多线程用于I/O密集型,如socket、爬虫、web
多进程用于计算密集型,如金融分析

      这里使用比较完善的第三方协程包gevent

四、锁

      pip  install    gevent

4.1 同步锁

需求:对一个全局变量,开启100个线程,每个线程都对该全局变量做减1操作;

不加锁,代码如下:

import time
import threading

num = 100  #设定一个共享变量
def addNum():
    global num #在每个线程中都获取这个全局变量
    #num-=1

    temp=num
    time.sleep(0.1)
    num =temp-1  # 对此公共变量进行-1操作

thread_list = []

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
    t.join()

print('Result: ', num)

分析:以上程序开启100线程并不能把全局变量num减为0,第一个线程执行addNum遇到I/O阻塞后迅速切换到下一个线程执行addNum,由于CPU执行切换的速度非常快,在0.1秒内就切换完成了,这就造成了第一个线程在拿到num变量后,在time.sleep(0.1)时,其他的线程也都拿到了num变量,所有线程拿到的num值都是100,所以最后减1操作后,就是99。加锁实现。

加锁,代码如下:

import time
import threading

num = 100   #设定一个共享变量
def addNum():
    with lock:
        global num
        temp = num
        time.sleep(0.1)
        num = temp-1    #对此公共变量进行-1操作

thread_list = []

if __name__ == '__main__':
    lock = threading.Lock()   #由于同一个进程内的线程共享此进程的资源,所以不需要给每个线程传这把锁就可以直接用。
    for i in range(100):
        t = threading.Thread(target=addNum)
        t.start()
        thread_list.append(t)

    for t in thread_list:  #等待所有线程执行完毕
        t.join()

    print("result: ",num)

加锁后,第一个线程拿到锁后开始操作,第二个线程必须等待第一个线程操作完成后将锁释放后,再与其它线程竞争锁,拿到锁的线程才有权操作。这样就保障了数据的安全,但是拖慢了执行速度。
注意:with locklock.acquire()(加锁)与lock.release()(释放锁)的简写。

import threading

R=threading.Lock()

R.acquire()
'''
对公共数据的操作
'''
R.release()

编辑:计算机编程 本文来源:第10天,线程、协程、IO多路复用、socketserver

关键词: