django的数据库原生操作sql
from django.db import connection
from django.db import transaction
from django.db.utils import (IntegrityError,OperationalError,ProgrammingError,DataError
)
from django.utils import timezoneclass Db(object):"""数据库操作工具类,封装了CRUD、批量操作、事务和软删除功能数据表必须要有:updated_at-更新时间created_at-创建时间is_deleted-软删除标志 0为删除,1已删除deleted_at-删除时间"""# ------------------------------# 核心查询方法# ------------------------------def execute_query(self, sql, params=None, fetchone=False, return_last_id=False):"""执行SQL查询:param sql: SQL语句:param params: 查询参数(元组):param fetchone: 是否只返回第一条记录(SELECT专用):param return_last_id: 是否返回最后插入的ID(INSERT/UPDATE/DELETE专用):return: SELECT返回字典列表/单条字典,其他语句返回受影响行数或最后插入ID"""try:with connection.cursor() as cursor:cursor.execute(sql, params or ())# 打印sql# print(f"{sql}====\n===={params}")if sql.strip().lower().startswith(('select', 'show')):columns = [col[0] for col in cursor.description]results = [dict(zip(columns, row)) for row in cursor.fetchall()]return results[0] if fetchone and results else resultselse:if return_last_id:try:return cursor.lastrowidexcept AttributeError:# 如果数据库后端不支持lastrowid,则返回受影响行数return cursor.rowcountelse:return cursor.rowcountexcept IntegrityError as e:error_msg = f"数据完整性错误(可能原因:主键冲突、唯一约束重复):{str(e)}"print(error_msg)raise ValueError(error_msg) from eexcept OperationalError as e:error_msg = f"数据库连接/操作失败(可能原因:数据库未启动、网络中断):{str(e)}"print(error_msg)raise ConnectionError(error_msg) from eexcept ProgrammingError as e:error_msg = f"SQL语法/表结构错误(可能原因:表/字段不存在、SQL拼写错误):{str(e)}"print(error_msg)raise SyntaxError(error_msg) from eexcept DataError as e:error_msg = f"数据格式错误(可能原因:类型不匹配、长度超限):{str(e)}"print(error_msg)raise TypeError(error_msg) from eexcept Exception as e:error_msg = f"意外错误:{str(e)}"print(error_msg)raise# ------------------------------# 插入操作# ------------------------------def insert_data(self, table, data, auto_time=True, return_last_id=True):"""插入单条数据:param table: 表名字符串:param data: 数据字典,格式 {字段名: 值}:param auto_time: 是否自动添加时间字段(默认True):param return_last_id: 是否返回最后插入的ID:return: 最后插入的ID或受影响的行数"""fields = list(data.keys())values = list(data.values())if auto_time:now = timezone.now()fields.extend(['created_at', 'updated_at'])values.extend([now, now])placeholders = ', '.join(['%s'] * len(fields))sql = f"INSERT INTO {table} ({', '.join(fields)}) VALUES ({placeholders})"return self.execute_query(sql, values, return_last_id=return_last_id)def batch_insert(self, table, data_list, auto_time=True, batch_size=1000):"""批量插入数据(自动分批次):param table: 表名字符串:param data_list: 数据字典列表,格式 [{字段名: 值}, ...]:param auto_time: 是否自动添加时间字段(默认True):param batch_size: 每批次处理的记录数(默认1000):return: 总共插入的行数"""if not data_list:return 0total_rows = 0total_batches = (len(data_list) + batch_size - 1) // batch_sizefor i in range(total_batches):start = i * batch_sizeend = start + batch_sizebatch_data = data_list[start:end]first_item = batch_data[0].copy()fields = list(first_item.keys())if auto_time:now = timezone.now()fields.extend(['created_at', 'updated_at'])for item in batch_data:item['created_at'] = nowitem['updated_at'] = nowplaceholders = ', '.join(['%s'] * len(fields))value_groups = [tuple(item.values()) for item in batch_data]flat_values = [val for group in value_groups for val in group]sql = f"INSERT INTO {table} ({', '.join(fields)}) VALUES "sql += ', '.join([f"({placeholders})"] * len(batch_data))rows = self.execute_query(sql, flat_values)total_rows += rowsreturn total_rows# ------------------------------# 更新操作# ------------------------------def update_data(self, table, data, where, params=None, auto_time=True, return_last_id=False):"""更新数据:param table: 表名字符串:param data: 要更新的数据字典,格式 {字段名: 值}:param where: WHERE条件(不带WHERE关键字),例如 "id = %s":param params: WHERE条件的参数列表:param auto_time: 是否自动更新updated_at字段(默认True):param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回更新的行ID):return: 受影响的行数或最后插入ID"""set_items = []values = list(data.values())if auto_time:data['updated_at'] = timezone.now()set_items = [f"{field} = %s" for field in data.keys()]values = list(data.values())else:set_items = [f"{field} = %s" for field in data.keys()]set_clause = ', '.join(set_items)sql = f"UPDATE {table} SET {set_clause} WHERE {where}"if params:values.extend(params)return self.execute_query(sql, values, return_last_id=return_last_id)def batch_update(self, table, data_list, where_field='id', auto_time=True, batch_size=1000):"""批量更新数据(使用CASE WHEN优化):param table: 表名字符串:param data_list: 数据字典列表,每个字典必须包含where_field字段:param where_field: 用于匹配记录的字段(默认id):param auto_time: 是否自动更新updated_at字段(默认True):param batch_size: 每批次处理的记录数(默认1000):return: 总共更新的行数"""if not data_list:return 0total_rows = 0total_batches = (len(data_list) + batch_size - 1) // batch_sizefor i in range(total_batches):start = i * batch_sizeend = start + batch_sizebatch_data = data_list[start:end]case_clauses = []values = []update_fields = {k for item in batch_data for k in item.keys() if k != where_field}for field in update_fields:case_sql = f"{field} = CASE {where_field} "for item in batch_data:case_sql += f"WHEN %s THEN %s "values.extend([item[where_field], item[field]])case_sql += "END"case_clauses.append(case_sql)where_ids = [item[where_field] for item in batch_data]values.extend(where_ids)if auto_time:now = timezone.now()case_clauses.append(f"updated_at = %s")values.append(now)set_clause = ', '.join(case_clauses)where_placeholders = ', '.join(['%s'] * len(where_ids))sql = f"UPDATE {table} SET {set_clause} WHERE {where_field} IN ({where_placeholders})"rows = self.execute_query(sql, values)total_rows += rowsreturn total_rows# ------------------------------# 删除/软删除操作# ------------------------------def soft_delete(self, table, where, params=None, return_last_id=False):"""软删除数据(标记is_deleted=1):param table: 表名字符串:param where: WHERE条件(不带WHERE关键字),例如 "id = %s":param params: WHERE条件的参数列表:param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回删除的行ID):return: 受影响的行数或最后插入ID"""data = {'is_deleted': 1, 'deleted_at': timezone.now()}return self.update_data(table, data, where, params, auto_time=False, return_last_id=return_last_id)def delete_data(self, table, where, params=None, hard_delete=False, return_last_id=False):"""删除数据(默认软删除):param table: 表名字符串:param where: WHERE条件(不带WHERE关键字),例如 "id = %s":param params: WHERE条件的参数列表:param hard_delete: 是否执行硬删除(物理删除):param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回删除的行ID):return: 受影响的行数或最后插入ID"""if hard_delete:sql = f"DELETE FROM {table} WHERE {where}"return self.execute_query(sql, params, return_last_id=return_last_id)else:return self.soft_delete(table, where, params, return_last_id=return_last_id)# ------------------------------# 查询操作# ------------------------------def get_list(self, table, where=None, params=None, order_by=None, limit=None, offset=None,with_deleted=False, only_deleted=False, fields=None):"""查询列表数据:param table: 表名字符串:param where: WHERE条件(不带WHERE关键字):param params: WHERE条件的参数列表:param order_by: 排序字段,例如 "created_at DESC":param limit: 返回记录数限制:param offset: 偏移量(用于分页):param with_deleted: 是否包含已删除数据:param only_deleted: 是否只返回已删除数据:param fields: 要返回的字段列表,默认返回所有字段:return: 符合条件的记录列表"""if fields:select_str = ', '.join(fields)else:select_str = '*'conditions = []query_params = []if where:conditions.append(where)if params:query_params.extend(params)if not with_deleted:if only_deleted:conditions.append("is_deleted = 1")else:conditions.append("is_deleted = 0")order_str = f"ORDER BY {order_by}" if order_by else ""limit_str = f"LIMIT {limit}" if limit is not None else ""offset_str = f"OFFSET {offset}" if offset is not None else ""where_clause = " AND ".join(conditions) if conditions else ""where_str = f"WHERE {where_clause}" if where_clause else ""sql = f"SELECT {select_str} FROM {table} {where_str} {order_str} {limit_str} {offset_str}"return self.execute_query(sql, query_params)def get_one(self, table, where, params=None, with_deleted=False, only_deleted=False, fields=None):"""查询单条数据:param table: 表名字符串:param where: WHERE条件(不带WHERE关键字):param params: WHERE条件的参数列表:param with_deleted: 是否包含已删除数据:param only_deleted: 是否只返回已删除数据:param fields: 要返回的字段列表,默认返回所有字段:return: 符合条件的单条记录或None"""return self.get_list(table, where, params,with_deleted=with_deleted,only_deleted=only_deleted,fields=fields,limit=1)def get_count(self, table, where=None, params=None, with_deleted=False, only_deleted=False):"""查询记录总数:param table: 表名字符串:param where: WHERE条件(不带WHERE关键字):param params: WHERE条件的参数列表:param with_deleted: 是否包含已删除数据:param only_deleted: 是否只返回已删除数据:return: 记录总数"""conditions = []query_params = []if where:conditions.append(where)if params:query_params.extend(params)if not with_deleted:if only_deleted:conditions.append("is_deleted = 1")else:conditions.append("is_deleted = 0")where_clause = " AND ".join(conditions) if conditions else ""where_str = f"WHERE {where_clause}" if where_clause else ""sql = f"SELECT COUNT(*) AS count FROM {table} {where_str}"result = self.execute_query(sql, query_params, fetchone=True)return result['count'] if result else 0# ------------------------------# 事务操作# ------------------------------def execute_transaction(self, operations):"""执行一组SQL操作作为事务:param operations: SQL操作列表,格式 [(SQL语句, 参数元组), ...]:return: 每个操作的返回结果列表:raises: 任何操作失败时抛出异常,所有操作回滚"""with transaction.atomic():results = []for idx, (sql, params) in enumerate(operations):try:result = self.execute_query(sql, params)results.append(result)except Exception as e:raise RuntimeError(f"事务中第{idx + 1}条SQL执行失败:{str(e)}") from ereturn results# ------------------------------# 数据恢复# ------------------------------def restore_data(self, table, where, params=None):"""恢复软删除的数据(is_deleted=0):param table: 表名字符串:param where: WHERE条件(不带WHERE关键字),例如 "id = %s":param params: WHERE条件的参数列表:return: 受影响的行数"""data = {'is_deleted': 0, 'deleted_at': None}return self.update_data(table, data, where, params, auto_time=True)
def get_count(self, table, where=None, params=None, with_deleted=False, only_deleted=False):"""查询记录总数:param table: 表名字符串:param where: WHERE条件(不带WHERE关键字):param params: WHERE条件的参数列表:param with_deleted: 是否包含已删除数据:param only_deleted: 是否只返回已删除数据:return: 记录总数"""conditions = []query_params = []if where:conditions.append(where)if params:query_params.extend(params)# if not with_deleted:# if only_deleted:# conditions.append("is_deleted = 1")# else:# conditions.append("is_deleted = 0")where_clause = " AND ".join(conditions) if conditions else ""where_str = f"WHERE {where_clause}" if where_clause else ""sql = f"SELECT COUNT(*) AS count FROM {table} {where_str}"result = self.execute_query(sql, query_params, fetchone=True)return result['count'] if result else 0