当前位置: 首页 > news >正文

Python-TCP编程-UDP编程-SocketServer-IO各种概念及多路复用-asyncio-学习笔记

欠4前年的一份笔记 ,献给今后的自己。

网络编程

Socket介绍
Socket套接字

Python中提供socket.py标准库,非常底层的接口库。
Socket是一种通用的网络编程接口,和网络层次没有一一对应的关系。
协议族

AF表示Address Family,用于socket()第一个参数
在这里插入图片描述

TCP编程

Socket编程,需要两端,一般来说需要一个服务端、一个客户端,服务端称为Server,客户端称为Client

TCP服务端编程

服务器端编程步骤

  • 创建Socket对象
  • 绑定IP地址Address和端口Port。bind()方法
    IPv4地址为一个二元组(IP地址字符串,Port)
  • 开始监听,将在指定的IP的端口上监听。listen()方法
  • 获取用于传送数据的Socket对象
    socket.accept() -> (socket object, address info)
    accept方法阻塞等待客户端建立连接,返回一个新的Socket对象和客户端地址的二元组
    地址是远程客户端的地址,IPv4中它是一个二元组(clientaddr, port)
    1、接收数据 : recv(bufsize[, flags]) 使用缓冲区接收数据
    2、发送数据 : send(bytes) 发送数据

在这里插入图片描述

问题

两次绑定同一个监听端口会怎么样?

import sockets = socket.socket()  # 创建socket对象
s.bind(('127.0.0.1', 9999))  # 一个二元组s.listen()  # 开始监听
# 开启一个连接s1, info = s.accept()  # 阻塞直到和客户端成功建立连接,返回一个socket对象和客户端地址# 使用缓冲区获取数据data = s1.recv(1024)print(data, info)s1.send(b'magedu.com')# 开启另外一个连接
s2, _ = s.accept()data = s2.recv(1024)s2.send(b'hello python')s.close()

上例accept和recv是阻塞的,主线程经常被阻塞住而不能工作。怎么办?

练习一一写一个群聊程序

需求分析

聊天工具是CS程序,C是每一个客户端,S是服务器端。
服务器应该具有的功能:
启动服务,包括绑定地址和端口,监听
建立连接,能和多个客户端建立连接
接收不同用户的信息
分发,将接收的某个用户的信息转发到已连接的所有客户端
停止服务
记录连接的客户端

代码实现

服务端应该对应一个类

class ChatServer:def __init__(self, ip, port):  # 启动服务self.sock = socket.socket()self.addr = (ip, port)def start(self):  # 启动监听passdef accept(self):  # 多人连接passdef recv(self):  # 接收客户端数据passdef stop(self):  # 停止服务pass

在此基础上,扩展完成

import logging
import socket
import threading
import datetimelogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {}  # 客户端def start(self):  # 启动监听self.sock.bind(self.addr)  # $Bself.sock.listen()  # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self):  # 多人连接while True:sock, client = self.sock.accept()  # 阻塞self.clients[client] = sock  # 添加到客户端字典# 准备接收数据,recv是阻塞的,开启新的线程threading.Thread(target=self.recv, args=(sock, client)).start()def recv(self, sock: socket.socket, client):  # 接收客户端数据while True:data = sock.recv(1024)  # 阻塞到数据到来msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())logging.info(msg)msg = msg.encode()for s in self.clients.values():s.send(msg)def stop(self):  # 停止服务for s in self.clients.values():s.close()self.sock.close()cs = ChatServer()
cs.start

在此基础上,扩展完成

import threading
import datetime
import logging
import socketlogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务logging.info('Connecting to Chat Server')self.sock = socket.socket()self.addr = (ip, port)self.clients = {}  # 客户端self.event = threading.Event()def start(self):  # 启动监听self.sock.bind(self.addr)  # 378self.sock.listen()  # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self):  # 多人连接while not self.event.is_set():sock, client = self.sock.accept()  # BE%self.clients[client] = sock  # 添加到客户端字典# 准备接收数据,recv是阻塞的,开启新的线程threading.Thread(target=self.recv, args=(sock, client)).start()def recv(self, sock: socket.socket, client):  # 接收客户端数据while not self.event.is_set():data = sock.recv(1024)  # 阻塞到数据到来msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())logging.info(msg)msg = msg.encode()for s in self.clients.values():s.send(msg)def stop(self):  # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()cs = ChatServer()
cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)break

基本功能完成,但是有问题。使用Event改进。

import logging
import socketimport threading
import datetimelogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {}  # 客户端self.event = threading.Event()def start(self):  # 启动监听self.sock.bind(self.addr)  #绑定 self.sock.listen()  # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self):  # 多人连接while not self.event.is_set():sock, client = self.sock.accept()  # BEself.clients[client] = sock  # 添加到客户端字典# 准备接收数据,recv是阻塞的,开启新的线程threading.Thread(target=self.recv, args=(sock, client)).start()def recv(self, sock: socket.socket, client):  # 接收客户端数据while not self.event.is_set():data = sock.recv(1024)  # 阻塞到数据到来msg = "{:%Y/%m/%d %H:%M:%S} {}: {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())logging.info(msg)msg = msg.encode()for s in self.clients.values():s.send(msg)def stop(self):  # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()cs = ChatServer()
cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)break

这一版基本能用了,测试通过。但是还有要完善的地方。

例如各种异常的判断,客户端断开连接后字典中的移除客户端数据等。

客户端主动断开带来的问题

服务端知道自己何时断开,如果客户端断开,服务器不知道。

所以,好的做法是,客户端断开发出特殊消息通知服务器端断开连接。但是,如果客户端主动断开,服务端主动发

送一个空消息,超时返回异常,捕获异常并清理连接。

即使 客户端提供了断开命令,也不能保证客户端会使用它断开连接。但是还是要增加这个退出功能。

增加客户端退出命令

import logging
import socketimport threading
import datetimelogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {}  # 客户端self.event = threading.Event()def start(self):  # 启动监听self.sock.bind(self.addr)  # 绑定self.sock.listen()  # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self):  # 多人连接while not self.event.is_set():sock, client = self.sock.accept()  # BEself.clients[client] = sock  # 添加到客户端字典# 准备接收数据,recv是阻塞的,开启新的线程threading.Thread(target=self.recv, args=(sock, client)).start()def recv(self, sock: socket.socket, client):  # 接收客户端数据while not self.event.is_set():data = sock.recv(1024)  # 阻塞到数据到来msg = data.decode().strip()# 客户端退出命令if msg == 'quit':self.clients.pop(client)sock.close()logging.info('{} quits'.format(client))breakmsg = "{:%Y/%m/%d %H:%M:%S} {}: {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())logging.info(msg)msg = msg.encode()for s in self.clients.values():s.send(msg)def stop(self):  # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()cs = ChatServer()
cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate())  # 用来观察断开后线程的变化

程序还有瑕疵,但是业务功能基本完成了
socket常用方法

在这里插入图片描述

MakeFile

socket-makefile(mode=‘r’, buffering=None, *, encoding=None, errors=None, newline=None)
创建一个与该套接字相关连的文件对象,将recv方法看做读方法,将send方法看做写方法。

# 使用makefile简单例子
import socketsockserver = socket.socket()ip = '127.0.0.1'port = 9999addr = (ip, port)sockserver.bind(addr)sockserver.listen()print(' - ' * 30)
s, _ = sockserver.accept()print(' - ' * 30)f = s.makefile(mode='rw')line = f.read(10)  # 阻塞等
print(' - ' * 30)
print(line)
f.write('Return your msg: {}'.format(line))
f.flush()

上例不能循环接收消息,修改一下

# 使用makefile简单例子
import socket
import threadingsockserver = socket.socket()ip = '127.0.0.1'port = 9999addr = (ip, port)sockserver.bind(addr)sockserver.listen()print(' - ' * 30)event = threading.Event()def accept(sock: socket.socket, e: threading.Event):s, _ = sock.accept()f = s.makefile(mode='rw')while True:line = f.readline()print(line)if line.strip() == "quit":  # 注意要发quit\nbreakf.write('Return your msg: (}'.format(line))f.flush()f.close()sock.close()e.wait(3)t = threading.Thread(target=accept, args=(sockserver, event))
t.start()
t.join()
print(sockserver)

makefile练习
使用makefile改写群聊类

import logging
import socket
import threading
import datetime
import socketlogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {}  # 客户端self.event = threading.Event()def start(self):  # 启动监听self.sock.bind(self.addr)  # 绑定self.sock.listen()  # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self):  # 多人连接while not self.event.is_set():sock, client = self.sock.accept()  # 阻塞# 准备接收数据,recv是阻塞的,开启新的线程f = sock.makefile('rw')  # 支持读写self.clients[client] = f  # 添加到客户端字典threading.Thread(target=self.recv, args=(f, client), name='recv').start()def recv(self, f, client):  # 接收客户端数据while not self.event.is_set():data = f.readline()  # 阻塞到换行符msg = data.strip()# 客户端退出命令if msg == 'quit':self.clients.pop(client)f.close()logging.info('{}  quits'.format(client))breakmsg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)logging.info(msg)for s in self.clients.values():s.write(msg)s.flush()def stop(self):  # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()cs = ChatServer()
cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate())  # 用来观察断开后线程的变化

上例完成了基本功能,但是,如果客户端主动断开,或者readline出现异常,就不会从clients中移除作废的
socket。可以使用异常处理解决这个问题。

ChatServer实验用完整代码

注意,这个代码为实验用,代码中瑕疵还有很多。Socket太底层了,实际开发中很少使用这么底层的接口。
增加一些异常处理。

import logging
import socket
import threading
import datetime
import socketlogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {}  # 客户端self.event = threading.Event()def start(self):  # 启动监听self.sock.bind(self.addr)  # 绑定self.sock.listen()  # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self):  # 多人连接while not self.event.is_set():sock, client = self.sock.accept()  # 阻塞# 准备接收数据,recv是阻塞的,开启新的线程f = sock.makefile('rw')  # 支持读写self.clients[client] = f  # 添加到客户端字典threading.Thread(target=self.recv, args=(f, client), name='recv').start()def recv(self, f, client):  # 接收客户端数据while not self.event.is_set():try:data = f.readline()  # 阻塞到换行符except Exception as e:logging.error(e)data = 'quit'msg = data.strip()# 客户端退出命令if msg == 'quit':self.clients.pop(client)f.close()logging.info('{}  quits'.format(client))breakmsg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)logging.info(msg)for s in self.clients.values():s.write(msg)s.flush()def stop(self):  # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()def main():cs = ChatServer()cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate())  # 用来观察断开后线程的变化if __name__ == '__main__':main()

TCP客户端编程

客户端编程步骤
  • 创建Socket对象
  • 连接到远端服务端的ip和port,connect()方法
  • 传输数据
    使用send、recv方法发送、接收数据
  • 关闭连接,释放资源
import socketclient = socket.socket()
ipaddr = ('127.0.0.1', 9999)
client.connect(ipaddr)  # 直接连接服务器
client.send(b'abcd\n')
data = client.recv(1024)  # 阻塞等待
print(data)
client.close()

开始编写客户端类

import logging
import socket
import threading
import datetime
import socketlogging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.clients = {}  # 客户端self.event = threading.Event()def start(self):  # 启动监听self.sock.bind(self.addr)  # 绑定self.sock.listen()  # 监听# accept会阻塞主线程,所以开一个新线程threading.Thread(target=self.accept).start()def accept(self):  # 多人连接while not self.event.is_set():sock, client = self.sock.accept()  # 阻塞# 准备接收数据,recv是阻塞的,开启新的线程f = sock.makefile('rw')  # 支持读写self.clients[client] = f  # 添加到客户端字典threading.Thread(target=self.recv, args=(f, client), name='recv').start()def recv(self, f, client):  # 接收客户端数据while not self.event.is_set():try:data = f.readline()  # 阻塞到换行符except Exception as e:logging.error(e)data = 'quit'msg = data.strip()# 客户端退出命令if msg == 'quit':self.clients.pop(client)f.close()logging.info('{}  quits'.format(client))breakmsg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)logging.info(msg)for s in self.clients.values():s.write(msg)s.flush()def stop(self):  # 停止服务for s in self.clients.values():s.close()self.sock.close()self.event.set()def main():cs = ChatServer()cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate())  # 用来观察断开后线程的变化if __name__ == '__main__':main()

在这里插入图片描述

在这里插入图片描述

同样,这样的客户端还是有些问题的,仅用于测试。

UDP编程

测试命令

> netstat -anp udp | find “9988” # windows查找udp是否启动端口
$ echo “123abc” | nc -u 127.0.0.1 9988 # linux下发给服务端数据\

UDP服务端编程

UDP服务端编程流程

在这里插入图片描述

  • 创建socket对象。socket.SOCK_DGRAM
  • 绑定IP和Port,bind0方法
  • 传输数据
    接收数据,socket.recvfrom(bufsize[, flagsl),获得一个二元组(string, address)
    发送数据,socket.sendto(string, address)发给某地址某信息
  • 释放资源
import socketserver = socket.socket(type=socket.SOCK_DGRAM)
server.bind(('0.0.0.0', 9999))  # 立即绑定一个udp端口
data = server.recv(1024)  # 阻塞等待数据
data = server.recvfrom(1024)  # 阻塞等待数据(value,(ip,port))
server.sendto(b'7', ('192.168.142.1', 10000))
server.close()

UDP客户端编程流程

  • 创建socket对象。 socket.SOCK_DGRAM
  • 发送数据,socket. sendto(string, address)发给某地址某信息
  • 接收数据,socket.recvfrom(bufsize[, flags]),获得一个二元组(string, address)
  • 释放资源
import socketclient = socket.socket(type=socket.SOCK_DGRAM)
raddr = ('192.168.142.1', 10000)
client.connect(raddr)
client.sendto(b'8', raddr)
client.send(b'9')
data = client.recvfrom(1024)  # 阻塞等待数据(value,(ip,port))
data = client.recv(1024)  # 阻塞等待数据
client.close() 

注意:UDP是无连接协议,所以可以只有任何一端,例如客户端数据发往服务端,服务端存在与否无所谓。

UDP编程中bind、connect、send、sendto、recv、recvfrom方法使用

UDP的socket对象创建后,是没有占用本地地址和端口的。

在这里插入图片描述

练习——UDP版群聊
UDP版群聊服务端代码

import socket# 服务端类的基本架构
class ChatUDPServer:def __init__(self, ip='127.0.0.1', port=9999):self.addr = (ip, port)self.sock = socket.socket(type=socket.SOCK_DGRAM)def start(self):self.sock.bind(self.addr)  # 立即绑定self.sock.recvfrom(1024)  # 阻塞接收数据def stop(self):self.sock.close()

在上面代码的基础之上扩充

import socket
import threading
import datetime
import logging
from tkinter.font import namesFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatUDPServer:def __init__(self, ip='127.0.0.1', port=9999):self.addr = (ip, port)self.sock = socket.socket(type=socket.SOCK_DGRAM)self.clients = set()  # 记录客户端self.event = threading.Event()def start(self):self.sock.bind(self.addr)  # 立即绑定# 启动线程threading.Thread(target=self.recv, name='recv').start()def recv(self):while not self.event.is_set():data, raddr = self.sock.recvfrom(1024)  # 阻塞接收数据if data.strip() == b'quit':# 有可能发来数据的不在clients中if raddr in self.clients:self.clients.remove(raddr)logging.info('{} leaving'.format(raddr))continueself.clients.add(raddr)msg = '{}. from {}: {}'.format(data.decode(), *raddr)logging.info(msg)msg = msg.encode()for c in self.clients:self.sock.sendto(msg, c)  # 不保证对方能够收到def stop(self):for c in self.clients:self.sock.sendto(b'bye', c)self.sock.close()self.event.set()def main():cs = ChatUDPServer()cs.start()while True:cmd = input(">>>")if cmd.strip() == 'quit':cs.stop()breaklogging.info(threading.enumerate())logging.info(cs.clients)if __name__ == '__main__':main()

UDP群聊客户端代码

import threading
import socket
import loggingFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message) s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatUdpClient:def __init__(self, rip='127.0.0.1', rport=9999):self.sock = socket.socket(type=socket.SOCK_DGRAM)self.raddr = (rip, rport)self.event = threading.Event()def start(self):self.sock.connect(self.raddr)  # 占用本地地址和端口,设置远端地址和端口threading.Thread(target=self.recv, name='recv').start()def recv(self):while not self.event.is_set():data, raddr = self.sock.recvfrom(1024)msg = '{}. from {}:{}'.format(data.decode(), *raddr)logging.info(msg)def send(self, msg: str):self.sock.sendto(msg.encode(), self.raddr)def stop(self):self.sock.close()self.event.set()def main():cc1 = ChatUdpClient()cc2 = ChatUdpClient()cc1.start()cc2.start()print(cc1.sock)print(cc2.sock)while True:cmd = input('Input your words >>')if cmd.strip() == 'quit':cc1.stop()cc2.stop()breakcc1.send(cmd)cc2.send(cmd)if __name__ == '__main__':main()

上面的例子并不完善,如果客户端断开了,服务端不知道。每一个服务端还需要对所有客户端发送数据,包括已经断开的客户端。

代码改进

服务端代码改进

加一个ack机制和心跳heartbeat。心跳,就是一端定时发往另一端的信息,一般每次数据越少越好。心跳时间间
隔约定好就行。ack即响应,一端收到另一端的消息后返回的信息。

心跳机制

1、一般来说是客户端定时发往服务端的,服务端并不需要ack回复客户端,只需要记录该客户端还活着就行了。
2、如果是服务端定时发往客户端的,一般需要客户端ack响应来表示活着,如果没有收到ack的客户端,服务端
移除其信息。这种实现较为复杂,用的较少。
3、也可以双向都发心跳的,用的更少。

在服务器端代码中使用第一种机制改进

import socket
import threading
import datetime
import logging
from tkinter.font import namesFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatUDPServer:def __init__(self, ip='127.0.0.1', port=9999,interval=10):self.addr = (ip, port)self.sock = socket.socket(type=socket.SOCK_DGRAM)self.clients = set()  # 记录客户端self.event = threading.Event()self.interval = interval  # 默认10秒,超时就要移除对应的客户端def start(self):self.sock.bind(self.addr)  # 立即绑定# 启动线程threading.Thread(target=self.recv, name='recv').start()def recv(self):while not self.event.is_set():localset = set() # 清理超时data, raddr = self.sock.recvfrom(1024)  # 阻塞接收数据current = datetime.datetime.now().timestamp()  # floatif data.strip() == b'^hb^':  # 心跳信息 ifself.clients[raddr] = currentcontinueelif data.strip() == b'quit':# 有可能发来数据的不在clients中self.clients.pop(raddr, None)logging.info('{} leaving'.format(raddr))continue# 有信息来就更新时间# 什么时候比较心跳时间呢?发送信息的时候,反正要遍历一遍self.clients[raddr] = currentmsg = '{}. from {}:{}'.format(data.decode(), *raddr)logging.info(msg)msg = msg.encode()for c, stamp in self.clients.items():if current - stamp > self.interval:localset.add(c)else:self.sock.sendto(msg, c)   # 不保证对方能够收到for c in localset:self.clients.pop(c)def stop(self):for c in self.clients:self.sock.sendto(b'bye', c)self.sock.close()self.event.set()def main():cs = ChatUDPServer()cs.start()while True:cmd = input(">>>")if cmd.strip() == 'quit':cs.stop()breaklogging.info(threading.enumerate())logging.info(cs.clients)if __name__ == '__main__':main()

客户端代码改进
增加定时发送心跳代码

import threading
import socket
import loggingfrom pydantic.v1 import parse_file_asFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatUdpClient:def __init__(self, rip='127.0.0.1', rport=9999):self.sock = socket.socket(type=socket.SOCK_DGRAM)self.raddr = (rip, rport)self.event = threading.Event()def start(self):self.sock.connect(self.raddr)  # 占用本地地址和端口,设置远端地址和端口threading.Thread(target=self._sendhb, name='heartbeat', daemon=True).start()threading.Thread(target=self.recv, name='recv').start()def _sendhb(self):# 心跳while not self.event.wait(5):self.send(' ^hb^')def recv(self):while not self.event.is_set():data, raddr = self.sock.recvfrom(1024)msg = '{}. from {}:{}'.format(data.decode(), *raddr)logging.info(msg)def send(self, msg: str):print(msg)self.sock.sendto(msg.encode(), self.raddr)def stop(self):self.send('quit')  # 通知服务端退出self.sock.close()self.event.set()def main():cc1 = ChatUdpClient()cc2 = ChatUdpClient()cc1.start()cc2.start()print(cc1.sock)print(cc2.sock)while True:cmd = input('Input your words >>')if cmd.strip() == 'quit':cc1.stop()cc2.stop()breakcc1.send(cmd)cc2.send(cmd)if __name__ == '__main__':main()

UDP协议应用

UDP是无连接协议,它基于以下假设:网络足够好 消息不会丢包 包不会乱序
但是,即使是在局域网,也不能保证不丢包,而且包的到达不一定有序。
应用场景 视频、音频传输,一般来说,丢些包,问题不大,最多丢些图像、听不清话语,可以重新发话语来解决。
海量采集数据,例如传感器发来的数据,丢几十、几百条数据也没有关系。DNS协议,数据内容小,一个包就能
查询到结果,不存在乱序,丟包,重新请求解析。
一般来说,UDP性能优于TCP,但是可靠性要求高的场合的还是要选择TCP协议。

SocketServer

socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层

AP进行封装,Python的封装就是——socketserver模块。它是网络服务编程框架,便于企业级快速开发。

类的继承关系

在这里插入图片描述

SocketServer简化了网络服务器的编写。

它有4个同步类:TCPServer ,UDPServer , UnixStreamServer,UnixDatagramServer。
2个Mixin类:ForkingMixIn 和 ThreadingMixIn类,用来支持异步。
class ForkingUDPServer(ForkingMixin, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServerThreadingMixIn, UDPServer): pass
class ThreadingTCPServerThreadingMixIn, TCPServer): pass
fork是创建多进程,thread是创建多线程

编程接口
socketserver.BaseServer(server_address, RequestHandlerClass)

需要提供服务器绑定的地址信息,和用于处理请求的RequestHandlerClass类。

RequestHandlerClass类必须是BaseRequestHandler类的子类,在BaseServer中代码如下:

import threading# BaseServer代码
class BaseServer:def __init__(self, server_address, RequestHandlerClass):"""Constructor. May be extended, do not override."""self.server_address = server_addressself.RequestHandlerClass = RequestHandlerClassself.__is_shut_down = threading.Event()self.__shutdown_request = Falsedef finish_request(self, request, client_address):  # 处理请求的方法"""Finish one request by instantiating RequestHandlerClass."""self.RequestHandlerClass(request, client_address, self)  # RequestHandlerClass*i*

BaseRequestHandler类

它是和用户连接的用户请求处理类的基类,定义为BaseRequestHandler(request, client_address, server)
服务端Server实例接收用户请求后,最后会实例化这个类。
它被初始化时,送入3个构造参数:request, client_address, server自身
以后就可以在BaseRequestHandler类的实例上使用以下属性:
self.request是和客户端的连接的socket对象
self.server是TCPServer本身
self.client_address是客户端地址

这个类在初始化的时候,它会依次调用3个方法。子类可以覆盖这些方法。

# BaseRequestHandler要子类覆盖的方法
class BaseRequestHandler:def __init__(self, request, client_address, server):self.request = requestself.client_address = client_addressself.server = serverself.setup()try:self.handle()finally:self.finish()def setup(self):  # 每一个连接初始化passdef handle(self):  # 每一次请求处理passdef finish(self):  # 每一个连接清理pass

测试代码

import threading
import socketserverclass MyHandler(socketserver.BaseRequestHandler):def handle(self):# super().handle()# 可以不调用,父类handle什么都没有做print('-' * 30)print(self.server)  # 服务print(self.request)  # 服务端负责客户端连接请求的socket对象print(self.client_address)  # 客户端地址print(self.__dict__)print(self.server.__dict__)  # 能看到负责accept的socketprint(threading.enumerate())print(threading.current_thread())print('-' * 30)addr = ('127.0.0.1', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever()  # 永久

测试结果说明,handle方法相当于socket的recv方法。每个不同的连接上的请求过来后,生成这个连接的socket对象即self.request,客户端地址是self.dlient_address。
问题

测试过程中,上面代码,连接后立即断开了,为什么?

怎样才能客户端和服务器端长时间连接?

import threading
import socketserver
import loggingFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)class MyHandler(socketserver.BaseRequestHandler):def handle(self):# super().handle()# 可以不调用,父类handle什么都没有做print('-' * 30)print(self.server)  # 服务print(self.request)  # 服务端负责客户端连接请求的socket对象print(self.Client_address)  # 客户端地址print(self.__dict__)print(self.server.__dict__)  # 能看到负责accept的print(threading.enumerate())print(threading.current_thread())print('-' * 30)for i in range(3):data = self.request.recv(1024)logging.info(data)logging.info('====end====')addr = ('127.0.0.1', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever()  # 永久

将ThreadingTCPServer换成TCPServer,同时连接2个客户端观察效果。

ThreadingTCPServer是异步的,可以同时处理多个连接。

TCPServer是同步的,一个连接处理完了,即一个连接的handle方法执行完了,才能处理另一个连接,且只有主线程。

总结

创建服务器需要几个步骤:
1.从BaseRequestHandler类派生出子类,并覆盖其handle(方法来创建请求处理程序类,此方法将处理
传入请求
2.实例化一个服务器类,传参服务器的地址和请求处理类
3.调用服务器实例的handle_request()或serve_forever()方法
4.调用server_close()关闭套接字

实现EchoServer

顾名思义,Echo,来什么消息回显什么消息
客户端发来什么信息,返回什么信息

import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sysclass EchoHandler(BaseRequestHandler):def setup(self):super().setup()self.event = threading.Event()  # 初始工作def finish(self):super().finish()self.event.set()def handle(self):super().handle()while not self.event.is_set():data = self.request.recv(1024).decode()msg = "{} {}".format(self.client_address, data).encode()self.request.send(msg)print('End')addr = ('127.0.0.1', 9999)
server = ThreadingTCPServer(addr, EchoHandler)server_thread = threading.Thread(target=server.serve_forever, name='EchoServer', daemon=True)
server_thread.start()try:while True:cmd = input('>>>')if cmd.strip() == 'quit':breakprint(threading.enumerate())
except Exception as e:print(e)
except KeyboardInterrupt:pass
finally:print('Exit')sys.exit(0)

练习一一改写ChatServer

使用ThreadingTCPServer改写ChatServer

import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
import loggingFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatHandler(BaseRequestHandler):clients = {}def setup(self):super().setup()self.event = threading.Event()  # 初始工作self.clients[self.client_address] = self.requestdef finish(self):super().finish()  # 清理工作self.clients.pop(self.client_address)  # 能执行到吗?self.event.set()def handle(self):super().handle()while not self.event.is_set():data = self.request.recv(1024).decode()if data == 'quit':breakmsg = "{} {}".format(self.client_address, data).encode()logging.info(msg)for c in self.clients.values():self.request.send(msg)print('End')addr = ('127.0.0.1', 9999)server = ThreadingTCPServer(addr, ChatHandler)server_thread = threading.Thread(target=server.serve_forever, name='ChatServer', daemon=True)server_thread.start()try:while True:cmd = input('>>>')if cmd.strip() == 'quit':breakprint(threading.enumerate())
except Exception as e:print(e)
except KeyboardInterrupt:pass
finally:print('Exit')sys.exit(0)

问题
上例 self.clients.pop(self.client_address) 能执行到吗?

如果连接的线程中handle方法中抛出异常,例如客户端主动断开导致的异常,线程崩溃,self.clients的pop方法还能执行吗?

当然能执行,基类源码保证了即使异常,也能执行finish方法。但不代表不应该不捕获客户端各种异常。

解决客户端主动连接断开问题

如果客户端主动断开,总是抛出一个异常。看看到底发生了什么,在handle方法中增加一些语句。

import threading
import logging
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sysclass ChatHandler(BaseRequestHandler):clients = {}def setup(self):super().setup()self.event = threading.Event()  # 初始工作self.clients[self.client_address] = self.requestdef finish(self):super().finish()  # 清理工作self.clients.pop(self.client_address)  # 能执行到吗?self.event.set()def handle(self):super().handle()while not self.event.is_set():data = self.request.recv(1024).decode()print(data, '~' * 30)  # 增加if data == 'quit':breakmsg = "{} {}".format(self.client_address, data).encode()logging.info(msg)for c in self.clients.values:print('+++++++++++++')  # 增加self.request.send(msg)print('End')

通过打印可以看到,客户端主动断开,会导致recv方法立即返回一个空bytes,并没有同时抛出异常。当循环回到recv这一句的时候就会抛出异常。所以,可以通过判断data数据是否为空来客户端是否断开。

import threading
import logging
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sysclass ChatHandler(BaseRequestHandler):clients = {}def setup(self):super().setup()self.event = threading.Event()  # 初始工作self.clients[self.client_address] = self.requestdef finish(self):super().finish()  # 清理工作self.Clients.pop(self.client_address)  # 能执行到吗?self.event.set()def handle(self):super().handle()while not self.event.is_set():data = self.request.recv(1024).decode()print(data, '~' * 30)if not data or data == 'quit':print('Broken pipe')breakmsg = "{} {}".format(self.client_address, data).encode()logging.info(msg)for c in self.clients.values:self.request.send(msg)print('End')

总结
为每一个连接提供RequestHandlerClass类实例,依次调用setup、handle、finish方法,且使用了try...finally结构保证finish方法一定能被调用。这些方法依次执行完成,如果想维持这个连接和客户端通信,就需要在handle函数中使用循环。socketserver模块提供的不同的类,但是编程接口是一样的,即使是多进程、多线程的类也是一样,大大减少了编程的难度。

异步编程

重要概念

同步、异步

函数或方法被调用的时候,调用者是否得到最终结果的。

直接得到最终结果结果的,就是同步调用;

不直接得到最终结果的,就是异步调用。

同步就是我让你打饭,你不打好给我不走开,直到你打饭给了我。
异步就是我让你打饭,你打着,我不等你,但是我会盯着你,你打完,我会过来拿走的。异步并不保证多长时间最终打完饭。

阻塞、非阻塞

函数或方法调用的时候,是否立刻返回。
立即返回就是非阻塞调用;
不立即返回就是阻塞调用。

区别

同步、异步,与阻塞、非阻塞不相关。
同步、异步强调的是,是否得到(最终的)结果;
阻塞、非阻塞强调是时间,是否等待。

同步与异步区别在于:调用者是否得到了想要的最终结果。

同步就是一直要执行到返回最终结果;

异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,还要通过被调用者,使用其它方式通知调用者,来取回最终结果。

阻塞与非阻塞的区别在于,调用者是否还能干其他事。

阻塞,调用者就只能干等;

非阻塞,调用者可以先去忙会别的,不用一直等。

联系

同步阻塞,我啥事不干,就等你打饭打给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。
同步非阻塞,我等着你打饭给我,但我可以玩会手机、看看电视。打饭是结果,但是我不一直等
异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。例如,叫号
异步非阻塞,我要打饭,你说等叫号,并没有返回饭给我,我在旁边看电视、玩手机,饭打好了叫我。

同步IO、异步IO、IO 多路复用

IO 两个阶段

IO过程分两阶段:
1.数据准备阶段
2.内核空间复制回用户进程缓冲区阶段
发生I0的时候: 人的高薪职业学
1、内核从输入设备读、写数据(淘米,把米放饭锅里煮饭)
2、进程从内核复制数据(盛饭,从内核这个饭锅里面把饭装到碗里来)
系统调用一—read函数

IO模型

同步IO
同步IO 模型包括 阻塞IO 、非阻塞IO 、IO 多路复用
阻塞IO

在这里插入图片描述

进程等待(阻塞),直到读写完成。(全程等待)
read/write函数
非阻塞1O

在这里插入图片描述
进程调用read操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。
第一阶段数据没有准备好,就先忙别的,等会再来看看。检 数据是否准备好了的过程是非阻塞的。
第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。
淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。
read/write

IO多路复用

所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力。

select几乎所有操作系统平台都支持,poll是对的select的升级。

epoll, Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有
kqueue , Windows有iocp.

在这里插入图片描述

以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核"“监视"select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。

select举例,食堂供应很多菜(众多的I0),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好等待。其中一样菜好了,大师傅叫你过来说你点的菜有好的了,你得自己找找看哪一样才好了,请服务员把做好的菜打给你。

epoll是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。

一般情况下,select最多能监听1024个fd(可以修改,但不建议改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。

epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。

异步IO

在这里插入图片描述

进程发起异步IO请求,立即返回。内核完成1O的两个阶段,内核给进程发一个信号。

举例,来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。

在整个过程中,进程都可以忙别的,等好了才过来。

举例,今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。

Linux的aio的系统调用,内核从版本2.6开始支持

Python 中IO多路复用

  • IO多路复用
    1、 大多数操作系统都支持select和poll
    2、 Linux 2.5+ 支持epoll
    3、 BSD、Mac支持kqueue
    4、 Windows JOCP

Python的select库

实现了select、poll系统调用,这个基本上操作系统都支持。部分实现了epoll

底层的I0多路复用模块。

开发中的选择
1、完全跨平台,使用select、poll。但是性能较差
2、针对不同操作系统自行选择支持的技术,这样做会提高10处理的性能

selectors库

3.4 版本提供这个库,高级 IO 复用库。

类层次结构:
BaseSelector
+-- SelectSelector 实现select
+-- PollSelector 实现po11
+-- EpollSelector 实现epol1
+-- DevpollSelector 实现devpo11
+-- KqueueSelector 实现kqueue

selectors.DefaultSelector返回当前平台最有效、性能最高的实现。

但是,由于没有实现Windows下的IOCP,所以,只能退化为select。

# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():DefaultSelector = PollSelector
else:DefaultSelector = SelectSelector 

abstractmethod register(fileobj, events, data=None)
为selector注册一个文件对象,监视它的IO事件。
fileobj被监视文件对象,例如socket对象
events 事件,该文件对象必须等待的事件
data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联方法。通过这个

参数在关注的事件产生后让selector干什么事。
在这里插入图片描述

# 使用举例
import selectors
import threading
import socket
import loggingFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)# 回调函数,自己定义形参def accept(sock: socket.socket, mask):"""mask:事件掩码的或值"""conn, raddr = sock.accept()conn.setblocking(False)  # 不阻塞# 监视conn这个文件对象key = selector.register(conn, selectors.EVENT_READ, read)logging.info(key)# 回调函数
def read(conn: socket.socket, mask):data = conn.recv(1024)msg = "Your msg is {}.", format(data.decode())conn.send(msg.encode())# 构造缺省性能最优selector
selector = selectors.DefaultSelector()
# 创建Tcp Server
sock = socket.socket()
sock.bind(('127.0.0.1', 9999))sock.listen()
logging.info(sock)sock.setblocking(False)  # 非阻塞# 注册文件对象sock关注读事件,返回SelectorKey
# 将sock、关注事件、data都绑定到key实例属性上
key = selector.register(sock, selectors.EVENT_READ, accept)logging.info(key)
e = threading.Event()def select(e):while not e.is_set():# 开始监视,等到有文件对象监控事件产生,返回(key,mask)元组events = selector.select()print('-' * 30)for key, mask in events:logging.info(key)logging.info(mask)callback = key.data  # 回调函数callback(key.fileobj, mask)threading.Thread(target=select, args=(e,), name='select').start()def main():while not e.is_set():cmd = input('>>')if cmd.strip() == 'quit':e.set()fobjs = []logging.info('{}'.format(list(selector.get_map().items())))for fd, key in selector.get_map().items():  # 返回注册的项print(fd, key)print(key.fileobj)fobjs.append(key.fileobj)for fobj in fobjs:selector.unregister(fobj)fobj.close()  # 关闭socketselector.close()if __name__ == '__main__':main()

练习

将ChatServer改写成IO多路复用的方式
不需要启动多线程来执行socket的accept、recv方法了

import socket
import threading
import datetime
import logging
import selectorsFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务self.sock = socket.socket()self.addr = (ip, port)self.event = threading.Event()self.selector = selectors.DefaultSelector()  # 创建selectordef start(self):  # 启动监听self.sock.bind(self.addr)  # 绑定self.sock.listen()  # 监听self.sock.setblocking(False)  # 不阻塞# 注册self.selector.register(self.sock, selectors.EVENT_READ, self.accept)threading.Thread(target=self.select, name='selector', daemon=True).start()def select(self):  # 阻基while not self.event.is_set():# 开始监视,等到有文件对象监控事件产生,返回(key,mask)元组events = self.selector.select()print('-' * 30)for key, mask in events:logging.info(key)logging.info(mask)callback = key.data  # 回调函数callback(key.fileobj)def accept(self, sock: socket.socket):  # 多人连接conn, addr = sock.accept()  # BaZEconn.setblocking(False)  # 非阻塞# 注册,监视每一个连接的socket对象self.selector.register(conn, selectors.EVENT_READ, self.recv)def recv(self, sock: socket.socket):  # 接收客户端数据data = sock.recv(1024)  # 阻塞到数据到来if data == b'':  # 客户端主动断开,注销并关闭socketself.selector.unregister(sock)sock.close()returnmsg = "{:%Y/%m/%d %H:%M:%S} {}: {}\n{}\n".format(datetime.datetime.now(), *sock.getpeername(), data.decode())logging.info(msg)msg = msg.encode()# 群发for key in self.selector.get_map().values():if key.data == self.recv:  # #ißself.acceptkey.fileobj.send(msg)def stop(self):  # 停止服务self.event.set()fobjs = []for fd, key in self.selector.get_map().items():fobjs.append(key.fileobj)for fobj in fobjs:self.selector.unregister(fobj)fobj.close()self.selector.close()def main():cs = ChatServer()cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate())if __name__ == '__main__':main()

基本完成功能,但是退出机制、异常处理没有加,这个和以前的处理方式一样,请自行完成。

进阶
send是写操作,也可以让selector监听,如何监听?
self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)
注册语句,要监听selectors.EVENT_READ | selectors.EVENT_WRITE 读与写事件。

回调的时候,需要mask来判断究竟是读触发还是写触发了。所以,需要修改方法声明,增加mask。

def recv(self, sock, mask)但是由于recv 方法处理读和写事件,所以叫recv不太合适,改名为
def handle(self, sock, mask)
注意读和写是分离的,那么handle函数应该写成下面这样

def handle(self, sock:socket.socket,mask) #  接收客户端数据if mask & selectors. EVENT_READ:pass# 注意,这里是某一个socket的写操作if mask & selectors.EVENT_WRITE:# 写缓冲区准备好了,可以写入数据了pass

handle方法里面处理读、写,mask有可能是0b01、0b10、0b11。

问题是,假没读取到了客户端发来的数据后,如何写出去?

为每一个与客户端连接的socket对象增加对应的队列。

与每一个客户端连接的socket对象,自己维护一个队列,某一个客户端收到信息后,会遍历发给所有客户端的队

列。这里完成一对多,即一份数据放到了所有队列中。

与每一个客户端连接的socket对象,发现自己队列有数据,就发送给客户端。

import socket
import threading
import datetime
import logging
import selectors
from queue import QueueFORMAT = "%(asctime)s %(threadName)s %(thread)d %(message) s"logging.basicConfig(format=FORMAT, level=logging.INFO)class ChatServer:def __init__(self, ip='127.0.0.1', port=9999):self.sock = socket.socket()self.addr = (ip, port)self.clients = {}self.event = threading.Event()self.selector = selectors.DefaultSelector()  # 创建 selectordef start(self):  # 启动监听self.sock.bind(self.addr)  # 绑定self.sock.listen()  # 监听self.sock.setblocking(False)  # 不阻塞# 注册self.selector.register(self.sock, selectors.EVENT_READ, self.accept)threading.Thread(target=self.select, name='selector', daemon=True).start()def select(self):  # 阻塞while not self.event.is_set():# 开始监视,等到某文件对象被监控的事件产生,返回(key,mask)元组events = self.selector.select()  # 阻塞,直到eventsfor key, mask in events:if callable(key.data):callback = key.data  # key对象的data属性,回调callback(key.fileobj, mask)else:callback = key.data[0]callback(key, mask)def accept(self, sock: socket.socket, mask):  # 接收客户端连接conn, raddr = sock.accept()conn.setblocking(False)  # 非阻塞self.clients[raddr] = (self.handle, Queue())# 注册,监视每一个与客户端的连接的socket对象self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.clients[raddr])def handle(self, key: selectors.SelectorKey, mask):  # 接收客户端数据if mask & selectors.EVENT_READ:sock = key.fileobjraddr = sock.getpeername()data = sock.recv(1024)if not data or data == b'quit':self.selector.unregister(sock)sock.close()self.clients.pop(raddr)returnmsg = "{:%Y/%m/%d %H:%M:%S} {}: {}\n{}\n".format(datetime.datetime.now(), *raddr, data.decode())logging.info(msg)msg = msg.encode()for k in self.selector.get_map().values():logging.info(k)if isinstance(k.data, tuple):k.data[1].put(data)if mask & selectors.EVENT_WRITE:# 因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样if not key.data[1].empty():key.fileobj.send(key.data[1].get())def stop(self):  # 停止服务self.event.set()fobjs = []for fd, key in self.selector.get_map().items():fobjs.append(key.fileobj)for fobj in fobjs:self.selector.unregister(fobj)fobj.close()self.selector.close()def main():cs = ChatServer()cs.start()while True:cmd = input('>>').strip()if cmd == 'quit':cs.stop()threading.Event().wait(3)breaklogging.info(threading.enumerate())logging.info('-' * 30)logging.info("{} {}".format(len(cs.clients), cs.clients))logging.info(list(map(lambda x: (x.fileobj.fileno(), x.data), cs.selector.get_map().values())))logging.info('-' * 30)if __name__ == '__main__':main()

这个程序最大的问题,在select0一直判断可写,几乎一直循环不停。所以对于写不频繁的情况下,就不要监听EVENT_WRITE。

对于Server来说,一般来说,更多的是等待对方发来数据后响应时才发出数据,而不是积极的等着发送数据。所以监听EVENT_READ,收到数据之后再发送就可以了。

本例只完成基本功能,其他功能如有需要,请自行完成。

asyncio

3.4版本加入标准库。
asyncio底层基于selectors实现,看似库,其实就是个框架,包含异步IO、事件循环、协程、任务等内容。

问题的引出

def a():for x in range(3):print(x)def b():for x in "abc":print(x)a()
b()输出:
0
1
2
a
b
c

这是一个串行的程序,单线程中根本没有做任何能否并行?

import threading
import timedef a():for x in range(3):time.sleep(0.001)  #print(x)def b():for x in "abc":time.sleep(0.001)print(x)threading.Thread(target=a, name='a').start()
threading.Thread(target=b, name='b').start()
# 运行结果
0
a
1
b
2
c
import multiprocessing
import timedef a():for x in range(3):time.sleep(0.001)print(x)def b():for x in "abc":time.sleep(0.001)print(x)if __name__ == "__main__":multiprocessing.Process(target=a, name='a').start()multiprocessing.Process(target=b, name='b').start()
输出:
a
0
b
1
c
2

生成器版本

def a():for x in range(3):print(x)yielddef b():for x in "abc":print(x)yieldx = a()
y = b()for i in range(3):next(x)next(y)
输出:
0
a
1
b
2
c

上例在一个线程内通过生成器完成了调度,让两个函数都有机会执行,这样的调度不是操作系统的进程、线程完成的,而是用户自己设计的。

这个程序编写:
需要使用yield来让出控制权

需要循环帮助交替执行

事件循环

事件循环是asyncio提供的核心运行机制。

在这里插入图片描述

协程

  • 协程不是进程、也不是线程,它是用户空间调度的完成并发处理的方式。
  • 进程、线程由操作系统完成调度,而协程是线程内完成调度。它不需要更多的线程,自然也没有多线程切换
    带来的开销。
  • 协程是非抢占式调度,只有一个协程主动让出控制权,另一个协程才会被调度。
  • 协程也不需要使用锁机制,因为是在同一个线程中执行。
  • 多CPU下,可以使用多进程和协程配合,既能进程并发又能发挥协程在单线程中的优势。
  • Python中协程是基于生成器的。

协程的使用
3.4引|入asyncio,使用装饰器

import asyncio@asyncio.coroutine
def sleep(x):  # 协程函数for i in range(3):print('sleep {}'.format(i))yield from asyncio.sleep(x)loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()

将生成器函数转换成协程函数,就可以在事件循环中执行了。

3.5版本开始,Python提供关键字async、await,在语言上原生支持协程。

import asyncioasync def sleep(x):for i in range(3):print('sleep (}'.format(i))await asyncio.sleep(x)loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()

async def用来定义协程函数,iscoroutinefunction() 返回True。协程函数中可以不包含await、 async关键字,但不能使用yield关键字。

如同生成器函数调用返回生成器对象一样,协程函数调用也会返回一个对象称为协程对象,iscoroutine()返回True.

再来做个例子

import asyncio
import threadingasync def sleep(x):for i in range(3):print('sleep {}'.format(i))await asyncio.sleep(x)async def showthread(x):for i in range(3):print(threading.enumerate())await asyncio.sleep(2)loop = asyncio.get_event_loop()tasks = [sleep(3), showthread(3)]loop.run_until_complete(asyncio.wait(tasks))loop.close()
# 协程版本
import asyncio
import threading@asyncio.coroutine
def a():for x in range(3):print('a.x', x)yield@asyncio.coroutine
def b():for x in 'abc':print('b.x', x)yieldprint(asyncio.iscoroutinefunction(a))
print(asyncio.iscoroutinefunction(b))# 大循环
loop = asyncio.get_event_loop()
tasks = [a(), b()]loop.run_until_complete(asyncio.wait(tasks))loop.close()

TCP Echo Server 举例

import asyncio# TCP Echo Server 举例
async def handle(reader, writer):while True:data = await reader.read(1024)print(dir(reader))print(dir(writer))client = writer.get_extra_info('peername')message = "{} Your msg (}".format(client, data.decode()).encode()writer.write(message)await writer.drain()loop = asyncio.get_event_loop()
ip = '127.0.0.1'
port = 9999
crt = asyncio.start_server(handle, ip, port, loop=loop)server = loop.run_until_complete(crt)print(server)  # server是监听的socket对象try:loop.run_forever()
except KeyboardInterrupt:pass
finally:server.close()loop.close()

aiohttp库

安装
$ pip install aiohttp
开发

HTTP Server

from aiohttp import webasync def indexhandle(request: web.Request):return web.Response(text=request.path, status=201)async def handle(request: web.Request):print(request.match_info)print(request.query_string)  # http://127.0.0.1:8080/1?name=12301return web.Response(text=request.match_info.get('id', '0000'), status=200)app = web.Application()app.router.add_get("/", indexhandle)  # http://127.0.0.1:8080/app.router.add_get("/{id}", handle)  # http://127.0.0.1:8080/12301web.run_app(app, host='0.0.0.0', port=9977)

HTTP Client

import asyncio
from aiohttp import ClientSessionasync def get_html(url: str):async with ClientSession() as session:async with session.get(url) as res:print(res.status)print(await res.text())url = 'http://127.0.0.1/ziroom-web/'loop = asyncio.get_event_loop()
loop.run_until_complete(get_html(url))loop.close()
http://www.dtcms.com/a/283875.html

相关文章:

  • 从0开始学习R语言--Day49--Lasso-Cox 回归
  • 在UniApp中防止页面上下拖动的方法
  • git@github.com: Permission denied (publickey).
  • 算法竞赛备赛——【图论】求最短路径——Dijkstra
  • 排序算法—交换排序(冒泡、快速)(动图演示)
  • uniapp问题总结
  • 并发事务~
  • 一种融合人工智能与图像处理的发票OCR技术,将人力从繁琐的票据处理中解放
  • 视频安全新思路:VRM视频分片错序加密技术
  • 小架构step系列17:getter-setter-toString
  • 智能视频分析:多行业安全防控的“AI之眼”
  • 嵌入式学习-PyTorch(7)-day23
  • Flutter Android打包学习指南
  • 如何下载视频 (pc端任何视频均可下载)
  • 英伟达Cosmos研究团队开源DiffusionRenderer (Cosmos): 神经逆向与正向渲染与视频扩散模型
  • 视频码率是什么?视频流分辨率 2688x1520_25fps采用 h264格式压缩,其码率为
  • Web攻防-PHP反序列化Phar文件类CLI框架类PHPGGC生成器TPYiiLaravel
  • blender 导入的fbx模型位置错乱
  • 【3D大比拼第一集】--max,maya,c4d,blender的命令搜索功能
  • iOS App 电池消耗管理与优化 提升用户体验的完整指南
  • 【力扣 中等 C】97. 交错字符串
  • 量化环节:Cont‘d
  • 题解:CF1829H Don‘t Blame Me
  • 相位中心偏置天线的SAR动目标检测
  • 代码随想录算法训练营第二十三天
  • Apache SeaTunnel配置使用案例
  • 【Leetcode】栈和队列算法题(逆波兰表达式、二叉树层序遍历、最小栈、栈的压入弹出序列)
  • 贪心算法(排序)
  • 如何通过ATS/HTTPS数据防篡改来加密视频?
  • 部署-k8s和docker的区别和联系