博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python学习之day9
阅读量:4658 次
发布时间:2019-06-09

本文共 10084 字,大约阅读时间需要 33 分钟。

队列queue

队列是线程安全的,它保证多线程间的数据交互的一致性。

先进先出队列Queue

import queueq = queue.Queue(maxsize=3) #maxsize为队列最大长度,不设置则为无限长q.put(1) #往队列放数据q.put(2,timeout=1) #设置超时时间,超过时间抛异常q.put_nowait(4) #如果队列已满,直接抛异常print(q.qsize()) #获取队列长度print(q.get()) # 从队列中获取数据,如果没有会一直等待print(q.get(timeout=1)) # 设置超时时间,超过时间会抛异常print(q.get_nowait()) # 如果没有数据直接抛异常

执行结果:

3124

后进先出队列LifoQueue

import queueq = queue.LifoQueue(maxsize=3) #maxsize为队列最大长度,不设置则为无限长q.put(1) #往队列放数据q.put(2,timeout=1) #设置超时时间,超过时间抛异常q.put_nowait(4) #如果队列已满,直接抛异常print(q.qsize()) #获取队列长度print(q.get()) # 从队列中获取数据,如果没有会一直等待print(q.get(timeout=1)) # 设置超时时间,超过时间会抛异常print(q.get_nowait()) # 如果没有数据直接抛异常

执行结果:

3421

设定优先级PriorityQueue

import queueq = queue.PriorityQueue(maxsize=3)q.put((10,[1,2,3]))  # 优先级和数据必须以元组的形式存在q.put((2,555))q.put_nowait((5,"abc"))print(q.qsize())print(q.get())print(q.get(timeout=1))print(q.get_nowait())

执行结果:

3(2, 555)(5, 'abc')(10, [1, 2, 3])

 生产者消费者模型:

#!/usr/bin/env python# -*- coding:utf-8 -*-# 生产者生产包子,消费者吃包子,服务员(队列)传递包子import queue,timeimport threadingq = queue.Queue(maxsize=3) #设定队列(服务员)最大长度(队列中最多3个包子)def producer(name):    baozi = 0    while True:        time.sleep(1)        # if q.qsize() < 3:        q.put(baozi)        print("\033[32;1mproducer {} produce a baozi {}\033[0m".format(name,baozi))        baozi += 1        q.join() #join方法等待队列为空告诉生产者继续生产def consumer(name):    while True:        if q.qsize() > 0:            bz = q.get()            print("\033[35;1mconsumer {} eat a baozi {}\033[0m".format(name,bz))            time.sleep(1)            q.task_done() #每次吃完包子告诉队列(服务员)if __name__ == "__main__":    c1 = threading.Thread(target=consumer,args=("A",),)    c2 = threading.Thread(target=consumer,args=("B",),)    c3 = threading.Thread(target=consumer,args=("C",),)    p1 = threading.Thread(target=producer,args=("akon",),)    p2 = threading.Thread(target=producer,args=("alex",),)    p3 = threading.Thread(target=producer,args=("cloris",),)    c1.start()    c2.start()    c3.start()    p1.start()    p2.start()    p3.start()

执行结果:

producer akon produce a baozi 0consumer B eat a baozi 0producer cloris produce a baozi 0producer alex produce a baozi 0consumer C eat a baozi 0consumer A eat a baozi 0producer alex produce a baozi 1consumer A eat a baozi 1producer cloris produce a baozi 1producer akon produce a baozi 1consumer B eat a baozi 1consumer C eat a baozi 1......

 


 

 

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是协程:协程是一种用户态的轻量级线程

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程在切换的时候都是在一个线程间进行切换,协程本身就是一个单线程

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

IO操作都是操作系统级别的

Gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

一个简单的模拟异步IO操作:

#!/usr/bin/env python# -*- coding:utf-8 -*-import geventdef foo():    print("\033[31;1mruning in foo\033[0m")    gevent.sleep(1)    print("\033[31;1mruning switch back\033[0m")    def second():    print("\033[32;1mruning in second\033[0m")    gevent.sleep(1)    print("\033[32;1msecond switch back\033[0m")def third():    print("\033[33;1mruning in third\033[0m")    gevent.sleep(1)    print("\033[33;1mthird switch back\033[0m")gevent.joinall([    gevent.spawn(foo),    gevent.spawn(second),    gevent.spawn(third)])

执行结果:

runing in fooruning in secondruning in thirdruning switch backthird switch backsecond switch back

 

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import gevent 4  5 def task(pid): 6     """ 7     Some non-deterministic task 8     """ 9     gevent.sleep(0.5)10     print('Task %s done' % pid)11 12 def synchronous():13     for i in range(10):14         task(i)15 16 def asynchronous():17     threads = [gevent.spawn(task, i) for i in range(10)]18     gevent.joinall(threads)19 20 print('Synchronous:')21 synchronous()22 23 print('Asynchronous:')24 asynchronous()
同步与异步的性能区别

 

1 from gevent import monkey; monkey.patch_all() 2 import gevent 3 from  urllib.request import urlopen 4   5 def f(url): 6     print('GET: %s' % url) 7     resp = urlopen(url) 8     data = resp.read() 9     print('%d bytes received from %s.' % (len(data), url))10  11 gevent.joinall([12         gevent.spawn(f, 'https://www.python.org/'),13         gevent.spawn(f, 'https://www.yahoo.com/'),14         gevent.spawn(f, 'https://github.com/'),15 ])
遇到IO阻塞会自动切换任务

执行结果:

GET: https://www.python.org/GET: https://www.yahoo.com/GET: https://github.com/24132 bytes received from https://github.com/.46958 bytes received from https://www.python.org/.458329 bytes received from https://www.yahoo.com/.

通过gevent实现单线程下多socket并发

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import gevent 4 from gevent import socket,monkey 5 monkey.patch_all() 6 def server(port): 7     s = socket.socket() 8     s.bind(("",port)) 9     s.listen(500)10     while True:11         cli,addr = s.accept()12         gevent.spawn(handle_request,cli)13 14 def handle_request(s):15     try:16         while True:17             data = s.recv(1024)18             print("recv:",data.decode("utf-8"))19             s.send(data)20             if not data:21                 s.shutdown(socket.SHUT_WR)22     except Exception as ex:23         print(ex)24     finally:25         s.close()26 if __name__ == "__main__":27     server(8888)
server
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3  4 import socket 5  6 host = "127.0.0.1" 7 port = 8888 8 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) 9 s.connect((host,port))10 while True:11     msg = input(">>:").strip()12     if len(msg) == 0:continue13     if msg == "q":break14     msg = bytes(msg,encoding="utf-8")15     s.sendall(msg)16     data = s.recv(1024)17     print("received:",data.decode("utf-8"))18 s.close()
client

 

select

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3  4 import select,socket,sys,queue 5  6 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)   #创建socket通信实例 7 server.setblocking(False)    #设置socket不阻塞 8  9 server_address = ("localhost",9999) # 声明IP和端口号10 print(sys.stderr,"starting up on %s port %s"%server_address)11 server.bind(server_address)     #绑定IP和端口12 server.listen(5)    #最大链接数5个13 14 inputs = [server]   #定义一个列表inputs并把server实例作为第一个元素,如果客户端连接请求过来,就把这个客户端连接存到这个列表15 outputs = []    #定义一个空列表,如果服务器端需要发送数据给客户端,就把客户端连接存到这个列表16 message_queues = {} #定义一个空字典,用来存储要发送给客户端的数据,键(key)是客户端连接,值(value)是数据17 18 while inputs:   #server作为第一个元素保证了程序可以往下走,并一直循环19     print("\nwaiting for the next event")20     readable,writable,exceptional = select.select(inputs,outputs,inputs)    #select阻塞等待IO变化,三个通信列表分别监控所有外部发送过来的数据、所有要发出去的数据和错误信息21     for s in readable:  #循环readable,相当于循环inputs列表22         if s is server:     #如果s是server就会用accept方法接收新的连接23             connection,client_address = s.accept()24             print("new connection from",client_address)25             connection.setblocking(False)26             inputs.append(connection)   #新的连接进来就把它放进inputs列表里27             message_queues[connection] = queue.Queue()  #生成一个以连接为键,以队列为值的元素放进字典message_queues里28         else:   #如果s不是server,就开始接受数据29             data = s.recv(1024)30             if data:31                 print(sys.stderr,"received '%s' from %s "%(data,s.getpeername()))32                 message_queues[s].put(data) #如果收到数据,就把数据先存起来33                 if s not in outputs:    #如果outputs里面还没有这个连接,就把连接加入outputs中34                     outputs.append(s)35             else:   #如果没收到数据,说明连接已经断开,以下是把这个连接的所有数据删除36                 print("closing",client_address,"after read no data")37                 if s in outputs:38                     outputs.remove(s)39                 inputs.remove(s)40                 s.close()41                 message_queues.pop(s)42     for s in writable:  #writable对应了outputs,这里循环把需要发出去的数据发送43         try:44             next_msg = message_queues[s].get_nowait().decode("gbk")   #获取数据45         except queue.Empty: #如果队列为空说明没有数据46             print("output queue for",s.getpeername(),"is empty")47             outputs.remove(s)   #把连接从outputs中移除,避免下一次循环重复检测48         else:49             print("sending '%s' to %s"%(next_msg,s.getpeername()))50             s.send(bytes(next_msg,encoding="gbk"))    #发送数据给客户端51     for s in exceptional:   #exceptional负责把出错的连接删除52         print("handling exceptional condition for",s.getpeername())53         inputs.remove(s)54         if s in outputs:55             outputs.remove(s)56         s.close()57         message_queues.pop(s)
select_server
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 import socket,sys 4  5 messages = ["this is a test message", 6             "please don't reply", 7             "don't reply",] 8  9 server_address = ("localhost",9999)10 socks = [11     socket.socket(socket.AF_INET,socket.SOCK_STREAM),12     socket.socket(socket.AF_INET,socket.SOCK_STREAM),13     # socket.socket(socket.AF_INET,socket.SOCK_STREAM),14     # socket.socket(socket.AF_INET,socket.SOCK_STREAM)15 ]16 17 print(sys.stderr,"connecting to %s port %s"%server_address)18 19 for s in socks:20     s.connect(server_address)21 for message in messages:22     for s in socks:23         # print(sys.stderr,"%s:sending '%s'"%(s.getsockname(),message))24         print("%s:sending '%s'"%(s.getsockname(),message))25         s.send(bytes(message,encoding="gbk"))26 27     for s in socks:28         data = s.recv(1024)29         # print(sys.stderr,"%s: received '%s'"%(s.getsockname(),data.decode("gbk")))30         print("%s: received '%s'"%(s.getsockname(),data.decode("gbk")))31         if not data:32             print("closing socket",s.getsockname())33             s.close()
select_client

 

 

数据库mysql基础

http://www.cnblogs.com/wupeiqi/articles/5095821.html

转载于:https://www.cnblogs.com/akon0207/p/5300413.html

你可能感兴趣的文章
docker file
查看>>
总结一些常见的国际标准化组织
查看>>
webpack 使用经验记录
查看>>
sanic连接mongo
查看>>
Set集合HashSet,TreeSet
查看>>
对象实例化过程分析
查看>>
EasyUI基础知识
查看>>
陀螺仪信号解调
查看>>
App响应式布局
查看>>
安卓手机安装虚拟定位的方法Xposed安装器+模拟位置(Xposed模块)
查看>>
WCF Behaviors(行为)
查看>>
hiho_1057_performance_log
查看>>
ES6
查看>>
字符串时间20180210转化为2018-02-10
查看>>
CDN(内容分发网络)技术原理
查看>>
Spring集成MyBatis
查看>>
1.设计模式 - Singleton模式(单件模式)
查看>>
caioj:1348: [NOIP普及组2012]质因数分解 C++
查看>>
利用 postMessage 进行数据传递 (iframe 及web worker)及问题
查看>>
C# 异步操作 async await 的用法
查看>>