用python streamlit sqlite3 写一个聊天室
图:


代码:
import streamlit as st
import sqlite3
import datetime
from typing import List, Tuple
import time
import os
import io
from PIL import Image
import base64# 创建uploads目录
if not os.path.exists('uploads'):os.makedirs('uploads')# 数据库初始化
def init_db():conn = sqlite3.connect('chat.db', check_same_thread=False)cursor = conn.cursor()# 创建用户表cursor.execute('''CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT,username TEXT UNIQUE NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')# 创建消息表(修改以支持文件类型)cursor.execute('''CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY AUTOINCREMENT,user_id INTEGER,message_type TEXT DEFAULT 'text', -- text, image, filemessage_content TEXT, -- 文本内容或文件路径file_name TEXT, -- 原始文件名file_size INTEGER, -- 文件大小timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,FOREIGN KEY (user_id) REFERENCES users (id))''')conn.commit()return conn# 用户管理函数
def add_user(username: str, conn) -> int:cursor = conn.cursor()try:cursor.execute('INSERT INTO users (username) VALUES (?)', (username,))conn.commit()return cursor.lastrowidexcept sqlite3.IntegrityError:# 用户已存在,返回现有用户IDcursor.execute('SELECT id FROM users WHERE username = ?', (username,))return cursor.fetchone()[0]def get_user_id(username: str, conn) -> int:cursor = conn.cursor()cursor.execute('SELECT id FROM users WHERE username = ?', (username,))result = cursor.fetchone()return result[0] if result else None# 消息管理函数
def add_message(user_id: int, message_type: str, message_content: str, file_name: str = None, file_size: int = None, conn=None):cursor = conn.cursor()cursor.execute('''INSERT INTO messages (user_id, message_type, message_content, file_name, file_size) VALUES (?, ?, ?, ?, ?)''', (user_id, message_type, message_content, file_name, file_size))conn.commit()def get_recent_messages(limit: int = 50, conn=None) -> List[Tuple]:cursor = conn.cursor()cursor.execute('''SELECT m.id, u.username, m.message_type, m.message_content, m.file_name, m.file_size, m.timestamp FROM messages m JOIN users u ON m.user_id = u.id ORDER BY m.timestamp DESC LIMIT ?''', (limit,))return cursor.fetchall()# 文件处理函数
def save_uploaded_file(uploaded_file) -> str:"""保存上传的文件并返回文件路径"""# 生成唯一文件名timestamp = int(time.time())file_extension = os.path.splitext(uploaded_file.name)[1]filename = f"{timestamp}_{uploaded_file.name}"filepath = os.path.join('uploads', filename)# 保存文件with open(filepath, "wb") as f:f.write(uploaded_file.getbuffer())return filepathdef get_file_size(file_path: str) -> int:"""获取文件大小"""return os.path.getsize(file_path)def format_file_size(size_bytes: int) -> str:"""格式化文件大小"""if size_bytes == 0:return "0B"size_names = ["B", "KB", "MB", "GB"]i = 0while size_bytes >= 1024 and i < len(size_names) - 1:size_bytes /= 1024.0i += 1return f"{size_bytes:.1f}{size_names[i]}"# 图片处理函数
def resize_image(image_data, max_size=(400, 400)):"""调整图片大小"""image = Image.open(io.BytesIO(image_data))image.thumbnail(max_size, Image.Resampling.LANCZOS)# 转换为base64buffered = io.BytesIO()image.save(buffered, format="PNG")img_str = base64.b64encode(buffered.getvalue()).decode()return img_str# 主应用
def main():st.set_page_config(page_title="聊天室",page_icon="💬",layout="wide")# 初始化数据库conn = init_db()# 自定义CSS样式st.markdown("""<style>.chat-container {max-height: 500px;overflow-y: auto;border: 1px solid #ddd;border-radius: 10px;padding: 10px;margin-bottom: 20px;background-color: #f9f9f9;}.message {margin: 10px 0;padding: 10px;border-radius: 10px;background-color: #e3f2fd;}.own-message {background-color: #c8e6c9;margin-left: 20%;}.other-message {background-color: #f5f5f5;margin-right: 20%;}.timestamp {font-size: 0.8em;color: #666;float: right;}.username {font-weight: bold;color: #1976d2;}.file-message {background-color: #fff3e0;border-left: 4px solid #ff9800;}.image-message {background-color: #e8f5e8;border-left: 4px solid #4caf50;}.uploaded-image {max-width: 100%;max-height: 300px;border-radius: 5px;margin: 5px 0;}.file-info {font-size: 0.9em;color: #666;margin-top: 5px;}</style>""", unsafe_allow_html=True)# 标题st.title("💬 实时聊天室(支持图片和文件)")# 用户会话状态if 'user_id' not in st.session_state:st.session_state.user_id = Noneif 'username' not in st.session_state:st.session_state.username = None# 用户登录/注册if st.session_state.user_id is None:st.subheader("登录/注册")username = st.text_input("请输入用户名:", max_chars=20)if st.button("进入聊天室") and username:if len(username.strip()) > 0:st.session_state.user_id = add_user(username.strip(), conn)st.session_state.username = username.strip()st.rerun()else:st.error("用户名不能为空")else:# 显示当前用户信息col1, col2 = st.columns([3, 1])with col1:st.success(f"欢迎, {st.session_state.username}!")with col2:if st.button("退出"):st.session_state.user_id = Nonest.session_state.username = Nonest.rerun()# 聊天界面st.subheader("聊天室")# 自动刷新消息if 'last_refresh' not in st.session_state:st.session_state.last_refresh = time.time()# 消息显示区域messages_container = st.container()with messages_container:st.markdown("<div class='chat-container'>", unsafe_allow_html=True)messages = get_recent_messages(50, conn)messages.reverse() # 按时间顺序显示for msg_id, username, message_type, message_content, file_name, file_size, timestamp in messages:is_own_message = (username == st.session_state.username)if message_type == 'text':message_class = "own-message" if is_own_message else "other-message"st.markdown(f"""<div class="message {message_class}"><span class="username">{username}</span>: {message_content}<span class="timestamp">{timestamp}</span></div>""", unsafe_allow_html=True)elif message_type == 'image':message_class = "image-message"if is_own_message:message_class += " own-message"else:message_class += " other-message"# 显示图片if os.path.exists(message_content):with open(message_content, "rb") as f:image_data = f.read()img_str = resize_image(image_data)st.markdown(f"""<div class="message {message_class}"><span class="username">{username}</span> 发送了一张图片:<br><img src="data:image/png;base64,{img_str}" class="uploaded-image"><span class="timestamp">{timestamp}</span></div>""", unsafe_allow_html=True)elif message_type == 'file':message_class = "file-message"if is_own_message:message_class += " own-message"else:message_class += " other-message"# 显示文件信息和下载链接file_size_str = format_file_size(file_size)# 检查文件是否存在if os.path.exists(message_content):# 读取文件内容用于下载with open(message_content, "rb") as f:file_data = f.read()# 创建下载按钮st.markdown(f"""<div class="message {message_class}"><span class="username">{username}</span> 发送了一个文件:<br><strong>{file_name}</strong><div class="file-info">大小: {file_size_str}</div><span class="timestamp">{timestamp}</span></div>""", unsafe_allow_html=True)# 添加下载按钮st.download_button(label=f"📥 下载 {file_name}",data=file_data,file_name=file_name,mime="application/octet-stream",key=f"download_{msg_id}",use_container_width=True)else:st.markdown(f"""<div class="message {message_class}"><span class="username">{username}</span> 发送了一个文件:<br><strong>{file_name}</strong><div class="file-info">大小: {file_size_str} (文件已丢失)</div><span class="timestamp">{timestamp}</span></div>""", unsafe_allow_html=True)st.markdown("</div>", unsafe_allow_html=True)# 消息输入区域st.subheader("发送消息")# 选项卡:文本消息、图片、文件tab1, tab2, tab3 = st.tabs(["📝 文本消息", "🖼️ 发送图片", "📎 发送文件"])with tab1:# 文本消息输入if 'message_input' not in st.session_state:st.session_state.message_input = ""message_input = st.text_area("输入消息:", height=80, max_chars=500, value=st.session_state.message_input,key="message_input_widget")if st.button("发送文本消息", use_container_width=True) and message_input:if len(message_input.strip()) > 0:add_message(st.session_state.user_id, 'text', message_input.strip(), conn=conn)st.session_state.message_input = ""st.rerun()with tab2:# 图片上传uploaded_image = st.file_uploader("选择图片文件", type=['png', 'jpg', 'jpeg', 'gif', 'bmp'],key="image_uploader")if uploaded_image is not None:# 显示预览image = Image.open(uploaded_image)st.image(image, caption="图片预览", use_column_width=True)if st.button("发送图片", use_container_width=True):# 保存图片file_path = save_uploaded_file(uploaded_image)file_size = get_file_size(file_path)# 添加到消息add_message(st.session_state.user_id, 'image', file_path, uploaded_image.name, file_size, conn=conn)st.rerun()with tab3:# 文件上传uploaded_file = st.file_uploader("选择文件", type=None, # 允许所有文件类型key="file_uploader")if uploaded_file is not None:file_size = len(uploaded_file.getvalue())file_size_str = format_file_size(file_size)st.write(f"**文件名:** {uploaded_file.name}")st.write(f"**文件大小:** {file_size_str}")if st.button("发送文件", use_container_width=True):# 保存文件file_path = save_uploaded_file(uploaded_file)# 添加到消息add_message(st.session_state.user_id, 'file', file_path, uploaded_file.name, file_size, conn=conn)st.rerun()# 刷新按钮if st.button("刷新消息", use_container_width=True):st.rerun()# 自动刷新(每5秒)if time.time() - st.session_state.last_refresh > 5:st.session_state.last_refresh = time.time()st.rerun()if __name__ == "__main__":main()streamlit run app.py
聊天室应用
一个基于Python Streamlit和SQLite3的实时聊天室应用,支持文本、图片和文件传输。
功能特性
✅ 用户注册/登录
✅ 实时消息发送和接收
✅ 图片上传和显示
✅ 文件上传和下载
✅ 消息历史记录
✅ 自动刷新消息
✅ 响应式界面设计
✅ SQLite3数据库存储
✅ 文件本地存储管理
安装和运行
1. 安装依赖
pip install -r requirements.txt
streamlit>=1.28.0
Pillow>=10.0.0
2. 运行应用
streamlit run app.py
3. 访问应用
打开浏览器访问 http://localhost:8501
新功能说明
图片上传功能
支持格式:PNG、JPG、JPEG、GIF、BMP
自动调整图片大小显示
图片预览功能
本地文件存储
文件上传功能
支持所有文件类型
显示文件大小信息
文件信息展示
本地文件存储管理
消息类型区分
文本消息:绿色背景
图片消息:浅绿色背景,带图片边框
文件消息:橙色背景,显示文件信息
项目结构
chatroom/ ├── app.py # 主应用文件 ├── requirements.txt # 项目依赖 ├── chat.db # SQLite数据库文件(自动创建) └── README.md # 项目说明
数据库设计
用户表 (users)
id: 用户ID(主键)username: 用户名(唯一)created_at: 创建时间
消息表 (messages)
id: 消息ID(主键)user_id: 用户ID(外键)message: 消息内容timestamp: 发送时间
使用说明
登录/注册: 首次使用需要输入用户名,系统会自动创建用户
发送消息: 在底部输入框输入消息并点击"发送"
查看消息: 消息会自动显示在聊天区域,最新消息在底部
刷新消息: 点击"刷新"按钮或等待5秒自动刷新
退出: 点击"退出"按钮可以退出当前用户
技术栈
前端: Streamlit
后端: Python
数据库: SQLite3
实时更新: Streamlit自动刷新机制
注意事项
应用支持多用户同时在线聊天
消息历史最多显示50条最新消息
用户名不能重复
消息内容限制500字符以内
应用会自动创建数据库文件
