自建本地DNS过滤系统:实现局域网广告和垃圾网站屏蔽
前言
在日常上网过程中,广告和恶意网站不仅影响浏览体验,还可能带来安全隐患。虽然浏览器插件可以部分解决问题,但对于家庭或小型办公室网络,搭建一个本地的DNS过滤系统可以为所有设备提供统一的保护。本文将介绍如何使用Python实现一个轻量级的DNS过滤代理服务器。
技术原理
DNS过滤原理
DNS过滤的核心思想是在DNS解析阶段进行拦截:
- 客户端发起DNS查询请求
- 本地DNS服务器接收请求
- 检查域名是否在黑名单中
- 如果在黑名单中,返回无效IP(如0.0.0.0)
- 如果不在黑名单中,转发到上游DNS服务器
系统架构
客户端设备 → 本地DNS服务器 → 黑名单检查 → 上游DNS服务器↓返回结果/拦截
实现方案
1. DNS过滤服务器核心代码
#!/usr/bin/env python3
# dns_filter_server.pyimport socket
import struct
import threading
import json
import time
import logging
from datetime import datetime
import hashlibclass DNSFilterServer:def __init__(self, config_file='config.json'):"""初始化DNS过滤服务器"""self.load_config(config_file)self.setup_logging()self.load_blacklist()self.cache = {} # DNS缓存self.statistics = {'total_queries': 0,'blocked_queries': 0,'cached_responses': 0}def load_config(self, config_file):"""加载配置文件"""try:with open(config_file, 'r', encoding='utf-8') as f:self.config = json.load(f)except FileNotFoundError:# 默认配置self.config = {'listen_host': '0.0.0.0','listen_port': 53,'upstream_dns': '8.8.8.8','upstream_port': 53,'blacklist_file': 'blacklist.txt','whitelist_file': 'whitelist.txt','cache_ttl': 300,'log_file': 'dns_filter.log'}self.save_config(config_file)def save_config(self, config_file):"""保存配置文件"""with open(config_file, 'w', encoding='utf-8') as f:json.dump(self.config, f, indent=2)def setup_logging(self):"""设置日志"""logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler(self.config['log_file']),logging.StreamHandler()])self.logger = logging.getLogger(__name__)def load_blacklist(self):"""加载黑名单和白名单"""self.blacklist = set()self.whitelist = set()# 加载黑名单try:with open(self.config['blacklist_file'], 'r', encoding='utf-8') as f:for line in f:domain = line.strip().lower()if domain and not domain.startswith('#'):self.blacklist.add(domain)self.logger.info(f"加载了 {len(self.blacklist)} 个黑名单域名")except FileNotFoundError:self.logger.warning("黑名单文件不存在,创建默认文件")self.create_default_blacklist()# 加载白名单try:with open(self.config['whitelist_file'], 'r', encoding='utf-8') as f:for line in f:domain = line.strip().lower()if domain and not domain.startswith('#'):self.whitelist.add(domain)self.logger.info(f"加载了 {len(self.whitelist)} 个白名单域名")except FileNotFoundError:passdef create_default_blacklist(self):"""创建默认黑名单"""default_domains = ['# 广告域名','doubleclick.net','googleadservices.com','googlesyndication.com','google-analytics.com','googletagmanager.com','facebook.com/tr','amazon-adsystem.com','# 追踪域名','scorecardresearch.com','quantserve.com','outbrain.com','taboola.com','# 恶意网站','malware-example.com',]with open(self.config['blacklist_file'], 'w', encoding='utf-8') as f:f.write('\n'.join(default_domains))def parse_dns_query(self, data):"""解析DNS查询"""# DNS头部格式header = struct.unpack('>HHHHHH', data[:12])transaction_id = header[0]flags = header[1]questions = header[2]# 解析查询域名offset = 12domain_parts = []while True:length = data[offset]if length == 0:breakoffset += 1domain_parts.append(data[offset:offset+length].decode('ascii'))offset += lengthdomain = '.'.join(domain_parts).lower()# 查询类型和类query_type, query_class = struct.unpack('>HH', data[offset+1:offset+5])return {'transaction_id': transaction_id,'domain': domain,'query_type': query_type,'query_class': query_class,'raw_data': data}def is_blocked(self, domain):"""检查域名是否应被屏蔽"""# 检查白名单if domain in self.whitelist:return False# 检查完整域名if domain in self.blacklist:return True# 检查子域名parts = domain.split('.')for i in range(len(parts)):parent = '.'.join(parts[i:])if parent in self.blacklist:return Truereturn Falsedef create_blocked_response(self, query):"""创建屏蔽响应"""# DNS响应头部response = struct.pack('>H', query['transaction_id']) # Transaction IDresponse += struct.pack('>H', 0x8180) # Flags: Response, Authoritativeresponse += struct.pack('>H', 1) # Questionsresponse += struct.pack('>H', 1) # Answer RRsresponse += struct.pack('>H', 0) # Authority RRsresponse += struct.pack('>H', 0) # Additional RRs# 复制查询部分response += query['raw_data'][12:]# 答案部分response += struct.pack('>H', 0xC00C) # 指向域名的指针response += struct.pack('>H', query['query_type']) # Typeresponse += struct.pack('>H', query['query_class']) # Classresponse += struct.pack('>I', 60) # TTLresponse += struct.pack('>H', 4) # Data lengthresponse += socket.inet_aton('0.0.0.0') # IP地址return responsedef forward_to_upstream(self, data):"""转发到上游DNS服务器"""try:# 检查缓存cache_key = hashlib.md5(data).hexdigest()if cache_key in self.cache:cached_time, cached_response = self.cache[cache_key]if time.time() - cached_time < self.config['cache_ttl']:self.statistics['cached_responses'] += 1return cached_response# 创建上游连接upstream_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)upstream_socket.settimeout(5)# 发送查询upstream_socket.sendto(data, (self.config['upstream_dns'], self.config['upstream_port']))# 接收响应response, _ = upstream_socket.recvfrom(4096)upstream_socket.close()# 缓存响应self.cache[cache_key] = (time.time(), response)# 清理过期缓存if len(self.cache) > 1000:self.cleanup_cache()return responseexcept socket.timeout:self.logger.error("上游DNS服务器超时")return Noneexcept Exception as e:self.logger.error(f"转发DNS请求失败: {e}")return Nonedef cleanup_cache(self):"""清理过期缓存"""current_time = time.time()expired_keys = [k for k, (t, _) in self.cache.items() if current_time - t > self.config['cache_ttl']]for key in expired_keys:del self.cache[key]def handle_dns_query(self, data, addr, server_socket):"""处理DNS查询"""try:# 解析查询query = self.parse_dns_query(data)domain = query['domain']self.statistics['total_queries'] += 1# 检查是否应该屏蔽if self.is_blocked(domain):self.logger.info(f"屏蔽域名: {domain} 来自 {addr[0]}")response = self.create_blocked_response(query)self.statistics['blocked_queries'] += 1else:self.logger.debug(f"转发查询: {domain} 来自 {addr[0]}")response = self.forward_to_upstream(data)if response is None:return# 发送响应server_socket.sendto(response, addr)except Exception as e:self.logger.error(f"处理DNS查询失败: {e}")def print_statistics(self):"""定期打印统计信息"""while True:time.sleep(60)self.logger.info(f"统计信息 - 总查询: {self.statistics['total_queries']}, "f"已屏蔽: {self.statistics['blocked_queries']}, "f"缓存命中: {self.statistics['cached_responses']}")def start(self):"""启动DNS服务器"""try:# 创建UDP socketserver_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)server_socket.bind((self.config['listen_host'], self.config['listen_port']))self.logger.info(f"DNS过滤服务器启动在 {self.config['listen_host']}:{self.config['listen_port']}")# 启动统计线程stats_thread = threading.Thread(target=self.print_statistics, daemon=True)stats_thread.start()while True:# 接收DNS查询data, addr = server_socket.recvfrom(4096)# 在新线程中处理查询thread = threading.Thread(target=self.handle_dns_query,args=(data, addr, server_socket))thread.daemon = Truethread.start()except PermissionError:self.logger.error("需要管理员权限来绑定53端口")except Exception as e:self.logger.error(f"服务器启动失败: {e}")finally:server_socket.close()# 主程序
if __name__ == '__main__':server = DNSFilterServer()server.start()
2. Web管理界面
#!/usr/bin/env python3
# web_admin.pyfrom flask import Flask, render_template_string, request, jsonify, redirect, url_for
import json
import os
from functools import wrapsapp = Flask(__name__)
app.secret_key = 'your-secret-key-here'# 简单的认证装饰器
def require_auth(f):@wraps(f)def decorated_function(*args, **kwargs):auth = request.authorizationif not auth or auth.username != 'admin' or auth.password != 'password':return 'Authentication required', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'}return f(*args, **kwargs)return decorated_function# HTML模板
ADMIN_TEMPLATE = '''
<!DOCTYPE html>
<html>
<head><title>DNS过滤器管理界面</title><meta charset="utf-8"><style>body {font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial;max-width: 1200px;margin: 0 auto;padding: 20px;background: #f5f5f5;}.container {background: white;border-radius: 8px;padding: 20px;box-shadow: 0 2px 4px rgba(0,0,0,0.1);margin-bottom: 20px;}h1 {color: #333;border-bottom: 2px solid #4CAF50;padding-bottom: 10px;}h2 {color: #666;margin-top: 30px;}.stats {display: grid;grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));gap: 15px;margin: 20px 0;}.stat-card {background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);color: white;padding: 20px;border-radius: 8px;text-align: center;}.stat-value {font-size: 2em;font-weight: bold;}.stat-label {margin-top: 5px;opacity: 0.9;}textarea {width: 100%;height: 300px;font-family: 'Courier New', monospace;padding: 10px;border: 1px solid #ddd;border-radius: 4px;resize: vertical;}button {background: #4CAF50;color: white;border: none;padding: 10px 20px;border-radius: 4px;cursor: pointer;margin-right: 10px;font-size: 16px;}button:hover {background: #45a049;}.danger {background: #f44336;}.danger:hover {background: #da190b;}.domain-input {display: flex;margin-bottom: 10px;}.domain-input input {flex: 1;padding: 10px;border: 1px solid #ddd;border-radius: 4px 0 0 4px;font-size: 16px;}.domain-input button {border-radius: 0 4px 4px 0;margin: 0;}.message {padding: 10px;margin: 10px 0;border-radius: 4px;background: #d4edda;color: #155724;border: 1px solid #c3e6cb;}.error {background: #f8d7da;color: #721c24;border: 1px solid #f5c6cb;}table {width: 100%;border-collapse: collapse;margin-top: 20px;}th, td {text-align: left;padding: 12px;border-bottom: 1px solid #ddd;}th {background: #f8f9fa;font-weight: 600;}tr:hover {background: #f8f9fa;}</style><script>function refreshStats() {fetch('/api/stats').then(response => response.json()).then(data => {document.getElementById('total-queries').textContent = data.total_queries;document.getElementById('blocked-queries').textContent = data.blocked_queries;document.getElementById('cached-responses').textContent = data.cached_responses;const blockRate = data.total_queries > 0 ? ((data.blocked_queries / data.total_queries) * 100).toFixed(1) : 0;document.getElementById('block-rate').textContent = blockRate + '%';});}function addDomain(listType) {const input = document.getElementById(listType + '-input');const domain = input.value.trim();if (!domain) return;fetch('/api/add_domain', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({domain: domain, list: listType})}).then(response => response.json()).then(data => {if (data.success) {location.reload();} else {alert('添加失败: ' + data.message);}});}setInterval(refreshStats, 5000);window.onload = refreshStats;</script>
</head>
<body><div class="container"><h1>🛡️ DNS过滤器管理中心</h1><h2>📊 实时统计</h2><div class="stats"><div class="stat-card"><div class="stat-value" id="total-queries">-</div><div class="stat-label">总查询数</div></div><div class="stat-card" style="background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);"><div class="stat-value" id="blocked-queries">-</div><div class="stat-label">已屏蔽</div></div><div class="stat-card" style="background: linear-gradient(135deg, #4facfe 0%, #00f2fe 100%);"><div class="stat-value" id="cached-responses">-</div><div class="stat-label">缓存命中</div></div><div class="stat-card" style="background: linear-gradient(135deg, #43e97b 0%, #38f9d7 100%);"><div class="stat-value" id="block-rate">-</div><div class="stat-label">屏蔽率</div></div></div></div><div class="container"><h2>🚫 黑名单管理</h2><div class="domain-input"><input type="text" id="blacklist-input" placeholder="输入要屏蔽的域名 (例如: ads.example.com)"><button onclick="addDomain('blacklist')">添加到黑名单</button></div><textarea id="blacklist">{{ blacklist }}</textarea><button onclick="saveList('blacklist')">保存黑名单</button><button onclick="loadDefaultBlacklist()">加载默认黑名单</button></div><div class="container"><h2>✅ 白名单管理</h2><div class="domain-input"><input type="text" id="whitelist-input" placeholder="输入要放行的域名"><button onclick="addDomain('whitelist')">添加到白名单</button></div><textarea id="whitelist">{{ whitelist }}</textarea><button onclick="saveList('whitelist')">保存白名单</button></div><div class="container"><h2>⚙️ 系统配置</h2><table><tr><th>配置项</th><th>当前值</th></tr><tr><td>监听地址</td><td>{{ config.listen_host }}:{{ config.listen_port }}</td></tr><tr><td>上游DNS</td><td>{{ config.upstream_dns }}:{{ config.upstream_port }}</td></tr><tr><td>缓存TTL</td><td>{{ config.cache_ttl }} 秒</td></tr></table></div><script>function saveList(listType) {const content = document.getElementById(listType).value;fetch('/api/save_list', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({list: listType, content: content})}).then(response => response.json()).then(data => {if (data.success) {alert('保存成功!');} else {alert('保存失败: ' + data.message);}});}function loadDefaultBlacklist() {if (confirm('这将覆盖当前的黑名单,是否继续?')) {fetch('/api/load_default_blacklist', {method: 'POST'}).then(() => location.reload());}}</script>
</body>
</html>
'''@app.route('/')
@require_auth
def admin_panel():"""显示管理面板"""# 加载配置with open('config.json', 'r') as f:config = json.load(f)# 加载黑名单try:with open(config['blacklist_file'], 'r') as f:blacklist = f.read()except:blacklist = ''# 加载白名单try:with open(config['whitelist_file'], 'r') as f:whitelist = f.read()except:whitelist = ''return render_template_string(ADMIN_TEMPLATE, config=config,blacklist=blacklist,whitelist=whitelist)@app.route('/api/stats')
@require_auth
def get_stats():"""获取统计信息"""# 这里需要从DNS服务器获取实时统计# 简化起见,返回模拟数据return jsonify({'total_queries': 12345,'blocked_queries': 3456,'cached_responses': 8901})@app.route('/api/save_list', methods=['POST'])
@require_auth
def save_list():"""保存列表"""data = request.jsonlist_type = data['list']content = data['content']with open('config.json', 'r') as f:config = json.load(f)filename = config[f'{list_type}_file']try:with open(filename, 'w') as f:f.write(content)return jsonify({'success': True})except Exception as e:return jsonify({'success': False, 'message': str(e)})@app.route('/api/add_domain', methods=['POST'])
@require_auth
def add_domain():"""添加域名到列表"""data = request.jsondomain = data['domain'].lower().strip()list_type = data['list']if not domain:return jsonify({'success': False, 'message': '域名不能为空'})with open('config.json', 'r') as f:config = json.load(f)filename = config[f'{list_type}_file']try:with open(filename, 'a') as f:f.write(f'\n{domain}')return jsonify({'success': True})except Exception as e:return jsonify({'success': False, 'message': str(e)})if __name__ == '__main__':app.run(host='0.0.0.0', port=8080, debug=True)
3. 自动更新黑名单脚本
#!/usr/bin/env python3
# update_blacklist.pyimport requests
import time
import loggingclass BlacklistUpdater:def __init__(self):self.sources = [{'name': 'EasyList China','url': 'https://easylist-downloads.adblockplus.org/easylistchina.txt','format': 'adblock'},{'name': 'Anti-AD','url': 'https://raw.githubusercontent.com/privacy-protection-tools/anti-AD/master/anti-ad-domains.txt','format': 'hosts'},{'name': 'StevenBlack Hosts','url': 'https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts','format': 'hosts'}]logging.basicConfig(level=logging.INFO)self.logger = logging.getLogger(__name__)def download_list(self, source):"""下载黑名单"""try:response = requests.get(source['url'], timeout=30)if response.status_code == 200:return response.textelse:self.logger.error(f"下载失败 {source['name']}: HTTP {response.status_code}")return Noneexcept Exception as e:self.logger.error(f"下载失败 {source['name']}: {e}")return Nonedef parse_domains(self, content, format_type):"""解析域名列表"""domains = set()for line in content.split('\n'):line = line.strip()# 跳过注释和空行if not line or line.startswith('#') or line.startswith('!'):continueif format_type == 'hosts':# hosts格式: 0.0.0.0 domain.comparts = line.split()if len(parts) >= 2 and parts[0] in ['0.0.0.0', '127.0.0.1']:domain = parts[1].lower()if self.is_valid_domain(domain):domains.add(domain)elif format_type == 'adblock':# AdBlock格式: ||domain.com^if line.startswith('||') and '^' in line:domain = line[2:line.index('^')].lower()if self.is_valid_domain(domain):domains.add(domain)elif format_type == 'plain':# 纯域名格式domain = line.lower()if self.is_valid_domain(domain):domains.add(domain)return domainsdef is_valid_domain(self, domain):"""验证域名有效性"""if not domain or len(domain) > 253:return False# 排除本地域名if domain in ['localhost', 'local', '0.0.0.0', '127.0.0.1']:return False# 基本域名格式检查parts = domain.split('.')if len(parts) < 2:return Falsefor part in parts:if not part or len(part) > 63:return Falseif not all(c.isalnum() or c == '-' for c in part):return Falseif part.startswith('-') or part.endswith('-'):return Falsereturn Truedef merge_blacklists(self):"""合并所有黑名单源"""all_domains = set()for source in self.sources:self.logger.info(f"下载 {source['name']}...")content = self.download_list(source)if content:domains = self.parse_domains(content, source['format'])self.logger.info(f"从 {source['name']} 获取 {len(domains)} 个域名")all_domains.update(domains)return all_domainsdef update_blacklist_file(self, filename='blacklist.txt'):"""更新黑名单文件"""# 读取现有的自定义域名custom_domains = set()try:with open(filename, 'r') as f:for line in f:line = line.strip()if line and line.startswith('# CUSTOM:'):# 自定义域名标记domain = line[9:].strip()custom_domains.add(domain)except FileNotFoundError:pass# 合并所有域名all_domains = self.merge_blacklists()all_domains.update(custom_domains)# 写入文件with open(filename, 'w') as f:f.write(f"# 黑名单更新时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")f.write(f"# 总计 {len(all_domains)} 个域名\n\n")# 写入自定义域名if custom_domains:f.write("# === 自定义域名 ===\n")for domain in sorted(custom_domains):f.write(f"# CUSTOM: {domain}\n")f.write("\n")# 写入自动获取的域名f.write("# === 自动获取的域名 ===\n")for domain in sorted(all_domains - custom_domains):f.write(f"{domain}\n")self.logger.info(f"黑名单已更新,共 {len(all_domains)} 个域名")def run_periodic_update(self, interval_hours=24):"""定期更新黑名单"""while True:try:self.update_blacklist_file()except Exception as e:self.logger.error(f"更新失败: {e}")time.sleep(interval_hours * 3600)if __name__ == '__main__':updater = BlacklistUpdater()updater.update_blacklist_file()# 如需定期更新,取消下行注释# updater.run_periodic_update(24)
4. 系统服务配置(systemd)
# /etc/systemd/system/dns-filter.service
[Unit]
Description=DNS Filter Server
After=network.target[Service]
Type=simple
User=root
WorkingDirectory=/opt/dns-filter
ExecStart=/usr/bin/python3 /opt/dns-filter/dns_filter_server.py
Restart=always
RestartSec=10[Install]
WantedBy=multi-user.target
5. Docker部署方案
# Dockerfile
FROM python:3.9-slimWORKDIR /app# 安装依赖
RUN pip install flask requests# 复制文件
COPY dns_filter_server.py .
COPY web_admin.py .
COPY update_blacklist.py .# 创建配置文件
RUN echo '{}' > config.json# 暴露端口
EXPOSE 53/udp
EXPOSE 8080/tcp# 启动脚本
COPY start.sh .
RUN chmod +x start.shCMD ["./start.sh"]
#!/bin/bash
# start.sh# 启动Web管理界面
python3 web_admin.py &# 启动DNS服务器
python3 dns_filter_server.py
# docker-compose.yml
version: '3'services:dns-filter:build: .container_name: dns-filterrestart: alwaysports:- "53:53/udp"- "8080:8080"volumes:- ./data:/app/data- ./config.json:/app/config.json- ./blacklist.txt:/app/blacklist.txt- ./whitelist.txt:/app/whitelist.txtenvironment:- TZ=Asia/Shanghaidns:- 8.8.8.8- 8.8.4.4
部署步骤
方法一:直接部署
- 安装Python依赖
pip install flask requests
- 创建工作目录
mkdir /opt/dns-filter
cd /opt/dns-filter
-
复制所有Python脚本到工作目录
-
初始化配置文件
python3 dns_filter_server.py
# 首次运行会生成默认配置文件
- 启动服务
# 启动DNS服务器(需要root权限)
sudo python3 dns_filter_server.py &# 启动Web管理界面
python3 web_admin.py &
方法二:Docker部署
- 构建镜像
docker-compose build
- 启动服务
docker-compose up -d
- 查看日志
docker-compose logs -f
客户端配置
Windows
- 打开网络设置
- 更改适配器选项
- 右键点击网络连接,选择属性
- 选择"Internet 协议版本 4 (TCP/IPv4)"
- 设置DNS服务器为本地服务器IP
Linux
编辑 /etc/resolv.conf
:
nameserver 192.168.1.100 # 替换为你的服务器IP
路由器
在路由器管理界面中,将DHCP服务器的DNS设置为本地服务器IP
功能特点
- 实时DNS过滤:基于黑名单实时屏蔽广告和恶意网站
- 缓存机制:提高响应速度,减少上游查询
- Web管理界面:方便管理黑白名单
- 自动更新:定期从多个源更新黑名单
- 统计功能:实时查看屏蔽效果
- Docker支持:简化部署流程
性能优化建议
- 使用Redis缓存:对于大规模部署,可以使用Redis替代内存缓存
- 异步处理:使用asyncio提高并发处理能力
- 负载均衡:部署多个实例实现负载均衡
- 黑名单优化:使用布隆过滤器加速域名匹配
安全建议
- 更改默认密码:修改Web管理界面的默认用户名和密码
- 限制访问:使用防火墙限制管理界面访问
- 启用HTTPS:为Web管理界面配置SSL证书
- 日志审计:定期检查DNS查询日志
总结
通过本文介绍的方案,你可以快速搭建一个功能完善的本地DNS过滤系统,有效屏蔽广告和恶意网站,提升网络安全性和浏览体验。系统采用模块化设计,易于扩展和维护,适合家庭和小型办公室使用。
项目地址
完整代码已开源,欢迎Star和贡献代码:
[GitHub项目地址]
标签: #DNS #网络安全 #广告屏蔽 #Python #Docker
作者: 技术爱好者
发布时间: 2025-01-07