python3通过纯真IP数据库查询IP

发布时间:2019-05-30 21:09:49编辑:auto阅读(2516)

    在网上看到的别人写的python2的代码,修改成了python3。

    把纯真IP数据库文件qqwry.dat放到czip.py同一目录下。

      1 #! /usr/bin/env python
      2 # -*- coding: utf-8 -*-
      3 # filename: czip.py
      4 
      5 
      6 import socket
      7 import struct
      8 
      9 
     10 class CzIp:
     11     def __init__(self, db_file='qqwry.dat'):
     12         self.f_db = open(db_file, "rb")
     13         bs = self.f_db.read(8)
     14         (self.first_index, self.last_index) = struct.unpack('II', bs)
     15         self.index_count = int((self.last_index - self.first_index) / 7 + 1)
     16         self.cur_start_ip = None
     17         self.cur_end_ip_offset = None
     18         self.cur_end_ip = None
     19         # print(self.get_version(), " 纪录总数: %d 条 "%(self.index_count))
     20 
     21     def get_version(self):
     22         '''
     23         获取版本信息,最后一条IP记录 255.255.255.0-255.255.255.255 是版本信息
     24         :return: str
     25         '''
     26         s = self.get_addr_by_ip(0xffffff00)
     27         return s
     28 
     29     def _get_area_addr(self, offset=0):
     30         if offset:
     31             self.f_db.seek(offset)
     32         bs = self.f_db.read(1)
     33         (byte,) = struct.unpack('B', bs)
     34         if byte == 0x01 or byte == 0x02:
     35             p = self.getLong3()
     36             if p:
     37                 return self.get_offset_string(p)
     38             else:
     39                 return ""
     40         else:
     41             self.f_db.seek(-1, 1)
     42             return self.get_offset_string(offset)
     43 
     44     def _get_addr(self, offset):
     45         '''
     46         获取offset处记录区地址信息(包含国家和地区)
     47         如果是中国ip,则是 "xx省xx市 xxxxx地区" 这样的形式
     48         (比如:"福建省 电信", "澳大利亚 墨尔本Goldenit有限公司")
     49         :param offset:
     50         :return:str
     51         '''
     52         self.f_db.seek(offset + 4)
     53         bs = self.f_db.read(1)
     54         (byte,) = struct.unpack('B', bs)
     55         if byte == 0x01:    # 重定向模式1
     56             country_offset = self.getLong3()
     57             self.f_db.seek(country_offset)
     58             bs = self.f_db.read(1)
     59             (b,) = struct.unpack('B', bs)
     60             if b == 0x02:   
     61                 country_addr = self.get_offset_string(self.getLong3())
     62                 self.f_db.seek(country_offset + 4)
     63             else:   
     64                 country_addr = self.get_offset_string(country_offset)
     65             area_addr = self._get_area_addr()
     66         elif byte == 0x02:  # 重定向模式2
     67             country_addr = self.get_offset_string(self.getLong3())
     68             area_addr = self._get_area_addr(offset + 8)
     69         else:   # 字符串模式
     70             country_addr = self.get_offset_string(offset + 4)
     71             area_addr = self._get_area_addr()
     72         return country_addr + " " + area_addr
     73 
     74     def dump(self, first, last):
     75         '''
     76         打印数据库中索引为first到索引为last(不包含last)的记录
     77         :param first:
     78         :param last:
     79         :return:
     80         '''
     81         if last > self.index_count:
     82             last = self.index_count
     83         for index in range(first, last):
     84             offset = self.first_index + index * 7
     85             self.f_db.seek(offset)
     86             buf = self.f_db.read(7)
     87             (ip, of1, of2) = struct.unpack("IHB", buf)
     88             address = self._get_addr(of1 + (of2 << 16))
     89             print("%d %s %s" % (index, self.ip2str(ip), address))
     90 
     91     def _set_ip_range(self, index):
     92         offset = self.first_index + index * 7
     93         self.f_db.seek(offset)
     94         buf = self.f_db.read(7)
     95         (self.cur_start_ip, of1, of2) = struct.unpack("IHB", buf)
     96         self.cur_end_ip_offset = of1 + (of2 << 16)
     97         self.f_db.seek(self.cur_end_ip_offset)
     98         buf = self.f_db.read(4)
     99         (self.cur_end_ip,) = struct.unpack("I", buf)
    100 
    101     def get_addr_by_ip(self, ip):
    102         '''
    103         通过ip查找其地址
    104         :param ip: (int or str)
    105         :return: str
    106         '''
    107         if type(ip) == str:
    108             ip = self.str2ip(ip)
    109         L = 0
    110         R = self.index_count - 1
    111         while L < R - 1:
    112             M = int((L + R) / 2)
    113             self._set_ip_range(M)
    114             if ip == self.cur_start_ip:
    115                 L = M
    116                 break
    117             if ip > self.cur_start_ip:
    118                 L = M
    119             else:
    120                 R = M
    121         self._set_ip_range(L)
    122         # version information, 255.255.255.X, urgy but useful
    123         if ip & 0xffffff00 == 0xffffff00:
    124             self._set_ip_range(R)
    125         if self.cur_start_ip <= ip <= self.cur_end_ip:
    126             address = self._get_addr(self.cur_end_ip_offset)
    127         else:
    128             address = "未找到该IP的地址"
    129         return address
    130 
    131     def get_ip_range(self, ip):
    132         '''
    133         返回ip所在记录的IP段
    134         :param ip: ip(str or int)
    135         :return: str
    136         '''
    137         if type(ip) == str:
    138             ip = self.str2ip(ip)
    139         self.get_addr_by_ip(ip)
    140         range = self.ip2str(self.cur_start_ip) + ' - ' \
    141                 + self.ip2str(self.cur_end_ip)
    142         return range
    143 
    144     def get_offset_string(self, offset=0):
    145         '''
    146         获取文件偏移处的字符串(以'\0'结尾)
    147         :param offset: 偏移
    148         :return: str
    149         '''
    150         if offset:
    151             self.f_db.seek(offset)
    152         bs = b''
    153         ch = self.f_db.read(1)
    154         (byte,) = struct.unpack('B', ch)
    155         while byte != 0:
    156             bs += ch
    157             ch = self.f_db.read(1)
    158             (byte,) = struct.unpack('B', ch)
    159         return bs.decode('gbk')
    160 
    161     def ip2str(self, ip):
    162         '''
    163         整数IP转化为IP字符串
    164         :param ip:
    165         :return:
    166         '''
    167         return str(ip >> 24) + '.' + str((ip >> 16) & 0xff) + '.' + str((ip >> 8) & 0xff) + '.' + str(ip & 0xff)
    168 
    169     def str2ip(self, s):
    170         '''
    171         IP字符串转换为整数IP
    172         :param s:
    173         :return:
    174         '''
    175         (ip,) = struct.unpack('I', socket.inet_aton(s))
    176         return ((ip >> 24) & 0xff) | ((ip & 0xff) << 24) | ((ip >> 8) & 0xff00) | ((ip & 0xff00) << 8)
    177 
    178     def getLong3(self, offset=0):
    179         '''
    180         3字节的数值
    181         :param offset:
    182         :return:
    183         '''
    184         if offset:
    185             self.f_db.seek(offset)
    186         bs = self.f_db.read(3)
    187         (a, b) = struct.unpack('HB', bs)
    188         return (b << 16) + a
    189 
    190 
    191 
    192 if __name__ == '__main__':
    193     cz = CzIp()
    194     print(cz.get_version())
    195     ip = '14.215.177.39'
    196     print(cz.get_ip_range(ip))
    197     print(cz.get_addr_by_ip(ip))

    运行结果:

     

关键字