当前位置: 首页 > news >正文

DeepSeek联网搜索

deepseek

  • 0、前言
  • 1、未联网
  • 2、联网
    • 2.1 SerpAPI
    • 2.2 SerpAPI+Deepseek

0、前言

为获取最新消息,需给deepseek联网

1、未联网

from dotenv import load_dotenv
from langchain_deepseek import ChatDeepSeek

load_dotenv()

# 1、模型
model = ChatDeepSeek(model="deepseek-chat")

resp = model.invoke("黄旭华先生何时逝世?")
print(resp.content)

运行结果:
在这里插入图片描述

2、联网

2.1 SerpAPI

SerpApi‌是一个提供搜索引擎结果页面(SERP)数据的API服务,它可以帮助用户轻松获取Google、Bing等搜索引擎的结果,而无需直接与搜索引擎进行交互。
SerpAPI文档
安装:

/usr/bin/python3 -m pip install  google-search-results

注意:

如果你通过 pip install serpapi 安装了名为 serpapi 的第三方库(非官方),它可能不包含 GoogleSearch 类。
官方库的 PyPI 包名是 google-search-results,但导入时仍然用 serpapi。

使用:
注册 SerpAPI 获取 API 密钥(免费套餐可用)

from serpapi import GoogleSearch
from dotenv import load_dotenv

# 配置参数
params = {
    "q": "Python tutorial",  # 搜索关键词
    "engine": "google",  # 搜索引擎(默认)
    "location": "北京",
    "api_key": "xxx",  # 替换为你的API密钥
    "google_domain": "google.com",
    "gl": "cn",  # 国家/地区(如中国)
    "hl": "zh-cn"  # 语言(中文简体)
}

# 执行搜索
client = GoogleSearch(params)
results = client.get_dict()

# 提取结果
for result in results["organic_results"]:
    print(f"标题: {result['title']}")
    print(f"链接: {result['link']}")
    print(f"摘要: {result['snippet']}\n")

2.2 SerpAPI+Deepseek

import os

from serpapi import GoogleSearch
from dotenv import load_dotenv
from typing import List, Optional
from langchain_core.prompts import PromptTemplate
from langchain_deepseek import ChatDeepSeek
from pydantic import BaseModel

load_dotenv()


class SearchResultItem(BaseModel):
    title: str #标题
    link: str  #链接
    snippet: str #摘要


class SearchResults(BaseModel):
    results: List[SearchResultItem]


def get_search_results(query: str) -> SearchResults:
    # 配置参数
    params = {
        "q": query,  # 搜索关键词
        "engine": "google",  # 搜索引擎(默认)
        "location": "北京",
        "api_key": os.environ["SERPAPI_KEY"],  # 替换为你的API密钥
        "google_domain": "google.com",
        "gl": "cn",  # 国家/地区(如中国)
        "hl": "zh-cn"  # 语言(中文简体)
    }

    # 执行搜索
    client = GoogleSearch(params)
    results = client.get_dict()

    # 提取结果
    search_results = SearchResults(results=[SearchResultItem(title=result['title'],
                                                             link=result['link'],
                                                             snippet=result['snippet'])
                                            for result in results["organic_results"]]
                                   )
    return search_results


# 1、模型
model = ChatDeepSeek(model="deepseek-chat")

# 2、提示词模版
prompt = (
        PromptTemplate.from_template("根据搜索引擎的结果,回答用户问题。用户问题:{question}")
        + "google搜索结果:{search_results}"
)

# 3、
chain = {
            "question": lambda x: x,
            "search_results": lambda x: get_search_results(x)
        } | prompt | model
resp = chain.invoke(input="黄旭华先生何时逝世?")
print(resp.content)

因serpapi.com网站验证不了手机号获取不了apikey,故用curl验证结果
各种api开发文档

import requests
from typing import List, Optional
from pydantic import BaseModel


class SearchResultItem(BaseModel):
    title: str  # 标题
    link: str  # 链接
    snippet: str  # 摘要


class SearchResults(BaseModel):
    results: List[SearchResultItem]


def get_search_results_for_test(query) -> SearchResults:
    url = "https://api.acedata.cloud/serp/google"

    headers = {
        "authorization": "Bearer f8ddaed2e30345948bc550d9caabb783",
        "accept": "application/json",
        "content-type": "application/json"
    }

    payload = {
        "query": query,
        "language": "zh-cn",
        "country": "cn",
        "page": 1,
        "number": 1
    }

    response = requests.post(url, json=payload, headers=headers).json()
    print(response)

    # 提取结果
    search_results = SearchResults(results=[SearchResultItem(title=result['title'],
                                                             link=result['link'],
                                                             snippet=result['snippet'])
                                            for result in response["organic"]]
                                   )
    print(f"====serpapi response: {search_results}=======")
    return search_results


if __name__ == '__main__':
    d = get_search_results_for_test("黄旭华先生何时逝世?")

运行结果:
在这里插入图片描述
替换chain中的get_search_results方法:

import os
import requests
from serpapi import GoogleSearch
from dotenv import load_dotenv
from typing import List, Optional
from langchain_core.prompts import PromptTemplate
from langchain_deepseek import ChatDeepSeek
from pydantic import BaseModel
from langchain.callbacks import StdOutCallbackHandler, FileCallbackHandler

load_dotenv()


class SearchResultItem(BaseModel):
    title: str  # 标题
    link: str  # 链接
    snippet: str  # 摘要


class SearchResults(BaseModel):
    results: List[SearchResultItem]


def get_search_results(query: str) -> SearchResults:
    # 配置参数
    params = {
        "query": query,  # 搜索关键词
        "engine": "google",  # 搜索引擎(默认)
        "location": "北京",
        "api_key": "f8ddaed2e30345948bc550d9caabb783",  # 替换为你的API密钥
        "google_domain": "google.com",
        "gl": "cn",  # 国家/地区(如中国)
        "hl": "zh-cn"  # 语言(中文简体)
    }

    # 执行搜索
    client = GoogleSearch(params)
    results = client.get_dict()
    print(results)

    # 提取结果
    search_results = SearchResults(results=[SearchResultItem(title=result['title'],
                                                             link=result['link'],
                                                             snippet=result['snippet'])
                                            for result in results["organic_results"]]
                                   )
    return search_results


def get_search_results_for_test(query) -> SearchResults:
    url = "https://api.acedata.cloud/serp/google"

    headers = {
        "authorization": "Bearer f8ddaed2e30345948bc550d9caabb783",
        "accept": "application/json",
        "content-type": "application/json"
    }

    payload = {
        "query": query,
        "language": "zh-cn",
        "country": "cn",
        "page": 1,
        "number": 1
    }

    response = requests.post(url, json=payload, headers=headers).json()

    # 提取结果
    search_results = SearchResults(results=[SearchResultItem(title=result['title'],
                                                             link=result['link'],
                                                             snippet=result['snippet'])
                                            for result in response["organic"]]
                                   )
    print(f"====serpapi response: {search_results}=======")
    return search_results


# 1、模型
model = ChatDeepSeek(model="deepseek-chat")

# 2、提示词模版
prompt = (
        PromptTemplate.from_template("根据搜索引擎的结果,回答用户问题。用户问题:{question}")
        + "google搜索结果:{search_results}"
)

# 3、
chain = {
            "question": lambda x: x,
            "search_results": lambda x: get_search_results_for_test(x)
        } | prompt | model

resp = chain.invoke(input="黄旭华先生何时逝世?")

# 打印 Chain 的组成结构
print(f"-----chain:{chain}-----------")  # 显示 Chain 类型和组件
print(f"====resp: {resp.content}=========")

运行结果如下:
在这里插入图片描述
注意:
chain的input参数需与lambda想对应,使用字典则如下:

chain = {
            "question": lambda x: x["question"],
            "search_results": lambda x: get_search_results_for_test(x["question"])
        } | prompt | model

resp = chain.invoke(input={"question": "黄旭华先生何时逝世?"})

相关文章:

  • Docker:3、在VSCode上安装并运行python程序或JavaScript程序
  • windows系统本地部署DeepSeek-R1全流程指南:Ollama+Docker+OpenWebUI
  • GitLab 概念
  • Python自动化测试
  • 【分布式理论12】事务协调者高可用:分布式选举算法
  • 详解Virtualhome环境搭建教程 | 智能体
  • ES12的逻辑操作符 ,数字分隔符,字符串的replaceAll,FinalizationRegistry的用法以及使用场景
  • transfmer学习认识
  • 【iOS】Blocks
  • Mysql-------事务
  • PWM(脉宽调制)技术详解:从基础到应用实践示例
  • 动态规划之背包问题
  • 正式页面开发-登录注册页面
  • 阿里云k8s服务部署操作一指禅
  • ECharts极简入门
  • 基于STM32设计的自动追光系统(系统资料)
  • 基于Chatbox AI部署Deepseek等模型
  • 环境变量1
  • 在项目中调用本地Deepseek(接入本地Deepseek)
  • 基于ffmpeg+openGL ES实现的视频编辑工具-字幕添加(六)
  • 十年磨一剑!上海科学家首次揭示宿主识别肠道菌群调控免疫新机制
  • 蚊媒传染病、手足口病……上海疾控发布近期防病提示
  • “异常”只停留在医院里,用艺术为“泡泡宝贝”加油
  • 安徽省委常委、合肥市委书记费高云卸任副省长职务
  • 中东睿评|特朗普中东三国行:喧嚣的形式与空洞的实质
  • 金正恩观摩朝鲜人民军各兵种战术综合训练