0x1、前言

​ 在现场取证遇到分析流量包的情况会比较少,虽然流量类设备原理是把数据都抓出来进行解析,很大一定程度上已经把人可以做的事情交给了机器自动完成。

​ 可用于PCAP包分析的软件比如科来,Wireshark都是很好用的分析软件,找Pcap解析的编程类代码时发现已经有很多大佬写过Python脚本辅助解析Pcap,也有提取将Pcap信息以界面形式展示出来框架。

​ 本文对利用Python里的Scapy库提取协议五元组信息进行学习性总结,没有用于实战,因为实践过程中发现PCAP读包解包查包速度太慢了。

0x2、参考库

Python解析pcap包的常见库有Scapy、dpkt、Pyshark等。可以参考的源码,工具如下

https://github.com/thepacketgeek/cloud-pcap
https://github.com/madpowah/ForensicPCAP
https://github.com/le4f/pcap-analyzer
https://github.com/HatBoy/Pcap-Analyzer
https://github.com/caesar0301/awesome-pcaptools
https://asecuritysite.com/forensics/pcap
https://github.com/DanMcInerney/net-creds

提供解析Pcap包服务的网站https://packettotal.com/、https://www.capanalysis.net/ca/、https://asecuritysite.com/forensics/pcap?infile=smtp.pcap&infile=smtp.pcap、https://app.any.run

0x3、邮件协议

提取发件人的邮箱,就要熟悉SMTP的几个端口进行识别。

25  端口为SMTP(Simple Mail Transfer Protocol,简单邮件传输协议)服务所开放的,是用于发送邮件。
110端口是为POP3(邮件协议3)服务开放的,POP2、POP3都是主要用于接收邮件的,目前POP3使用的比较多,许多服务器都同时支持POP2和POP3。
143端口主要是用于“Internet Message AccessProtocol”v2(Internet消息访问协议,简称IMAP),和POP3一样,是用于电子邮件的接收的协议。
465 端口是SSL/TLS通讯协议的 内容一开始就被保护起来了 是看不到原文的。
465 端口(SMTPS):465端口是为SMTPS(SMTP-over-SSL)协议服务开放的,这是SMTP协议基于SSL安全协议之上的一种变种协议,它继承了SSL安全协议的非对称加密的高度安全可靠性,可防止邮件泄露。
587 端口是STARTTLS协议的 属于TLS通讯协议 只是他是在STARTTLS命令执行后才对之后的原文进行保护的。

SMTP常用命令语法

在Wireshark中SMTP协议数据是在建立TCP三次握手后会出现的,常用的命令如下。

SMTP命令不区分大小写,但参数区分大小写,有关这方面的详细说明请参考RFC821。
HELO <domain> <CRLF>。向服务器标识用户身份发送者能欺骗,说谎,但一般情况下服务器都能检测到。
MAIL FROM: <reverse-path> <CRLF>。<reverse-path>为发送者地址,此命令用来初始化邮件传输,即用来对所有的状态和缓冲区进行初始化。
RCPT TO:<forward-path> <CRLF>。 <forward-path>用来标志邮件接收者的地址,常用在MAIL FROM后,可以有多个RCPT TO。
DATA <CRLF>。将之后的数据作为数据发送,以<CRLF>.<CRLF>标志数据的结尾。
REST <CRLF>。重置会话,当前传输被取消。
NOOP <CRLF>。要求服务器返回OK应答,一般用作测试。
QUIT <CRLF>。结束会话。
VRFY <string> <CRLF>。验证指定的邮箱是否存在,由于安全方面的原因,服务器大多禁止此命令。
EXPN <string> <CRLF>。验证给定的邮箱列表是否存在,由于安全方面的原因,服务器大多禁止此命令。
HELP <CRLF>。查询服务器支持什么命令。

代码

forensicPCAP 可以根据解析PCAP包,然后利用CMD模块循环交互界面,输入命令调出对应函数。核心原理为

基于Scapy找到源端口或目的端口为110、143端口的数据记录保存作为邮件数据。

关键代码代码如下:

self.pcap是构造函数self.pcap = rdpcap(namefile)提前读取,然后enumerate()遍历。指定目标端口和源端口是110、143的数据筛选出来。

	def do_mail(self, arg, opts=None):
"""Print the number of mail's requests and store its
Usage :
- mail"""
sys.stdout.write(bcolors.TXT + "## Searching mail's request ... ")
sys.stdout.flush()
con = []
mailpkts = []
for i,packet in enumerate(self.pcap):
# TCP的包
if TCP in packet:
# 获取源端口或目的端口为110、143端口的数据记录
if packet.getlayer('TCP').dport == 110 or packet.getlayer('TCP').sport == 110 or packet.getlayer('TCP').dport == 143 or packet.getlayer('TCP').sport == 143 :
if packet.getlayer('TCP').flags == 2:
con.append(i)
mailpkts.append(packet)
sys.stdout.write("OK.\n")
print "## Result : Mail's request : " + str(len(con))
sys.stdout.write(bcolors.TXT + "## Saving mails ... ")
sys.stdout.flush()
res = ""
for packet in mailpkts:
if packet.getlayer('TCP').flags == 24:
res = res + packet.getlayer('Raw').load
sys.stdout.write(".")
sys.stdout.flush()
sys.stdout.write("OK\n")
sys.stdout.flush()
self.cmd = "mail"
self.last = res

0x4、DNS解析

forensicPCAP 解析DNS部分代码。bcolors.TXT是设定了一个字体显示的颜色,res = packet.getlayer('DNS').qd.qname是获取DNS域名解析记录。

############# do_dns() ###########
def do_dns(self, arg, opts=None):
"""Print all DNS requests in the PCAP file
Usage :
- dns"""
sys.stdout.write(bcolors.TXT + "## Listing all DNS requests ...")
sys.stdout.flush()
dns = []
dns.append([]) # 枚举PCAP包的数据
for i,packet in enumerate(self.pcap):
if DNS in packet:
# 获取DNS域名解析记录
res = packet.getlayer('DNS').qd.qname
if res[len(res) - 1] == '.':
res = res[:-1]
# 保存域名
dns.append([i, res]) sys.stdout.write("OK.\n")
# 统计DNS数目
print bcolors.TXT + "## Result : " + str(len(dns) - 1) + " DNS request(s)" + bcolors.ENDC
self.last = dns
self.cmd = "dns"

pcap-analyzer核心的代码是借鉴了forensicPCAP,获取解析为DNS的请求记录显示的时候会统计出次数最多的IP的前十名。

代码

def get_dns(file):
dns = []
# 打开PCAP文件
pcap = rdpcap(UPLOAD_FOLDER+file)
for packet in pcap:
if DNS in packet:
# 核心代码
res = packet.getlayer('DNS').qd.qname
if res[len(res) - 1] == '.':
res = res[:-1]
dns.append(res)
# 统计DNS协议,出现次数最多的IP前十名
dns = Counter(dns).most_common(10)

0x5、密码信息提取

net-creds 以Scapy为基础,解析PCAP中含有密码信息的一个脚本。

代码

主要代码由other_parser()函数实现,分割每个包中的HTTP等内容,然后搜索身份验证相关的关键字筛选出账户、密码。

def other_parser(src_ip_port, dst_ip_port, full_load, ack, seq, pkt, verbose):
'''
Pull out pertinent info from the parsed HTTP packet data
'''
user_passwd = None
http_url_req = None
method = None
http_methods = ['GET ', 'POST ', 'CONNECT ', 'TRACE ', 'TRACK ', 'PUT ', 'DELETE ', 'HEAD ']
http_line, header_lines, body = parse_http_load(full_load, http_methods)
headers = headers_to_dict(header_lines)
if 'host' in headers:
host = headers['host']
else:
host = '' if http_line != None:
method, path = parse_http_line(http_line, http_methods)
http_url_req = get_http_url(method, host, path, headers)
if http_url_req != None:
if verbose == False:
if len(http_url_req) > 98:
http_url_req = http_url_req[:99] + '...'
printer(src_ip_port, None, http_url_req) # Print search terms
searched = get_http_searches(http_url_req, body, host)
if searched:
printer(src_ip_port, dst_ip_port, searched) # Print user/pwds
if body != '':
user_passwd = get_login_pass(body)
if user_passwd != None:
try:
http_user = user_passwd[0].decode('utf8')
http_pass = user_passwd[1].decode('utf8')
# Set a limit on how long they can be prevent false+
if len(http_user) > 75 or len(http_pass) > 75:
return
user_msg = 'HTTP username: %s' % http_user
printer(src_ip_port, dst_ip_port, user_msg)
pass_msg = 'HTTP password: %s' % http_pass
printer(src_ip_port, dst_ip_port, pass_msg)
except UnicodeDecodeError:
pass # Print POST loads
# ocsp is a common SSL post load that's never interesting
if method == 'POST' and 'ocsp.' not in host:
try:
if verbose == False and len(body) > 99:
# If it can't decode to utf8 we're probably not interested in it
msg = 'POST load: %s...' % body[:99].encode('utf8')
else:
msg = 'POST load: %s' % body.encode('utf8')
printer(src_ip_port, None, msg)
except UnicodeDecodeError:
pass # Kerberos over TCP
decoded = Decode_Ip_Packet(str(pkt)[14:])
kerb_hash = ParseMSKerbv5TCP(decoded['data'][20:])
if kerb_hash:
printer(src_ip_port, dst_ip_port, kerb_hash) # Non-NETNTLM NTLM hashes (MSSQL, DCE-RPC,SMBv1/2,LDAP, MSSQL)
NTLMSSP2 = re.search(NTLMSSP2_re, full_load, re.DOTALL)
NTLMSSP3 = re.search(NTLMSSP3_re, full_load, re.DOTALL)
if NTLMSSP2:
parse_ntlm_chal(NTLMSSP2.group(), ack)
if NTLMSSP3:
ntlm_resp_found = parse_ntlm_resp(NTLMSSP3.group(), seq)
if ntlm_resp_found != None:
printer(src_ip_port, dst_ip_port, ntlm_resp_found) # Look for authentication headers
if len(headers) == 0:
authenticate_header = None
authorization_header = None
for header in headers:
authenticate_header = re.match(authenticate_re, header)
authorization_header = re.match(authorization_re, header)
if authenticate_header or authorization_header:
break if authorization_header or authenticate_header:
# NETNTLM
netntlm_found = parse_netntlm(authenticate_header, authorization_header, headers, ack, seq)
if netntlm_found != None:
printer(src_ip_port, dst_ip_port, netntlm_found) # Basic Auth
parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header)

关键字列表:

# Regexs
authenticate_re = '(www-|proxy-)?authenticate'
authorization_re = '(www-|proxy-)?authorization'
ftp_user_re = r'USER (.+)\r\n'
ftp_pw_re = r'PASS (.+)\r\n'
irc_user_re = r'NICK (.+?)((\r)?\n|\s)'
irc_pw_re = r'NS IDENTIFY (.+)'
irc_pw_re2 = 'nickserv :identify (.+)'
mail_auth_re = '(\d+ )?(auth|authenticate) (login|plain)'
mail_auth_re1 = '(\d+ )?login '
NTLMSSP2_re = 'NTLMSSP\x00\x02\x00\x00\x00.+'
NTLMSSP3_re = 'NTLMSSP\x00\x03\x00\x00\x00.+'
# Prone to false+ but prefer that to false-
http_search_re = '((search|query|&q|\?q|search\?p|searchterm|keywords|keyword|command|terms|keys|question|kwd|searchPhrase)=([^&][^&]*))'

0x6、提取数据需要关注的元素

  • 源IP、目的IP、源端口、目的端口、协议、数据包大小
关联出受害者,攻击者控制的跳板
  • 协议:HTTP、FTP、邮件协议
- 先查看HTTP、HTTPS类,然后查看数据包长度。
- 提取相关IP信息。
- 判断是否是木马远控、密码账户、可疑IP
  • 账户密码字段
- 渗透测试,嗅探回来的数据包分析含有账户密码信息
- 暴力破解IP

0x7、测试dpkt解析pcap

本想借助dpkt解析mail、dns、http来辅助分析pcap包进行分析,查阅资料学习却发现并不如使用scapy那么方便。

dpkt是一个python模块,可以对简单的数据包创建/解析,以及基本TCP / IP协议的解析,速度很快。

dpkt 手册

https://dpkt.readthedocs.io/en/latest/

dpkt 下载

https://pypi.org/project/dpkt/

看官方手册发现DPKT是读取每个pcap包里的内容,用isinstance判断是不是有IP的包,再判断是属于哪个协议,对应的协议已经封装好API如果发现可以匹配某个协议API就输出来相关值。

想要扩展这个源码还需要去学习一下协议相关的字段含义。

API调用:

https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.qq

在手册中找到了在Github中部分API的示例代码,具备参考价值。

https://github.com/jeffsilverm/dpkt_doc

手册例子

以下代码是手册中的例子,通过查询发现inet_pton无法直接使用,按照网络上的解决方法修改了一下。

打印数据包

使用DPKT读取pcap文件并打印出数据包的内容。打印出以太网帧和IP数据包中的字段。

python2测试代码:

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os def mac_addr(address):
"""Convert a MAC address to a readable/printable string Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % compat_ord(b) for b in address) class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)] if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows():
raise SystemError(
"Invalid platform. ctypes.windll must be available."
)
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family') def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family') if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1] # Adding our two functions to the socket library
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop def inet_to_str(inet):
return socket.inet_ntop(socket.AF_INET, inet) def print_packets(pcap):
"""Print out information about each packet in a pcap Args:
pcap: dpkt pcap reader object (dpkt.pcap.Reader)
"""
# packet num count
r_num = 0
# For each packet in the pcap process the contents
for timestamp, buf in pcap:
r_num=r_num+1
print ('packet num count :' , r_num )
# Print out the timestamp in UTC
print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp))) # Unpack the Ethernet frame (mac src/dst, ethertype)
eth = dpkt.ethernet.Ethernet(buf)
print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type) # Make sure the Ethernet data contains an IP packet
if not isinstance(eth.data, dpkt.ip.IP):
print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
continue # Now unpack the data within the Ethernet frame (the IP packet)
# Pulling out src, dst, length, fragment info, TTL, and Protocol
ip = eth.data # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
more_fragments = bool(ip.off & dpkt.ip.IP_MF)
fragment_offset = ip.off & dpkt.ip.IP_OFFMASK # Print out the info
print('IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)\n' % \
(inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset)) def test():
"""Open up a test pcap file and print out the packets"""
with open('pcap222.pcap', 'rb') as f:
pcap = dpkt.pcap.Reader(f)
print_packets(pcap) if __name__ == '__main__':
test()

输出:

('packet num count :', 4474)
('Timestamp: ', '2017-08-01 03:55:03.314832')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 211.90.25.31 (len=52 ttl=64 DF=1 MF=0 offset=0) ('packet num count :', 4475)
('Timestamp: ', '2017-08-01 03:55:03.485679')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 180.97.33.12 (len=114 ttl=64 DF=0 MF=0 offset=0) ('packet num count :', 4476)
('Timestamp: ', '2017-08-01 03:55:03.486141')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 119.75.222.122 (len=52 ttl=64 DF=1 MF=0 offset=0)

打印ICMP

检查ICMP数据包并显示ICMP内容。

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os def mac_addr(address):
"""Convert a MAC address to a readable/printable string Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % compat_ord(b) for b in address) class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)] if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows():
raise SystemError(
"Invalid platform. ctypes.windll must be available."
)
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family') def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family') if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1] # Adding our two functions to the socket library
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop def inet_to_str(inet):
return socket.inet_ntop(socket.AF_INET, inet) def print_icmp(pcap):
"""Print out information about each packet in a pcap Args:
pcap: dpkt pcap reader object (dpkt.pcap.Reader)
"""
# packet num count
r_num = 0
# For each packet in the pcap process the contents
for timestamp, buf in pcap:
r_num=r_num+1
print ('packet num count :' , r_num )
# Unpack the Ethernet frame (mac src/dst, ethertype)
eth = dpkt.ethernet.Ethernet(buf) # Make sure the Ethernet data contains an IP packet
if not isinstance(eth.data, dpkt.ip.IP):
print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
continue # Now grab the data within the Ethernet frame (the IP packet)
ip = eth.data # Now check if this is an ICMP packet
if isinstance(ip.data, dpkt.icmp.ICMP):
icmp = ip.data # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
more_fragments = bool(ip.off & dpkt.ip.IP_MF)
fragment_offset = ip.off & dpkt.ip.IP_OFFMASK # Print out the info
print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
print( 'Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
print( 'IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)' % \
(inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
print('ICMP: type:%d code:%d checksum:%d data: %s\n' % (icmp.type, icmp.code, icmp.sum, repr(icmp.data))) def test():
"""Open up a test pcap file and print out the packets"""
with open('pcap222.pcap', 'rb') as f:
pcap = dpkt.pcap.Reader(f)
print_icmp(pcap) if __name__ == '__main__':
test()

输出:

('packet num count :', 377)
('Timestamp: ', '2017-08-01 03:45:56.403640')
('Ethernet Frame: ', 'ec:88:8f:86:14:5c', '9c:5c:8e:76:bf:24', 2048)
IP: 202.118.168.73 -> 192.168.1.103 (len=56 ttl=253 DF=0 MF=0 offset=0)
ICMP: type:3 code:13 checksum:52074 data: Unreach(data=IP(len=28, id=2556, off=16384, ttl=61, p=6, sum=36831, src='\xc0\xa8\x01g', dst='\xcal\x17q', opts='', data='n\xb1\x00P\x85)=]'))

打印HTTP请求

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os def mac_addr(address):
"""Convert a MAC address to a readable/printable string Args:
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
Returns:
str: Printable/readable MAC address
"""
return ':'.join('%02x' % compat_ord(b) for b in address) class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)] if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows():
raise SystemError(
"Invalid platform. ctypes.windll must be available."
)
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family') def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family') if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1] # Adding our two functions to the socket library
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop def inet_to_str(inet):
return socket.inet_ntop(socket.AF_INET, inet) def print_http_requests(pcap):
"""Print out information about each packet in a pcap Args:
pcap: dpkt pcap reader object (dpkt.pcap.Reader)
"""
# packet num count
r_num = 0
# For each packet in the pcap process the contents
for timestamp, buf in pcap:
r_num=r_num+1
print ('packet num count :' , r_num )
# Unpack the Ethernet frame (mac src/dst, ethertype)
eth = dpkt.ethernet.Ethernet(buf) # Make sure the Ethernet data contains an IP packet
if not isinstance(eth.data, dpkt.ip.IP):
print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
continue # Now grab the data within the Ethernet frame (the IP packet)
ip = eth.data # Check for TCP in the transport layer
if isinstance(ip.data, dpkt.tcp.TCP): # Set the TCP data
tcp = ip.data # Now see if we can parse the contents as a HTTP request
try:
request = dpkt.http.Request(tcp.data)
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
continue # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
more_fragments = bool(ip.off & dpkt.ip.IP_MF)
fragment_offset = ip.off & dpkt.ip.IP_OFFMASK # Print out the info
print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
print('IP: %s -> %s (len=%d ttl=%d DF=%d MF=%d offset=%d)' %
(inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
print('HTTP request: %s\n' % repr(request)) # Check for Header spanning acrossed TCP segments
if not tcp.data.endswith(b'\r\n'):
print('\nHEADER TRUNCATED! Reassemble TCP segments!\n') def test():
"""Open up a test pcap file and print out the packets"""
with open('pcap222.pcap', 'rb') as f:
pcap = dpkt.pcap.Reader(f)
print_http_requests(pcap) if __name__ == '__main__':
test()

输出:

Timestamp:  2004-05-13 10:17:08.222534
Ethernet Frame: 00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 65.208.228.223 (len=519 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/download.html', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'www.ethereal.com', 'referer': 'http://www.ethereal.com/development.html'}, version='1.1', data='', method='GET') Timestamp: 2004-05-13 10:17:10.295515
Ethernet Frame: 00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 216.239.59.99 (len=761 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/pagead/ads?client=ca-pub-2309191948673629&random=1084443430285&lmt=1082467020&format=468x60_as&output=html&url=http%3A%2F%2Fwww.ethereal.com%2Fdownload.html&color_bg=FFFFFF&color_text=333333&color_link=000000&color_url=666633&color_border=666633', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'pagead2.googlesyndication.com', 'referer': 'http://www.ethereal.com/download.html'}, version='1.1', data='', method='GET') ...

打印出以太网IP

594 MB的pcap解析速度是127秒。

# coding=utf-8
import dpkt
import socket
import time
import ctypes
import os
import datetime # 测试dpkt获取IP运行时间
# 使用dpkt获取时间戳、源IP、目的IP class sockaddr(ctypes.Structure):
_fields_ = [("sa_family", ctypes.c_short),
("__pad1", ctypes.c_ushort),
("ipv4_addr", ctypes.c_byte * 4),
("ipv6_addr", ctypes.c_byte * 16),
("__pad2", ctypes.c_ulong)] if hasattr(ctypes, 'windll'):
WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
def not_windows():
raise SystemError(
"Invalid platform. ctypes.windll must be available."
)
WSAStringToAddressA = not_windows
WSAAddressToStringA = not_windows def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr)) if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError()) if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16) raise socket.error('unknown address family') def inet_ntop(address_family, packed_ip):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
ip_string = ctypes.create_string_buffer(128)
ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string)) if address_family == socket.AF_INET:
if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
elif address_family == socket.AF_INET6:
if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
raise socket.error('packed IP wrong length for inet_ntoa')
ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
else:
raise socket.error('unknown address family') if WSAAddressToStringA(
ctypes.byref(addr),
addr_size,
None,
ip_string,
ctypes.byref(ip_string_size)
) != 0:
raise socket.error(ctypes.FormatError()) return ip_string[:ip_string_size.value - 1] # Adding our two functions to the socket library
if os.name == 'nt':
socket.inet_pton = inet_pton
socket.inet_ntop = inet_ntop def inet_to_str(inet):
return socket.inet_ntop(socket.AF_INET, inet) def getip(pcap): Num = 0
for timestamp, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf) # 对没有IP段的包过滤掉
if eth.type != dpkt.ethernet.ETH_TYPE_IP:
continue ip = eth.data
ip_src = inet_to_str(ip.src)
ip_dst = inet_to_str(ip.dst)
# 打印时间戳,源->目标 #print(ts + " " + ip_src + "-->" + ip_dst)
Num= Num+1
print ('{0}\ttime:{1}\tsrc:{2}-->dst:{3} '.format(Num,timestamp,ip_src ,ip_dst))
if eth.data.__class__.__name__ == 'IP': ip = '%d.%d.%d.%d' % tuple(map(ord, list(eth.data.dst))) if eth.data.data.__class__.__name__ == 'TCP': if eth.data.data.dport == 80:
print eth.data.data.data # http 请求的数据 if __name__ == '__main__':
starttime = datetime.datetime.now()
f = open('pcap222.pcap', 'rb') # 要以rb方式打开,用r方式打开会报错
pcap = dpkt.pcap.Reader(f)
getip(pcap)
endtime = datetime.datetime.now()
print ('time : {0} seconds '.format((endtime - starttime).seconds))

输出:

1290064	time:1501562988.75	src:113.142.85.151-->dst:192.168.1.103
1290065 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151 1290066 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151 1290067 time:1501562988.75 src:113.142.85.151-->dst:192.168.1.103
1290068 time:1501562988.75 src:192.168.1.103-->dst:113.142.85.151 1290069 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151 1290070 time:1501562988.76 src:122.228.91.14-->dst:192.168.1.103
1290071 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151 1290072 time:1501562988.76 src:113.142.85.151-->dst:192.168.1.103
1290073 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151 1290074 time:1501562988.76 src:192.168.1.103-->dst:113.142.85.151
GET / HTTP/1.1
Accept: application/x-ms-application, image/jpeg, application/xaml+xml, image/gif, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */*
Accept-Language: zh-cn
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Mac_PowerPC; en) Opera 9.24
Referer: -
Connection: Keep-Alive
Host: win7.shangshai-qibao.cn

0x7、参考

SMTP协议分析 https://yq.aliyun.com/wenji/262429

scapy 解析pcap文件总结 https://www.cnblogs.com/14061216chen/p/8093441.html

python之字符串格式化(format) https://www.cnblogs.com/benric/p/4965224.html

Python解析Pcap包类源码学习的更多相关文章

  1. 【Java】【常用类】Object 基类 源码学习

    源码总览: 有好些都是native本地方法,背后是C++写的 没有关于构造器的描述,默认编译器提供的无参构造 https://blog.csdn.net/dmw412724/article/detai ...

  2. 【Java】【常用类】 Arrays工具类 源码学习

    虽然在数组的随笔中有说过,但实际上应该仔细深入一下源码进行分析 源码没有想象中的高大上,代码终究还是写给人看的,可读性大于执行性 最小阵列排序:1 乘 2的13次方 =  8192 学识浅薄,暂时还不 ...

  3. Java并发包源码学习系列:挂起与唤醒线程LockSupport工具类

    目录 LockSupport概述 park与unpark相关方法 中断演示 blocker的作用 测试无blocker 测试带blocker JDK提供的demo 总结 参考阅读 系列传送门: Jav ...

  4. Java并发包源码学习系列:JDK1.8的ConcurrentHashMap源码解析

    目录 为什么要使用ConcurrentHashMap? ConcurrentHashMap的结构特点 Java8之前 Java8之后 基本常量 重要成员变量 构造方法 tableSizeFor put ...

  5. Java并发包源码学习系列:阻塞队列实现之ArrayBlockingQueue源码解析

    目录 ArrayBlockingQueue概述 类图结构及重要字段 构造器 出队和入队操作 入队enqueue 出队dequeue 阻塞式操作 E take() 阻塞式获取 void put(E e) ...

  6. Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析

    目录 LinkedBlockingQueue概述 类图结构及重要字段 构造器 出队和入队操作 入队enqueue 出队dequeue 阻塞式操作 E take() 阻塞式获取 void put(E e ...

  7. Java并发包源码学习系列:阻塞队列实现之PriorityBlockingQueue源码解析

    目录 PriorityBlockingQueue概述 类图结构及重要字段 什么是二叉堆 堆的基本操作 向上调整void up(int u) 向下调整void down(int u) 构造器 扩容方法t ...

  8. Java并发包源码学习系列:阻塞队列实现之DelayQueue源码解析

    目录 DelayQueue概述 类图及重要字段 Delayed接口 Delayed元素案例 构造器 put take first = null 有什么用 总结 参考阅读 系列传送门: Java并发包源 ...

  9. Java并发包源码学习系列:阻塞队列实现之SynchronousQueue源码解析

    目录 SynchronousQueue概述 使用案例 类图结构 put与take方法 void put(E e) E take() Transfer 公平模式TransferQueue QNode t ...

随机推荐

  1. AspectJ获取方法注解的信息

    在使用Aspectj获取方法注解信息的时候,可以使用下面的代码片段: /** * Get value of annotated method parameter */ private <T ex ...

  2. PHP文件操作 读取与写入

    基本知识: PHP文件系统是基于Unix系统的 文件数据基本类型:二进制数据.文本数据 文件输入流:数据从源文件到内存的流动 文件输出流:数据从内存保存到文件的流动 文件操作函数: >>& ...

  3. POJ 2421 Constructing Roads (最小生成树)

    Constructing Roads 题目链接: http://acm.hust.edu.cn/vjudge/contest/124434#problem/D Description There ar ...

  4. printf参数的问题

    根据前面的某一篇的文章,可以清楚的看到:对于每一个函数,通过这个函数的[ebp+x]就可以直接访问到它调用的时候传进来的形参的值,通过[ebp-x]就可以直接访问它的局部变量. 所以printf这个函 ...

  5. WeMall的Android app商城中的wemall doraemon代码

    WeMall-Android 包含SMSSDK/WeMall-Client/social_sdk_library_project三个项目以及Api目录下的client.php/update.xml接口 ...

  6. IOS的控制器

    控制器简单来说,就是用来做界面跳转的,类似于 Android 的Intent 1.创建一个控制器 控制器的常见的创建方式 )通过storyboard创建 //直接创建 NJViewController ...

  7. Selenium-WebDriver框架常用基本操作

    1.基础元素定位的八种方法 WebDriver driver = new ChromeDriver(); WebElement element = new WebElement(); 1.1 By.i ...

  8. Java中CountDownLatch和CyclicBarrier的使用和比较

    CountDownLatch和CyclicBarrier同为Java1.5开始引入的,应用于多线程编程中的一种工具,二者用途十分相近,十分容易混淆. CountDownLatch CountDownL ...

  9. python信息收集之子域名

    python信息收集之子域名 主要是以下3种思路: 字典爆破 搜索引擎 第三方网站 0x00 背景知识 list Python内置的一种数据类型是列表:list是一种有序的集合. >>&g ...

  10. SQL记录-解锁和dbms_job操作

    创建JOB create or replace procedure proc_auto_exec_job as begin declare job number; BEGIN dbms_job.sub ...