端口扫描器的几种代码实现方案

这雨夜太漫长 失眠的我 在谁梦里 歌唱

  搞安全的应该都知道端口扫描在渗透测试、漏洞扫描过程中的重要性,其与URL爬虫等技术构成了漏洞扫描的第一阶段,即目标信息收集。因此能否开发出一款高效稳定的端口扫描器,往往决定了漏洞扫描器的好坏。那么说到端口扫描器,我们往往会先想到nmap、masscan等神器,它们是这个领域的标杆。但本篇并不是为了介绍这几款工具,而是谈谈如何自研一款高效稳定的端口扫描器。

  端口扫描器,顾名思义就是为了探测服务器上的某个端口是否开放,究其原理可以分为很多种探测方式,比如tcp三次握手扫描,syn扫描等等,本篇并不打算详细介绍这些扫描方式的区别,有兴趣的可以看下nmap的文档,对这几种扫描方式有详细的介绍。
  那么说下本文重点,基于这几天我研究并尝试利用python、go开发tcp扫描器、tcp-syn扫描器,以及对比它们之间的速度性能、稳定性差异情况,将测试结果在此做个记录,并分享一下代码以及方案。

说明:文章结尾将给出本篇所使用代码的Github地址,可供大家测试,代码测试环境为centos7。

scan for Python Socket

Python的Socket模块可以创建套接字,创建tcp三次握手连接,以此探测目标端口是否存活。本篇将使用socket模块编写tcp扫描以及syn扫描,并对比两者的差异。

tcp scan

快来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#! -*- coding:utf-8 -*-
import time
import socket
socket_timeout = 0.1
def tcp_scan(ip,port):
try:
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.settimeout(socket_timeout)
c=s.connect_ex((ip,port))
if c==0:
print "%s:%s is open" % (ip,port)
else:
# print "%s:%s is not open" % (ip,port)
pass
except Exception,e:
print e
s.close()
if __name__=="__main__":
s_time = time.time()
ip = "14.215.177.38"
for port in range(0,1024):
''' 此处可用协作 '''
tcp_scan(ip,port)
e_time = time.time()
print "scan time is ",e_time-s_time

运行结果:

说明一下:可以看到此代码扫描1024个端口用了102s,当然代码并没有用多线程、协程等方式提高扫描效率(使用协程测试过扫65535个端口用时400s左右),因为python在这方面的能力比较弱;由于扫描过程中会建立tcp三次握手,因此比较消耗资源。

tcp syn scan

  相对tcp扫描,tcp syn扫描方式更为隐蔽,也更节省资源,那么如何利用socket模块实现tcp syn扫描呢?这里需要用到SOCK_RAW,这个在socket编程中相对少用,资料也不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: UTF-8 -*-
import time
import random
import socket
import sys
from struct import *
'''
Warning:must run it as root
yum install python-devel libpcap-devel
pip install pcap
'''
def checksum(msg):
''' Check Summing '''
s = 0
for i in range(0,len(msg),2):
w = (ord(msg[i]) << 8) + (ord(msg[i+1]))
s = s+w
s = (s>>16) + (s & 0xffff)
s = ~s & 0xffff
return s
def CreateSocket(source_ip,dest_ip):
''' create socket connection '''
try:
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_TCP)
except socket.error, msg:
print 'Socket create error: ',str(msg[0]),'message: ',msg[1]
sys.exit()
''' Set the IP header manually '''
s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
return s
def CreateIpHeader(source_ip, dest_ip):
''' create ip header '''
# packet = ''
# ip header option
headerlen = 5
version = 4
tos = 0
tot_len = 20 + 20
id = random.randrange(18000,65535,1)
frag_off = 0
ttl = 255
protocol = socket.IPPROTO_TCP
check = 10
saddr = socket.inet_aton ( source_ip )
daddr = socket.inet_aton ( dest_ip )
hl_version = (version << 4) + headerlen
ip_header = pack('!BBHHHBBH4s4s', hl_version, tos, tot_len, id, frag_off, ttl, protocol, check, saddr, daddr)
return ip_header
def create_tcp_syn_header(source_ip, dest_ip, dest_port):
''' create tcp syn header function '''
source = random.randrange(32000,62000,1) # randon select one source_port
seq = 0
ack_seq = 0
doff = 5
''' tcp flags '''
fin = 0
syn = 1
rst = 0
psh = 0
ack = 0
urg = 0
window = socket.htons (8192) # max windows size
check = 0
urg_ptr = 0
offset_res = (doff << 4) + 0
tcp_flags = fin + (syn<<1) + (rst<<2) + (psh<<3) + (ack<<4) + (urg<<5)
tcp_header = pack('!HHLLBBHHH', source, dest_port, seq, ack_seq, offset_res, tcp_flags, window, check, urg_ptr)
''' headers option '''
source_address = socket.inet_aton( source_ip )
dest_address = socket.inet_aton( dest_ip )
placeholder = 0
protocol = socket.IPPROTO_TCP
tcp_length = len(tcp_header)
psh = pack('!4s4sBBH', source_address, dest_address, placeholder, protocol, tcp_length);
psh = psh + tcp_header;
tcp_checksum = checksum(psh)
''' Repack the TCP header and fill in the correct checksum '''
tcp_header = pack('!HHLLBBHHH', source, dest_port, seq, ack_seq, offset_res, tcp_flags, window, tcp_checksum, urg_ptr)
return tcp_header
def syn_scan(source_ip, dest_ip, des_port) :
s = CreateSocket(source_ip, dest_ip)
ip_header = CreateIpHeader(source_ip, dest_ip)
tcp_header = create_tcp_syn_header(source_ip, dest_ip, des_port)
packet = ip_header + tcp_header
s.sendto(packet, (dest_ip, 0))
data = s.recvfrom(1024) [0][0:]
ip_header_len = (ord(data[0]) & 0x0f) * 4
# ip_header_ret = data[0: ip_header_len - 1]
tcp_header_len = (ord(data[32]) & 0xf0)>>2
tcp_header_ret = data[ip_header_len:ip_header_len+tcp_header_len - 1]
''' SYN/ACK flags '''
if ord(tcp_header_ret[13]) == 0x12:
print "%s:%s is open" % (dest_ip,des_port)
else:
print "%s:%s is not open" % (dest_ip,des_port)
if __name__=="__main__":
t_s = time.time()
source_ip = '' # 填写本机ip
dest_ip = '14.215.177.38'
for des_port in range(1024):
syn_scan(source_ip, dest_ip, des_port)
t_e = time.time()
print "time is ",(t_e-t_s)

有一点需要注意的,运行这段代码前,需要在系统上安装依赖:

1
2
yum install python-devel libpcap-devel
pip install pcap

运行结果:

说明:从运行结果上来看,并没有很准确,而且速度也不快,不清楚是不是代码上有问题。

scan for Python scapy

除了socket模块外,python还有一个scapy模块,可以用来模拟发包,但只能在linux下使用,其他操作系统不建议使用此模块。

tcp syn csan

代码在这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#! -*- coding:utf-8 -*-
import time
from scapy.all import *
ip = "14.215.177.38"
TIMEOUT = 0.5
threads = 500
port_range = 1024
retry = 1
def is_up(ip):
""" Tests if host is up """
icmp = IP(dst=ip)/ICMP()
resp = sr1(icmp, timeout=TIMEOUT)
if resp == None:
return False
else:
return True
def reset_half_open(ip, ports):
# Reset the connection to stop half-open connections from pooling up
sr(IP(dst=ip)/TCP(dport=ports, flags='AR'), timeout=TIMEOUT)
def is_open(ip, ports):
to_reset = []
results = []
p = IP(dst=ip)/TCP(dport=ports, flags='S') # Forging SYN packet
answers, un_answered = sr(p, verbose=False, retry=retry ,timeout=TIMEOUT) # Send the packets
for req, resp in answers:
if not resp.haslayer(TCP):
continue
tcp_layer = resp.getlayer(TCP)
if tcp_layer.flags == 0x12:
# port is open
to_reset.append(tcp_layer.sport)
results.append(tcp_layer.sport)
elif tcp_layer.flags == 0x14:
# port is open
pass
reset_half_open(ip, to_reset)
return results
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
if __name__ == '__main__':
start_time = time.time()
open_port_list = []
for ports in chunks(list(range(port_range)), threads):
results = is_open(ip, ports)
if results:
open_port_list += results
end_time = time.time()
print "%s %s" % (ip,open_port_list)
print "%s Scan Completed in %fs" % (ip, end_time-start_time)

运行结果:

说明:由于scapy可以一次性发多个syn包,因此速度相对socket更快一些,但稳定性没有很好。

scan for python+nmap

文章开头提到了nmap,其实在python中也可以直接调用nmap,看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#! -*- coding:utf-8 -*-
'''
pip install python-nmap
'''
import nmap
nm =nmap.PortScanner()
def scan(ip,port,arg):
try:
nm.scan(ip, arguments=arg+str(port))
except nmap.nmap.PortScannerError:
print "Please run -O method for root privileges"
else:
for host in nm.all_hosts():
for proto in nm[host].all_protocols():
lport = nm[host][proto].keys()
lport.sort()
for port in lport:
print ('port : %s\tstate : %s' % (port, nm[host][proto][port]['state']))
if __name__=="__main__":
port="80,443,22,21"
scan(ip="14.215.177.38",port=port,arg="-sS -Pn -p")
# tcp scan -sT
# tcp syn scan -sS

运行结果:

由于nmap扫描速度相对比较慢,因此这里只演示扫描4个端口,不做速度的对比,当然其稳定性还是可以的。

scan for go

  前文一直在介绍使用python语言开发端口扫描器,然而由于python在多线程方面的弱势,扫描器的性能可想而知,因此我又利用go语言的高并发性优势,尝试开发端口扫描器。(题外话:为此我花了半天时间看了下go语言的基础,勉强看懂了扫描代码,并做了一些修改)

tcp scan

直接看代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package main
// port tcp scan
import (
"fmt"
"net"
"os"
"runtime"
"strconv"
"sync"
"time"
)
func loop(inport chan int, startport, endport int) {
for i := startport; i <= endport; i++ {
inport <- i
}
close(inport)
}
type ScanSafeCount struct {
// 结构体
count int
mux sync.Mutex
}
var scanCount ScanSafeCount
func scanner(inport int, outport chan int, ip string, endport int) {
// 扫描函数
in := inport // 定义要扫描的端口号
// fmt.Printf(" %d ", in) // 输出扫描的端口
host := fmt.Sprintf("%s:%d", ip, in) // 类似(ip,port)
tcpAddr, err := net.ResolveTCPAddr("tcp4", host) // 根据域名查找ip
if err != nil {
// 域名解析ip失败
outport <- 0
} else {
conn, err := net.DialTimeout("tcp", tcpAddr.String(), 10*time.Second) //建立tcp连接
if err != nil {
// tcp连接失败
outport <- 0
} else {
// tcp连接成功
outport <- in // 将端口写入outport信号
fmt.Printf("\n *************( %d 可以 )*****************\n", in)
conn.Close()
}
}
// 线程锁
scanCount.mux.Lock()
scanCount.count = scanCount.count - 1
if scanCount.count <= 0 {
close(outport)
}
scanCount.mux.Unlock()
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) // 设置最大可使用的cpu核数
// 定义变量
inport := make(chan int) // 信号变量,类似python中的queue
outport := make(chan int)
collect := []int{} // 定义一个切片变量,类似python中的list
// fmt.Println(os.Args, len(os.Args)) // 获取命令行参数并输出
if len(os.Args) != 4 {
// 命令行参数个数有误
fmt.Println("使用方式: port_scanner IP startport endport")
os.Exit(0)
}
s_time := time.Now().Unix()
// fmt.Println("扫描开始:") // 获取当前时间
ip := string(os.Args[1]) // 获取参数中的ip
startport, _ := strconv.Atoi(os.Args[2]) // 获取参数中的启始端口
endport, _ := strconv.Atoi(os.Args[3]) // 获取参数中的结束端口
if startport > endport {
fmt.Println("Usage: scanner IP startport endport")
fmt.Println("Endport must be larger than startport")
os.Exit(0)
} else {
// 定义scanCount变量为ScanSafeCount结构体,即计算扫描的端口数量
scanCount = ScanSafeCount{count: (endport - startport + 1)}
}
fmt.Printf("扫描 %s:%d----------%d\n", ip, startport, endport)
go loop(inport, startport, endport) // 执行loop函数将端口写入input信号
for v := range inport {
// 开始循环input
go scanner(v, outport, ip, endport)
}
// 输出结果
for port := range outport {
if port != 0 {
collect = append(collect, port)
}
}
fmt.Println("--")
fmt.Println(collect)
e_time := time.Now().Unix()
fmt.Println("扫描时间:", e_time-s_time)
}

  代码我就不解释了(我在代码中加了些注释,应该大致可以看懂),本文也不打算介绍go的用法,毕竟自己也是刚开始学习go,有兴趣的可以看看go的文档,然后再回过头来看看这段代码。

代码运行结果:

说明:由于是tcp扫描,所以多少还是占资源的,而且测试发现稳定性不是很好。

tcp syn scan

看代码看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45