Python与ZooKeeper集群连接

发布时间:2019-09-09 08:43:12编辑:auto阅读(2189)

    由于项目的需要,需要学习Python客户端连接ZooKeeper集群,并实现创建临时节点、获得指定的路径下的信息、监听子节点变化的功能。

    环境配置

    ZooKeeper集群的安装可以参考http://blog.csdn.net/mrbcy/article/details/54767484

    使用下面的命令安装kazoo

    pip install kazoo
    

    基本使用

    这一部分可参考官方文档:http://kazoo.readthedocs.io/en/latest/basic_usage.htm

    监听子节点变化

    下面的代码实现了创建一个临时、顺序的节点,并且可以监听子节点的变化。

    #-*- coding: utf-8 -*-
    import time
    from kazoo.client import KazooClient
    from kazoo.recipe.watchers import ChildrenWatch
    
    
    
    
    class ValidatorDetector:
    
        def __init__(self):
            self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
            self.validator_children_watcher = ChildrenWatch(client=self.zk,path='/mproxy/validators',func=self.validator_watcher_fun)
            self.zk.start()
    
        def validator_watcher_fun(self,children):
            print "The children now are:", children
    
        def create_node(self):
            self.zk.create('/mproxy/validators/validator',b'validator_huabei_1',ephemeral=True,sequence=True,makepath=True)
    
        def __del__(self):
            self.zk.close()
    
    
    
    
    
    if __name__ == '__main__':
        detector = ValidatorDetector()
        detector.create_node()
        time.sleep(10)
    

    ZooKeeper原生提供了监听节点变化及值的变化的API。关于这一部分可以参考http://blog.csdn.net/mrbcy/article/details/54790758。但是这些API只能生效一次,一旦被触发过一次以后就不会再触发了,除非再次注册。而kazoo则在这个基础上封装了更上层的API,可以持续的触发。这就是上面的ChildrenWatch,除此之外kazoo还封装了一个DataWatch,用于监听数据的变化。下面我们也会用到。

    kazoo还实现了自动续订功能,使得在会话过期后我们不需要再次初始化ZooKeeper客户端(这里可以参考http://blog.csdn.net/mrbcy/article/details/55062713),也是非常方便的。

    注册验证器

    有了上面的知识就可以做一个注册类和一个监测类了。

    #-*- coding: utf-8 -*-
    import threading
    import time
    from kazoo.client import KazooClient
    from kazoo.protocol.states import KazooState
    
    class InfoKeeper(threading.Thread):
        def __init__(self,register):
            threading.Thread.__init__(self)
            self.register=register
    
        def run(self):
            time.sleep(0.25)
            if self.register.zk_node is None:
                print "create method has not been called"
                return
            check_result = self.register.zk.exists(self.register.validator_path)
            if check_result is None:
                # redo the regist
                print "redo the regist"
                self.register.regist()
            else:
                print "the path remain exists"
    
    class ValidatorRegister:
        def __init__(self):
            self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
            self.zk_node = None
            self.validator_path = '/mproxy/validators/'
            self.zk.add_listener(self.conn_state_watcher)
            self.zk.start()
    
    
        def __del__(self):
            self.zk.close()
    
        def regist(self):
            self.zk_node = self.zk.create(self.validator_path + 'validator',bytes('validator_huabei_1'),ephemeral=True,sequence=True,makepath=True)
    
        def close(self):
            self.zk.stop()
            self.zk.close()
    
        def conn_state_watcher(self, state):
            if state == KazooState.CONNECTED:
                print "Now connected"
    
                if self.zk_node is None:
                    print "create method has not been called"
                    return
                info_keeper = InfoKeeper(self)
                info_keeper.start()
            elif state == KazooState.LOST:
                print "Now lost"
            else:
                print "Now suspended"
    

    监测类:

    #-*- coding: utf-8 -*-
    import time
    from kazoo.client import KazooClient
    from kazoo.recipe.watchers import ChildrenWatch
    
    
    
    
    class ValidatorDetector:
    
        def __init__(self):
            self.validator_path = '/mproxy/validators/'
            self.zk = KazooClient(hosts='amaster:2181,anode1:2181,anode2:2181')
            self.validator_children_watcher = ChildrenWatch(client=self.zk,path=self.validator_path,func=self.validator_watcher_fun)
            self.zk.start()
    
        def validator_watcher_fun(self,children):
            for child in children:
                validator_name = self.zk.get(path=self.validator_path + str(child))
                print validator_name[0]
            print "The children now are:", children
    
    
        def __del__(self):
            self.zk.close()
    

    注册类这里稍微复杂了一点,做了一个在会话过期后重新注册的机制,如果会话过期,重新注册之前的注册信息。

    监听子节点值的变化

    嗯,这个需求仔细想过后可以通过监听子节点的变化来代替,所以暂时不实现了。

关键字