基于TCP的简易端口扫描器
端口扫描是网络侦察和信息收集的基础手段之一。在多种扫描技术中,TCP全连接扫描因其实现简单、结果可靠而成为初学者的理想选择。其核心原理是尝试与目标主机的指定端口完成完整的TCP三次握手。本文将简要介绍其工作原理,并使用
[Python/Go/你使用的语言] 一步步实现一个轻量级的端口扫描器。
一、定义扫描函数
#定义一个开始扫描的函数
def star_scan(ip,port):#异常处理try:s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 创建一个使用tcp的套接字s.settimeout(1.0) #设置超时时间s.connect((ip,port))except Exception as e:# print(e) #防止超时提醒过多passelse:#避免因未接收到返回数据而报错误判,所以注释掉了# res = s.recv(3096).decode('utf-8').encode()# print(res)print("[+]{}:{} \topen".format(ip,port))finally:s.close()
二、定义创建扫描列表函数
def make_port_list(ports):#接收的列表new_port_list = []# 判断并处理传入的ports的值,判断间隔为 "," or "-"# 情况一:22,80,443if "," in ports:#以","为分隔符生成列表temp_list = ports.split(",")#在temp_list列表中循环遍历for port in temp_list:#判断是否有"-"在列表内if "-" in port:#根据传入的值从首位遍历值末位for p in range(int(port.split("-")[0]),int(port.split("-")[1]) + 1):new_port_list.append(p)# 如果在"-"则代表传入的为1-100格式# 否则就是20,22,23,80这种格式,直接传入遍历的值即可else:new_port_list.append(port)# 情况二:1-100elif "-" in ports:# 从第一个数通过 +1 一直遍历至最后一个数for p in range(int(port.split("-")[0]),int(port.split("-")[1]) + 1):new_port_list.append(p)# 情况三:使用者输入的内容为单一端口else:new_port_list.append(ports)# 返回端口列表的值return new_port_list
三、加入协程函数
import socket
import sys
# sys 导入系统相关的功能,通常用于访问命令行参数、退出程序等
from queue import Queue
# 导入线程安全的队列数据结构,用于在多线程/协程间安全地传递数据
from gevent import monkey;monkey.patch_socket()
# gevent是一个基于协程的Python网络库
# monkey模块提供了"猴子补丁"功能
# monkey.patch_socket()将标准库的socket模块替换为gevent的异步版本
# 效果:使普通的同步socket代码变成异步非阻塞的,无需修改原有代码
import gevent
# 导入gevent主模块,用于创建和管理协程# socket套接字基本格式
# socket.socket(family.type)print('''____ _ ____
| _ \ ___ _ __| |_/ ___| ___ __ _ _ __
| |_) / _ \| '__| __\___ \ / __/ _` | '_ \
| __/ (_) | | | |_ ___) | (_| (_| | | | |
|_| \___/|_| \__|____/ \___\__,_|_| |_|''')#定义生成扫描列表的函数
def make_port_list(ports):#接收的列表new_port_list = []# 判断并处理传入的ports的值,判断间隔为 "," or "-"# 情况一:22,80,443if "," in ports:#以","为分隔符生成列表temp_list = ports.split(",")#在temp_list列表中循环遍历for port in temp_list:#判断是否有"-"范围类参数在列表内if "-" in port:#根据传入的值从首位遍历值末位for p in range(int(port.split("-")[0]),int(port.split("-")[1]) + 1):new_port_list.append(p)# 如果在"-"则代表传入的为1-100格式# 否则就是20,22,23,80这种格式,直接传入遍历的值即可else:new_port_list.append(port)# 情况二:1-100elif "-" in ports:# 从第一个数通过 +1 一直遍历至最后一个数for p in range(int(port.split("-")[0]),int(port.split("-")[1]) + 1):new_port_list.append(p)# 情况三:使用者输入的内容为单一端口else:new_port_list.append(ports)# 返回端口列表的值return new_port_list# 定义多协程函数
def coroutines(batch_size=None):#开启多协程cos = []#空列表cos用于存储所有的协程对象# 如果指定了batch_size,就使用它,否则使用队列中的所有任务if batch_size is None:num = ip_port.qsize()else:num = min(batch_size, ip_port.qsize()) # 确保不超过队列中剩余的任务数# ip_port.qsize():获取队列 ip_port 中的任务数量# num:保存任务总数# print(num):打印任务数量,用于调试 for i in range(num):# 循环任务数量,为每个任务创建协程# 调用工作函数,cor = gevent.spawn(star_scan)# gevent.spawn()函数用于创建一个新的协程,并调用star_scan函数# spawn方法不会立即执行函数,而是创建协程对象# 将返回的协程对象保存至变量corcos.append(cor)#将cor中的协程对象添加至cor列表中gevent.joinall(cos)#等待列表中的所有协程执行完毕#定义一个开始扫描的函数
def star_scan():#动态任务分配sockets = ip_port.get()ip = sockets[0]port = int(sockets[1])#异常处理try:s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 创建一个使用tcp的套接字s.settimeout(2.0) #设置超时时间s.connect((ip,port))except Exception as e:# print(e) #防止超时提醒过多passelse:#避免因未接收到返回数据而报错误判,所以注释掉了# res = s.recv(3096).decode('utf-8').encode()# print(res)print("[+]{}:{} \topen".format(ip,port))finally:s.close()ip_port = Queue()
#创建队列
ip = "113.44.158.248"
port = "1,2-2400"
# 将make_port_list返回的值传入port_list
port_list = make_port_list(port)
print("total port num:{}".format(len(port_list)))
for port in port_list:ip_port.put([ip,port])coroutines()
#开启协程
#将协程设置为每50个创建一次
num = 50
if len(port_list)%num == 0:
#如果port_list中的数量能被50整除turn = int(len(port_list)/num)#计算需要多少批次for i in range(turn):coroutines(num)#每批处理50个端口
else:turn = int(len(port_list)/num)+1# 因为不能被整除,所以 批次 = 整数部分 + 1for i in range(turn):if i == turn-1:#如果是最后一批coroutines(len(port_list)%num)#取余,处理剩余端口else:coroutines(num)#正常处理50个端口
四、命令行传入
def arguments():# 创建对象parser = argparse.ArgumentParser(description='python PortScanner.py [IP] [Port]')# 保存定义的命令行参数信息parser.add_argument('IP', type=str, help='待扫描IP地址')parser.add_argument('Port', type=str, help='指定端口')'''add_argument方法:用于添加命令行参数'input'指定的 name:指定命令行参数的名称type:指定参数的类型help:用于描述该参数的作用'''# 如果参数是可选参数,使用 - -- 前缀来定义# parser.add_argument('-P', '--Port', action='store_true', help='待扫描端口' \# '[20,21,22] or [1-999]')'''--output 为可选参数,类型为字符串-v 和 --verbose 是同一个可选参数的两种类型,类型为布尔值如果用户指定了该参数时,值为 True ,否则为 Falseaction='store_true' 表示当指定这个参数时,将其值设置为 True'''args = parser.parse_args()# parse_args():解析命令行参数,返回命名空间IP = args.IPPort = args.Portreturn(IP,Port)
五、最后整理一下
import socket
import sys
# sys 导入系统相关的功能,通常用于访问命令行参数、退出程序等
from queue import Queue
# 导入线程安全的队列数据结构,用于在多线程/协程间安全地传递数据
from gevent import monkey;monkey.patch_socket()
# gevent是一个基于协程的Python网络库
# monkey模块提供了"猴子补丁"功能
# monkey.patch_socket()将标准库的socket模块替换为gevent的异步版本
# 效果:使普通的同步socket代码变成异步非阻塞的,无需修改原有代码
import gevent
# 导入gevent主模块,用于创建和管理协程
import argparse# socket套接字基本格式
# socket.socket(family.type)print('''____ _ ____
| _ \ ___ _ __| |_/ ___| ___ __ _ _ __
| |_) / _ \| '__| __\___ \ / __/ _` | '_ \
| __/ (_) | | | |_ ___) | (_| (_| | | | |
|_| \___/|_| \__|____/ \___\__,_|_| |_|''')#定义生成扫描列表的函数
def make_port_list(ports):#接收的列表new_port_list = []# 判断并处理传入的ports的值,判断间隔为 "," or "-"# 情况一:22,80,443if "," in ports:#以","为分隔符生成列表temp_list = ports.split(",")#在temp_list列表中循环遍历for port in temp_list:#判断是否有"-"范围类参数在列表内if "-" in port:#根据传入的值从首位遍历值末位for p in range(int(port.split("-")[0]),int(port.split("-")[1]) + 1):new_port_list.append(p)# 如果在"-"则代表传入的为1-100格式# 否则就是20,22,23,80这种格式,直接传入遍历的值即可else:new_port_list.append(port)# 情况二:1-100elif "-" in ports:# 从第一个数通过 +1 一直遍历至最后一个数for p in range(int(port.split("-")[0]),int(port.split("-")[1]) + 1):new_port_list.append(p)# 情况三:使用者输入的内容为单一端口else:new_port_list.append(ports)# 返回端口列表的值return new_port_list# 定义多协程函数
def coroutines(batch_size=None):#开启多协程cos = []#空列表cos用于存储所有的协程对象# 如果指定了batch_size,就使用它,否则使用队列中的所有任务if batch_size is None:num = ip_port.qsize()else:num = min(batch_size, ip_port.qsize()) # 确保不超过队列中剩余的任务数# ip_port.qsize():获取队列 ip_port 中的任务数量# num:保存任务总数# print(num):打印任务数量,用于调试 for i in range(num):# 循环任务数量,为每个任务创建协程# 调用工作函数,cor = gevent.spawn(star_scan)# gevent.spawn()函数用于创建一个新的协程,并调用star_scan函数# spawn方法不会立即执行函数,而是创建协程对象# 将返回的协程对象保存至变量corcos.append(cor)#将cor中的协程对象添加至cor列表中gevent.joinall(cos)#等待列表中的所有协程执行完毕#定义一个开始扫描的函数
def star_scan():#动态任务分配sockets = ip_port.get()ip = sockets[0]port = int(sockets[1])#异常处理try:s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)# 创建一个使用tcp的套接字s.settimeout(2.0) #设置超时时间s.connect((ip,port))except Exception as e:# print(e) #防止超时提醒过多passelse:#避免因未接收到返回数据而报错误判,所以注释掉了# res = s.recv(3096).decode('utf-8').encode()# print(res)print("[+]{}:{} \topen".format(ip,port))finally:s.close()def arguments():# 创建对象parser = argparse.ArgumentParser(description='python PortScanner.py [IP] [Port]')# 保存定义的命令行参数信息parser.add_argument('IP', type=str, help='待扫描IP地址')parser.add_argument('Port', type=str, help='指定端口')'''add_argument方法:用于添加命令行参数'input'指定的 name:指定命令行参数的名称type:指定参数的类型help:用于描述该参数的作用'''# 如果参数是可选参数,使用 - -- 前缀来定义# parser.add_argument('-P', '--Port', action='store_true', help='待扫描端口' \# '[20,21,22] or [1-999]')'''--output 为可选参数,类型为字符串-v 和 --verbose 是同一个可选参数的两种类型,类型为布尔值如果用户指定了该参数时,值为 True ,否则为 Falseaction='store_true' 表示当指定这个参数时,将其值设置为 True'''args = parser.parse_args()# parse_args():解析命令行参数,返回命名空间IP = args.IPPort = args.Portreturn(IP,Port)
IP,Port = arguments()
ip_port = Queue()
#创建队列
ip = IP
port = Port
# 将make_port_list返回的值传入port_list
port_list = make_port_list(port)
print("total port num:{}".format(len(port_list)))
for port in port_list:ip_port.put([ip,port])coroutines()
#开启协程
#将协程设置为每50个创建一次
num = 50
if len(port_list)%num == 0:
#如果port_list中的数量能被50整除turn = int(len(port_list)/num)#计算需要多少批次for i in range(turn):coroutines(num)#每批处理50个端口
else:turn = int(len(port_list)/num)+1# 因为不能被整除,所以 批次 = 整数部分 + 1for i in range(turn):if i == turn-1:#如果是最后一批coroutines(len(port_list)%num)#取余,处理剩余端口else:coroutines(num)#正常处理50个端口
六、效果展示
: (