python mysql-connector、PyMySQL基础
目录
一、python mysql mysql-connector。
创建数据库连接。
创建数据库、数据表。
插入单条数据、批量插入executemany()。
查询数据。fetchall()获取查询结果的所有数据。
更新表数据。
删除表数据。
删除表。
二、PyMySQL。
三、typing模块类型提示:Optional、Union。
一、python mysql mysql-connector。
- 配置 pip 镜像源教程:https://suisuipingan-hyl.blog.csdn.net/article/details/154346181?spm=1001.2014.3001.5502
 http://配置pip镜像源
- 使用 pip 命令安装 mysql-connector。
 python -m pip install mysql-connector (旧) pip install mysql-connector-python (推荐)
创建数据库连接。
import mysql.connectormydb_config = {"host": "127.0.0.1","port": 3306,"user": "root","password": "test123123" } conn = None try:conn = mysql.connector.connect(**mydb_config) #参数解包if conn.is_connected():print("连接成功") #连接成功 except mysql.connector.Error as e:print(f"连接失败:{str(e)}") finally:if conn.is_connected():conn.close()print("连接已关闭") #连接已关闭
创建数据库、数据表。
import mysql.connectormydb_config = {"host": "127.0.0.1","port": 3306,"user": "root","password": "test123123" } conn = None my_cursor = None #游标对象 try:conn = mysql.connector.connect(**mydb_config) #参数解包if conn.is_connected():print("连接成功") #连接成功my_cursor = conn.cursor()#创建数据库db_name = "python_test_db"create_db_sql = f"create database if not exists {db_name}"my_cursor.execute(create_db_sql) #执行SQLprint(f"数据库{db_name}已创建") #数据库python_test_db已创建#切换数据库conn.database = db_name#创建数据表create_table_sql = """create table if not exists students (id int auto_increment primary key,name varchar(50) not null,age int,gender int,create_time datetime default current_timestamp,check (gender in (0,1,2)))"""my_cursor.execute(create_table_sql)print(f"数据表students已创建") #数据表students已创建 except mysql.connector.Error as e:print(f"操作失败:{str(e)}") finally:if my_cursor:my_cursor.close()print("游标已关闭") #游标已关闭if conn and conn.is_connected():conn.close()print("连接已关闭") #连接已关闭
插入单条数据、批量插入executemany()。
mysql.connector 的 execute() 方法要求 SQL 语句的参数必须是可迭代对象(元组、列表等)。
其中:元组是不可变的(创建后不能修改),能避免参数被意外篡改。元组语法简洁,与 SQL 占位符 %s 的对应关系清晰。(元组中的元素会按顺序替换 %s)
import mysql.connectormydb_config = {"host": "127.0.0.1","port": 3306,"user": "root","password": "test123123","database": "python_test_db" #指定已创建的数据库 } conn = None my_cursor = None #游标对象 try:conn = mysql.connector.connect(**mydb_config) #参数解包if conn.is_connected():print("连接成功") #连接成功my_cursor = conn.cursor()#插入单条数据insert_sql = "insert into students (name,age,gender) values(%s,%s,%s)"student1 = ("小王",18,1) #1:男,2:女my_cursor.execute(insert_sql,student1)print(f"插入1条数据成功,id:{my_cursor.lastrowid}") #插入1条数据成功,id:1#批量插入多条数据students = [("李四", 22, 1),("小美", 19, 2),("未知用户", None, 0)]my_cursor.executemany(insert_sql, students)print(f"插入{my_cursor.rowcount}条数据成功") #插入3条数据成功conn.commit() #提交事务,数据表内容有更新,必须使用该语句except mysql.connector.Error as e:if conn:conn.rollback() #出错时回滚事务print(f"操作失败:{str(e)}") finally:if my_cursor:my_cursor.close()print("游标已关闭") #游标已关闭if conn and conn.is_connected():conn.close()print("连接已关闭") #连接已关闭
查询数据。fetchall()获取查询结果的所有数据。
import mysql.connectormydb_config = {"host": "127.0.0.1","port": 3306,"user": "root","password": "test123123","database": "python_test_db" #指定已创建的数据库 } conn = None my_cursor = None #游标对象 try:conn = mysql.connector.connect(**mydb_config) #参数解包if conn.is_connected():print("连接成功") #连接成功my_cursor = conn.cursor()#查询所有select_all_sql = "select id, name, age, gender, create_time from students"my_cursor.execute(select_all_sql)all_students = my_cursor.fetchall()#遍历元组列表for student in all_students:#id:2, name:李四, age:22, gender:1, create_time:2025-11-03 15:21:26#id:4, name:未知用户, age:None, gender:0, create_time:2025-11-03 15:21:26print(f"id:{student[0]}, name:{student[1]}, age:{student[2]}, gender:{student[3]}, create_time:{student[4]}")#where条件查询select_where_sql = "select name, age from students where age > %s"my_cursor.execute(select_where_sql, (20,)) #参数必须是元组for row in my_cursor.fetchall():print(f"年龄>20查询结果 => name:{row[0]}, age:{row[1]}") #年龄>20查询结果 => name:李四, age:22#limit查询select_limit_sql = "select name, age from students limit %s"my_cursor.execute(select_limit_sql, (2,))for row in my_cursor.fetchall():print(f"limit查询结果 => name:{row[0]}, age:{row[1]}") #limit查询结果 => name:小王, age:18、limit查询结果 => name:李四, age:22#limit + offset查询select_limit_offset_sql = "select name, age from students limit %s offset %s" #offset从第4条(索引3)开始读my_cursor.execute(select_limit_offset_sql, (1, 3))for row in my_cursor.fetchall():print(f"limit + offset查询结果 => name:{row[0]}, age:{row[1]}") #limit + offset查询结果 => name:未知用户, age:Noneexcept mysql.connector.Error as e:if conn:conn.rollback() #出错时回滚事务print(f"操作失败:{str(e)}") finally:if my_cursor:my_cursor.close()print("游标已关闭") #游标已关闭if conn and conn.is_connected():conn.close()print("连接已关闭") #连接已关闭
更新表数据。
import mysql.connectormydb_config = {"host": "127.0.0.1","port": 3306,"user": "root","password": "test123123","database": "python_test_db" #指定已创建的数据库 } conn = None my_cursor = None #游标对象 try:conn = mysql.connector.connect(**mydb_config) #参数解包if conn.is_connected():print("连接成功") #连接成功my_cursor = conn.cursor()#更新表数据update_sql = "update students set age = %s where name = %s"my_cursor.execute(update_sql, (100, "未知用户"))print("更新成功!受影响的行数:", my_cursor.rowcount) #更新成功!受影响的行数: 1conn.commit() #数据表内容有更新,必须使用该语句except mysql.connector.Error as e:if conn:conn.rollback() #出错时回滚事务print(f"操作失败:{str(e)}") finally:if my_cursor:my_cursor.close()print("游标已关闭") #游标已关闭if conn and conn.is_connected():conn.close()print("连接已关闭") #连接已关闭
删除表数据。
import mysql.connectormydb_config = {"host": "127.0.0.1","port": 3306,"user": "root","password": "test123123","database": "python_test_db" #指定已创建的数据库 } conn = None my_cursor = None #游标对象 try:conn = mysql.connector.connect(**mydb_config) #参数解包if conn.is_connected():print("连接成功") #连接成功my_cursor = conn.cursor()#删除表数据delete_sql = "delete from students where name = %s"my_cursor.execute(delete_sql,("未知用户",))print(f"删除成功!影响行数:{my_cursor.rowcount}") #删除成功!影响行数:1conn.commit() #数据表内容有更新,必须使用该语句except mysql.connector.Error as e:if conn:conn.rollback() #出错时回滚事务print(f"操作失败:{str(e)}") finally:if my_cursor:my_cursor.close()print("游标已关闭") #游标已关闭if conn and conn.is_connected():conn.close()print("连接已关闭") #连接已关闭
删除表。
import mysql.connectormydb_config = {"host": "127.0.0.1","port": 3306,"user": "root","password": "test123123","database": "python_test_db" #指定已创建的数据库 } conn = None my_cursor = None #游标对象 try:conn = mysql.connector.connect(**mydb_config) #参数解包if conn.is_connected():print("连接成功") #连接成功my_cursor = conn.cursor()#删除表delete_table_sql = "drop table if exists students"my_cursor.execute(delete_table_sql) #隐式提交,不需要conn.commit()except mysql.connector.Error as e:if conn:conn.rollback() #出错时回滚事务print(f"操作失败:{str(e)}") finally:if my_cursor:my_cursor.close()print("游标已关闭") #游标已关闭if conn and conn.is_connected():conn.close()print("连接已关闭") #连接已关闭
二、PyMySQL。
- PyMySQL(社区开发者维护) 与 mysql-connector-python(官方维护) 几乎一致,都是 Python 连接 MySQL 数据库的第三方库。也就是出生、兼容性、细节语法有些些不同。
 
- 安装执行命令:pip install pymysql。
 - 基础导入与创建连接:import pymysql 、pymysql.connect(** config)、conn.cursor()、cursor.execute() ... 。
 
- 实际开发自我选择即可,大差不多。
 
三、typing模块类型提示:Optional、Union。
- 用于静态类型检查工具或IDE检查。
 - Optional:可选。某个变量 / 参数可以是指定类型,也可以是None。
 - Optional [T] 等价:Union [ T, None ] , T是任意类型。
 from typing import Optionaldef hello(name: Optional[str]) -> str:if name is None:return "hello Guest!"return f"hello {name}!"print(hello("zhangsan")) #hello zhangsan! print(hello(None)) #hello Guest!
- Union:联合类型。某个变量 / 参数可以是多个指定类型中的任意一种。
 - Union [T1, T2, ..., Tn] 。其中T1 到 Tn 是不同类型。
 from typing import Uniondef add(a: Union[int, float],b: Union[int, float]) -> Union[int, float]:return a + bprint(add(2,3)) #5 print(add(2.5,3)) #5.5 print(add('2',3)) #提示:应为类型 'int | float',但实际为 'str'


