DeepSeek联网搜索
deepseek
- 0、前言
- 1、未联网
- 2、联网
- 2.1 SerpAPI
- 2.2 SerpAPI+Deepseek
0、前言
为获取最新消息,需给deepseek联网
1、未联网
from dotenv import load_dotenv
from langchain_deepseek import ChatDeepSeek
load_dotenv()
# 1、模型
model = ChatDeepSeek(model="deepseek-chat")
resp = model.invoke("黄旭华先生何时逝世?")
print(resp.content)
运行结果:
2、联网
2.1 SerpAPI
SerpApi是一个提供搜索引擎结果页面(SERP)数据的API服务,它可以帮助用户轻松获取Google、Bing等搜索引擎的结果,而无需直接与搜索引擎进行交互。
SerpAPI文档
安装:
/usr/bin/python3 -m pip install google-search-results
注意:
如果你通过 pip install serpapi 安装了名为 serpapi 的第三方库(非官方),它可能不包含 GoogleSearch 类。
官方库的 PyPI 包名是 google-search-results,但导入时仍然用 serpapi。
使用:
注册 SerpAPI 获取 API 密钥(免费套餐可用)
from serpapi import GoogleSearch
from dotenv import load_dotenv
# 配置参数
params = {
"q": "Python tutorial", # 搜索关键词
"engine": "google", # 搜索引擎(默认)
"location": "北京",
"api_key": "xxx", # 替换为你的API密钥
"google_domain": "google.com",
"gl": "cn", # 国家/地区(如中国)
"hl": "zh-cn" # 语言(中文简体)
}
# 执行搜索
client = GoogleSearch(params)
results = client.get_dict()
# 提取结果
for result in results["organic_results"]:
print(f"标题: {result['title']}")
print(f"链接: {result['link']}")
print(f"摘要: {result['snippet']}\n")
2.2 SerpAPI+Deepseek
import os
from serpapi import GoogleSearch
from dotenv import load_dotenv
from typing import List, Optional
from langchain_core.prompts import PromptTemplate
from langchain_deepseek import ChatDeepSeek
from pydantic import BaseModel
load_dotenv()
class SearchResultItem(BaseModel):
title: str #标题
link: str #链接
snippet: str #摘要
class SearchResults(BaseModel):
results: List[SearchResultItem]
def get_search_results(query: str) -> SearchResults:
# 配置参数
params = {
"q": query, # 搜索关键词
"engine": "google", # 搜索引擎(默认)
"location": "北京",
"api_key": os.environ["SERPAPI_KEY"], # 替换为你的API密钥
"google_domain": "google.com",
"gl": "cn", # 国家/地区(如中国)
"hl": "zh-cn" # 语言(中文简体)
}
# 执行搜索
client = GoogleSearch(params)
results = client.get_dict()
# 提取结果
search_results = SearchResults(results=[SearchResultItem(title=result['title'],
link=result['link'],
snippet=result['snippet'])
for result in results["organic_results"]]
)
return search_results
# 1、模型
model = ChatDeepSeek(model="deepseek-chat")
# 2、提示词模版
prompt = (
PromptTemplate.from_template("根据搜索引擎的结果,回答用户问题。用户问题:{question}")
+ "google搜索结果:{search_results}"
)
# 3、
chain = {
"question": lambda x: x,
"search_results": lambda x: get_search_results(x)
} | prompt | model
resp = chain.invoke(input="黄旭华先生何时逝世?")
print(resp.content)
因serpapi.com网站验证不了手机号获取不了apikey,故用curl验证结果
各种api开发文档
import requests
from typing import List, Optional
from pydantic import BaseModel
class SearchResultItem(BaseModel):
title: str # 标题
link: str # 链接
snippet: str # 摘要
class SearchResults(BaseModel):
results: List[SearchResultItem]
def get_search_results_for_test(query) -> SearchResults:
url = "https://api.acedata.cloud/serp/google"
headers = {
"authorization": "Bearer f8ddaed2e30345948bc550d9caabb783",
"accept": "application/json",
"content-type": "application/json"
}
payload = {
"query": query,
"language": "zh-cn",
"country": "cn",
"page": 1,
"number": 1
}
response = requests.post(url, json=payload, headers=headers).json()
print(response)
# 提取结果
search_results = SearchResults(results=[SearchResultItem(title=result['title'],
link=result['link'],
snippet=result['snippet'])
for result in response["organic"]]
)
print(f"====serpapi response: {search_results}=======")
return search_results
if __name__ == '__main__':
d = get_search_results_for_test("黄旭华先生何时逝世?")
运行结果:
替换chain中的get_search_results方法:
import os
import requests
from serpapi import GoogleSearch
from dotenv import load_dotenv
from typing import List, Optional
from langchain_core.prompts import PromptTemplate
from langchain_deepseek import ChatDeepSeek
from pydantic import BaseModel
from langchain.callbacks import StdOutCallbackHandler, FileCallbackHandler
load_dotenv()
class SearchResultItem(BaseModel):
title: str # 标题
link: str # 链接
snippet: str # 摘要
class SearchResults(BaseModel):
results: List[SearchResultItem]
def get_search_results(query: str) -> SearchResults:
# 配置参数
params = {
"query": query, # 搜索关键词
"engine": "google", # 搜索引擎(默认)
"location": "北京",
"api_key": "f8ddaed2e30345948bc550d9caabb783", # 替换为你的API密钥
"google_domain": "google.com",
"gl": "cn", # 国家/地区(如中国)
"hl": "zh-cn" # 语言(中文简体)
}
# 执行搜索
client = GoogleSearch(params)
results = client.get_dict()
print(results)
# 提取结果
search_results = SearchResults(results=[SearchResultItem(title=result['title'],
link=result['link'],
snippet=result['snippet'])
for result in results["organic_results"]]
)
return search_results
def get_search_results_for_test(query) -> SearchResults:
url = "https://api.acedata.cloud/serp/google"
headers = {
"authorization": "Bearer f8ddaed2e30345948bc550d9caabb783",
"accept": "application/json",
"content-type": "application/json"
}
payload = {
"query": query,
"language": "zh-cn",
"country": "cn",
"page": 1,
"number": 1
}
response = requests.post(url, json=payload, headers=headers).json()
# 提取结果
search_results = SearchResults(results=[SearchResultItem(title=result['title'],
link=result['link'],
snippet=result['snippet'])
for result in response["organic"]]
)
print(f"====serpapi response: {search_results}=======")
return search_results
# 1、模型
model = ChatDeepSeek(model="deepseek-chat")
# 2、提示词模版
prompt = (
PromptTemplate.from_template("根据搜索引擎的结果,回答用户问题。用户问题:{question}")
+ "google搜索结果:{search_results}"
)
# 3、
chain = {
"question": lambda x: x,
"search_results": lambda x: get_search_results_for_test(x)
} | prompt | model
resp = chain.invoke(input="黄旭华先生何时逝世?")
# 打印 Chain 的组成结构
print(f"-----chain:{chain}-----------") # 显示 Chain 类型和组件
print(f"====resp: {resp.content}=========")
运行结果如下:
注意:
chain的input参数需与lambda想对应,使用字典则如下:
chain = {
"question": lambda x: x["question"],
"search_results": lambda x: get_search_results_for_test(x["question"])
} | prompt | model
resp = chain.invoke(input={"question": "黄旭华先生何时逝世?"})