Python 实现简易版的文件管理(结合网络编程)
目录
- 一、Python 代码实现
- 1. 服务器端
- 2. 客户端
- 二、结果展示
- 1. 查看当前路径下的内容 ls
- 2. 切换当前路径 cd
- 3. 查看当前路径 pwd
- 4. 显示根目录下的树状结构 tree
- 5. 在当前路径下创建目录 mkdir
- 6. 删除当前路径下的文件或目录 rm
- 7. 复制文件 mv
- 8. 移动文件 cp
- 9. 用户从当前路径下载文件到指定路径 gets
- 10. 用户从指定路径上传文件到当前路径 puts
一、Python 代码实现
以下是一个基于 socket、os 和 struct 等库实现的 Python TCP 网盘系统(分为客户端和服务器端),支持常见文件操作,并通过定长报头解决粘包问题。
注:先运行服务器端,再运行客户端。
1. 服务器端
# !/usr/bin/python
# -*- coding:utf-8 -*-import time
from socket import *
import struct
import os
import shutil
from multiprocessing import Poolclass Server:def __init__(self, ip, port):# 创建套接字self.s_sock = socket(AF_INET, SOCK_STREAM)self.ip = ipself.port = portdef tcp_init(self):# 重用对应地址和端口(实现端口复用)self.s_sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)# 绑定本地 IP 地址和端口self.s_sock.bind((self.ip, self.port))# 端口激活self.s_sock.listen(128)# 获取当前脚本的工作目录:D:\PycharmProjects\Learn_Catalog\net_disk
BASE_DIR = os.getcwd()class User: # 一个user对象对应一个客户端def __init__(self, new_sock):self.new_sock = new_sockself.abs_path = BASE_DIR # 绝对路径(当前的工作目录)self.tree = '' # 以树状图列出 net_disk 下的所有内容def send_train(self, send_message): # 发送数据send_bytes = send_message.encode('utf-8')train_head_bytes = struct.pack('I', len(send_bytes)) # 封装火车头total_trans = 0while total_trans < len(train_head_bytes + send_bytes):trans = self.new_sock.send((train_head_bytes + send_bytes)[total_trans:])if trans == 0:breakelse:total_trans += transdef recv_train(self): # 接收数据train_head_bytes = self.new_sock.recv(4)train_head = struct.unpack('I', train_head_bytes)[0] # 解封火车头total_packet = b''while len(total_packet) < train_head:packet = self.new_sock.recv(train_head - len(total_packet))if not packet:self.send_train('Client disconnected.' + '\n')breakelse:total_packet += packetreturn total_packet.decode('utf-8')def deal_command(self):while True:command = self.recv_train()if command[:2] == 'ls': # 查看当前路径下的内容self.do_ls()elif command[:2] == 'cd': # 切换工作目录self.do_cd(command)elif command[:3] == 'pwd': # 查看当前目录的绝对路径self.do_pwd()elif command[:4] == 'tree': # 显示根目录下的树状结构self.do_tree()elif command[:5] == 'mkdir': # 在当前路径下创建目录self.do_mkdir(command)elif command[:2] == 'rm': # 删除当前路径下的文件或目录self.do_rm(command)elif command[:2] == 'mv': # 移动文件self.do_mv(command)elif command[:2] == 'cp': # 复制文件self.do_cp(command)elif command[:4] == 'gets': # 用户下载文件self.do_gets(command)elif command[:4] == 'puts': # 用户上传文件self.do_puts(command)elif command[:4] == 'exit':print('User has logged out.')else:self.send_train('Warning: Wrong command.' + '\n')def do_ls(self): # 将当前路径下的信息发给客户端if len(os.listdir(self.abs_path)) == 0:self.send_train('Warning: There are no files or folders in the current path.' + '\n')else:ls_data = ''ls_data += (f'{'链接数':^6}{'用户ID':^6}{'组ID':^8}{'大小':^7}{'名称':^18}{'最近修改时间':^18}'+ '\n' + ('-' * 80) + '\n')# os.listdir : 返回指定的文件夹包含的文件或文件夹的名字的列表for file in os.listdir(self.abs_path):ls_data += (f'{os.stat(file).st_nlink:^8}{os.stat(file).st_uid:^8}{os.stat(file).st_gid:^8}'f'{os.stat(file).st_size:^8}{file:^20}{time.ctime(os.stat(file).st_mtime):^20}' + '\n')self.send_train(ls_data)def do_cd(self, command):cmd = command.strip().split() # 以空格分割if len(cmd) < 2:self.send_train('Usage: cd <dir>' + '\n')else:try: # 存在该目录os.chdir(cmd[1])new_path = os.getcwd()# 只能 cd BASE_DIR 之下的目录if new_path.startswith(BASE_DIR):self.abs_path = new_pathrel_path = os.path.relpath(self.abs_path, start=BASE_DIR)self.send_train('>>> ' + rel_path + '\n')else:os.chdir(self.abs_path)self.send_train('Warning: No access permission.' + '\n')except Exception as e:print(e)self.send_train('Warning: No such directory.' + '\n')def do_pwd(self): # 将当前路径传输给客户端rel_path = os.path.relpath(self.abs_path, start=BASE_DIR)self.send_train('>>> ' + rel_path + '\n')def do_tree(self): # 以树状图列出目录的内容if self.tree == '':self.list_directory_tree(BASE_DIR)self.send_train(self.tree)def list_directory_tree(self, init_path, level=0):# 显示当前目录名称,按缩进表示层级,os.path.basename:从完整路径中提取文件名self.tree += (" " * level + f"|-- {os.path.basename(init_path)}" + '\n')if os.path.isdir(init_path):for item in os.listdir(init_path):item_path = os.path.join(init_path, item)# 递归遍历子目录和文件self.list_directory_tree(item_path, level + 1)def do_mkdir(self, command):cmd = command.strip().split() # 以空格分割if len(cmd) < 2:self.send_train('Usage: mkdir <dir>' + '\n')else:if os.path.exists(os.path.join(self.abs_path, cmd[1])):self.send_train('Warning: The folder already exists.' + '\n')else:# 在当前路径下创建新文件夹os.makedirs(os.path.join(self.abs_path, cmd[1]), exist_ok=True)self.send_train('>>> ' + f'The {cmd[1]} folder was created successfully!' + '\n')# 重新构造树状图self.tree = ''def do_rm(self, command):cmd = command.strip().split() # 以空格分割if len(cmd) < 2:self.send_train('Usage: rm <target>' + '\n')else:# 删除当前目录下的文件和空文件夹target_path = os.path.join(self.abs_path, cmd[1])# 如果该文件或文件夹存在if os.path.exists(target_path):# 如果是文件则删除if os.path.isfile(target_path):# 使用 os.remove 时,路径指向的是文件而非目录os.remove(target_path)self.send_train('>>> ' + 'File deleted successfully!' + '\n')# 重新构造树状图self.tree = ''# 如果是文件夹且为空则删除elif os.path.isdir(target_path):if len(os.listdir(target_path)) == 0:os.rmdir(target_path)self.send_train('>>> ' + 'Empty folder deleted successfully!' + '\n')self.tree = ''else:self.send_train('Warning: The folder is not empty, deletion failed.' + '\n')else:self.send_train('Warning: The file or folder does not exist.' + '\n')def do_mv(self, command):# 将当前路径下的某文件或文件夹移动到其他路径下cmd = command.strip().split() # 以空格分割if len(cmd) < 3:self.send_train("Usage: mv <target> <dst>" + '\n')else:src = os.path.join(self.abs_path, cmd[1])dst = os.path.join(BASE_DIR, cmd[2])# 如果目标路径存在且是目录if os.path.isdir(dst):# 如果在源路径下存在该文件或文件夹if os.path.exists(src):# 如果目标路径存在同名文件或文件夹if os.path.exists(os.path.join(dst, cmd[1])):self.send_train('Warning: A file with the same name exists in the target path.' + '\n')else:shutil.move(src, dst)self.tree = ''self.send_train('>>> ' + 'File moved successfully!' + '\n')else:self.send_train('Warning: The file does not exist in the source path.' + '\n')else:self.send_train('Warning: The target path is not valid.' + '\n')def do_cp(self, command):# 将当前路径下的某文件复制到其他路径下cmd = command.strip().split() # 以空格分割if len(cmd) < 3:self.send_train("Usage: cp <target> <dst>" + '\n')else:src_cp = os.path.join(self.abs_path, cmd[1])dst_cp = os.path.join(BASE_DIR, cmd[2])# 如果目标路径存在且是目录if os.path.isdir(dst_cp):# 如果在源路径下存在该文件或文件夹if os.path.exists(src_cp):# 如果目标路径存在同名文件或文件夹if os.path.exists(os.path.join(dst_cp, cmd[1])):self.send_train('Warning: A file with the same name exists in the target path.' + '\n')else:if os.path.isdir(src_cp):shutil.copytree(src_cp, dst_cp)else:shutil.copy2(src_cp, dst_cp)self.tree = ''self.send_train('>>> ' + 'File copied successfully!' + '\n')else:self.send_train('Warning: The file does not exist in the source path.' + '\n')else:self.send_train('Warning: The target path is not valid.' + '\n')def do_gets(self, command): # 用户在当前路径下下载文件cmd = command.strip().split() # 以空格分割if len(cmd) < 3:passelse:self.send_train(self.abs_path)file_path = os.path.join(self.abs_path, cmd[1])# 如果文件存在if os.path.exists(file_path) and os.path.isfile(file_path):# 如果目的文件夹存在if os.path.isdir(cmd[2]):# 如果目的文件夹有同名文件final_path = os.path.join(cmd[2], cmd[1])if os.path.exists(final_path):passelse:self.send_file_train(file_path)else:passelse:passdef send_file_train(self, f_path):self.new_sock.send(struct.pack('I', os.path.getsize(f_path)))f = open(f_path, 'rb')while True:send_data = f.read(1024)if send_data:self.new_sock.sendall(send_data)else:breakf.close()def do_puts(self, command): # 用户上传文件在当前路径下cmd = command.strip().split() # 以空格分割if len(cmd) < 2:passelse:# 如果文件存在if os.path.exists(cmd[1]):self.send_train(self.abs_path)file_name = os.path.basename(cmd[1]) # 文件名称# 如果当前路径下存在同名文件if os.path.exists(os.path.join(self.abs_path, file_name)):passelse:write_path = self.abs_path + os.sep + file_name # 在当前目录下创建名字一样的文件self.recv_file_train(write_path) # 写文件else:passdef recv_file_train(self, f_path):f_wb = open(f_path, 'wb')head = self.new_sock.recv(4)train_head = struct.unpack('I', head)recv_data = self.new_sock.recv(train_head[0])f_wb.write(recv_data)f_wb.close()# 进程池 apply_async 只支持纯方法传入,不支持对象方法
def pool_task(u):u.deal_command()if __name__ == '__main__':server = Server('192.168.31.66', 8888)server.tcp_init()po = Pool(5) # 进程池,并行处理5个进程while True:# 等待客户端连接,跟客户端进行后续通信的是 new_clientnew_client, client_addr = server.s_sock.accept()print(f'{client_addr} connection successful!')user = User(new_client)po.apply_async(pool_task, (user,))# -----------------------------------------------------data = '-' * 50 + '\n' + '请选择操作 : ' + '\n'data += '查看当前路径下的内容 → ls' + '\n'data += '切换当前路径 → cd [路径](net_disk下的相对路径)' + '\n'data += '查看当前路径 → pwd' + '\n'data += '显示根目录下的树状结构 → tree' + '\n'data += '在当前路径下创建目录 → mkdir [文件/目录名称]' + '\n'data += '删除当前路径下的文件或目录 → rm [文件/目录名称]' + '\n'data += '复制文件 → mv [文件名称] [目标路径](net_disk下的相对路径)' + '\n'data += '移动文件 → cp [文件名称] [目标路径](net_disk下的相对路径)' + '\n'data += '用户从指定路径上传文件到当前路径 → puts [源路径](绝对路径)' + '\n'data += '用户从当前路径下载文件到指定路径 → gets [文件名称] [目标路径](绝对路径)' + '\n'data += '-' * 50 + '\n'user.send_train(data)
2. 客户端
# !/usr/bin/python
# -*- coding:utf-8 -*-from socket import *
import struct
import osclass Client:def __init__(self, ip, port):self.client = socket(AF_INET, SOCK_STREAM)self.ip = ipself.port = portdef tcp_connect(self):self.client.connect((self.ip, self.port))def send_train(self, send_messages):send_bytes = send_messages.encode('utf-8')train_head_bytes = struct.pack('I', len(send_bytes))self.client.send(train_head_bytes + send_bytes)def recv_train(self):train_head_bytes = self.client.recv(4)train_head = struct.unpack('I', train_head_bytes)recv_data = self.client.recv(train_head[0])return recv_data.decode('utf-8')def send_command(self):while True:# 读取命令并发送到服务器端command = input()self.send_train(command)if command[:4] == 'puts': # 用户上传文件self.do_puts(command)elif command[:4] == 'gets': # 用户下载文件self.do_gets(command)elif command[:2] == 'cp': # 复制文件print(self.recv_train())elif command[:2] == 'mv': # 移动文件print(self.recv_train())elif command[:2] == 'rm': # 删除当前路径下的文件或目录print(self.recv_train())elif command[:5] == 'mkdir': # 在当前路径下创建目录print(self.recv_train())elif command[:4] == 'tree': # 显示根目录下的树状结构print(self.recv_train())elif command[:3] == 'pwd': # 查看当前目录的绝对路径print(self.recv_train())elif command[:2] == 'cd': # 切换工作目录print(self.recv_train())elif command[:2] == 'ls': # 查看当前路径下的内容print(self.recv_train())elif command[:4] == 'exit':exit(0)else:print(self.recv_train())def do_gets(self, command): # 下载文件cmd = command.strip().split() # 以空格分割if len(cmd) < 3:print('Usage: gets <target> <dst>\n')else:abs_path = self.recv_train()file_path = os.path.join(abs_path, cmd[1])# 如果文件存在if os.path.exists(file_path) and os.path.isfile(file_path):# 如果目的文件夹存在if os.path.isdir(cmd[2]):# 如果目的文件夹有同名文件final_path = os.path.join(cmd[2], cmd[1])if os.path.exists(final_path):print('Warning: A file with the same name exists in the destination directory.\n')else:self.recv_file_train(final_path)print(">>> File downloaded successfully!\n")else:print('Warning: The download path is incorrect.\n')else:print("Warning: The file does not exist in the source directory.\n")def recv_file_train(self, f_path):f_wb = open(f_path, 'wb')head = self.client.recv(4)train_head = struct.unpack('I', head)recv_data = self.client.recv(train_head[0])f_wb.write(recv_data)f_wb.close()def do_puts(self, command): # 上传文件cmd = command.strip().split() # 以空格分割if len(cmd) < 2:print('Usage: puts <target>\n')else:# 如果文件存在if os.path.exists(cmd[1]):file_path = self.recv_train()file_name = os.path.basename(cmd[1]) # 文件名称# 如果当前路径下存在同名文件if os.path.exists(os.path.join(file_path, file_name)):print('Warning: A file with the same name exists in the destination directory.\n')else:self.send_file_train(cmd[1])print('>>> File uploaded successfully!\n')else:print('Warning: The file does not exist in this directory.\n')def send_file_train(self, f_path):f = open(f_path, 'rb')self.client.send(struct.pack('I', os.path.getsize(f_path)))while True:send_data = f.read(1024)if send_data:self.client.sendall(send_data)else:breakf.close()if __name__ == '__main__':client = Client('192.168.31.66', 8888)client.tcp_connect()print(client.recv_train())client.send_command()
二、结果展示
1. 查看当前路径下的内容 ls
def do_ls(self): # 将当前路径下的信息发给客户端if len(os.listdir(self.abs_path)) == 0:self.send_train('Warning: There are no files or folders in the current path.' + '\n')else:ls_data = ''ls_data += (f'{'链接数':^6}{'用户ID':^6}{'组ID':^8}{'大小':^7}{'名称':^18}{'最近修改时间':^18}'+ '\n' + ('-' * 80) + '\n')# os.listdir : 返回指定的文件夹包含的文件或文件夹的名字的列表for file in os.listdir(self.abs_path):ls_data += (f'{os.stat(file).st_nlink:^8}{os.stat(file).st_uid:^8}{os.stat(file).st_gid:^8}'f'{os.stat(file).st_size:^8}{file:^20}{time.ctime(os.stat(file).st_mtime):^20}' + '\n')self.send_train(ls_data)
参考文章:
【Python os.stat() 方法 - 菜鸟教程】
【Python OS 文件/目录方法 - 菜鸟教程】
2. 切换当前路径 cd
def do_cd(self, command):cmd = command.strip().split() # 以空格分割if len(cmd) < 2:self.send_train('Usage: cd <dir>' + '\n')else:try: # 存在该目录os.chdir(cmd[1])new_path = os.getcwd()# 只能 cd BASE_DIR 之下的目录if new_path.startswith(BASE_DIR):self.abs_path = new_pathrel_path = os.path.relpath(self.abs_path, start=BASE_DIR)self.send_train('>>> ' + rel_path + '\n')else:os.chdir(self.abs_path)self.send_train('Warning: No access permission.' + '\n')except Exception as e:print(e)self.send_train('Warning: No such directory.' + '\n')
参考文章:
【Python 异常处理 - 菜鸟教程】
【Python startswith()方法 - 菜鸟教程】
3. 查看当前路径 pwd
def do_pwd(self): # 将当前路径传输给客户端rel_path = os.path.relpath(self.abs_path, start=BASE_DIR)self.send_train('>>> ' + rel_path + '\n')
4. 显示根目录下的树状结构 tree
def do_tree(self): # 以树状图列出目录的内容if self.tree == '':self.list_directory_tree(BASE_DIR)self.send_train(self.tree)def list_directory_tree(self, init_path, level=0):# 显示当前目录名称,按缩进表示层级,os.path.basename:从完整路径中提取文件名self.tree += (" " * level + f"|-- {os.path.basename(init_path)}" + '\n')if os.path.isdir(init_path):for item in os.listdir(init_path):item_path = os.path.join(init_path, item)# 递归遍历子目录和文件self.list_directory_tree(item_path, level + 1)
5. 在当前路径下创建目录 mkdir
def do_mkdir(self, command):cmd = command.strip().split() # 以空格分割if len(cmd) < 2:self.send_train('Usage: mkdir <dir>' + '\n')else:if os.path.exists(os.path.join(self.abs_path, cmd[1])):self.send_train('Warning: The folder already exists.' + '\n')else:# 在当前路径下创建新文件夹os.makedirs(os.path.join(self.abs_path, cmd[1]), exist_ok=True)self.send_train('>>> ' + f'The {cmd[1]} folder was created successfully!' + '\n')# 重新构造树状图self.tree = ''
6. 删除当前路径下的文件或目录 rm
def do_rm(self, command):cmd = command.strip().split() # 以空格分割if len(cmd) < 2:self.send_train('Usage: rm <target>' + '\n')else:# 删除当前目录下的文件和空文件夹target_path = os.path.join(self.abs_path, cmd[1])# 如果该文件或文件夹存在if os.path.exists(target_path):# 如果是文件则删除if os.path.isfile(target_path):# 使用 os.remove 时,路径指向的是文件而非目录os.remove(target_path)self.send_train('>>> ' + 'File deleted successfully!' + '\n')# 重新构造树状图self.tree = ''# 如果是文件夹且为空则删除elif os.path.isdir(target_path):if len(os.listdir(target_path)) == 0:os.rmdir(target_path)self.send_train('>>> ' + 'Empty folder deleted successfully!' + '\n')self.tree = ''else:self.send_train('Warning: The folder is not empty, deletion failed.' + '\n')else:self.send_train('Warning: The file or folder does not exist.' + '\n')
参考文章:
【Python os.remove() 方法 - 菜鸟教程】
【Python os.rmdir() 方法 - 菜鸟教程】
7. 复制文件 mv
def do_mv(self, command):# 将当前路径下的某文件或文件夹移动到其他路径下cmd = command.strip().split() # 以空格分割if len(cmd) < 3:self.send_train("Usage: mv <target> <dst>" + '\n')else:src = os.path.join(self.abs_path, cmd[1])dst = os.path.join(BASE_DIR, cmd[2])# 如果目标路径存在且是目录if os.path.isdir(dst):# 如果在源路径下存在该文件或文件夹if os.path.exists(src):# 如果目标路径存在同名文件或文件夹if os.path.exists(os.path.join(dst, cmd[1])):self.send_train('Warning: A file with the same name exists in the target path.' + '\n')else:shutil.move(src, dst)self.tree = ''self.send_train('>>> ' + 'File moved successfully!' + '\n')else:self.send_train('Warning: The file does not exist in the source path.' + '\n')else:self.send_train('Warning: The target path is not valid.' + '\n')
参考文章:【Python shutil.move函数用法介绍 | 极客教程】
8. 移动文件 cp
def do_cp(self, command):# 将当前路径下的某文件复制到其他路径下cmd = command.strip().split() # 以空格分割if len(cmd) < 3:self.send_train("Usage: cp <target> <dst>" + '\n')else:src_cp = os.path.join(self.abs_path, cmd[1])dst_cp = os.path.join(BASE_DIR, cmd[2])# 如果目标路径存在且是目录if os.path.isdir(dst_cp):# 如果在源路径下存在该文件或文件夹if os.path.exists(src_cp):# 如果目标路径存在同名文件或文件夹if os.path.exists(os.path.join(dst_cp, cmd[1])):self.send_train('Warning: A file with the same name exists in the target path.' + '\n')else:if os.path.isdir(src_cp):shutil.copytree(src_cp, dst_cp)else:shutil.copy2(src_cp, dst_cp)self.tree = ''self.send_train('>>> ' + 'File copied successfully!' + '\n')else:self.send_train('Warning: The file does not exist in the source path.' + '\n')else:self.send_train('Warning: The target path is not valid.' + '\n')
参考文章:【Python中的Shutil模块 | 极客笔记】
9. 用户从当前路径下载文件到指定路径 gets
服务器端:
def do_gets(self, command): # 用户在当前路径下下载文件cmd = command.strip().split() # 以空格分割if len(cmd) < 3:passelse:self.send_train(self.abs_path)file_path = os.path.join(self.abs_path, cmd[1])# 如果文件存在if os.path.exists(file_path) and os.path.isfile(file_path):# 如果目的文件夹存在if os.path.isdir(cmd[2]):# 如果目的文件夹有同名文件final_path = os.path.join(cmd[2], cmd[1])if os.path.exists(final_path):passelse:self.send_file_train(file_path)else:passelse:passdef send_file_train(self, f_path):self.new_sock.send(struct.pack('I', os.path.getsize(f_path)))f = open(f_path, 'rb')while True:send_data = f.read(1024)if send_data:self.new_sock.sendall(send_data)else:breakf.close()
客户端:
def do_gets(self, command): # 下载文件cmd = command.strip().split() # 以空格分割if len(cmd) < 3:print('Usage: gets <target> <dst>\n')else:abs_path = self.recv_train()file_path = os.path.join(abs_path, cmd[1])# 如果文件存在if os.path.exists(file_path) and os.path.isfile(file_path):# 如果目的文件夹存在if os.path.isdir(cmd[2]):# 如果目的文件夹有同名文件final_path = os.path.join(cmd[2], cmd[1])if os.path.exists(final_path):print('Warning: A file with the same name exists in the destination directory.\n')else:self.recv_file_train(final_path)print(">>> File downloaded successfully!\n")else:print('Warning: The download path is incorrect.\n')else:print("Warning: The file does not exist in the source directory.\n")def recv_file_train(self, f_path):f_wb = open(f_path, 'wb')head = self.client.recv(4)train_head = struct.unpack('I', head)recv_data = self.client.recv(train_head[0])f_wb.write(recv_data)f_wb.close()
10. 用户从指定路径上传文件到当前路径 puts
服务器端:
def do_puts(self, command): # 用户上传文件在当前路径下cmd = command.strip().split() # 以空格分割if len(cmd) < 2:passelse:# 如果文件存在if os.path.exists(cmd[1]):self.send_train(self.abs_path)file_name = os.path.basename(cmd[1]) # 文件名称# 如果当前路径下存在同名文件if os.path.exists(os.path.join(self.abs_path, file_name)):passelse:write_path = self.abs_path + os.sep + file_name # 在当前目录下创建名字一样的文件self.recv_file_train(write_path) # 写文件else:passdef recv_file_train(self, f_path):f_wb = open(f_path, 'wb')head = self.new_sock.recv(4)train_head = struct.unpack('I', head)recv_data = self.new_sock.recv(train_head[0])f_wb.write(recv_data)f_wb.close()
客户端:
def do_puts(self, command): # 上传文件cmd = command.strip().split() # 以空格分割if len(cmd) < 2:print('Usage: puts <target>\n')else:# 如果文件存在if os.path.exists(cmd[1]):file_path = self.recv_train()file_name = os.path.basename(cmd[1]) # 文件名称# 如果当前路径下存在同名文件if os.path.exists(os.path.join(file_path, file_name)):print('Warning: A file with the same name exists in the destination directory.\n')else:self.send_file_train(cmd[1])print('>>> File uploaded successfully!\n')else:print('Warning: The file does not exist in this directory.\n')def send_file_train(self, f_path):f = open(f_path, 'rb')self.client.send(struct.pack('I', os.path.getsize(f_path)))while True:send_data = f.read(1024)if send_data:self.client.sendall(send_data)else:breakf.close()
注:代码仅供参考,还有很多需要完善的地方,请自行优化。