当前位置: 首页 > news >正文

python对mysql数据库的操作

现在遇到一个问题如何将数据批量的插入mysql数据库中

基础操作


import asyncio
from config import config
from mysql_pool import MysqlPool


class MysqlLoop(object):
    def __init__(self):
        self.logger = config.logger
        self.pool = MysqlPool()

    def loop_query(self, queries):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        results = loop.run_until_complete(
            self.some_query(loop, self.mysql_query, queries))
        return results

    def loop_many_query(self, queries):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        results = loop.run_until_complete(
            self.some_query(loop, self.mysql_many_query, queries))
        return results

    async def some_query(self, loop, func, args):
        tasks = []
        for item in args:
            tasks.append(self.make_future(loop, func, *item))
        results = await asyncio.gather(*tasks)
        return results

    async def make_future(self, loop, func, *args):
        future = loop.run_in_executor(None, func, *args)
        result = await future
        return result

    def mysql_query(self, sql, args=None):
        return self.pool.select_one(sql, args)

    def mysql_many_query(self, sql, args=None):
        return self.pool.select_all(sql, args)

import pymysql
import logging
import os
from dbutils.pooled_db import PooledDB


class MysqlPool(object):
    def __init__(self, db = 'spiders_binance'):
        self.db = db
        self.logger = logging
        self.pool = self.mysql_connection()

    def mysql_connection(self):
        host = 'rm-wz97166ln9cin6304zo.mysql.rds.aliyuncs.com' 
        pool = PooledDB(pymysql,
                        maxconnections=4,
                        maxcached=10,
                        host=host,
                        user='biteagle',
                        port=3306,
                        passwd="rfpMh@F36KsyQ2M",
                        db=self.db,
                        charset='utf8',
                        use_unicode=True)
        return pool

    def create_conn(self):
        conn = self.pool.connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        return conn, cursor

    def close_conn(self, conn, cursor):
        conn.close()
        cursor.close()

    def select_one(self, sql, args=None):
        conn, cur = self.create_conn()
        cur.execute(sql, args)
        result = cur.fetchone()
        self.close_conn(conn, cur)
        return result

    def select_all(self, sql, args=None):
        conn, cur = self.create_conn()
        cur.execute(sql, args)
        result = cur.fetchall()
        self.close_conn(conn, cur)
        return result

    def insert_one(self, sql, args=None):
        conn, cur = self.create_conn()
        result = cur.execute(sql, args)
        pk_id = cur.lastrowid
        conn.commit()
        self.close_conn(conn, cur)
        return pk_id

    def delete_one(self, sql, args=None):
        conn, cur = self.create_conn()
        result = cur.execute(sql, args)
        conn.commit()
        self.close_conn(conn, cur)
        return result

    def update_one(self, sql, args=None):
        conn, cur = self.create_conn()
        result = cur.execute(sql, args)
        conn.commit()
        self.close_conn(conn, cur)
        return result

    def update_many(self, table_name, col_list, data_list, pri_name='id'):
        # sql语句

        cols = ", ".join('`{}`=%s'.format(k) for k in col_list)
        update_many_article_into_news_sql = f"""
        UPDATE
        {table_name}
        SET
        {cols}
        WHERE
        {pri_name} = %s;
        """

        conn, cur = self.create_conn()
        # 批量插入
        try:
            res = cur.executemany(update_many_article_into_news_sql, data_list)
            # print(res)
            conn.commit()
        except Exception as e:
            self.logger.error(e)
            conn.rollback()
        finally:
            self.close_conn(conn, cur)

    def update(self, table_name, pk_id, update_data, pri_name='id'):
        """
        update data into mysql while pk = id
        """
        cols = ', '.join('`{}`=%s'.format(k) for k in update_data)
        update_sql = f"""
        UPDATE
        {table_name}
        SET
        {cols}
        WHERE
        {pri_name} = {pk_id};
        """

        self.update_one(update_sql, list(update_data.values()))

    def save_many(self, table_name, col_list, data_list):
        # sql语句

        cols = ", ".join('`{}`'.format(k) for k in col_list)
        val_cols = ', '.join('%s' for k in col_list)
        save_many_article_into_news_sql = f'INSERT IGNORE INTO {table_name}({cols}) VALUES ({val_cols})'

        conn, cur = self.create_conn()
        # 批量插入
        try:
            res = cur.executemany(save_many_article_into_news_sql, data_list)
            # print(res)
            conn.commit()
        except Exception as e:
            self.logger.error(e)
            conn.rollback()
        finally:
            self.close_conn(conn, cur)

    def save(self, table_name, save_data):
        """
        save data into mysql
        """

        cols = ", ".join('`{}`'.format(k) for k in save_data.keys())
        val_cols = ', '.join('%({})s'.format(k) for k in save_data.keys())
        save_article_into_news_sql = f"""
        INSERT IGNORE INTO 
        {table_name}
        (%s) 
        VALUES
        (%s)
        """

        # self.logger.info(f'save_data: {save_data}')

        news_id = self.insert_one(
            save_article_into_news_sql % (cols, val_cols), save_data)
        # self.logger.info('save succeed.')

        return news_id

    def delete(self, table_name, pk_id, pri_name='id'):
        delete_user_sql = f"""
        DELETE FROM
        {table_name}
        WHERE
        {pri_name} = %s;
        """

        self.delete_one(delete_user_sql, pk_id)


if __name__ == "__main__":
    from langchain.document_loaders import DirectoryLoader, TextLoader
    import pymysql
    import re
    data_directory = r"C:/Users/Asus/Desktop/back_bitspider2/chroma/data"
    pool = MysqlPool()
    cols = ['id', 'title', 'content']
     #按目录加载文档
    loader = DirectoryLoader(data_directory, glob='**/*.txt')
    docs = loader.load()
    from time import sleep
    # cursor.execute(sql,(id,title,content))
    # db.commit()
    # i = 0
    data_list = [
    ]
    for  i in range(len(docs)):
        if i < 99:
            continue
        str = docs[i].metadata["source"]
        match = re.search(r"chroma\\data\\(.*).txt", str)
        if match:
            title = match.group(1)
        # title = ""
        content = docs[i].page_content
        # data.append(id,title,content)
        data_list.append([i,title,content])
        
        
        
  
    pool.save_many('binance', cols, data_list)
    

相关文章:

  • 深度学习中多机训练概念下的DP与DDP
  • C++ 编程指南35 - 为保持ABI稳定,应避免模板接口
  • SQL查询语句的执行顺序
  • C++(初阶)(十一)——list
  • 数据结构实验6.1:矩阵的螺旋方阵输出
  • 在ArcGIS Pro中将栅格NoData值修改为特定值
  • QEMU源码全解析 —— 块设备虚拟化(19)
  • 【项目管理】第12章 项目质量管理-- 知识点整理
  • JavaScript 输入输出语句
  • Docker 部署 Kafka 完整指南
  • 系统编程3(共享内存/信号量)
  • 【数据结构与算法】——堆(补充)
  • 人的需求更多是动物本能—观《枪王》
  • 计算视觉与数学结构及AI拓展
  • 【家政平台开发(41)】家政平台性能蜕变:性能测试与优化全解析
  • Spring Boot 中应用的设计模式
  • Spring Security + JWT 实现前后端分离权限控制实战教程
  • 机器人仿真:URDF与Gazebo
  • 学习MySQL的第九天
  • 【C++】 —— 笔试刷题day_15
  • 全力打好二季度经济回升向好的硬仗,全方位夯实耕地保护和粮食安全根基……今天的市政府常务会议研究了这些重要事项
  • 美官员鼓动他国“退出WHO”遭冷遇,鲁比奥辩称美国没“退群”
  • 公元1059年:宋朝人为什么这么爱杜甫?
  • 深圳南山法院回应“执行款未到账”:张核子公司申请的执行异议成立
  • 同济大学党委常务副书记冯身洪履新中国科协党组副书记
  • 中方敦促美国停止将溯源问题政治化