文章目录
- 1. 说明
- 2. 准备工作
- 3. 代码
-
- 3.1 导入库:
- 3.2 遍历发票指定处理方式
- 3.3 发票识别相关函数
- 3.4 发票字段定位函数
- 3.6 识别记录相关函数
- 3.6 识别结果校验
- 3.7 文件预处理等其他函数
- 3.8 main主函数
1. 说明
1.1 以paddle识别引擎为基础的增值税发票识别程序,可批量识别和累积纸质发票和电子发票数据。已经生产环境中测试。
1.2 识别的源发票数据:
- 文件夹中存放的用高速连续发票扫描仪批量扫描的JPG格式图片
- 文件夹中汇集的电子发票PDF格式文件
1.3 可选择用识别引擎:快速-mb 平衡:sv 精细-pp (总体上,预识别用mb,精细用pd,速度和精确度比较好。
1.4 适配断续工作,跳过已扫描的重复发票,边识别边存储。
1.5 可装在闲置低配置的win7老台式,资源利用,识别速度视电脑配置差异大概2-3秒一张。
1.6 在实际生产环境中测试,如果纸质发票不清晰,综合识别准确率大概85%-95%左右。如果数电发票比较多,识别准确率大概达到97%以上。
1.7 对于识别有误或缺失的数据,在结果中提示错误并链接原发票文件,以便人工直接对照修改。
1.8 其他:
- 公司名称税号可在代码中预置设定好,位置在发票字段定位函数Loc_range_content_pandas。
- 可自行预置对方公司名称错误的更正,详细可在Check_result函数中此处文字内容"字段修正:公司名错别字"所在位置的字典修改。
2. 准备工作
2.1 准备工作发票电子文件夹:已用高速连续发票扫描仪扫描完纸质发票的图片文件夹,和已汇集的电子发票PDF格式文件夹。
2.2 安装好辅助程序 acrobat pro dc
2.3 语言环境 anaconda,python3.7(虚拟环境)
2.4 环境中安装好所需要的库(自行安装好虚拟环境中所需的第三方库):
imghdr, shutil, glob, pathlib, tkinter, cv2, numpy, paddlehub, pandas, psutil, openpyxl, paddleocr, pillow, pyzbar, ZipFile, pymupdf
3. 代码
3.1 导入库:
import imghdr
import math
import os
import re
import shutil
from collections import OrderedDict
from datetime import datetime
from glob import glob
from pathlib import Path
from tkinter import filedialog
from tkinter import Tk
import cv2
import numpy as np
import paddlehub as hub
import pandas as pd
import psutil
from openpyxl import cell, load_workbook
from openpyxl.styles import Font, colors
from paddleocr import PaddleOCR, draw_ocr
from PIL import Image, ImageDraw, ImageEnhance, ImageFont
from pyzbar import pyzbar
from zipfile import ZipFile
import fitz
3.2 遍历发票指定处理方式
def walk_folder_ocr(origin_pandas,duplicate_pandas,origin_folder_path,**walk_folder_args):
ocr_engines = walk_folder_args['ocr_engines']
temp_folder_path = walk_folder_args['temp_folder_path']
prepare_engine = walk_folder_args['engine_switch']
result_pandas = origin_pandas
cnt_file = len({
p.resolve() for p in Path(origin_folder_path).glob("*") if p.suffix in [".jpg", ".pdf"]})
inv_dict = {
}
if not result_pandas.empty:
for i, (index, row) in enumerate(result_pandas.iterrows()):
if row['01票号'] is np.NAN:
continue
if row['01票号'] not in inv_dict:
inv_dict[row['01票号']] = [row['file_path']]
else:
inv_dict[row['01票号']].append(row['file_path'])
if not duplicate_pandas.empty:
for i, (index, row) in enumerate(duplicate_pandas.iterrows()):
if row['重复票号'] is np.NAN:
continue
if row['重复票号'] not in inv_dict:
inv_dict[row['重复票号']] = [row['file_path']]
else:
inv_dict[row['重复票号']].append(row['file_path'])
cnt_done = 0
cnt_duplicate = 0
if not origin_pandas.empty:
cnt_done = len(origin_pandas.loc[origin_pandas['file_path'].notnull(),:])
if not duplicate_pandas.empty:
cnt_duplicate = len(duplicate_pandas.loc[duplicate_pandas['file_path'].notnull(),:])
for file_name in os.listdir(origin_folder_path):
file_path = os.path.join(origin_folder_path, file_name)
if os.path.isfile(file_path):
pr,nm,fr,ex = pathsplit(file_path)
if ex not in ['.pdf','.jpg']:
continue
inv_out_of_result_pandas = True
inv_out_of_duplicate_pandas = True
try:
inv_out_of_result_pandas = result_pandas.loc[result_pandas['file_path']==file_path,:].empty
inv_out_of_duplicate_pandas = duplicate_pandas.loc[duplicate_pandas['file_path']==file_path,:].empty
except:
pass
if not(inv_out_of_result_pandas and inv_out_of_duplicate_pandas):
continue
result_series_orderdic = OrderedDict()
err_info = ''
if ex == '.pdf':
inv_code = ''
pdf_trans_file_fr = fr
pdf_trans_file_ex = '.xlsx'
pdf_trans_file_nm = pdf_trans_file_fr + pdf_trans_file_ex
pdf_trans_folder_name = 'temp_pdf_trans_excel'
pdf_trans_folder_path = os.path.join(temp_folder_path, pdf_trans_folder_name)
if not os.path.exists(pdf_trans_folder_path):
os.mkdir(pdf_trans_folder_path)
pdf_trans_file_path = os.path.join(pdf_trans_folder_path, pdf_trans_file_nm)
if not os.path.exists(pdf_trans_file_path):
trans_type = '.xlsx'
pdf_trans_file_path = Pdf_tans_to(file_path, pdf_trans_file_path, trans_type = trans_type, temp_pdf_trans_excel_out = True)
if os.path.exists(pdf_trans_file_path):
result_series_orderdic, err_info, inv_dict = Tele_inv_ocr(ocr_engines, result_series_orderdic, inv_dict, file_path, pdf_trans_file_path, err_info, engine_switch = precise_engine)
if len(result_series_orderdic) != 0:
if '01票号' in result_series_orderdic:
inv_code = result_series_orderdic['01票号'][0].values[0]
if inv_code not in inv_dict:
inv_dict[inv_code] = [file_path]
else:
if file_path not in inv_dict[inv_code]:
inv_dict[inv_code].append(file_path)
if len(inv_dict[inv_code]) > 1:
if duplicate_pandas.empty:
duplicate_pandas = pd.DataFrame(data={
'重复票号':[inv_code],'file_path':[file_path]})
else:
duplicate_pandas = pd.concat([duplicate_pandas, pd.DataFrame(data={
'重复票号':[inv_code],'file_path':[file_path]})], ignore_index = True, axis = 0)
Log_result_file(duplicate_pandas,result_file_path,duplicate_sheet_name)
cnt_duplicate = cnt_duplicate + 1
print(datetime.now().strftime("%H:%M:%S"),file_path, 'Skip. ','\n\t\tDuplicate:', inv_code,inv_dict[inv_code][0])
continue
else:
pdf_trans_file_ex = '.jpg'
pdf_trans_file_nm = pdf_trans_file_fr + '.jpg'
pdf_trans_folder_name = 'temp_pdf_trans_jpg'
pdf_trans_folder_path = os.path.join(temp_folder_path, pdf_trans_folder_name)
pdf_trans_jpg_file_path = os.path.join(pdf_trans_folder_path, pdf_trans_file_nm)
pdf_trans_jpg_file_path = Pdf_tans_jpg(file_path, pdf_trans_jpg_file_path, temp_pdf_trans_jpg_out = True)
if len(pdf_trans_jpg_file_path)>0:
if os.path.exists(pdf_trans_jpg_file_path):
print('\n\nPDF转成图片识别:',pdf_trans_jpg_file_path,'【此模块待添加。】\n\n')
elif str.lower(ex) == '.jpg':
known_dict = {
}
inv_code =''
temp_img_trans_excel_folder = os.path.join(temp_folder_path,'temp_img_trans_excel')
img_trans_xls_name = 'result_' + fr + '.xlsx'
img_trans_xls_path = os.path.join(temp_img_trans_excel_folder, img_trans_xls_name)
if os.path.exists(img_trans_xls_path):
origin_df = pd.read_excel(img_trans_xls_path, sheet_name=0,header=0,index_col=0,na_values=None, keep_default_na=False, dtype=object)
else:
known_dict = Crop_known_from_qrcode(file_path)
if len(known_dict)>0:
inv_code = known_dict['01票号'].values[0]
if inv_code not in inv_dict:
inv_dict[inv_code] = [file_path]
else:
if file_path not in inv_dict[inv_code]:
inv_dict[inv_code].append(file_path)
if len(inv_dict[inv_code]) > 1:
if duplicate_pandas.empty:
duplicate_pandas = pd.DataFrame(data={
'重复票号':[inv_code],'file_path':[file_path]})
else:
duplicate_pandas = pd.concat([duplicate_pandas, pd.DataFrame(data={
'重复票号':[inv_code],'file_path':[file_path]})], ignore_index = True, axis = 0)
Log_result_file(duplicate_pandas,result_file_path,duplicate_sheet_name)
cnt_duplicate = cnt_duplicate + 1
print(datetime.now().strftime("%H:%M:%S"),file_path, 'Skip. ','\n\t\tDuplicate:', inv_code,inv_dict[inv_code][0])
continue
origin_df = Ocr_func(ocr_engines, img_path = file_path, temp_folder_path = temp_folder_path,
range_title = '', known_dict=known_dict, ocr_excel_out = ocr_excel_out, draw_result_out = draw_result_out, engine_switch=prepare_engine)
if not origin_df.empty:
result_series_orderdic, err_info = Loc_range_content_pandas(ocr_engines, origin_df, result_series_orderdic, err_info, known_dict, file_path, temp_folder_path, enhance = enhance, engine_switch=precise_engine)
if len(result_series_orderdic['01票号']) > 0:
inv_code = result_series_orderdic['01票号'].values[0]
if inv_code not in inv_dict:
inv_dict[inv_code] = [file_path]
else:
if file_path not in inv_dict[inv_code]:
inv_dict[inv_code].append(file_path)
if len(inv_code)>0 and inv_code in inv_dict and len(inv_dict[inv_code]) >1:
if duplicate_pandas.empty:
duplicate_pandas = pd.DataFrame(data={
'重复票号':[inv_code],'file_path':[file_path]})
else:
duplicate_pandas = pd.concat([duplicate_pandas, pd.DataFrame(data={
'重复票号':[inv_code],'file_path':[file_path]})], ignore_index = True, axis = 0)
Log_result_file(duplicate_pandas,result_file_path,duplicate_sheet_name)
cnt_duplicate = cnt_duplicate + 1
print(datetime.now().strftime("%H:%M:%S"),file_path, 'Skip. ','\n\t\tDuplicate:', inv_code,inv_dict[inv_code][0])
continue
bind_df = pd.DataFrame([result_series_orderdic[series_title][0] if isinstance(result_series_orderdic[series_title], list) else result_series_orderdic[series_title] for series_title in result_series_orderdic]).T
columns_list = ['01票号','02代码','03日期','04购方','05购方税号','06品名','07单位','08数量','09单价','10税前',
'11税率','12税额','13合计税前','14合计税额','15总额','16大写','17销方','18销方税号']
if len(bind_df) == 0:
bind_df = pd.DataFrame(columns = columns_list)
result_df = bind_df.copy()
result_df['file_path'] = ''
if len(result_df) == 0:
result_df = result_df.append({
'file_path':file_path},ignore_index = True)
else:
result_df['file_path'].values[0] = file_path
result_df['err_info'] = ''
result_df.loc[result_df.index[0],'err_info'] = err_info
result_df = Fill_na_result(result_df)
if result_pandas.empty:
result_pandas = result_df
else:
result_pandas = pd.concat([result_pandas, result_df], ignore_index = True, axis = 0)
result_pandas = Check_result(result_pandas)
Log_result_file(result_pandas,result_file_path,result_sheet_name)
Add_hyperlink(result_file_path,result_sheet_name)
cnt_done = cnt_done + 1
print(datetime.now().strftime("%H:%M:%S"),file_name, inv_code,'done: ' + str(cnt_done) + ' / ' + str(cnt_file))
return result_pandas,duplicate_pandas
3.3 发票识别相关函数
def Ocr_func(ocr_engines, img_path, temp_folder_path, range_title='', known_dict = {
}, ocr_excel_out = True, draw_result_out = False, engine_switch = 0) ->object:
p,n,fr,ex = pathsplit(img_path)
temp_img_trans_excel_folder = os.path.join(temp_folder_path,'temp_img_trans_excel')
temp_draw_result_folder = os.path.join(temp_folder_path,'temp_draw_result')
if engine_switch == 0:
engine = 'mb'
elif engine_switch == 1:
engine = 'pp'
elif engine_switch == 2:
engine = 'sv'
if range_title =='':
img_trans_xls_name = 'result(' + engine + ')_' + fr + '.xlsx'
else:
img_trans_xls_name = 'result(' + engine + ')_' + fr + '_' + range_title + '.xlsx'
img_trans_xls_path = os.path.join(temp_img_trans_excel_folder, img_trans_xls_name)
if not os.path.exists(temp_img_trans_excel_folder):
Create_clear_dir(temp_img_trans_excel_folder)
if not os.path.exists(temp_draw_result_folder):
Create_clear_dir(temp_draw_result_folder)
result = ''
if engine_switch == 1:
paddleOcr = ocr_engines[engine_switch]
results = paddleOcr.ocr(img_path, cls=True)
df0 = pd.DataFrame(data=results,columns=['pix','result'])
df1 = pd.concat([pd.DataFrame(df0['pix'].values.tolist(),columns=['lu','ru','rd','ld']), pd.DataFrame(df0['result'].values.tolist(),columns=['content','trust'])], axis=1)
title_list = ['lu', 'ru', 'rd', 'ld']
df = df1[['content','trust']]
for i, title in enumerate(title_list):
df = pd.concat([df, pd.DataFrame(df1[title].values.tolist(), columns=[title + 'w', title + 'h'])], axis=1)
if ocr_excel_out == True:
df.to_excel(img_trans_xls_path, index=False)
if draw_result_out == True:
from PIL import Image
image = Image.open(img_path).convert('RGB')
boxes = [line[0] for line in result]
txts = [line[1][0] for line in result]
scores = [line[1][1] for line in result]
im_show = draw_ocr(image, boxes, txts, scores, font_path='./fonts/simfang.ttf')
im_show = Image.fromarray(im_show)
if range_title =='':
draw_result_name = 'draw_result_' + fr + ex
else:
draw_result_name = 'draw_result_' + fr + '_' + range_title + ex
draw_result_path = os.path.join(temp_draw_result_folder, draw_result_name)
im_show.save(draw_result_path)
elif engine_switch == 0 or engine_switch == 2:
hubOcr = ocr_engines[engine_switch]
img = cv_imread(img_path)
np_images = [img]
hub_result = hubOcr.recognize_text(
images=np_images,
use_gpu=False,
output_dir=temp_draw_result_folder,
visualization=True,
box_thresh=0.5,
text_thresh=0.5)
results = hub_result[0]['data']
df = pd.DataFrame()
column_list = ['content','confdence','luw','luh','ruw','ruh','rdw','rdh','ldw','ldh']
for infomation in results:
content = infomation['text']
confidence = infomation['confidence']
box = infomation['text_box_position']
luw,luh,ruw,ruh = box[0][0],box[0][1],box[1][0],box[1][1]
rdw,rdh,ldw,ldh = box[2][0],box[2][1],box[3][0],box[3][1]
line = [content,confidence,luw,luh,ruw,ruh,rdw,rdh,ldw,ldh]
line_df = pd.DataFrame(data = line,index = column_list).T
if df.empty:
df = line_df
else:
df = pd.concat([df, line_df], axis=0, ignore_index=True)
if ocr_excel_out == True:
df.to_excel(img_trans_xls_path, index = False)
return df
def Crop_known_from_qrcode(file_path) ->dict:
known_dict = {
}
pr,nm,fr,ex = pathsplit(file_path)
qrcode_folder_name = 'temp_crop_qrcode'
qrcode_folder_path = os.path.join(temp_folder_path, qrcode_folder_name)
if not os.path.exists(qrcode_folder_path):
Create_clear_dir(qrcode_folder_path)
qrcode_file_name = 'qrcode_' + nm
qrcode_file_path = os.path.join(qrcode_folder_path, qrcode_file_name)
qrcode_image_crop = Crop_qrcode_image(file_path, qrcode_file_path)
qrcode_result = ''
if qrcode_image_crop == True:
qrcode_result = qrcode_recongnize(qrcode_file_path)
if len(qrcode_result) > 0:
if len(qrcode_result) > 20:
qrcode_list = qrcode_result.split(',')
for index, range_title in enumerate(['02代码','01票号','13合计税前','04日期']):
known_dict[range_title] = pd.Series(data=qrcode_list[index+2],name = range_title)
return known_dict
def Crop_qrcode_image(origin_file_path,crop_file_path):
result = False
img_inv = cv_imread(origin_file_path)
img_crop = img_inv[100:400, 50:350]
img_magnify = cv2.resize(img_crop, (1200, 1200))
cv2.imencode('.jpg', img_magnify)[1].tofile(crop_file_path)
if os.path.exists(crop_file_path):
result = True
return result
def qrcode_recongnize(file_path, method = 'cv2', drawframe = False, enhance=False):
pr = os.path.split(file_path)[0]
nm = os.path.split(file_path)[1]
output_img_path = os.path.join(pr, 'draw_qrcode_' + nm)
if method =='cv2':
img = cv_imread(file_path)
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
barcodes =pyzbar.decode(gray_img)
barcodeData = ''
if len(barcodes) >0 :
for barcode in barcodes:
(x, y, w, h) = barcode.rect
cv2.rectangle(img, (x, y), (x + w, y + h), (255, 255, 0), 2)
barcodeData = barcode.data.decode("utf-8")
if len(barcodeData) > 20:
if drawframe == True:
from PIL import Image, ImageFont, ImageDraw
barcodeType = barco