首页 > 其他分享 >cloudstack的重新封装--api调用

cloudstack的重新封装--api调用

时间:2023-06-15 18:33:05浏览次数:60  
标签:username LOG api -- self args url cloudstack response

使用python将cloudstack的多个功能进行重新封装形成api调用。




# coding=utf-8
# !/usr/bin/env python

import base64
import hmac
import json
import requests
import sys
import time
import urllib
import re
import hashlib
import logging

LOG = logging.getLogger(__name__)
verbose = 1  # CLOUDSTACK_API_VERBOSE


FOREVER_BLOCK_LIST = ["deleteuser", "deleteproject"]
COMMAND_BLACK_LIST = ["destroyvirtualmachine", "deletevolume", "disassociateipaddress", "disassociateipv6address"]
FUNC_WHITE_LIST = ["rollback_resource", "roll_back_wo", "clean_resource_task"]
SCRIPT_WHITE_LIST = ["cleanres"]

CLOUDSTACK_API_DEBUGGING = True
RIGHT_RESP = ["restorevmresponse", "addiptovmnicresponse", "restorevmresponse", "cloudhighwayresponse",
              "listloadbalancerssresponse", "listpublicipaddressesresponse", 'clouddirectconnectresponse',
              'CloudDirectConnectResponse', "queryDisklimitresponse", "confineiprateresponse"]

class SignedAPICall(object):
    def __init__(self, api_url, apiKey="", secret=""):
        self.api_url = api_url
        self.apiKey = apiKey
        self.secret = secret

    def request(self, args):
        if "sessionkey" not in args:
            args['apiKey'] = self.apiKey
        self.params = []
        self.real_params = []
        self._sort_request(args)
        self._create_signature()
        if "sessionkey" not in args:
            self._build_post_request_with_signature()
        self._build_post_request()

    def serialize_request(self, key, value):
        if isinstance(value, str):
            return (key + '=' + urllib.quote_plus(value).replace('+', '%20'),
                    urllib.quote_plus(key) + '=' + urllib.quote_plus(value))
        elif isinstance(value, bool):
            return (key + '=' + urllib.quote_plus(str(value).lower()).replace('+', '%20'),
                    urllib.quote_plus(key) + '=' + urllib.quote_plus(str(value).lower()))
        elif isinstance(value, dict):
            sub_params = tuple(self.serialize_request(key + '.' + k, value[k]) for k in sorted(value.keys()))
            return ('&'.join(t[0] for t in sub_params), '&'.join(t[1] for t in sub_params))
        elif hasattr(value, '__iter__'):
            v = tuple(value)
            # Need to be sorted
            indices = dict(('[{0}]'.format(i), i) for i in range(0, len(v)))
            indices_keys = sorted(indices.keys())
            sub_params = tuple(self.serialize_request(key + k, v[indices[k]]) for k in indices_keys)
            return ('&'.join(t[0] for t in sub_params), '&'.join(t[1] for t in sub_params))
        else:
            try:
                return (key + '=' + urllib.quote_plus(str(value)).replace('+', '%20'),
                        urllib.quote_plus(key) + '=' + urllib.quote_plus(str(value)))
            except:
                return (key + '=' + urllib.quote_plus(value.encode('utf-8')).replace('+', '%20'),
                        urllib.quote_plus(key) + '=' + urllib.quote_plus(value.encode('utf-8')))

    def _sort_request(self, args):
        keys = sorted(args.keys())

        for key in keys:
            sub_params = self.serialize_request(key, args[key])
            self.params.append(sub_params[0])
            self.real_params.append(sub_params[1])

    def _create_signature(self):
        self.query = '&'.join(self.params)
        digest = hmac.new(
            self.secret,
            msg=self.query.lower(),
            digestmod=hashlib.sha1).digest()

        self.signature = base64.b64encode(digest)
        # Use well-formed query string instead of the mistaken one
        self.query = '&'.join(self.real_params)

    def _build_post_request(self):
        self.value = self.api_url + '?' + self.query

    def _build_post_request_with_signature(self):
        self.query += '&signature=' + urllib.quote_plus(self.signature)
        self.value = self.api_url + '?' + self.query


class CloudStack(SignedAPICall):
    # api_coverage = {}
    api_time = {}
    jobtrace = {}

    def __init__(self, api_url, apiKey="", secret="", username=None, password=None, keyword='', domainid="",
                 os_id="", need_page_response=False):
        LOG.info("*************os_id:{}".format(os_id))
        super(CloudStack, self).__init__(api_url, apiKey, secret)
        self.username = username
        self.password = password
        self.keyword = keyword
        self.domainid = domainid
        self.login_session = None
        self.need_page_response = need_page_response
        self.apiKey = apiKey
        self.secret = secret
        self.os_id = os_id

    def __getattr__(self, name):
        frame_l = 1
        f_back_func_list = []
        f_back_script_list = []
        block = True
        LOG.info("frame_func_list:{}".format(f_back_func_list))
        LOG.info("frame_file_list:{}".format(f_back_script_list))
        for fn_ in FUNC_WHITE_LIST:
            if any([(fn_ in i) for i in f_back_func_list]):
                block = False
                break
        for sc_ in SCRIPT_WHITE_LIST:
            if any([(sc_ in i) for i in f_back_script_list]):
                block = False
                break
        if not block:
            LOG.info("unblock frame, {}, {}".format(f_back_func_list, f_back_script_list))

        if name.lower() in FOREVER_BLOCK_LIST or (name.lower() in COMMAND_BLACK_LIST) and block:
            e_msg = "blocked command:{}".format(name.lower())
            LOG.info(e_msg)
            raise Exception(e_msg)

        def handlerFunction(*args, **kwargs):
            LOG.info('jgycs_base123,args:{},kwargs:{}, name:{}'.format(args, kwargs, name))
            if args:
                self.username = args[0].get("username")
            if name != 'queryAsyncJobResult' and verbose and name != 'command':
                LOG.info('Call API ' + name)
                LOG.info('Parameters:')
            if kwargs or not args:
                if name != 'queryAsyncJobResult' and verbose:
                    LOG.info(kwargs)
                return self._make_request(name, kwargs)
            if name != 'queryAsyncJobResult' and verbose and name != 'command':
                LOG.info(args[0])
            if name == 'command':
                LOG.info('Call API ' + args[1])
                LOG.info('Parameters:')
                LOG.info(args[0])
                # {"username": "admin", "password": "password"}
                self.password = args[0].get("password")
                return self._make_request(args[1], args[0])
            return self._make_request(name, args[0])

        return handlerFunction

    def _get_md5_key(self, str):
        m2 = hashlib.md5()
        m2.update(str)
        key = m2.hexdigest()
        return key

    def login_ct(self, **kwargs):
        username = kwargs.get("username")
        password = kwargs.get("password")
        domain = kwargs.get("domain")
        url = kwargs.get("url")
        session = requests.Session()
        login_args = {"command": "login", "username": username, "password": password, "domain": domain,
                      "response": "json"}
        LOG.info("login args:{}".format(str(login_args)))
        login_ret = session.get(url, params=login_args, verify=False)
        self.login_session = session
        self.username = username
        self.password = password
        LOG.info("login ret:{}".format(login_ret.content))
        ret = json.loads(login_ret.content)['loginresponse']
        return ret

    def _http_get(self, url):
        if url.find("command=login") != -1:
            resp = None
            session = None
            for i in range(5):
                LOG.info("get sk {}".format(i))
                resp, session, sessionkey = self.login(self.api_url, self.username, self.password)
                if sessionkey:
                    break
                time.sleep(0.5)
            if not session:
                raise Exception("get login session timeout")
            self.login_session = session
            LOG.info("====login session " + str(id(self.session)))
            return resp.text
        elif url.find("apiKey") != -1:
            # response = urllib.urlopen(url)
            # return response.read() # 因没有超时时间,有时会延时15分钟,放弃使用urllib
            try:
                response = requests.get(url, timeout=60)
            except Exception as e:
                LOG.exception("requests get failed, retry! this error: {}".format(e))
                response = requests.get(url, timeout=60)
            return response.content
        else:
            session = self.login_session
            if session:
                resp = session.get(url)
                LOG.info("====get session %s, url is[%s]" % (str(id(session)), url))
                result = resp.text
                if url.find("command=logout") != -1:
                    self.session = None
                    self.sessionkey = None
                return result
            else:
                LOG.info("session invalid")
                return '{"msg": "session invalid"}'

    def _make_request(self, command, args):
        LOG.info(" CloudStack base command is %s args is %s" % (command, args))
        if not CloudStack.api_time.has_key(command):
            CloudStack.api_time[command] = []
        args['response'] = 'json'
        args['command'] = command
        # keyword用于模糊查询
        if self.keyword:
            args['keyword'] = self.keyword

        LOG.info("domain {},username {},and command {}".format(self.domainid, self.username, command))
        # todo 新加接口需显示全部就跳过
        # if "offering" not in command.lower() and "zone" not in command.lower() and "configurations" not in command.lower() and "storagepools" not in command.lower():
        if not any([i in command.lower() for i in ["offering", "zone", "configurations", "storagepools"]]):
            if self.domainid and self.username:
                if "list" in command.lower():
                    args["domainid"] = self.domainid
                    args["account"] = self.username
                if command.lower() in ["deployvirtualmachine", "createvolume", "createvpc", "createnetworkacl",
                                       "createnetworkacllist", "createnetwork", "createvmsnapshot",
                                       "associateipaddress",
                                       "createloadbalancerrule"]:
                    args["domainid"] = self.domainid
                    args["account"] = self.username

        if self.apiKey and self.secret:
            if 'sessionkey' in args:
                args.pop('sessionkey')

        self.request(args)
        try:
            data = self._http_get(self.value)
        except Exception as e:
            LOG.exception("get http response failed, raise, error is %s" % e)
            # key = command.lower() + "response"
            # # try:
            # #     error = "{\"uuidList\":[],\"errorcode\":401,\"errortext\": \"unable to verify user credentials and/or request signature\",error:%s}" % e
            # # except:
            # #     error = "{\"uuidList\":[],\"errorcode\":401,\"errortext\": \"unable to verify user credentials and/or request signature\"}"
            # # data = "{\"%s\" : %s}" % (key, error)
            # try:
            #     error = {"uuidList":[],"errorcode":401,"errortext": "unable to verify user credentials and/or request signature","error":str(e)}
            # except:
            #     error = {"uuidList":[],"errorcode":401,"errortext": "unable to verify user credentials and/or request signature"}
            raise RuntimeError(str(self.value) + "error: {}".format(e))

        try:
            LOG.info(u"command is %s,response is [%s]" % (command, unicode(data)))
        except:
            try:
                LOG.info("command is %s,response is [%s]" % (command, str(data)))
            except:
                pass
        try:
            decoded = json.loads(data)
        except:
            print data
            raise RuntimeError(data)

        LOG.info("need page response:{}".format(self.need_page_response))
        if self.need_page_response:
            LOG.info("need page response 2")
            return decoded

        propertyResponse = command.lower() + 'response'
        response = None
        # 对不规则返回筛选,使正常返回
        if decoded.keys()[0] in RIGHT_RESP:
            value = decoded[decoded.keys()[0]]
            if "errortext" in value:
                raise RuntimeError("ERROR: " + value["errortext"])
            else:
                response = value
        elif propertyResponse not in decoded:  # 错误信息提取
            if 'errorresponse' in decoded:
                raise RuntimeError("ERROR: " + decoded['errorresponse']['errortext'])
            else:
                raise RuntimeError("ERROR: Unable to parse the response, response is [%s]" % decoded)
        response = decoded[propertyResponse] if not response else response
        if "errortext" in response:
            raise RuntimeError("ERROR: " + response["errortext"])
        result = re.compile(r"^list(\w+)s").match(command.lower())

        if result is not None:
            result_type = result.group(1)

            if result_type in response:
                return response[result_type]
            else:
                # sometimes, the 's' is kept, as in :
                # { "listasyncjobsresponse" : { "asyncjobs" : [ ... ] } }
                result_type += 's'
                if type in response:
                    return response[result_type]

        return response

    def login(self, url, username, password, domain="/", verifysslcert=False):
        """
        Login and obtain a session to be used for subsequent API calls
        Wrong username/password leads to HTTP error code 531
        """
        args = {}

        args["command"] = 'login'
        args["username"] = username
        args["password"] = password
        args["domain"] = domain
        args["response"] = "json"

        sessionkey = ''
        session = requests.Session()

        try:
            resp = session.get(url, params=args, verify=verifysslcert, timeout=60)
        except requests.exceptions.ConnectionError, e:
            LOG.exception("Connection refused by server: %s" % e)
            return None, None, None

        if resp.status_code == 200:
            sessionkey = resp.json()['loginresponse']['sessionkey']
        elif resp.status_code == 405:
            LOG.exception("Method not allowed, unauthorized access on URL: %s" % url)
            session = None
            sessionkey = None
        elif resp.status_code == 531:
            LOG.exception("Error authenticating at %s using username: %s"
                          ", password: %s, domain: %s" % (url, username, password, domain))
            session = None
            sessionkey = None
        else:
            resp.raise_for_status()

        return resp, session, sessionkey

    def writeError(self, msg):
        sys.stderr.write(msg)
        sys.stderr.write("\n")
        sys.stderr.flush()


def create_key_by_rc(rc):
    """
    :param rc: username password url
    :return:
    """
    api_url = rc["auth_url"]
    username = rc["username"]
    password = rc["password"]
    apiKey = rc.get('apiKey')
    secret = rc.get('secret')
    domainid = rc.get('domainid')
    os_id = rc.get("os_id")
    if not apiKey or not secret:
        conn = CloudStack(api_url, os_id=os_id)
        domain = "/"
        try:
            resp = conn.login_ct( **{"url": api_url, "command": "login", "username": username, "password": password, "domain": domain})
        except KeyError:
            raise Exception("login failed and retry failed")
        LOG.info("vvvvv resp:{}".format(resp))
        sessionkey = resp["sessionkey"]
        userid = resp["userid"]
        LOG.info("login success,user name is [%s],user id is [%s]" % (username, userid))
        return conn, sessionkey, userid
    else:
        if domainid:
            conn = CloudStack(api_url, apiKey=apiKey, secret=secret, domainid=domainid, username=username, os_id=os_id)
        else:
            conn = CloudStack(api_url, apiKey=apiKey, secret=secret, os_id=os_id)
        return conn, None, None


def queryAsyncJobResult(jobid, universal_obj):
    '''
     universal_obj can be CSimage CloudStackNetwork
     conn=universal_obj.conn
     sessionkey=universal_obj.sessionkey
     response = queryAsyncJobResult(jobid, sessionkey, conn)
    :return: jobid is required
    '''
    LOG.info("Begin to queryAsyncJobResult")
    try:
        if hasattr(universal_obj, "conn"):
            conn = universal_obj.conn
        else:
            conn = universal_obj.handler
        if not conn:
            conn = universal_obj.admin_conn
        sessionkey = universal_obj.sessionkey
        if sessionkey:
            attrs = {"sessionkey": sessionkey, "jobid": jobid}
        else:
            attrs = {"jobid": jobid}
        resp = conn.queryAsyncJobResult(**attrs)
        LOG.info(u"queryAsyncJobResult successfully response is [%s]" % unicode(resp)[:8000])
        return resp
    except Exception as ex:
        LOG.exception(u"queryAsyncJobResult failed , excetption [%s]" % ex)
        raise RuntimeError(ex)


if __name__ == '__main__':
    api_url = "http://192.168.5.200:8080/client/api"
    # username = "admin"
    # password = "password"
    # handler = CloudStack(api_url)
    # resp = handler.command({"username": username, "password": password}, "login")
    # print resp
    #
    # sessionkey = resp.get("sessionkey")
    # handler2 = CloudStack(api_url)
    # response = handler.listUsers(sessionkey=sessionkey)
    # print response
    api_key = "vk………………eSA"
    secret = "2Q………………ojY-A"
    admin_conn = CloudStack(api_url, apiKey=api_key, secret=secret)
    list_regions = admin_conn.listRegions()
    print list_regions



执行结果里包含 api和返回结果。

cloudstack的重新封装--api调用_cloud stack的调用


标签:username,LOG,api,--,self,args,url,cloudstack,response
From: https://blog.51cto.com/chier11/6494032

相关文章

  • 使用 Spring 2.5 注释驱动的 IoC 功能
    http://www.ibm.com/developerworks/cn/java/j-lo-spring25-ioc/概述注释配置相对于XML配置具有很多的优势:它可以充分利用Java的反射机制获取类结构信息,这些信息可以有效减少配置的工作。如使用JPA注释配置ORM映射时,我们就不需要指定PO的属性名、类型等信息,如果关系表字......
  • 免杀工具 -- SysWhispers3WinHttp
    0x01工具介绍SysWhispers3WinHttp基于SysWhispers3增添WinHttp分离加载,可免杀360核晶与Defender等杀软。0x02安装与使用1.使用msfvenom生成Shellcode(或使用CobaltStrike生成Stageless之Shellcode)msfvenom-pwindows/x64/meterpreter_reverse_tcplhost=192.168.1.110lport=44......
  • Microsoft 365 :小议Private Channels 和Shared Channels
    Blog链接:https://blog.51cto.com/13969817大概5,6年前,Microsoft在发布Teams的时候,仅有StandardChannel和PrivateChannel,其中StandardChannel面向所有用户开放的标准频道;而PrivateChannel是一个私有频道,只适用于Teams中特定的用户。现在Microsoft发布的SharedChannels是Teams中......
  • 学习Spring必学的Java基础知识----反射
    引述要学习Spring框架的技术内幕,必须事先掌握一些基本的Java知识,正所谓“登高必自卑,涉远必自迩”。以下几项Java知识和Spring框架息息相关,不可不学(我将通过一个系列分别介绍这些Java基础知识,希望对大家有所帮助。):[1]Java反射知识-->SpringIoC[2]属性编辑器,即PropertyEditor......
  • 小鹿线Web前端怎么样?
    现在web前端开发开发技术在不断地迭代更新,有很多从事前端开发的程序员在技术上会遇到瓶颈,这个时候小伙伴就应该通过不断的学习开发技术知识,来提升自身的开发技术水平,那小伙伴应该怎么来学习呢?1.梳理清楚知识体系框架学习前端开发技术,不管是入门还是进阶,一定都要有知识体系建设......
  • Linux 下分卷压缩,windows 解压
    1.Linux分卷压缩tarczf-/mnt/import2023/|split-b3G-/mnt/hq2023.tar.gz压缩后形成的文件: 2.将这些文件复制到windows,然后运行命令copy/bhq2023.tar.*hq2023.tar.gz 3.完成后,用winrar打开hq2023.tar.gz即可看到完成文件夹目录 ......
  • Dos命令
    打开CMD的方式开始->系统->命令提示符Win键+R输入cmd打开控制台(推荐使用)在任意文件夹下面,按住shift+鼠标右键,在此处打开命令行窗口资源管理器的地址栏前面加上cmd路径管理员方式运行:选择以管理员方式运行常用的Dos命令#盘符切换D:#查看当前目录下的所有文件dir#切......
  • SpringBoot整合Redis
    第一步:导入坐标 第二步:在application.yml中进行相关配置第三步:使用对应的API对操作接口进行操作 操作 key-value格式的 操作 hash格式的  ......
  • idea 配置 javaweb项目
    对于老的web项目,不是springboot,需要的配置一、git clone 项目 二、配置 projectstructure 这几个都要配置,当然有的项目点开默认有的话修改一下就行,没有就 的手动添加 project 这几个都填好填对 没有classes 就创建一个 modules libbraries 把WEB-INF......
  • .Net6基础配置
    NET6App介绍.NET6的CoreApp框架,用来学习.NET6的一些变动和新特性,使用EFCore,等一系列组件的运用.。软件架构分为模型层,服务层,接口层来做测试使用0.如何使用IConfiguration、Environment直接在builder后的主机中使用。builder.Configuration;builder.Environment1.如何使......