当前位置: 首页 > news >正文

python全栈-并发和网络通信

python全栈-并发和网络通信

文章目录

  • python全栈-并发和网络通信
  • 线程
    • 创建线程的方式
    • 等待线程join
    • 守护线程daemon=true/false
    • 互斥锁lock
    • 信号量semaphore
    • 事件event
  • 协程coroutines
    • 异步io/asyncio
  • 计算机网络通信
    • IP
    • OSI网络协议
    • TCP/UDP区别
    • TCP三次握手四次挥手
  • socket编程
    • 内置方法和属性
    • UDP通信
    • TCP通信

  • 并发
  • 线程thread
  • 进程Process
  • 协程/纤程coroutine

进程>线程>协程

常用的生成模式:

  • 多线程
  • 多进程
  • 多线程+多进程

线程

使用threading库的Thread模块

  • 参数:
    • target后面写方法名
    • args给方法的参数,元组的形式
  • start 使用这个启动线程

创建线程的方式

  1. 直接使用threading模块,去执行对应的函数
# coding='utf-8'
import threading
import time


def addd(name):
    for i in range(3):
        print(f"addd{name}--{i}")
        time.sleep(1) 

threading.Thread(target=addd, args=(1,)).start()
threading.Thread(target=addd, args=(2,)).start()
  1. 使用类方法去继承thread,然后重定义run方法
# coding='utf-8'
import time
from threading import Thread

class mythread(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        for i in range(3):
            print(self.name,i)
            time.sleep(1)

mythread('th1').start()
mythread('th2').start()

等待线程join

由于某种原因,我们在创建子线程的时候,不希望子线程可以脱离主线程自己跑

就是想要把子线程包在主线程里面

需要使用join方法

# coding='utf-8'
import time
from threading import Thread

class mythread(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        for i in range(3):
            print(self.name,i)
            time.sleep(1)

if __name__ == '__main__':
    print('1')
    th1 = mythread('th1')
    th2 = mythread('th2')
    th1.start()
    th2.start()
    th1.join()
    th2.join()

    print('2')

就像start方法一样,把线程使用join方法就ok了,主线程就会一直等待子线程执行完所有程序再执行后面的

守护线程daemon=true/false

如果我们在多线程模式下,既有子线程又有主线程,在不使用join的方法时,主线程可能会比子线程跑的更快。我们希望在主线程结束的时候,子线程也能结束,就需要使用守护线程daemon

# coding='utf-8'
import time
from threading import Thread

class mythread(Thread):
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        for i in range(3):
            print(self.name,i)
            time.sleep(1)

if __name__ == '__main__':
    print('1')
    th1 = mythread('th1')
    th1.daemon = True  需要在start之前
    th1.start()
    print('2')

当主线程的print(2)输出之后,子线程立即结束

最典型的应用是垃圾收集器,程序结束的时候所有子任务结束

互斥锁lock

  • 加锁 lock.acquice()
  • 解锁 lock.release()
# coding='utf-8'
import threading
from time import sleep

class person():
    def __init__(self, name):
        self.name = name
        self.num = 100

def consume(number,person):
    if person.num <= number:
        print('ERROR')
    else:
        sleep(1)
        person.num -= number
        print(person.num)


if __name__ == '__main__':
    teacher = person('Teacher')
    threading.Thread(target=consume, args=(80,teacher)).start()
    threading.Thread(target=consume, args=(80,teacher)).start()

上面是互斥锁的应用场景,发现我们在判断账户可以操作的时候,会有程序阻塞,导致数据无法实时刷新,出现了-60

因此我们需要卡线程的数量,一次只允许一个线程去操作,就有了锁,一次进去一个线程

# coding='utf-8'
import threading
from time import sleep

class person():
    def __init__(self, name):
        self.name = name
        self.num = 100

def consume(number,person):
    lock.acquire()
    if person.num <= number:
        print('ERROR')
    else:
        sleep(1)
        person.num -= number
        print(person.num)
    lock.release()


if __name__ == '__main__':
    teacher = person('Teacher')
    lock = threading.Lock()
    threading.Thread(target=consume, args=(80,teacher)).start()
    threading.Thread(target=consume, args=(80,teacher)).start()

我们只需要把阻塞的地方锁起来,先加锁,再解锁。就可以保证程序正常运行了

  • 死锁:一个程序里面同时含有两个及以上的锁,由于线程的差异,导致线程之间互相等待。只要不同时使用两个及以上的锁就行。

信号量semaphore

就是锁的升级版,因为锁一次只能进一个线程,对于某些场景来说,可以允许两个线程同时操作

就有了信号量的概念

信号量,在初始化的时候,可以定义线程的最大数量,这个信号量就允许同时几个线程运行

和锁的用法一样

当信号量为1的时候,就是普通的锁了

# coding='utf-8'
import threading
from threading import Semaphore
from time import sleep

class person():
    def __init__(self, name):
        self.name = name
        self.num = 1000

def consume(number,person):
    se.acquire()
    if person.num <= number:
        print('ERROR')
    else:
        sleep(1)
        person.num -= number
        print(person.num)
    se.release()

if __name__ == '__main__':
    teacher = person('Teacher')
    se = Semaphore(2)  同时进两个线程
    for i in range(10):
        threading.Thread(target=consume, args=(80,teacher)).start()

事件event

设置一个对象,专门用来传递信号的,就像过红绿灯一样。所有人到达红绿灯,如果是红灯都要等着。

一旦变成绿灯,就过马路。正常行驶。

  • 等红绿灯 event.wait(timeout=time) 可以设置最大等待时间,比如红绿灯损坏,人不能一直等着
  • 红绿灯变绿了 event.set()
  • 初始化红绿灯 threading.Event()
  • 红绿灯变红灯 event.clear() 把有wait的线程阻塞
  • 判断当前红绿灯的状态 event.is_set() 看看什么状态
# coding='utf-8'
import threading
import time
from threading import Semaphore
from time import sleep

class person():
    def __init__(self, name):
        self.name = name
        self.num = 1000

def consume(number,person,name):
    print(name,11)
    ev.wait()
    print(name,22)
    if person.num <= number:
        print('ERROR')
    else:
        sleep(1)
        person.num -= number
        print(person.num)

if __name__ == '__main__':
    teacher = person('Teacher')
    ev = threading.Event()
    for i in range(5):
        threading.Thread(target=consume, args=(80,teacher,i)).start()
    time.sleep(10)
    ev.set() 启动所有线程

协程coroutines

这里的知识,工作用的时候少,面试的时候问的多

除了cpu密集型应用

异步io/asyncio

  • async 声明异步函数
  • await 声明程序挂起
  • gather把异步的函数传入
  • run方法去运行主函数

底层逻辑是事件循环

import asyncio

async def fun():
    for i in range(5):
        await asyncio.sleep(1)
        print(f'fun-{i}')
    print('fun-ok')
async def fun2():
    for i in range(5):
        await asyncio.sleep(1)
        print(f'fun2-{i}')
    print('fun2-ok')

async def main():
    await asyncio.gather(fun(), fun2()) 添加异步方法

if __name__ == '__main__':
    asyncio.run(main())  执行异步函数

大概分成3部分:

  1. 使用run去执行主函数
  2. 使用gather去添加异步任务
  3. 编写异步方法

计算机网络通信

IP

ip地址,是计算机的身份证

ipv4是旧版的ip地址,32位

ipv6是新版的,128位

本机ip地址是127.0.0.1

在命令行使用ipconfig获取网卡信息

使用ping可以查看网络连接:

  • ping + 网址 查看能不能上网
  • ping + ip 看看和该计算机在不在一个局域网
  • ping + 127.0.0.1 查看本机网卡能不能用

OSI网络协议

网络之间数据传输的层数:

  1. 应用层
  2. 表示层
  3. 会话层
  4. 传输层
  5. 网络层
  6. 数据链路层
  7. 物理层

每一层都有自己的协议

TCP/IP协议模型:

  1. 应用层 —1~3—socket
  2. 传输层 —4 – TCP/UDP
  3. 网络层 ----5
  4. 物理+数据链路层6~7

TCP/UDP区别

TCPUDP
一对一一对多
可靠不可靠/不稳定
需要连接不需要连接
类似打电话,需要专门连接类似广播,不需要连接,能接收信息就行
需要很多的验证信息,20-60字节几乎不需要验证信息,8字节
文件传输直播,视频会议

TCP三次握手四次挥手

就是使用TCP进行网络通信的时候,计算机之间需要进行三次通话,才能开始发送真正的数据

在结束TCP的时候,需要进行四次通话,才能完美的结束通话

  • 三次握手
    1. 发送端发送SYN数据包,SYN=1,seq=x
    2. 接受端发送SYN/ACK数据包,SYN=1,ACK=1,seq=y,ack=x+1
    3. 发送端发送ACK数据包,ACK=1,seq=x+1,ack=y+1

三次握手结束后,开始发送数据

因为UDP不需要验证,不需要建立连接,没有三次握手

  • 四次挥手
    1. 发送端FIN=1,发出TCP停止请求
    2. 接收端ACK=1
    3. 接受端FIN=1,提出关闭请求
    4. 发送端ACK=1,结束

为了断开连接

通信传输的数据单位—数据包

数据包含有—包,帧,数据包,段,消息

socket编程

  • socket.socket([family[,type[,proto]]]) 表示打开一个网络连接
  • family是套接字家族:AF_UNIX(本地)/AF_INET(ipv4)
  • type是套接字类型,TCP的类型是SOCK_STREAM,UDP是SOCK_DGRAM
  • proto指定协议不写,默认是0

内置方法和属性

  • 服务器server
    • bind 绑定地址
    • lisen TCP监听
    • accept 接收客户端连接
  • 客户端client
    • connect 主动初始化TCP连接
    • connect_ex
  • 套接方法
    • recv接收TCP,以字符串的形式返回
    • send发送TCP数据
    • sendall发送完整TCP数据
    • recvfrom接收UDP数据
    • sendto发送UDP数据
    • close关闭
    • getpeername
    • getsockname
    • getsockopt

UDP通信

只需要知道IP和端口号,就可以直接发送数据包

对于UDP来说,客户端和服务端基本类似

from socket import *

# 初始化socket
sk = socket(AF_INET, SOCK_DGRAM) # UDP
sk.bind(('127.0.0.1', 8080)) # 绑定端口号
print('正在监听')
while True:
    txt = sk.recvfrom(1024) # 允许单次接收的最大字节数
    print(txt[0].decode('gbk'),txt[1])
    if txt[0].decode('gbk')=='quit':
        break

sk.close()

注意解码方式,我们发送信息的时候需要encode加编码方式gbk,接收的时候就需要decode解码

from socket import *

# 初始化socket
sk = socket(AF_INET, SOCK_DGRAM) # UDP
sk.bind(('127.0.0.1', 8888)) # 绑定端口号
while True:
    data = input('>>>')
    sk.sendto(data.encode('gbk'), ('127.0.0.1', 8080))  
    if data == 'quit':
        break
sk.close()
  • 基于UDP的双向通信
import threading
from socket import *

def fasong():
    while True:
        data = input('>>>')
        sk.sendto(data.encode('gbk'), ('127.0.0.1', 8888))
        if data == 'quit':
            break
    sk.close()

def jieshou():
    while True:
        txt = sk.recvfrom(1024)  # 允许单次接收的最大字节数
        print(txt[0].decode('gbk'), txt[1])
        print('>>>',end='')
        if txt[0].decode('gbk') == 'quit':
            break
    sk.close()

if __name__ == '__main__':
    # 初始化socket
    sk = socket(AF_INET, SOCK_DGRAM)  # UDP
    sk.bind(('127.0.0.1', 8080))  # 绑定端口号
    th1 = threading.Thread(target=fasong)
    th2 = threading.Thread(target=jieshou)
    th1.start()
    th2.start()
    th1.join()
    th2.join() 

客户端

import threading
from socket import *

def fasong():
    while True:
        data = input('>>>')
        sk.sendto(data.encode('gbk'), ('127.0.0.1', 8080))
        if data == 'quit':
            break
    sk.close()

def jieshou():
    while True:
        txt = sk.recvfrom(1024)  # 允许单次接收的最大字节数
        print(txt[0].decode('gbk'), txt[1])
        print('>>>',end='')
        if txt[0].decode('gbk') == 'quit':
            break
    sk.close()

if __name__ == '__main__':
    # 初始化socket
    sk = socket(AF_INET, SOCK_DGRAM)  # UDP
    sk.bind(('127.0.0.1', 8888))  # 绑定端口号
    th1 = threading.Thread(target=fasong)
    th2 = threading.Thread(target=jieshou)
    th1.start()
    th2.start()
    th1.join()
    th2.join()

不难发现,基于UDP的双向通信,服务器和客户端一模一样,除了端口号不同,没有区别

TCP通信

  • 单次通信TCP

服务器

from socket import *

sk = socket(AF_INET, SOCK_STREAM) # TCP连接
sk.bind(('127.0.0.1', 8080)) # 绑定端口号
sk.listen(5) # 最大监听数量,就是同时接收客户端的数量,一般为5就够了
print('等待连接')
client_socket,client_info = sk.accept() # 端口阻塞,等待客户端连接
recv_data = client_socket.recv(1024) # 最大接收字节
print(recv_data.decode('utf-8'))  客户端信息
print(client_info)  客户端地址
# 断开连接
client_socket.close() 先关闭客户端端口
sk.close() 再关闭服务器端口

客户端

from socket import *

sk = socket(AF_INET, SOCK_STREAM) # TCP协议
sk.connect(('127.0.0.1', 8080)) # 连接服务器端口,客户端不需要绑定端口
msg = input('>>>')
sk.send(msg.encode('utf-8')) # 向服务器发送信息
sk.close() # 关闭客户端
  • TCP多次通信,双向多次通信

客户端

from socket import *

sk = socket(AF_INET, SOCK_STREAM)
sk.connect(('127.0.0.1', 8080))
while True:
    msg = input('>>>')
    sk.send(msg.encode('utf-8'))
    if msg == 'exit':
        break
    server_recv = sk.recv(1024)  接收服务器信息
    print(server_recv.decode('utf-8'))

sk.close()

服务器

from socket import *

sk = socket(AF_INET, SOCK_STREAM) # TCP连接
sk.bind(('127.0.0.1', 8080))
sk.listen(5) # 最大监听数量,就是同时接收客户端的数量,一般为5就够了
print('等待连接')
client_socket,client_info = sk.accept()
while True:
    recv_data = client_socket.recv(1024) # 最大接收字节
    print(recv_data.decode('utf-8'))
    if recv_data.decode('utf-8') == 'exit':
        break
    client_socket.send(f'已接收到信息:{recv_data.decode("utf-8")}'.encode('utf-8'))发送给客户端提示
# 断开连接
client_socket.close()
sk.close()
  • 基于TCP实现多次通话

就是把上面单次通话的操作部分,拆开,分成两个函数,然后使用多线程去调用

把初始化,和结束部分放在主函数里面

import threading
from socket import *

def fasong():
    while True:
        msg = input('>>>')
        client_socket.send(msg.encode('utf-8'))
        if msg == 'exit':
            print('结束通话')
            break
def jieshou():
    while True:
        recv_data = client_socket.recv(1024)  # 最大接收字节
        print(recv_data.decode('utf-8'))
        if recv_data.decode('utf-8') == 'exit':
            print('结束接收')
            break

if __name__ == '__main__':
    sk = socket(AF_INET, SOCK_STREAM)  # TCP连接
    sk.bind(('127.0.0.1', 8080))
    sk.listen(5)  # 最大监听数量,就是同时接收客户端的数量,一般为5就够了
    print('等待连接')
    client_socket, client_info = sk.accept()
    print('连接成功')

    t1 = threading.Thread(target=fasong)
    t2 = threading.Thread(target=jieshou)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    # 断开连接
    client_socket.close()
    sk.close()

客户端

import threading
from socket import *

def fasong():
    while True:
        msg = input('>>>')
        sk.send(msg.encode('utf-8'))
        if msg == 'exit':
            print('结束通话')
            break

def jieshou():
    while True:
        server_recv = sk.recv(1024)
        print(server_recv.decode('utf-8'))
        if server_recv.decode('utf-8') == 'exit':
            print('结束接收')
            break
if __name__ == '__main__':
    sk = socket(AF_INET, SOCK_STREAM)
    sk.connect(('127.0.0.1', 8080))

    t1 = threading.Thread(target=fasong)
    t2 = threading.Thread(target=jieshou)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    sk.close()

相关文章:

  • GO 快速升级Go版本
  • 【Qt之QQuickWidget】QML嵌入QWidget中
  • c++day4
  • 【嵌入式Linux应用开发基础】网络编程(1):TCP/IP协议栈
  • WIN10 本地部署 BGE Embedding 向量化模型
  • unxi-进程间通信
  • 使用PHP接入纯真IP库:实现IP地址地理位置查询
  • akka现有的分布式定时任务框架总结
  • 条件渲染
  • .Net 9下使用Tensorflow.net---DNN_Keras
  • AI时代前端开发技能变革与ScriptEcho:拥抱AI,提升效率
  • MongoDB 复制(副本集)
  • Uncaught TypeError: Module._malloc is not a function
  • 【学习笔记16】Java中常见的Exception(异常)
  • ROS2 同一个pkg里定义自定义action、msg报错
  • 爬虫解析库:Beautiful Soup的详细使用
  • 怎样把外网的文件放到内网?
  • Python3 运算符
  • 清影2.0(AI视频生成)技术浅析(五):音频处理技术
  • 超高速工业相机的应用
  • 欧盟拟对发往欧洲的小额包裹加收手续费,外交部回应
  • 海南省市监局与香港标准及检定中心签署合作协议,加快检验检测国际化
  • 法国参议院调查委员会公布雀巢“巴黎水”丑闻调查报告
  • 央企通号建设集团有限公司原党委常委、副总经理叶正兵被查
  • 达恩当选罗马尼亚总统
  • 世卫大会拒绝涉台提案,外交部:坚持一个中国原则是人心所向