Easticsearch 数据迁移至in

发布时间:2019-08-31 09:47:05编辑:auto阅读(1775)

    Easticsearch 数据迁移至influxdb python


    需求:将Easticsearch部分数据迁移至influxdb中。


    见过从mysql,influxdb迁移至Easticsearch中的,没见过从Easticsearch迁移至influxdb中,迁移的数据是一些实时性的流量数据,influxdb时序性数据库对这类数据的支撑比较客观。


    解决方案:大批量从Easticsearch取数据,两种方案。1.from...size    2.scroll (类似于数据库的游标)  脚本采用第二种scroll方案对Easticsearch 查询取数据。循环通过scrool_id进行查询并写入influxdb中。


    #!/usr/bin/env python
    #coding=utf-8
    
    import sys
    import json
    import datetime
    import elasticsearch
    from influxdb import InfluxDBClient
    
    #连接Easticsearch
    class ES(object):
        @classmethod
        def connect_host(cls):
            url = "http://192.168.121.33:9202/"
            es = elasticsearch.Elasticsearch(url,timeout=120)
            return es
    es = ES.connect_host()
    
    #连接influxdb
    client = InfluxDBClient(host="192.168.121.33", port="8086", username='admin', password='admin', database='esl')
    client.create_database('esl')
    
    #DSL查询语法
    data = {
        "query": { "match_all" : {}},
        "size": 100
    }
    
    # 设置要过滤返回的字段值,要什么字段。
        'hits.hits._source.resource_id',
        'hits.hits._source.timestamp',
        'hits.hits._source.counter_volume',
        'hits.hits._source.@timestamp',
    ]
    
    # 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用
    res = es.search(
        index='pipefilter_meters*',
        doc_type ='canaledge.flow.bytes',
        body=data,
        search_type="scan",
        scroll="10m"
    )
    scroll_id = res['_scroll_id']
    
    response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,)
    scroll_id = response['_scroll_id']   #获取第二次scroll_id
    hits = response['hits']['hits'] 
    in_data = []
    
    while len(hits) > 0:
        for i in hits:
            res_id = i['_source']['resource_id']
            r_id, r_type = res_id.split(':')
            datas = {
                "measurement": "es_net",
                "tags": {
                     "resource_id": r_id,
                     "type": r_type
                 },
                "time": i['_source']['timestamp'],
                "fields": {
                    "counter_volume": i['_source']['counter_volume']
                }
            }
            in_data.append(datas)
        #循环写入influxdb
        client.write_points(in_data)
        in_data = []   #每次循环完重新定义列表为空
    
        data = {
            "query": { "match_all" : {}},
            "size": 100
        }
        ## 设置要过滤返回的字段值,要什么字段。
            '_scroll_id',
            'hits.hits._source.resource_id',
            'hits.hits._source.timestamp',
            'hits.hits._source.counter_volume',
            'hits.hits._source.@timestamp',
        ]
    
        ## 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用
        response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,)
        #调试
        #if not response.get('hits'):
        #    print response
        #    sys.exit(1)
        #else:
        
        hits = response['hits']['hits']
        scroll_id = response["_scroll_id"] #获取第三次scroll_id


关键字