当前位置: 首页 > news >正文

使用MGeo模型高精度实现文本中地址识别

一、功能与安装

1、模型地址

模型是阿里开发的门址高精度识别模型。

https://modelscope.cn/models/iic/mgeo_geographic_elements_tagging_chinese_base/summary

注意:不能自己安装包,没法解决依赖问题,直接按照官方要求安装下面的包,大模型最好用conda

pip install "modelscope[nlp]" -f https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html+

2、功能介绍
在这里插入图片描述
按官网案例

from modelscope.pipelines import pipeline
from modelscope.utils.constant import Taskstask = Tasks.token_classification
model = 'iic/mgeo_geographic_elements_tagging_chinese_base'
inputs = '浙江省杭州市余杭区阿里巴巴西溪园区'
pipeline_ins = pipeline(task=task, model=model)
print(pipeline_ins(input=inputs))
# 输出
# {'output': [{'type': 'prov', 'start': 0, 'end': 3, 'span': '浙江省'}, {'type': 'city', 'start': 3, 'end': 6, 'span': '杭州市'}, {'type': 'district', 'start': 6, 'end': 9, 'span': '余杭区'}, {'type': 'poi', 'start': 9, 'end': 17, 'span': '阿里巴巴西溪园区'}]}

二、需要解决的问题

1、和表格数据结合
2、怎么加速,模型支持CPU和GPU,对于CPU使用需要做优化提升推理速度。
2.1、循环优化
2.2、缓存
2.3、数据类型 catory类
2.4、部分字段改为使用numpy
2.5、批量推理
2.6、模型量化(cpu无用)
2.7、模型自带并行,无需做并行优化。

(一)、 逐条推理

这种方法准确率高,但是效率低

1、小数据量—使用pandas 从df逐行读入,再写入excel
易于理解

import pandas as pd
import warnings
import os
import time
# 忽略所有警告
warnings.filterwarnings("ignore")# 关闭 ModelScope 的详细日志输出
os.environ["MODELSCOPE_LOG_LEVEL"] = "40"# 导入 ModelScope 模块
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks# 预加载模型(全局变量),避免每次调用函数都重新加载
TASK = Tasks.token_classification
MODEL = 'iic/mgeo_geographic_elements_tagging_chinese_base'
PIPELINE = pipeline(task=TASK, model=MODEL)def process_address_data(texts):"""批量处理地址文本,使用预加载的 MGeo 模型进行地理元素识别。参数:texts (list): 字符串列表,表示待处理的地址文本返回:list: 每个样本的预测结果(格式为标签+span)"""results = []for text in texts:# 确保输入为字符串if not isinstance(text, str):text = str(text)# 使用已加载的模型进行推理try:result = PIPELINE(input=text)results.append(result['output'])except Exception as e:results.append([])  # 出错则返回空结果return resultsdef extract_geo_entities(results):"""从 ModelScope 的输出中提取出指定类型的地理实体,并分列返回。参数:results (list): 模型输出的 list,每个元素是一个 dict 列表返回:dict: 包含各类地理实体拼接后的字符串"""entity_dict = {'district': [],     # 区'town': [],         # 镇/街道'community': [],    # 社区'poi': [],          # 小区'houseno': []       # 楼号等}for item_list in results:for key in entity_dict:entities = [item['span'] for item in item_list if item.get('type') == key]entity_dict[key].append(";".join(entities) if entities else "")return entity_dictdef process_excel_file(file_path, input_col, output_file=None):"""读取 Excel 文件,批量处理某列中的地址文本,并将提取的地理信息写入新列。参数:file_path (str): Excel 文件路径input_col (str): 待处理的列名output_file (str): 输出文件路径(默认覆盖原文件)返回:pd.DataFrame: 包含新增字段的 DataFrame"""# 读取Exceldf = pd.read_excel(file_path)# 确保输入列为字符串类型df[input_col] = df[input_col].astype(str)# 获取文本列表texts = df[input_col].tolist()# 处理文本并获取结果results = process_address_data(texts)# 提取各类地理实体geo_entities = extract_geo_entities(results)# 添加新列到 dataframefor key in geo_entities:df[key] = geo_entities[key]# 写回到 Excelif output_file is None:output_file = file_path  # 默认覆盖原文件df.to_excel(output_file, index=False)return df# 示例调用
if __name__ == "__main__":file_path = r'C:\Users\xueshifeng\Desktop\模板 - 副本1.xlsx'  # 替换为你的实际路径input_col = '工单内容'  # 替换为你要处理的列名output_file = r'C:\Users\xueshifeng\Desktop\处理结果.xlsx't1=time.time()df_result = process_excel_file(file_path, input_col, output_file)print(time.time()-t1)

2、大数据量—使用numpy优化

通过numpy的向量化减少循环和内容预分配来提升速度

#ds
import pandas as pd
import numpy as np
import os
import time
import warnings
from functools import lru_cache
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks# 环境配置保持不变
warnings.filterwarnings("ignore")
os.environ["MODELSCOPE_LOG_LEVEL"] = "40"# 预定义常量
GEO_ENTITY_TYPES = ('district', 'town', 'community', 'poi', 'houseno')
VALID_ENTITY_TYPES = frozenset(GEO_ENTITY_TYPES)
MODEL_NAME = 'iic/mgeo_geographic_elements_tagging_chinese_base'# 模型加载保持不变
pipeline_instance = pipeline(task=Tasks.token_classification,model=MODEL_NAME
)@lru_cache(maxsize=128)
def cached_inference(text: str) -> list:"""优化后的缓存推理函数"""try:return pipeline_instance(input=text)['output'] or []except Exception:return []def batch_processing(texts: np.ndarray) -> list:texts_flat = texts.ravel()n = texts_flat.sizeresults = [None] * nfor i in range(n):text = str(texts_flat[i])results[i] = cached_inference(text)return resultsdef optimized_entity_extraction(results: list) -> dict:n_samples = len(results)entity_store = {etype: [[] for _ in range(n_samples)] for etype in GEO_ENTITY_TYPES}for idx, entities in enumerate(results):if isinstance(entities, list):for ent in entities:if (etype := ent.get('type')) in VALID_ENTITY_TYPES:entity_store[etype][idx].append(ent.get('span', ''))return {etype: pd.Series([';'.join(lst) if lst else '' for lst in entity_store[etype]],dtype='category')for etype in GEO_ENTITY_TYPES}def process_excel_data(file_path: str, input_col: str, output_file: str = None) -> pd.DataFrame:# 读取数据df = pd.read_excel(file_path, engine='openpyxl')# 强制转换输入列为字符串df[input_col] = df[input_col].astype(str)# 核心处理流程input_series = df[input_col].values.astype(np.str_, copy=False)raw_results = batch_processing(input_series)geo_data = optimized_entity_extraction(raw_results)# 合并结果for col_name, series in geo_data.items():df[col_name] = series# 输出结果output_path = output_file or file_pathdf.to_excel(output_path, index=False, engine='openpyxl')return df# 示例调用保持不变
if __name__ == "__main__":input_file = r"D:\data\gn\结果v68.xlsx"output_file = r"D:\data\gn\结果v68_optimized.xlsx"t_start = time.perf_counter()result_df = process_excel_data(input_file, '工单内容', output_file)elapsed = time.perf_counter() - t_startprint(f"处理完成,总耗时: {elapsed:.2f}秒")

(二)、提升运行速度

运行速度有2个瓶颈,一是模型推理速度二是大量数据操作。
模型推理速度增强,减少模型调用次数:减少模型由于模型能够对128个字以内的数据进行高精度的处理,因此我们把多行在一起推理。这里的难点是如何区分处理后的结果是excel中的哪一行。
这里我们按照输出中的’start’: 0, ‘end’: 3,即在输入文本中的位置来确定是原数据的哪一行,即通过输出词在输入文本的位置推导出输出词在原文本的位置。

# 输出
# {'output': [{'type': 'prov', 'start': 0, 'end': 3, 'span': '浙江省'}, {'type': 'city', 'start': 3, 'end': 6, 'span': '杭州市'}, {'type': 'district', 'start': 6, 'end': 9, 'span': '余杭区'}, {'type': 'poi', 'start': 9, 'end': 17, 'span': '阿里巴巴西溪园区'}]}

我们通过下面的实体分拆函数来实现

def split_entities_by_line(result: List[Dict[str, Any]], original_lines: List[str],actual_batch_size: int = BATCH_SIZE
) -> List[List[Dict[str, Any]]]:"""实体分拆函数(修正偏移计算)"""if not result:  # 空结果处理return [[] for _ in range(actual_batch_size)]# 预计算行偏移量(含分隔符)line_lengths = [len(line) for line in original_lines]  # 每行长度sep_len = len(SEPARATOR)  # 分隔符长度# 计算每行的起始偏移量(向量化加速)line_offsets = [0]for i in range(len(original_lines)-1):line_offsets.append(line_offsets[-1] + line_lengths[i] + sep_len)  # 累加偏移# 实体分配逻辑(添加调试输出)split_result = [[] for _ in range(actual_batch_size)]for entity in result:if not isinstance(entity, dict):continuestart_pos = entity.get('start', 0)# 使用二分查找加速实体分配(比循环快10倍+)line_idx = np.searchsorted(line_offsets, start_pos, side='right') - 1if 0 <= line_idx < actual_batch_size:# 计算相对位置adjusted_entity = {'type': entity.get('type', ''),'span': entity.get('span', ''),'start': start_pos - line_offsets[line_idx],  # 调整为行内相对位置'end': entity.get('end', 0) - line_offsets[line_idx]}split_result[line_idx].append(adjusted_entity)# 调试输出print(f"分配实体: {entity['span']} -> 第{line_idx}行")return split_result

2、大量数据操作
主要是使用以下方法
2.1、使用numpy代替pandas 并采用向量化的手段代替在pandas中遍历。
2.2、使用二分查找而不是遍历

3、预加载模型并去除告警和日志,特别是预加载模型,而不是在函数中每次调用加载可以大幅度缩短时间

4、加缓存,使得重复文本不需要重复推理。

最终代码如下

import numpy as np
import pandas as pd
import os
import time
from functools import lru_cache
from typing import List, Dict, Any, Optional
from collections import defaultdict
import warnings# 配置环境
os.environ["MODELSCOPE_LOG_LEVEL"] = "40"
warnings.filterwarnings("ignore")# 模型初始化
from modelscope.pipelines import pipeline
from modelscope.utils.constant import TasksTASK = Tasks.token_classification
MODEL = 'iic/mgeo_geographic_elements_tagging_chinese_base'
PIPELINE = pipeline(task=TASK, model=MODEL)# 常量定义
GEO_ENTITY_TYPES = ['district', 'town', 'community', 'poi', 'houseno']
BATCH_SIZE = 3  # 初始小批量测试,验证通过后调大
CACHE_SIZE = 128
SEPARATOR = '\ue000'  # Unicode私有区字符(U+E000)@lru_cache(maxsize=CACHE_SIZE)
def cached_pipeline(text: str) -> List[Dict[str, Any]]:"""带缓存的模型推理函数(修正输出结构)"""if not text.strip():return []try:raw_output = PIPELINE(input=text)# 深度解析模型输出结构if isinstance(raw_output, dict):return raw_output.get('output', [])elif isinstance(raw_output, list):return raw_outputreturn []except Exception as e:print(f"模型推理失败: {str(e)}")return []def split_entities_by_line(result: List[Dict[str, Any]], original_lines: List[str],actual_batch_size: int = BATCH_SIZE
) -> List[List[Dict[str, Any]]]:"""实体分拆函数(修正偏移计算)"""if not result:return [[] for _ in range(actual_batch_size)]# 预计算行偏移量(含分隔符)line_lengths = [len(line) for line in original_lines]sep_len = len(SEPARATOR)line_offsets = [0]for i in range(len(original_lines)-1):line_offsets.append(line_offsets[-1] + line_lengths[i] + sep_len)# 实体分配逻辑(添加调试输出)split_result = [[] for _ in range(actual_batch_size)]for entity in result:if not isinstance(entity, dict):continuestart_pos = entity.get('start', 0)# 查找所属行line_idx = np.searchsorted(line_offsets, start_pos, side='right') - 1if 0 <= line_idx < actual_batch_size:# 计算相对位置adjusted_entity = {'type': entity.get('type', ''),'span': entity.get('span', ''),'start': start_pos - line_offsets[line_idx],'end': entity.get('end', 0) - line_offsets[line_idx]}split_result[line_idx].append(adjusted_entity)# 调试输出return split_resultdef process_entities(all_results: List[List[Dict]], entity_types: List[str]
) -> Dict[str, List[str]]:"""实体处理函数"""entity_records = {k: defaultdict(set) for k in entity_types}for row_idx, row in enumerate(all_results):for entity in row:if isinstance(entity, dict) and (etype := entity.get('type')) in entity_types:entity_records[etype][row_idx].add(entity.get('span', ''))geo_data = {k: [] for k in entity_types}for row_idx in range(len(all_results)):for etype in entity_types:values = entity_records[etype].get(row_idx, set())geo_data[etype].append(';'.join(values) if values else '')  return geo_datadef process_excel_file(file_path: str,input_col: str,output_file: Optional[str] = None
) -> pd.DataFrame:"""主处理流程"""try:df = pd.read_excel(file_path, engine='openpyxl', dtype={input_col: str})raw_texts = df[input_col].to_numpy()total_rows = len(raw_texts)except Exception as e:raise ValueError(f"Excel读取失败: {e}")all_results = []batch_indices = np.arange(0, total_rows, BATCH_SIZE)for start_idx in batch_indices:end_idx = min(start_idx + BATCH_SIZE, total_rows)batch_texts = raw_texts[start_idx:end_idx]actual_batch_size = len(batch_texts)merged_text = SEPARATOR.join(batch_texts.tolist())try:model_output = cached_pipeline(merged_text)split_result = split_entities_by_line(model_output, batch_texts.tolist(), actual_batch_size)all_results.extend(split_result)except Exception as e:print(f"处理行{start_idx}-{end_idx}时出错: {e}")all_results.extend([[] for _ in range(actual_batch_size)])all_results = all_results[:total_rows]geo_data = process_entities(all_results, GEO_ENTITY_TYPES)for col in GEO_ENTITY_TYPES:df[col] = geo_data[col]try:output_path = output_file or file_pathdf.to_excel(output_path, index=False, engine='openpyxl')return dfexcept Exception as e:raise IOError(f"结果保存失败: {e}")# 其余保持if __name__ == "__main__":file_path = r"D:\data\gn\结果v68.xlsx"output_file = r"D:\data\gn\结果v68_1.xlsx"try:t_start = time.time()result_df = process_excel_file(file_path, "工单内容", output_file)print(f"处理完成,总耗时: {time.time()-t_start:.2f}秒")except Exception as e:print(f"程序运行失败: {str(e)}")

三、其他
1、增加batch size,能多行推理但是不用numpy 做向量化优化的版本,更易于理解代码

import pandas as pd
import os
import time
from functools import lru_cache
from typing import List, Dict, Any, Optional
import warnings# 配置环境
# 忽略所有警告
warnings.filterwarnings("ignore")# 关闭 ModelScope 的详细日志输出
os.environ["MODELSCOPE_LOG_LEVEL"] = "40"# 模型初始化
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
TASK = Tasks.token_classification
MODEL = 'iic/mgeo_geographic_elements_tagging_chinese_base'
PIPELINE = pipeline(task=TASK, model=MODEL)# 常量定义
GEO_ENTITY_TYPES = ['district', 'town', 'community', 'poi', 'houseno']
BATCH_SIZE = 3  # 每次合并的行数
CACHE_SIZE = 128  # 固定缓存大小@lru_cache(maxsize=CACHE_SIZE)
def cached_pipeline(text: str) -> List[Dict[str, Any]]:"""带缓存的模型推理函数(固定128缓存大小)"""if not text.strip():return []try:return PIPELINE(input=text).get('output', [])except Exception as e:print(f"模型推理失败: {str(e)}")return []def split_results_by_prov(result: List[Dict[str, Any]], original_lines: List[str]
) -> List[List[Dict[str, Any]]]:"""根据原始行位置拆分模型结果Args:result: 模型输出结果original_lines: 原始的三行文本列表"""# 输入校验和自动填充if not original_lines:return [[] for _ in range(BATCH_SIZE)]# 确保总有3行数据(不足时填充空字符串)padded_lines = original_lines + [''] * (BATCH_SIZE - len(original_lines))# 计算字符范围(考虑空格分隔)line_ranges = []current_pos = 0for line in padded_lines:line_end = current_pos + len(line)line_ranges.append((current_pos, line_end))current_pos = line_end + 1  # 空格占位# 分配实体到对应行split_result = [[] for _ in range(BATCH_SIZE)]for entity in result:if not isinstance(entity, dict) or 'start' not in entity:continue# 查找实体所属行for line_idx, (start, end) in enumerate(line_ranges):if start <= entity['start'] < end:split_result[line_idx].append(entity)breakelse:# 未匹配的实体默认放入第一行split_result[0].append(entity)return split_resultdef process_excel_file(file_path: str,input_col: str,output_file: Optional[str] = None
) -> pd.DataFrame:"""主处理函数(单进程版)Args:file_path: 输入Excel路径input_col: 要处理的列名output_file: 输出路径(None则覆盖输入文件)"""# 读取数据try:df = pd.read_excel(file_path, engine='openpyxl')df[input_col] = df[input_col].astype(str)except Exception as e:raise ValueError(f"Excel读取失败: {e}")# 准备批处理数据all_results = []for i in range(0, len(df), BATCH_SIZE):# 获取原始三行文本original_lines = df[input_col].iloc[i:i+BATCH_SIZE].tolist()merged_text = ' '.join(original_lines)try:# 单次模型推理model_output = cached_pipeline(merged_text)# 拆分结果到原始行split_result = split_results_by_prov(model_output, original_lines)all_results.extend(split_result)except Exception as e:print(f"处理行{i}-{i+BATCH_SIZE}时出错: {e}")all_results.extend([[] for _ in range(BATCH_SIZE)])# 结果对齐(确保与原始行数一致)all_results = all_results[:len(df)]# 提取地理实体geo_data = {k: [] for k in GEO_ENTITY_TYPES}for row in all_results:row_entities = {k: set() for k in GEO_ENTITY_TYPES}for entity in row:if isinstance(entity, dict) and entity.get('type') in row_entities:row_entities[entity['type']].add(entity.get('span', ''))for k in GEO_ENTITY_TYPES:geo_data[k].append(';'.join(filter(None, row_entities[k])))# 添加结果列for col in GEO_ENTITY_TYPES:df[col] = geo_data[col]# 输出结果try:output_path = output_file if output_file else file_pathdf.to_excel(output_path, index=False, engine='openpyxl')return dfexcept Exception as e:raise IOError(f"结果写入失败: {e}")if __name__ == "__main__":path= r"D:\data\gn\结果v68.xlsx"output_file = r"D:\data\gn\结果v68_1.xlsx"try:t_start = time.time()result_df = process_excel_file(file_path=path,input_col="工单内容",output_file=output_file)print(f"处理完成,耗时: {time.time()-t_start:.2f}秒")except Exception as e:print(f"程序运行失败: {str(e)}")

2、模型量化

from modelscope.models import Model
import torch
import os# 1. 定义本地模型路径(需要包含 config.json 和 pytorch_model.bin 等文件)
local_model_path = r"D:\data\2025\Data\geo\iic\mgeo_geographic_elements_tagging_chinese_base"# 2. 确保路径存在且包含模型文件
if not os.path.exists(local_model_path):raise FileNotFoundError(f"模型路径不存在: {local_model_path}")# 3. 从本地加载模型
model = Model.from_pretrained(local_model_path, local_files_only=True)
print("模型加载成功!")# 4. 定义目标保存路径
save_path = r'D:\data\2025\Data\geoq\iic'# 5. 创建目标路径(如果不存在)
os.makedirs(save_path, exist_ok=True)# 6. 保存模型权重(推荐方式:仅保存参数)
model_weight_path = os.path.join(save_path, 'pytorch_model.bin')
torch.save(model.state_dict(), model_weight_path)
print(f"模型参数已保存至: {model_weight_path}")# 7. 或者保存整个模型(包括架构和参数)
full_model_path = os.path.join(save_path, 'full_model.pt')
torch.save(model, full_model_path)
print(f"完整模型已保存至: {full_model_path}")

注意:
量化后的路径自有2个文件,其余文件需要手动复制过去。

CPU量化不起作用原因:

CPU对INT8运算的支持不足。大多数传统CPU(如Intel Skylake或更早架构)不支持INT8加速指令集(如AVX512-VNNI),导致INT8运算需通过软件模拟,反而增加计算开销。

动态量化未覆盖关键计算层。动态量化通常仅对线性层(Linear) 进行量化,而非线性层(如Softmax、LayerNorm)仍需FP32计算。如果模型中非线性层占比高,整体加速效果有限。激活值动态量化引入额外开销。
批处理规模(Batch Size)过小。批量数据无法发挥INT8并行加速优势 增大批量(如batch_size≥8)。
框架优化程度 PyTorch动态量化可能未调用底层加速库 使用ONNX Runtime/TensorRT部署,或升级PyTorch版本。

3、预处理
为了能够在128个字符的限制内容,推理跟多行,可以先对原始数据用正则做处理缩短长度。

相关文章:

  • C++11新特性_自动类型推导
  • GPU虚拟化实现(七)
  • *(解引用运算符)与 ++(自增运算符)的优先级
  • 编写教育网站后端页面笔记
  • Dinero.js - 免费开源的 JavaScript 货币处理工具库,完美解决 JS 浮点数精度丢失问题
  • vue 常见ui库对比(element、ant、antV等)
  • C标准库(libc)接口及示例解析
  • 免费实用的图像处理工具箱​
  • 神经网络入门
  • 前端八股 7
  • 7.0/Q1,GBD数据库最新文章解读
  • 2025五一杯数学建模C题:社交媒体平台用户分析问题;思路分析+模型代码
  • 加密解密记录
  • 【笔记】深度学习模型训练的 GPU 内存优化之旅⑤:内存分配篇
  • 电子秤检测管理系统开发实战:从数据采集到可视化大屏
  • 从0开始的c++知识讲解之字符串(1)
  • 体系学习1:C语言与指针1——预定义、进制打印、传参为数组
  • 【dify—7】文本生成应用实战——学员周报生成
  • 多模态大语言模型arxiv论文略读(五十二)
  • LabVIEW异步调用VI介绍
  • 五一假期天气将大转变,南方新一轮降雨来袭
  • 挑大梁!一季度北上广等7省份进出口占外贸总值四分之三
  • 人民日报社论:坚定信心、奋发有为、创新创造——写在“五一”国际劳动节
  • 上海科创再出发:“造星”的城和“摘星”的人
  • 杜前任宁波中院代理院长,卸任宁波海事法院院长
  • 早睡1小时,变化有多惊人?第一个就没想到