python编写api调用ceph对象网

发布时间:2019-09-17 07:48:15编辑:auto阅读(1689)

    #_*_coding:utf-8_*_
    #yum install python-boto
    import boto
    import boto.s3.connection
    #pip install filechunkio
    from filechunkio import  FileChunkIO
    import math
    import  threading
    import os
    import Queue
    class Chunk(object):
        num = 0
        offset = 0
        len = 0
        def __init__(self,n,o,l):
            self.num=n
            self.offset=o
            self.length=l
    
    
    
    class CONNECTION(object):
        def __init__(self,access_key,secret_key,ip,port,is_secure=False,chrunksize=8<<20): #chunksize最小8M否则上传过程会报错
            self.conn=boto.connect_s3(
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            host=ip,port=port,
            is_secure=is_secure,
            calling_format=boto.s3.connection.OrdinaryCallingFormat()
            )
            self.chrunksize=chrunksize
            self.port=port
    
        #查询
        def list_all(self):
            all_buckets=self.conn.get_all_buckets()
            for bucket in all_buckets:
                print u'容器名: %s' %(bucket.name)
                for key in bucket.list():
                    print ' '*5,"%-20s%-20s%-20s%-40s%-20s" %(key.mode,key.owner.id,key.size,key.last_modified.split('.')[0],key.name)
    
        def list_single(self,bucket_name):
            try:
                single_bucket = self.conn.get_bucket(bucket_name)
            except Exception as e:
                print 'bucket %s is not exist' %bucket_name
                return
            print u'容器名: %s' % (single_bucket.name)
            for key in single_bucket.list():
                print ' ' * 5, "%-20s%-20s%-20s%-40s%-20s" % (key.mode, key.owner.id, key.size, key.last_modified.split('.')[0], key.name)
    
        #普通小文件下载:文件大小<=8M
        def dowload_file(self,filepath,key_name,bucket_name):
            all_bucket_name_list = [i.name for i in self.conn.get_all_buckets()]
            if bucket_name not in all_bucket_name_list:
                print 'Bucket %s is not exist,please try again' % (bucket_name)
                return
            else:
                bucket = self.conn.get_bucket(bucket_name)
    
            all_key_name_list = [i.name for i in bucket.get_all_keys()]
            if key_name not in all_key_name_list:
                print 'File %s is not exist,please try again' % (key_name)
                return
            else:
                key = bucket.get_key(key_name)
    
            if not os.path.exists(os.path.dirname(filepath)):
                print 'Filepath %s is not exists, sure to create and try again' % (filepath)
                return
    
            if os.path.exists(filepath):
                while True:
                    d_tag = raw_input('File %s already exists, sure you want to cover (Y/N)?' % (key_name)).strip()
                    if d_tag not in ['Y', 'N'] or len(d_tag) == 0:
                        continue
                    elif d_tag == 'Y':
                        os.remove(filepath)
                        break
                    elif d_tag == 'N':
                        return
            os.mknod(filepath)
            try:
                key.get_contents_to_filename(filepath)
            except Exception:
                pass
    
        # 普通小文件上传:文件大小<=8M
        def upload_file(self,filepath,key_name,bucket_name):
            try:
                bucket = self.conn.get_bucket(bucket_name)
            except Exception as e:
                print 'bucket %s is not exist' % bucket_name
                tag = raw_input('Do you want to create the bucket %s: (Y/N)?' % bucket_name).strip()
                while tag not in ['Y', 'N']:
                    tag = raw_input('Please input (Y/N)').strip()
                if tag == 'N':
                    return
                elif tag == 'Y':
                    self.conn.create_bucket(bucket_name)
                    bucket = self.conn.get_bucket(bucket_name)
            all_key_name_list = [i.name for i in bucket.get_all_keys()]
            if key_name in all_key_name_list:
                while True:
                    f_tag = raw_input(u'File already exists, sure you want to cover (Y/N)?: ').strip()
                    if f_tag not in ['Y', 'N'] or len(f_tag) == 0:
                        continue
                    elif f_tag == 'Y':
                        break
                    elif f_tag == 'N':
                        return
            key=bucket.new_key(key_name)
            if not os.path.exists(filepath):
                print 'File %s does not exist, please make sure you want to upload file path and try again' %(key_name)
                return
            try:
                f=file(filepath,'rb')
                data=f.read()
                key.set_contents_from_string(data)
            except Exception:
                pass
    
        def delete_file(self,key_name,bucket_name):
            all_bucket_name_list = [i.name for i in self.conn.get_all_buckets()]
            if bucket_name not in all_bucket_name_list:
                print 'Bucket %s is not exist,please try again' % (bucket_name)
                return
            else:
                bucket = self.conn.get_bucket(bucket_name)
    
            all_key_name_list = [i.name for i in bucket.get_all_keys()]
            if key_name not in all_key_name_list:
                print 'File %s is not exist,please try again' % (key_name)
                return
            else:
                key = bucket.get_key(key_name)
    
            try:
                bucket.delete_key(key.name)
            except Exception:
                pass
    
        def delete_bucket(self,bucket_name):
            all_bucket_name_list = [i.name for i in self.conn.get_all_buckets()]
            if bucket_name not in all_bucket_name_list:
                print 'Bucket %s is not exist,please try again' % (bucket_name)
                return
            else:
                bucket = self.conn.get_bucket(bucket_name)
    
            try:
                self.conn.delete_bucket(bucket.name)
            except Exception:
                pass
    
    
        #队列生成
        def init_queue(self,filesize,chunksize):   #8<<20 :8*2**20
            chunkcnt=int(math.ceil(filesize*1.0/chunksize))
            q=Queue.Queue(maxsize=chunkcnt)
            for i in range(0,chunkcnt):
                offset=chunksize*i
                length=min(chunksize,filesize-offset)
                c=Chunk(i+1,offset,length)
                q.put(c)
            return q
    
        #分片上传object
        def upload_trunk(self,filepath,mp,q,id):
            while not q.empty():
                chunk=q.get()
                fp=FileChunkIO(filepath,'r',offset=chunk.offset,bytes=chunk.length)
                mp.upload_part_from_file(fp,part_num=chunk.num)
                fp.close()
                q.task_done()
    
        #文件大小获取---->S3分片上传对象生成----->初始队列生成(--------------->文件切,生成切分对象)
        def upload_file_multipart(self,filepath,key_name,bucket_name,threadcnt=8):
            filesize=os.stat(filepath).st_size
            try:
                bucket=self.conn.get_bucket(bucket_name)
            except Exception as e:
                print 'bucket %s is not exist' % bucket_name
                tag=raw_input('Do you want to create the bucket %s: (Y/N)?' %bucket_name).strip()
                while tag not in ['Y','N']:
                    tag=raw_input('Please input (Y/N)').strip()
                if tag == 'N':
                    return
                elif tag == 'Y':
                    self.conn.create_bucket(bucket_name)
                    bucket = self.conn.get_bucket(bucket_name)
            all_key_name_list=[i.name for i in bucket.get_all_keys()]
            if key_name  in all_key_name_list:
                while True:
                    f_tag=raw_input(u'File already exists, sure you want to cover (Y/N)?: ').strip()
                    if f_tag not in ['Y','N'] or len(f_tag) == 0:
                        continue
                    elif f_tag == 'Y':
                        break
                    elif f_tag == 'N':
                        return
    
            mp=bucket.initiate_multipart_upload(key_name)
            q=self.init_queue(filesize,self.chrunksize)
            for i in range(0,threadcnt):
                t=threading.Thread(target=self.upload_trunk,args=(filepath,mp,q,i))
                t.setDaemon(True)
                t.start()
            q.join()
            mp.complete_upload()
    
        #文件分片下载
        def download_chrunk(self,filepath,key_name,bucket_name,q,id):
            while not q.empty():
                chrunk=q.get()
                offset=chrunk.offset
                length=chrunk.length
                bucket=self.conn.get_bucket(bucket_name)
                resp=bucket.connection.make_request('GET',bucket_name,key_name,headers={'Range':"bytes=%d-%d" %(offset,offset+length)})
                data=resp.read(length)
                fp=FileChunkIO(filepath,'r+',offset=chrunk.offset,bytes=chrunk.length)
                fp.write(data)
                fp.close()
                q.task_done()
    
        def download_file_multipart(self,filepath,key_name,bucket_name,threadcnt=8):
            all_bucket_name_list=[i.name for i in self.conn.get_all_buckets()]
            if bucket_name not in all_bucket_name_list:
                print 'Bucket %s is not exist,please try again' %(bucket_name)
                return
            else:
                bucket=self.conn.get_bucket(bucket_name)
    
            all_key_name_list = [i.name for i in bucket.get_all_keys()]
            if key_name not in all_key_name_list:
                print 'File %s is not exist,please try again' %(key_name)
                return
            else:
                key=bucket.get_key(key_name)
    
            if not os.path.exists(os.path.dirname(filepath)):
                print 'Filepath %s is not exists, sure to create and try again' % (filepath)
                return
    
            if os.path.exists(filepath):
                while True:
                    d_tag = raw_input('File %s already exists, sure you want to cover (Y/N)?' % (key_name)).strip()
                    if d_tag not in ['Y', 'N'] or len(d_tag) == 0:
                        continue
                    elif d_tag == 'Y':
                        os.remove(filepath)
                        break
                    elif d_tag == 'N':
                        return
            os.mknod(filepath)
            filesize=key.size
            q=self.init_queue(filesize,self.chrunksize)
            for i in range(0,threadcnt):
                t=threading.Thread(target=self.download_chrunk,args=(filepath,key_name,bucket_name,q,i))
                t.setDaemon(True)
                t.start()
            q.join()
    
        def generate_object_download_urls(self,key_name,bucket_name,valid_time=0):
            all_bucket_name_list = [i.name for i in self.conn.get_all_buckets()]
            if bucket_name not in all_bucket_name_list:
                print 'Bucket %s is not exist,please try again' % (bucket_name)
                return
            else:
                bucket = self.conn.get_bucket(bucket_name)
    
            all_key_name_list = [i.name for i in bucket.get_all_keys()]
            if key_name not in all_key_name_list:
                print 'File %s is not exist,please try again' % (key_name)
                return
            else:
                key = bucket.get_key(key_name)
    
            try:
                key.set_canned_acl('public-read')
                download_url = key.generate_url(valid_time, query_auth=False, force_http=True)
                if self.port != 80:
                    x1=download_url.split('/')[0:3]
                    x2=download_url.split('/')[3:]
                    s1=u'/'.join(x1)
                    s2=u'/'.join(x2)
    
                    s3=':%s/' %(str(self.port))
                    download_url=s1+s3+s2
                    print download_url
    
            except Exception:
                pass
    
    
    
    if __name__ == '__main__':
        #约定:
        #1:filepath指本地文件的路径(上传路径or下载路径),指的是绝对路径
        #2:bucket_name相当于文件在对象存储中的目录名或者索引名
        #3:key_name相当于文件在对象存储中对应的文件名或文件索引
    
        access_key = "65IY4EC1BSFYNH6SHWGW"
        secret_key = "viNfIftLHhrPt2MYK44DkWGvxZb82aYqLrCzGYLx"
        ip='172.16.201.36'
        port=8080
        conn=CONNECTION(access_key,secret_key,ip,port)
        #查看所有bucket以及其包含的文件
        #conn.list_all()
    
        #简单上传,用于文件大小<=8M
        # conn.upload_file('/etc/passwd','passwd','test_bucket01')
        #查看单一bucket下所包含的文件信息
        # conn.list_single('test_bucket01')
    
    
        #简单下载,用于文件大小<=8M
        # conn.dowload_file('/lhf_test/test01','passwd','test_bucket01')
        # conn.list_single('test_bucket01')
    
        #删除文件
        # conn.delete_file('passwd','test_bucket01')
        # conn.list_single('test_bucket01')
        #
        #删除bucket
        # conn.delete_bucket('test_bucket01')
        # conn.list_all()
    
        #切片上传(多线程),用于文件大小>8M,8M可修改,但不能小于8M,否则会报错切片太小
        # conn.upload_file_multipart('/etc/passwd','passwd_multi_upload','test_bucket01')
        # conn.list_single('test_bucket01')
    
        # 切片下载(多线程),用于文件大小>8M,8M可修改,但不能小于8M,否则会报错切片太小
        # conn.download_file_multipart('/lhf_test/passwd_multi_dowload','passwd_multi_upload','test_bucket01')
    
        #生成下载url
        #conn.generate_object_download_urls('passwd_multi_upload','test_bucket01')
        #conn.list_all()


关键字