当前位置: 首页 > news >正文

(原创)用python语言基于paddleocr构建批量识别实现纸质和电子的增值税专用发票程序

文章目录

  • 1. 说明
  • 2. 准备工作
  • 3. 代码
    • 3.1 导入库:
    • 3.2 遍历发票指定处理方式
    • 3.3 发票识别相关函数
    • 3.4 发票字段定位函数
    • 3.6 识别记录相关函数
    • 3.6 识别结果校验
    • 3.7 文件预处理等其他函数
    • 3.8 main主函数

1. 说明

1.1 以paddle识别引擎为基础的增值税发票识别程序,可批量识别和累积纸质发票和电子发票数据。已经生产环境中测试。
1.2 识别的源发票数据:
    - 文件夹中存放的用高速连续发票扫描仪批量扫描的JPG格式图片
    - 文件夹中汇集的电子发票PDF格式文件
1.3 可选择用识别引擎:快速-mb 平衡:sv 精细-pp (总体上,预识别用mb,精细用pd,速度和精确度比较好。
1.4 适配断续工作,跳过已扫描的重复发票,边识别边存储。
1.5 可装在闲置低配置的win7老台式,资源利用,识别速度视电脑配置差异大概2-3秒一张。
1.6 在实际生产环境中测试,如果纸质发票不清晰,综合识别准确率大概85%-95%左右。如果数电发票比较多,识别准确率大概达到97%以上。
1.7 对于识别有误或缺失的数据,在结果中提示错误并链接原发票文件,以便人工直接对照修改。
1.8 其他: 
     - 公司名称税号可在代码中预置设定好,位置在发票字段定位函数Loc_range_content_pandas。
     - 可自行预置对方公司名称错误的更正,详细可在Check_result函数中此处文字内容"字段修正:公司名错别字"所在位置的字典修改。

2. 准备工作

2.1 准备工作发票电子文件夹:已用高速连续发票扫描仪扫描完纸质发票的图片文件夹,和已汇集的电子发票PDF格式文件夹。
2.2 安装好辅助程序 acrobat pro dc
2.3 语言环境 anaconda,python3.7(虚拟环境)
2.4 环境中安装好所需要的库(自行安装好虚拟环境中所需的第三方库):
    imghdr, shutil, glob, pathlib, tkinter, cv2, numpy, paddlehub, pandas, psutil, openpyxl, paddleocr, pillow, pyzbar, ZipFile, pymupdf

3. 代码

3.1 导入库:

# -*- coding: utf-8 -*-
# 程序名: final_inv_ocr
# Author: ddxn417
# email:allenzhang0182@qq.com
import imghdr
import math
import os
import re
import shutil
from collections import OrderedDict
from datetime import datetime
from glob import glob
from pathlib import Path
from tkinter import filedialog
from tkinter import Tk
import cv2
import numpy as np
import paddlehub as hub
import pandas as pd
import psutil
from openpyxl import cell, load_workbook
from openpyxl.styles import Font, colors
from paddleocr import PaddleOCR, draw_ocr
from PIL import Image, ImageDraw, ImageEnhance, ImageFont
from pyzbar import pyzbar
from zipfile import ZipFile
import fitz #pip install pymupdf

3.2 遍历发票指定处理方式

# 遍历文件夹内的发票文件,识别。
def walk_folder_ocr(origin_pandas,duplicate_pandas,origin_folder_path,**walk_folder_args):
    ocr_engines = walk_folder_args['ocr_engines']
    temp_folder_path = walk_folder_args['temp_folder_path']
    prepare_engine = walk_folder_args['engine_switch']
    result_pandas = origin_pandas
    # 获取文件夹内所有的jpg和pdf文件个数
    cnt_file = len({
   p.resolve() for p in Path(origin_folder_path).glob("*") if p.suffix in [".jpg", ".pdf"]})
    # 如果要包括子目录中的文件,则为:
    # cnt_total = len({p.resolve() for p in Path(origin_folder_path).glob("**/*") if p.suffix in [".jpg", ".pdf"]})
    inv_dict = {
   }  #发票字典初始化  
    #从origin_pandas 构建inv_dict字典(票号:文件路径)
    if not result_pandas.empty:
        for i, (index, row) in enumerate(result_pandas.iterrows()):
            if row['01票号'] is np.NAN: #如果票号是空,则跳过
                continue
            if row['01票号'] not in inv_dict:
                inv_dict[row['01票号']] = [row['file_path']]
            else:
                inv_dict[row['01票号']].append(row['file_path'])
    if not duplicate_pandas.empty:
        for i, (index, row) in enumerate(duplicate_pandas.iterrows()):
            if row['重复票号'] is np.NAN: #如果票号是空,则跳过
                continue
            if row['重复票号'] not in inv_dict:
                inv_dict[row['重复票号']] = [row['file_path']]
            else:
                inv_dict[row['重复票号']].append(row['file_path'])   
    cnt_done = 0
    cnt_duplicate = 0
    if not origin_pandas.empty:
        cnt_done = len(origin_pandas.loc[origin_pandas['file_path'].notnull(),:])
    if not duplicate_pandas.empty:
        cnt_duplicate = len(duplicate_pandas.loc[duplicate_pandas['file_path'].notnull(),:])
    for file_name in os.listdir(origin_folder_path): #只在本层文件夹内遍历
        file_path = os.path.join(origin_folder_path, file_name)
        if os.path.isfile(file_path): #排除file_name是文件夹的情况
            pr,nm,fr,ex = pathsplit(file_path)
            if ex not in ['.pdf','.jpg']:
                continue

            inv_out_of_result_pandas = True
            inv_out_of_duplicate_pandas = True
            # 在上次结果文件和重复文件记录中查找文件路径:
            try:
               inv_out_of_result_pandas = result_pandas.loc[result_pandas['file_path']==file_path,:].empty
               inv_out_of_duplicate_pandas = duplicate_pandas.loc[duplicate_pandas['file_path']==file_path,:].empty
            except:
                pass
            #如果文件路径在上次结果文件和重复文件记录中查询结果不为空,即曾识别过,则跳过该文件
            if not(inv_out_of_result_pandas and inv_out_of_duplicate_pandas):
                continue        
            result_series_orderdic = OrderedDict() #定义series有序字典
            err_info = '' #错误记录初始化
            if ex == '.pdf':
                inv_code = ''
                pdf_trans_file_fr = fr
                pdf_trans_file_ex = '.xlsx'
                # pdf_trans_file_ex = '.txt'
                pdf_trans_file_nm = pdf_trans_file_fr + pdf_trans_file_ex
                pdf_trans_folder_name = 'temp_pdf_trans_excel'
                pdf_trans_folder_path = os.path.join(temp_folder_path, pdf_trans_folder_name)
                
                if not os.path.exists(pdf_trans_folder_path):
                    os.mkdir(pdf_trans_folder_path)

                pdf_trans_file_path = os.path.join(pdf_trans_folder_path, pdf_trans_file_nm)
                
                if not os.path.exists(pdf_trans_file_path):
                    trans_type = '.xlsx'
                    # trans_type = '.txt'
                    pdf_trans_file_path = Pdf_tans_to(file_path, pdf_trans_file_path, trans_type = trans_type, temp_pdf_trans_excel_out = True)
                
                if os.path.exists(pdf_trans_file_path):
                    result_series_orderdic, err_info, inv_dict = Tele_inv_ocr(ocr_engines, result_series_orderdic, inv_dict, file_path, pdf_trans_file_path, err_info, engine_switch = precise_engine)   

                if len(result_series_orderdic) != 0:
                    if '01票号' in result_series_orderdic:
                        inv_code = result_series_orderdic['01票号'][0].values[0]
                        #票号添加到票号字典
                        if inv_code not in inv_dict:
                            inv_dict[inv_code] = [file_path]
                        else:
                            if file_path not in inv_dict[inv_code]:
                                inv_dict[inv_code].append(file_path)
                        if len(inv_dict[inv_code]) > 1: #如果该票号的发票重复,跳出本张图片循环
                            if duplicate_pandas.empty:
                                duplicate_pandas = pd.DataFrame(data={
   '重复票号':[inv_code],'file_path':[file_path]}) 
                            else:
                                duplicate_pandas = pd.concat([duplicate_pandas, pd.DataFrame(data={
   '重复票号':[inv_code],'file_path':[file_path]})], ignore_index = True, axis = 0)    
                            Log_result_file(duplicate_pandas,result_file_path,duplicate_sheet_name)
                            cnt_duplicate = cnt_duplicate + 1
                            print(datetime.now().strftime("%H:%M:%S"),file_path, 'Skip. ','\n\t\tDuplicate:', inv_code,inv_dict[inv_code][0])
                            #发票号重复,跳出本次识别
                            continue 
                else:
                    #如果没有结果,转成图片识别
                    pdf_trans_file_ex = '.jpg'
                    pdf_trans_file_nm = pdf_trans_file_fr + '.jpg'
                    pdf_trans_folder_name = 'temp_pdf_trans_jpg'
                    pdf_trans_folder_path = os.path.join(temp_folder_path, pdf_trans_folder_name)
                    pdf_trans_jpg_file_path = os.path.join(pdf_trans_folder_path, pdf_trans_file_nm)
                    pdf_trans_jpg_file_path = Pdf_tans_jpg(file_path, pdf_trans_jpg_file_path, temp_pdf_trans_jpg_out = True)

                    if len(pdf_trans_jpg_file_path)>0:
                        if os.path.exists(pdf_trans_jpg_file_path):
                            #如果传回了转成图片的路径,并且路径存在,读取jpg路径,付给file_path,转成ocr识别:
                            print('\n\nPDF转成图片识别:',pdf_trans_jpg_file_path,'【此模块待添加。】\n\n')
                        

            elif str.lower(ex) == '.jpg':        
                known_dict = {
   } #初始化
                inv_code ='' #初始化
                temp_img_trans_excel_folder = os.path.join(temp_folder_path,'temp_img_trans_excel')
                img_trans_xls_name = 'result_' + fr +  '.xlsx' 
                img_trans_xls_path = os.path.join(temp_img_trans_excel_folder, img_trans_xls_name)

                if os.path.exists(img_trans_xls_path):
                    origin_df = pd.read_excel(img_trans_xls_path, sheet_name=0,header=0,index_col=0,na_values=None, keep_default_na=False, dtype=object) #读取表格
                else:
                    known_dict = Crop_known_from_qrcode(file_path)
                    if len(known_dict)>0:
                        inv_code = known_dict['01票号'].values[0]
                        #票号添加到票号字典
                        if inv_code not in inv_dict:
                            inv_dict[inv_code] = [file_path]
                        else:
                            if file_path not in inv_dict[inv_code]:
                                inv_dict[inv_code].append(file_path)
                        if len(inv_dict[inv_code]) > 1: #如果该票号的发票重复,跳出本张图片循环
                            if duplicate_pandas.empty:
                                duplicate_pandas = pd.DataFrame(data={
   '重复票号':[inv_code],'file_path':[file_path]}) 
                            else:
                                duplicate_pandas = pd.concat([duplicate_pandas, pd.DataFrame(data={
   '重复票号':[inv_code],'file_path':[file_path]})], ignore_index = True, axis = 0)    
                            Log_result_file(duplicate_pandas,result_file_path,duplicate_sheet_name)
                            cnt_duplicate = cnt_duplicate + 1
                            print(datetime.now().strftime("%H:%M:%S"),file_path, 'Skip. ','\n\t\tDuplicate:', inv_code,inv_dict[inv_code][0])
                            #发票号重复,跳出本次识别
                            continue 
                    origin_df = Ocr_func(ocr_engines, img_path = file_path, temp_folder_path = temp_folder_path, 
                        range_title = '', known_dict=known_dict, ocr_excel_out = ocr_excel_out, draw_result_out = draw_result_out, engine_switch=prepare_engine)  #识别为原始文本df
                if not origin_df.empty:
                    result_series_orderdic, err_info = Loc_range_content_pandas(ocr_engines, origin_df, result_series_orderdic, err_info, known_dict, file_path, temp_folder_path, enhance = enhance, engine_switch=precise_engine) #处理为结果series字典
                    if len(result_series_orderdic['01票号']) > 0:
                        inv_code = result_series_orderdic['01票号'].values[0]
                        # assert isinstance(inv_code,str)
                        # assert len(inv_code) == 8 or len(inv_code) == 20
                        if inv_code not in inv_dict:
                            inv_dict[inv_code] = [file_path]
                        else:
                            if file_path not in inv_dict[inv_code]:
                                inv_dict[inv_code].append(file_path)
            if len(inv_code)>0 and inv_code in inv_dict and len(inv_dict[inv_code]) >1:
            # duplicate_df = pd.read_excel(result_file_path, sheet_name=duplicate_sheet_name,index_col=0,header = 0,keep_default_na=True,dtype=object) #读取表格
                if duplicate_pandas.empty:
                    duplicate_pandas = pd.DataFrame(data={
   '重复票号':[inv_code],'file_path':[file_path]}) 
                else:
                    duplicate_pandas = pd.concat([duplicate_pandas, pd.DataFrame(data={
   '重复票号':[inv_code],'file_path':[file_path]})], ignore_index = True, axis = 0)    
                Log_result_file(duplicate_pandas,result_file_path,duplicate_sheet_name)
                cnt_duplicate = cnt_duplicate + 1
                print(datetime.now().strftime("%H:%M:%S"),file_path, 'Skip. ','\n\t\tDuplicate:', inv_code,inv_dict[inv_code][0])
                continue #如果发票号不只一张,跳出本次识别
            #series列表合成dataframe:
            bind_df = pd.DataFrame([result_series_orderdic[series_title][0] if isinstance(result_series_orderdic[series_title], list) else result_series_orderdic[series_title] for series_title in result_series_orderdic]).T
            columns_list =  ['01票号','02代码','03日期','04购方','05购方税号','06品名','07单位','08数量','09单价','10税前',
                    '11税率','12税额','13合计税前','14合计税额','15总额','16大写','17销方','18销方税号'] 
            if len(bind_df) == 0:
                bind_df = pd.DataFrame(columns = columns_list)
            result_df = bind_df.copy() #浅拷贝,防止下面填充提示错误
            result_df['file_path'] = ''
            if len(result_df) == 0:
                result_df = result_df.append({
   'file_path':file_path},ignore_index = True) #追加文件路径到第一行
            else:
                result_df['file_path'].values[0] = file_path #追加文件路径到第一行
            result_df['err_info'] = ''
            result_df.loc[result_df.index[0],'err_info'] = err_info #追加错误提示到第一行
            # 填充处理:务必先处理na值,再进行后续处理。
            
            result_df = Fill_na_result(result_df)

            if result_pandas.empty:
                result_pandas = result_df
            else:
                result_pandas = pd.concat([result_pandas, result_df], ignore_index = True, axis = 0)

            result_pandas = Check_result(result_pandas) #检查和修改结果 每识别一个文件,重新检查前面所有的发票
            #每识别一个文件,写入结果文件,防止中间出错导致未保存结果而重复识别,以实现断点接续,提高总体的效率:
            Log_result_file(result_pandas,result_file_path,result_sheet_name)
            # writer = pd.ExcelWriter(result_file_path, engine='openpyxl', mode='a', if_sheet_exists='replace')
            # duplicate_pandas.to_excel(writer,sheet_name=duplicate_sheet_name)
            # writer.close()

            #-----添加文件路径超链接------
            Add_hyperlink(result_file_path,result_sheet_name)

            cnt_done = cnt_done + 1
            print(datetime.now().strftime("%H:%M:%S"),file_name, inv_code,'done: ' + str(cnt_done) + ' / ' + str(cnt_file))
    # cnt_dict = {'cnt_file':cnt_file,'cnt_done':cnt_file,'cnt_done':cnt_duplicate}
    return result_pandas,duplicate_pandas

3.3 发票识别相关函数

# ocr image to origin_DataFrame. 
def Ocr_func(ocr_engines, img_path, temp_folder_path,  range_title='', known_dict = {
   }, ocr_excel_out = True, draw_result_out = False, engine_switch = 0) ->object: #DataFrame            

    p,n,fr,ex = pathsplit(img_path) #拆分路径

    temp_img_trans_excel_folder = os.path.join(temp_folder_path,'temp_img_trans_excel')
    temp_draw_result_folder = os.path.join(temp_folder_path,'temp_draw_result')
    if engine_switch == 0:
        engine = 'mb'
    elif engine_switch == 1:
        engine = 'pp'
    elif engine_switch == 2:
        engine = 'sv'
    if range_title =='':
        img_trans_xls_name = 'result(' + engine + ')_' + fr + '.xlsx' 
    else:
        img_trans_xls_name = 'result(' + engine + ')_' + fr + '_' + range_title + '.xlsx' 
    img_trans_xls_path = os.path.join(temp_img_trans_excel_folder, img_trans_xls_name)

    if not os.path.exists(temp_img_trans_excel_folder):
        Create_clear_dir(temp_img_trans_excel_folder)
    if not os.path.exists(temp_draw_result_folder):
        Create_clear_dir(temp_draw_result_folder)

    result = '' #结果初始化

    if engine_switch == 1:
        paddleOcr = ocr_engines[engine_switch] 
        results = paddleOcr.ocr(img_path, cls=True)  #识别图像----------------
        df0 = pd.DataFrame(data=results,columns=['pix','result'])
        df1 = pd.concat([pd.DataFrame(df0['pix'].values.tolist(),columns=['lu','ru','rd','ld']), pd.DataFrame(df0['result'].values.tolist(),columns=['content','trust'])], axis=1)
        title_list = ['lu', 'ru', 'rd', 'ld']
        df = df1[['content','trust']]
        for i, title in enumerate(title_list):
            df = pd.concat([df, pd.DataFrame(df1[title].values.tolist(), columns=[title + 'w', title + 'h'])], axis=1)

        if ocr_excel_out == True:
            df.to_excel(img_trans_xls_path, index=False)

        if draw_result_out == True:
            # draw result
            from PIL import Image
            image = Image.open(img_path).convert('RGB')
            boxes = [line[0] for line in result]
            txts = [line[1][0] for line in result]
            scores = [line[1][1] for line in result]
            im_show = draw_ocr(image, boxes, txts, scores, font_path='./fonts/simfang.ttf')
            im_show = Image.fromarray(im_show)
            if range_title =='':
                draw_result_name = 'draw_result_' + fr + ex
            else:
                draw_result_name = 'draw_result_' + fr + '_' + range_title + ex 
            draw_result_path = os.path.join(temp_draw_result_folder, draw_result_name)
            im_show.save(draw_result_path)
        
    elif engine_switch == 0 or engine_switch == 2:
        hubOcr = ocr_engines[engine_switch]
        img = cv_imread(img_path)
        np_images = [img]
#         np_images = [cv2.imdecode(np.fromfile(jpgfile, dtype=np.uint8), cv2.IMREAD_COLOR)]
        #---------使用识别引擎:
        hub_result = hubOcr.recognize_text(
            images=np_images,  # 图片数据,ndarray.shape 为 [H, W, C],BGR格式
            use_gpu=False,  # 是否使用 GPU。否即False,是即请先设置CUDA_VISIBLE_DEVICES环境变量
            output_dir=temp_draw_result_folder,  # 图片的保存路径
            visualization=True,  # 是否将识别结果保存为图片文件
            box_thresh=0.5,  # 检测文本框置信度的阈值
            text_thresh=0.5)  # 识别中文文本置信度的阈值
        results = hub_result[0]['data']
        df = pd.DataFrame()
        column_list = ['content','confdence','luw','luh','ruw','ruh','rdw','rdh','ldw','ldh']
        for infomation in results:
            content = infomation['text']
            confidence = infomation['confidence']
            box = infomation['text_box_position']
            luw,luh,ruw,ruh = box[0][0],box[0][1],box[1][0],box[1][1]
            rdw,rdh,ldw,ldh = box[2][0],box[2][1],box[3][0],box[3][1]
            line = [content,confidence,luw,luh,ruw,ruh,rdw,rdh,ldw,ldh]
            line_df = pd.DataFrame(data = line,index = column_list).T
            if df.empty:
                df = line_df
            else:
                df = pd.concat([df, line_df], axis=0, ignore_index=True)
        if ocr_excel_out == True:
            df.to_excel(img_trans_xls_path, index = False)
    return df


# 识别发票二维码信息
def Crop_known_from_qrcode(file_path) ->dict:
    known_dict = {
   } #返回值初始化
    pr,nm,fr,ex = pathsplit(file_path)
    qrcode_folder_name = 'temp_crop_qrcode'
    qrcode_folder_path = os.path.join(temp_folder_path, qrcode_folder_name)
    if not os.path.exists(qrcode_folder_path):
        Create_clear_dir(qrcode_folder_path)
    qrcode_file_name = 'qrcode_' + nm
    qrcode_file_path = os.path.join(qrcode_folder_path, qrcode_file_name)
    qrcode_image_crop = Crop_qrcode_image(file_path, qrcode_file_path)  # -----------切割处理二维码图片

    qrcode_result = ''
    if qrcode_image_crop == True: #如果二维码切图返回为True
        qrcode_result = qrcode_recongnize(qrcode_file_path)    #------------二维码识别
    if len(qrcode_result) > 0:
        if len(qrcode_result) > 20:
            qrcode_list = qrcode_result.split(',') 
            for index, range_title in enumerate(['02代码','01票号','13合计税前','04日期']): #二维码各字段结果逐个赋值给knowndict
                known_dict[range_title] = pd.Series(data=qrcode_list[index+2],name = range_title)
    
    return known_dict



#切割二维码图片并放大像素
def Crop_qrcode_image(origin_file_path,crop_file_path):
    # 切割二维码图片
    result = False #结果初始化
    img_inv = cv_imread(origin_file_path)
    img_crop = img_inv[100:400, 50:350]  # h, w
    img_magnify = cv2.resize(img_crop, (1200, 1200))
    cv2.imencode('.jpg', img_magnify)[1].tofile(crop_file_path) 
    if os.path.exists(crop_file_path):
        result = True
    return result


# 二维码识别:
def qrcode_recongnize(file_path, method = 'cv2', drawframe = False, enhance=False): #method:pil or cv2
    pr = os.path.split(file_path)[0]
    nm = os.path.split(file_path)[1]
    output_img_path = os.path.join(pr, 'draw_qrcode_' + nm)
    
    #方式一:cv2 方式
    if method =='cv2':
        img = cv_imread(file_path)
        gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        barcodes =pyzbar.decode(gray_img)
    #     print(barcodes)
        barcodeData = ''
        if len(barcodes) >0 :
            for barcode in barcodes:
                # 提取条形码的边界框的位置
                # 画出图像中条形码的边界框
                (x, y, w, h) = barcode.rect
                cv2.rectangle(img, (x, y), (x + w, y + h), (255, 255, 0), 2)
                # 条形码数据为字节对象,所以如果我们想在输出图像上
                #  画出来,就需要先将它转换成字符串
                barcodeData = barcode.data.decode("utf-8")
                if len(barcodeData) > 20:
                    if drawframe == True:
                        from PIL import Image, ImageFont, ImageDraw
                        # 绘出图像上条形码的数据和条形码类型
                        barcodeType = barco

相关文章:

  • 3.2-A-L1-2-第15讲-冒泡排序 mochen @denglexi
  • 实验-基于ESP32-S3的敏捷快速原型开发
  • 创建一个MCP服务器,并在Cline中使用,增强自定义功能。
  • 形象生动讲解Linux 虚拟化 I/O
  • 【ComfyUI】[进阶工作流] 高级采样器与Refiner的工作流优化
  • python django
  • Nginx配置详解
  • 加入二极管的NE555 PWM 电路
  • 使用python实现线性回归
  • 修改DOSBox的窗口大小
  • 启动你的RocketMQ之旅(四)-Producer启动和发送流程(下)
  • 国产开源AI平台Cherry Studio详解:联网搜索升级与ChatBox对比指南
  • spring.profiles.active和spring.profiles.include的使用及区别说明
  • 基于html的俄罗斯方块小游戏(附程序)
  • MCAL-I/O驱动
  • 考研408数据结构第三章(栈、队列和数组)核心易错点深度解析
  • 01_NLP基础之文本处理的基本方法
  • 附录-Python — 包下载缓慢,配置下载镜像
  • 河南理工XCPC萌新选拔赛
  • SEO长尾词优化进阶法则
  • 企业网站的意思/广告投放这个工作难不难做
  • 优化网站建设公司/百度关键词规划师工具
  • 现在外国有哪个网站可以做卖东西/油烟机seo关键词
  • 云端物联网管理平台/宁波企业seo推广
  • 西班牙语 b2b网站开发/如何做推广呢
  • 2016企业网站建设合同/百度竞价代运营外包