Python 实现批量查询IP并解析为归

发布时间:2019-05-09 22:13:37编辑:auto阅读(2386)

    一、背景:
    最近工作中做了一个小功能,目的是为了分析注册用户区域分布和订单的区域分布情况。所以需要将其对应的IP信息解析为归属地,并同步每天同步更新。
    线上跑起来效率还是有优化的空间,优化的方向:在调用IP查询API过程可以调整为多线程并行解析IP。后续会更新这方便的调整。
    技术: Pyhton3
    postgreSQL
    env配置文件

    附加信息:iP地址查询(iP138官方企业版):https://market.aliyun.com/products/56928004/cmapi015606.html#sku=yuncode960600002
    .可提供免费的IP查询API.

    二、实现思路: 1、 读取数据库IP信息
    2、 调用第三方IP解析API进行解析
    3、 将解析归属地信息存入数据库
    三、几点说明: 1、环境信息等参数配置
    2、日志输出
    3、异常处理: 数据库连接异常
    请求连接查询IP的URL异常:HTTP ERROR 503
    4、json,字典,数组等类型数据输入输出
    5、分页查询并批量解析
    5.功能实现很简单,所以就没有做详细的介绍了。详情可直接看完整代码,有详细的备注。

    四、步骤简单介绍:
    针对实现思路的3个步骤写了3个函数,彼此调用执行。
    函数:
    def get_ip_info(table_name):
    def get_ip_area(table_name):
    def ip_write_db(table_name):
    调用:
    ip_write_db("h_user_stat")

    五、关键代码说明:
    语法:urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
     # 对从数据库表中出查询的IP进行解析
           querys = 'callback&datatype=jsonp&ip=' + get_ip
           bodys = {}
           url = host + path + '?' + querys
           request = urllib.request.Request(url)
           request.add_header('Authorization', 'APPCODE ' + appcode)
    
           # 连接url时可能会出现 ERROR: HTTP Error 503: Service Unavailable
           try: 
             response = urllib.request.urlopen(request)
           except Exception as e:
             logging.error(e) # 输出异常日志信息 
             time.sleep(5)
             response = urllib.request.urlopen(request)
           finally:
             content = response.read()
             ip_area = content.decode('utf8')
             ip_area = json.loads(ip_area)['data'] # json类型转字典类型并取'data'健值
             arr.append([get_ip, ip_area]) # 将结果集存于二元数组
    
    说明:从数据库分页查询固定数量的IP存入数组,并遍历该数组并将解析后的地区信息data健值存于二元数组中。
    
    

     

     
    六、Python代码实现如下:

      1 # 导入psycopg2包
      2 import psycopg2, time,datetime,sys
      3 import json
      4 import urllib, urllib.request
      5 import os
      6 import configparser
      7 import logging
      8                      # purpose: 连接数据库读取表IP信息
      9 def get_ip_info(table_name):
     10     # 全局变量作用局部作用域
     11     global pagesize # 每页查询数据条数
     12     global rows_count
     13 
     14     # 测试1
     15     starttime_1 = time.time()
     16     # 建立游标,用来执行数据库操作
     17     cur = conn.cursor()
     18     # 执行SQL命令
     19     cur.execute("SELECT remote_ip FROM (select remote_ip,min(created_at) from " + table_name + " group by remote_ip) h1 where remote_ip is not null and remote_ip <> '' and  not exists (select 1 from d_ip_area_mapping h2 where h1.remote_ip = h2.remote_ip) limit " + str(pagesize) + ";")
     20 
     21 
     22     # 获取结果集条数
     23     rows_count = cur.rowcount
     24    
     25     # print('解析用户IP的总数:' + str(rows_count))
     26 
     27      # 当有未解析的用户的IP,返回元组,否则退出程序
     28     if rows_count > 0:
     29       # 获取SELECT返回的元组
     30       rows =  cur.fetchall()        # all rows in table
     31 
     32       for row in rows:
     33           tuple = rows
     34 
     35       conn.commit()
     36       # 关闭游标
     37       cur.close()
     38 
     39     else:
     40         tuple = []
     41     logging.info('每页查询秒数:' + str(time.time() - starttime_1))
     42     return tuple
     43     # 调用解析函数
     44 
     45 
     46 def get_ip_area(table_name):
     47   # 内包含用户ID和IP的数组的元组
     48   tuple = get_ip_info(table_name)  
     49 
     50   # 测试2
     51   starttime_2 = time.time()
     52   host = 'http://ali.ip138.com'
     53   path = '/ip/'
     54   method = 'GET'
     55   appcode = '917058e6d7c84104b7cab9819de54b6e'
     56   arr = []
     57   for row in tuple:
     58 
     59        get_ip = row[0]
     60        #get_user = "".join(str(row))
     61        #get_user = row[0]
     62 
     63             # 对从数据库表中出查询的IP进行解析
     64        querys = 'callback&datatype=jsonp&ip=' + get_ip
     65        bodys = {}
     66        url = host + path + '?' + querys
     67        request = urllib.request.Request(url)
     68        request.add_header('Authorization', 'APPCODE ' + appcode)
     69 
     70        # 连接url时可能会出现 ERROR: HTTP Error 503: Service Unavailable
     71        try: 
     72          response = urllib.request.urlopen(request)
     73        except Exception as e:
     74          logging.error(e) # 输出异常日志信息 
     75          time.sleep(5)
     76          response = urllib.request.urlopen(request)
     77        finally:
     78          content = response.read()
     79          ip_area = content.decode('utf8')
     80          ip_area = json.loads(ip_area)['data'] # json类型转字典类型并取'data'健值
     81          arr.append([get_ip, ip_area]) # 将结果集存于二元数组
     82   logging.info('每页解析秒数:' + str(time.time() - starttime_2))
     83   return  arr
     84 
     85 
     86 def ip_write_db(table_name):
     87    
     88     write_ip = get_ip_area(table_name)  # 内包含用户ID和IP的数组的元组
     89 
     90 
     91     # 测试1
     92     starttime_3 = time.time()
     93 
     94      # 建立游标,用来执行数据库操作
     95     cur = conn.cursor()
     96     for row in write_ip:
     97         # get_user = row[0]  # 获取用户ID
     98         get_ip = row[0]  # 获取用户对应的IP
     99         country = row[1][0]  # 获取IP解析后的地区:国家
    100         province = row[1][1]  # 获取IP解析后的地区:省
    101         city = row[1][2]  # 获取IP解析后的地区:市
    102         isp = row[1][3]  # 获取IP解析后的服务提供商
    103 
    104         # 执行SQL命令
    105         sql = "insert into d_ip_area_mapping(remote_ip,country,province,city,isp,created_at,updated_at,job_id) values (%s,%s,%s,%s,%s,%s,%s,%s);"
    106         val = [get_ip, country, province, city, isp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
    107                time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),time.strftime("%Y-%m-%d",time.localtime())]
    108         
    109         cur.execute(sql, val)
    110         conn.commit()    
    111     # 关闭游标
    112     cur.close()
    113     logging.info('每页插入秒数:' + str(time.time() - starttime_3))
    114 
    115 
    116 # 1.程序开始执行计时
    117 starttime = time.time()
    118 
    119 
    120      # 读取配置文件环境信息 
    121 
    122 # 项目路径
    123 rootDir = os.path.split(os.path.realpath(__file__))[0]
    124 
    125 
    126 ############################### config.env文件路径  #############################################################
    127 
    128 configFilePath = os.path.join(rootDir, 'db_udw.env')
    129 config = configparser.ConfigParser()
    130 config.read(configFilePath)
    131 
    132 # 读取数据库环境信息
    133 db_database = config.get('postgresql','database')
    134 db_user = config.get('postgresql','user')
    135 db_password = config.get('postgresql','password')
    136 db_host = config.get('postgresql','host')
    137 db_port = config.get('postgresql','port')
    138 
    139 # 读取输出日志路径
    140 log = config.get('log','log_path')
    141 
    142 # 每页查询数据条数
    143 pagesize = config.get('page','pagesize') 
    144 
    145 # 读取解析IP条数限制
    146 ip_num_limit = config.get('ip_num','ip_num_limit') 
    147 
    148 # 配置输出日志格式
    149 logging.basicConfig(level=logging.DEBUG,#控制台打印的日志级别
    150                       filename='{my_log_path}/ip_analyzer.log'.format(my_log_path=log),  # 指定日志文件及路径
    151                       filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志 #a是追加模式,默认如果不写的话,就是追加模式
    152                       format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'#日志格式
    153                       )
    154 
    155 ###############################   程序开始执行  #############################################################
    156 try:
    157 
    158   # 连接到一个给定的数据库
    159   conn = psycopg2.connect(database=db_database, user=db_user, password=db_password, host=db_host, port=db_port)
    160 except Exception as e:
    161   logging.error(e) # 输出连接异常日志信息
    162 
    163 # 返回查询行数 默认为0
    164 rows_count = 0
    165  # 用户表IP解析总数
    166 user_ip_num = 0 
    167  # 订单表IP解析总数
    168 order_ip_num = 0 
    169 
    170 
    171 
    172 try:
    173 
    174   # 解析用户表注册IP信息
    175   while user_ip_num <= eval(ip_num_limit):
    176      i = 1  # 循环次数
    177      ip_write_db("h_user_stat")
    178      user_ip_num = user_ip_num + rows_count*i
    179      i  = i + 1
    180      if rows_count == 0 :
    181          break
    182 
    183   # 解析订单表下单IP信息 
    184   while user_ip_num <= eval(ip_num_limit):
    185       # 解析用户表注册IP信息
    186       i = 1  # 循环次数
    187       ip_write_db("h_order")
    188       order_ip_num = order_ip_num + rows_count*i
    189       i = i + 1
    190       if rows_count == 0 :
    191          break
    192 except Exception as e:
    193   logging.error(e) # 输出异常日志信息
    194 finally:
    195   # 关闭数据库连接
    196   conn.close()
    197 
    198 # 2 程序结束执行计时
    199   endtime = time.time()
    200   
    201   # print('解析用户IP的总数:' + str(user_ip_num))
    202   # print('解析订单IP的总数:' + str(order_ip_num))
    203   # # 打印程序执行总耗时
    204   # print('解析总耗时秒数:' + str(endtime - starttime))
    205   logging.info('解析用户IP的总数:' + str(user_ip_num))
    206   logging.info('解析订单IP的总数:' + str(order_ip_num))
    207   logging.info('解析总耗时秒数:' + str(endtime - starttime))


     环境配置db_udw.envdb_udw.env 如下:

    # 数据库环境信息
    [postgresql]
    database = ahaschool_udw
    user = admin
    password = 123456
    host = 127.0.0.0
    port = 5432
    
    # 设置日志文件路径
    [log]
    log_path = /home/hjmrunning/bi_etl_product/scripts/log
    
    # 每页查询数据条数
    [page]
    pagesize = 1000  
    
    # IP解析条数限制
    [ip_num]
    ip_num_limit = 150000
    

    最后

        我接触Python时间也不是很久,实现方法可能会有疏漏。如果有什么疑问和见解,欢迎评论区交流。

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

     

关键字