7.3. 数据监控与生成本地知识库
- 目的:监控新生成的小红书文案,记录每一次生成的小红书文案风格。后续根据输入topic,检索与某一topic有关的文案,可以根据先前的文案风格,生成类似风格的文案。
- 实现思路:
- 1.要实现文件监控功能,需要使用
watchdog
库。watchdog
是一个Python库,用于监控文件系统的变化。它提供了多种事件类型,如文件创建、修改、删除等,可以用来监控文件的变化。启动一个线程,实时监控xiaohongshu_drafts
目录下的文件变化,当有新文件生成时,调用process_new_file(file_path)
函数生成知识库。 - 2.
process_new_file(file_path)
函数读取新文件中的内容,并调用generate_knowledge_base()
函数对新生成的文案进行文本分割、对象转换、向量化等一系列操作来生成知识库。
- 代码实现:
'''
Author: yeffky
Date: 2025-02-12 13:29:31
LastEditTime: 2025-02-17 14:28:11
'''
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import FAISS
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer
from langchain.schema import Document
from text2vec import SentenceModel
import time
import os
class NewFileHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
file_path = event.src_path
process_new_file(file_path)
print(f"新增文件已加载: {file_path}")
def process_new_file(file_path):
loader = TextLoader(file_path, encoding="utf-8")
documents = loader.load()
print(type(documents[0]))
tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")
text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
tokenizer,
chunk_size=256,
chunk_overlap=0,
separators=['---']
)
text_chunks = text_splitter.split_text(documents[0].page_content)
chunk_docs = [Document(page_content=chunk) for chunk in text_chunks]
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh")
vector_store_dir = "./vector_store"
if os.path.exists(vector_store_dir):
vector_store = FAISS.load_local(vector_store_dir, embeddings, allow_dangerous_deserialization=True)
vector_store.add_documents(chunk_docs)
else:
vector_store = FAISS.from_documents(chunk_docs, embeddings)
vector_store.save_local(vector_store_dir)
def start_observer():
observer = Observer()
observer.schedule(NewFileHandler(), path="./xiaohongshu_drafts", recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
start_observer()
7.4. 数据分析
- 目的:根据先前爬取数据,与本地知识库联动,调用deepseek api完成小红书文案生成。
- 实现思路:
- 要通过deepseek生成文案首先需要构建prompt和提问词,首先从文件中加载prompt和提问词模板,然后将前期爬取的商品数据以及本地知识库中的文案数据作为输入,构建prompt和提问词。
- 代码实现:
'''
Author: yeffky
Date: 2025-02-11 11:17:04
LastEditTime: 2025-02-17 15:35:13
'''
import json
import os
import requests
from datetime import datetime
import random
from langchain import FAISS
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from text2vec import SentenceModel
os.environ['HF_ENDPOINT'] = 'hf-mirror.com'
today_date = datetime.now().strftime('%Y-%m-%d')
topic = "手机推荐"
def read_json_file(filename):
with open(f'data/{filename}', 'r', encoding='utf-8') as f:
return json.load(f)
def build_prompt(item):
with open('./docs/prompt.txt', 'r', encoding='utf-8') as f:
prompt = f.read()
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh")
vector_store = FAISS.load_local("./vector_store", embeddings, allow_dangerous_deserialization=True)
retrieved_docs = vector_store.similarity_search(topic, k=5)
random.shuffle(retrieved_docs)
selected_docs = retrieved_docs[:3]
return f"""{prompt},
{json.dumps(item, ensure_ascii=False, indent=2)}
**根据以下文案风格,做出创新**:
{selected_docs}
**注意**:
- 在结尾加入提示,数据截至当前日期:{today_date}
- 每一段内容使用 --- 进行分割
"""
def build_preset():
with open('./docs/preset.txt', 'r', encoding='utf-8') as f:
preset = f.read()
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-zh")
print("embeddings加载完毕")
vector_store = FAISS.load_local("./vector_store", embeddings, allow_dangerous_deserialization=True)
retrieved_docs = vector_store.similarity_search(topic, k=5)
random.shuffle(retrieved_docs)
selected_docs = retrieved_docs[:3]
preset += f"""\n **主题**:{topic}
**创新要求**:
- 使用{random.choice(["轻松幽默", "专业严谨", "犀利吐槽"])}的语气
- 加入{["emoji表情", "热门梗", "互动提问"]}元素
"""
print(preset)
return preset
def get_deepseek_response(preset, prompt, api_key):
url = "https://api.deepseek.com/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
'Content-Type': 'application/json',
'Accept': 'application/json',
}
payload = json.dumps({
"messages": [
{
"content": preset,
"role": "system"
},
{
"content": prompt,
"role": "user"
}
],
"model": "deepseek-reasoner",
"frequency_penalty": 0,
"max_tokens": 2048,
"presence_penalty": 0,
"response_format": {
"type": "text"
},
"stop": None,
"stream": False,
"stream_options": None,
"temperature": 1,
"top_p": 1,
"tools": None,
"tool_choice": "none",
"logprobs": False,
"top_logprobs": None
})
response = None
while not response:
try:
response = requests.post(url, data=payload, headers=headers, timeout=100)
response.raise_for_status()
if not response.json():
response = None
print("没有收到响应,重试中...")
else:
print("收到响应,内容为:\n" + response.json()['choices'][0]['message']['content'])
except requests.exceptions.RequestException as e:
print(f"请求失败:{str(e)}")
response = None
return response.json()['choices'][0]['message']['content']
def save_copywriting(content):
base_path = f'./xiaohongshu_drafts/'
filename = f"小红书_推广文案_千战系列" + today_date + ".txt"
print(content)
with open(base_path + filename, 'w', encoding='utf-8') as f:
f.write(content)
print(f"文案已保存至:{filename}")
def analysis_data():
API_KEY = os.getenv("DEEPSEEK_API_KEY")
JSON_FILE = f'goods_{today_date}.json'
items = read_json_file(JSON_FILE)
print(f"正在处理:{JSON_FILE}")
prompt = build_prompt(items)
preset = build_preset()
try:
response = get_deepseek_response(preset, prompt, API_KEY)
save_copywriting(response)
except Exception as e:
print(f"处理失败:{str(e)}")
if __name__ == "__main__":
analysis_data()