使用python将cloudstack的多个功能进行重新封装形成api调用。
# coding=utf-8
# !/usr/bin/env python
import base64
import hmac
import json
import requests
import sys
import time
import urllib
import re
import hashlib
import logging
LOG = logging.getLogger(__name__)
verbose = 1 # CLOUDSTACK_API_VERBOSE
FOREVER_BLOCK_LIST = ["deleteuser", "deleteproject"]
COMMAND_BLACK_LIST = ["destroyvirtualmachine", "deletevolume", "disassociateipaddress", "disassociateipv6address"]
FUNC_WHITE_LIST = ["rollback_resource", "roll_back_wo", "clean_resource_task"]
SCRIPT_WHITE_LIST = ["cleanres"]
CLOUDSTACK_API_DEBUGGING = True
RIGHT_RESP = ["restorevmresponse", "addiptovmnicresponse", "restorevmresponse", "cloudhighwayresponse",
"listloadbalancerssresponse", "listpublicipaddressesresponse", 'clouddirectconnectresponse',
'CloudDirectConnectResponse', "queryDisklimitresponse", "confineiprateresponse"]
class SignedAPICall(object):
def __init__(self, api_url, apiKey="", secret=""):
self.api_url = api_url
self.apiKey = apiKey
self.secret = secret
def request(self, args):
if "sessionkey" not in args:
args['apiKey'] = self.apiKey
self.params = []
self.real_params = []
self._sort_request(args)
self._create_signature()
if "sessionkey" not in args:
self._build_post_request_with_signature()
self._build_post_request()
def serialize_request(self, key, value):
if isinstance(value, str):
return (key + '=' + urllib.quote_plus(value).replace('+', '%20'),
urllib.quote_plus(key) + '=' + urllib.quote_plus(value))
elif isinstance(value, bool):
return (key + '=' + urllib.quote_plus(str(value).lower()).replace('+', '%20'),
urllib.quote_plus(key) + '=' + urllib.quote_plus(str(value).lower()))
elif isinstance(value, dict):
sub_params = tuple(self.serialize_request(key + '.' + k, value[k]) for k in sorted(value.keys()))
return ('&'.join(t[0] for t in sub_params), '&'.join(t[1] for t in sub_params))
elif hasattr(value, '__iter__'):
v = tuple(value)
# Need to be sorted
indices = dict(('[{0}]'.format(i), i) for i in range(0, len(v)))
indices_keys = sorted(indices.keys())
sub_params = tuple(self.serialize_request(key + k, v[indices[k]]) for k in indices_keys)
return ('&'.join(t[0] for t in sub_params), '&'.join(t[1] for t in sub_params))
else:
try:
return (key + '=' + urllib.quote_plus(str(value)).replace('+', '%20'),
urllib.quote_plus(key) + '=' + urllib.quote_plus(str(value)))
except:
return (key + '=' + urllib.quote_plus(value.encode('utf-8')).replace('+', '%20'),
urllib.quote_plus(key) + '=' + urllib.quote_plus(value.encode('utf-8')))
def _sort_request(self, args):
keys = sorted(args.keys())
for key in keys:
sub_params = self.serialize_request(key, args[key])
self.params.append(sub_params[0])
self.real_params.append(sub_params[1])
def _create_signature(self):
self.query = '&'.join(self.params)
digest = hmac.new(
self.secret,
msg=self.query.lower(),
digestmod=hashlib.sha1).digest()
self.signature = base64.b64encode(digest)
# Use well-formed query string instead of the mistaken one
self.query = '&'.join(self.real_params)
def _build_post_request(self):
self.value = self.api_url + '?' + self.query
def _build_post_request_with_signature(self):
self.query += '&signature=' + urllib.quote_plus(self.signature)
self.value = self.api_url + '?' + self.query
class CloudStack(SignedAPICall):
# api_coverage = {}
api_time = {}
jobtrace = {}
def __init__(self, api_url, apiKey="", secret="", username=None, password=None, keyword='', domainid="",
os_id="", need_page_response=False):
LOG.info("*************os_id:{}".format(os_id))
super(CloudStack, self).__init__(api_url, apiKey, secret)
self.username = username
self.password = password
self.keyword = keyword
self.domainid = domainid
self.login_session = None
self.need_page_response = need_page_response
self.apiKey = apiKey
self.secret = secret
self.os_id = os_id
def __getattr__(self, name):
frame_l = 1
f_back_func_list = []
f_back_script_list = []
block = True
LOG.info("frame_func_list:{}".format(f_back_func_list))
LOG.info("frame_file_list:{}".format(f_back_script_list))
for fn_ in FUNC_WHITE_LIST:
if any([(fn_ in i) for i in f_back_func_list]):
block = False
break
for sc_ in SCRIPT_WHITE_LIST:
if any([(sc_ in i) for i in f_back_script_list]):
block = False
break
if not block:
LOG.info("unblock frame, {}, {}".format(f_back_func_list, f_back_script_list))
if name.lower() in FOREVER_BLOCK_LIST or (name.lower() in COMMAND_BLACK_LIST) and block:
e_msg = "blocked command:{}".format(name.lower())
LOG.info(e_msg)
raise Exception(e_msg)
def handlerFunction(*args, **kwargs):
LOG.info('jgycs_base123,args:{},kwargs:{}, name:{}'.format(args, kwargs, name))
if args:
self.username = args[0].get("username")
if name != 'queryAsyncJobResult' and verbose and name != 'command':
LOG.info('Call API ' + name)
LOG.info('Parameters:')
if kwargs or not args:
if name != 'queryAsyncJobResult' and verbose:
LOG.info(kwargs)
return self._make_request(name, kwargs)
if name != 'queryAsyncJobResult' and verbose and name != 'command':
LOG.info(args[0])
if name == 'command':
LOG.info('Call API ' + args[1])
LOG.info('Parameters:')
LOG.info(args[0])
# {"username": "admin", "password": "password"}
self.password = args[0].get("password")
return self._make_request(args[1], args[0])
return self._make_request(name, args[0])
return handlerFunction
def _get_md5_key(self, str):
m2 = hashlib.md5()
m2.update(str)
key = m2.hexdigest()
return key
def login_ct(self, **kwargs):
username = kwargs.get("username")
password = kwargs.get("password")
domain = kwargs.get("domain")
url = kwargs.get("url")
session = requests.Session()
login_args = {"command": "login", "username": username, "password": password, "domain": domain,
"response": "json"}
LOG.info("login args:{}".format(str(login_args)))
login_ret = session.get(url, params=login_args, verify=False)
self.login_session = session
self.username = username
self.password = password
LOG.info("login ret:{}".format(login_ret.content))
ret = json.loads(login_ret.content)['loginresponse']
return ret
def _http_get(self, url):
if url.find("command=login") != -1:
resp = None
session = None
for i in range(5):
LOG.info("get sk {}".format(i))
resp, session, sessionkey = self.login(self.api_url, self.username, self.password)
if sessionkey:
break
time.sleep(0.5)
if not session:
raise Exception("get login session timeout")
self.login_session = session
LOG.info("====login session " + str(id(self.session)))
return resp.text
elif url.find("apiKey") != -1:
# response = urllib.urlopen(url)
# return response.read() # 因没有超时时间,有时会延时15分钟,放弃使用urllib
try:
response = requests.get(url, timeout=60)
except Exception as e:
LOG.exception("requests get failed, retry! this error: {}".format(e))
response = requests.get(url, timeout=60)
return response.content
else:
session = self.login_session
if session:
resp = session.get(url)
LOG.info("====get session %s, url is[%s]" % (str(id(session)), url))
result = resp.text
if url.find("command=logout") != -1:
self.session = None
self.sessionkey = None
return result
else:
LOG.info("session invalid")
return '{"msg": "session invalid"}'
def _make_request(self, command, args):
LOG.info(" CloudStack base command is %s args is %s" % (command, args))
if not CloudStack.api_time.has_key(command):
CloudStack.api_time[command] = []
args['response'] = 'json'
args['command'] = command
# keyword用于模糊查询
if self.keyword:
args['keyword'] = self.keyword
LOG.info("domain {},username {},and command {}".format(self.domainid, self.username, command))
# todo 新加接口需显示全部就跳过
# if "offering" not in command.lower() and "zone" not in command.lower() and "configurations" not in command.lower() and "storagepools" not in command.lower():
if not any([i in command.lower() for i in ["offering", "zone", "configurations", "storagepools"]]):
if self.domainid and self.username:
if "list" in command.lower():
args["domainid"] = self.domainid
args["account"] = self.username
if command.lower() in ["deployvirtualmachine", "createvolume", "createvpc", "createnetworkacl",
"createnetworkacllist", "createnetwork", "createvmsnapshot",
"associateipaddress",
"createloadbalancerrule"]:
args["domainid"] = self.domainid
args["account"] = self.username
if self.apiKey and self.secret:
if 'sessionkey' in args:
args.pop('sessionkey')
self.request(args)
try:
data = self._http_get(self.value)
except Exception as e:
LOG.exception("get http response failed, raise, error is %s" % e)
# key = command.lower() + "response"
# # try:
# # error = "{\"uuidList\":[],\"errorcode\":401,\"errortext\": \"unable to verify user credentials and/or request signature\",error:%s}" % e
# # except:
# # error = "{\"uuidList\":[],\"errorcode\":401,\"errortext\": \"unable to verify user credentials and/or request signature\"}"
# # data = "{\"%s\" : %s}" % (key, error)
# try:
# error = {"uuidList":[],"errorcode":401,"errortext": "unable to verify user credentials and/or request signature","error":str(e)}
# except:
# error = {"uuidList":[],"errorcode":401,"errortext": "unable to verify user credentials and/or request signature"}
raise RuntimeError(str(self.value) + "error: {}".format(e))
try:
LOG.info(u"command is %s,response is [%s]" % (command, unicode(data)))
except:
try:
LOG.info("command is %s,response is [%s]" % (command, str(data)))
except:
pass
try:
decoded = json.loads(data)
except:
print data
raise RuntimeError(data)
LOG.info("need page response:{}".format(self.need_page_response))
if self.need_page_response:
LOG.info("need page response 2")
return decoded
propertyResponse = command.lower() + 'response'
response = None
# 对不规则返回筛选,使正常返回
if decoded.keys()[0] in RIGHT_RESP:
value = decoded[decoded.keys()[0]]
if "errortext" in value:
raise RuntimeError("ERROR: " + value["errortext"])
else:
response = value
elif propertyResponse not in decoded: # 错误信息提取
if 'errorresponse' in decoded:
raise RuntimeError("ERROR: " + decoded['errorresponse']['errortext'])
else:
raise RuntimeError("ERROR: Unable to parse the response, response is [%s]" % decoded)
response = decoded[propertyResponse] if not response else response
if "errortext" in response:
raise RuntimeError("ERROR: " + response["errortext"])
result = re.compile(r"^list(\w+)s").match(command.lower())
if result is not None:
result_type = result.group(1)
if result_type in response:
return response[result_type]
else:
# sometimes, the 's' is kept, as in :
# { "listasyncjobsresponse" : { "asyncjobs" : [ ... ] } }
result_type += 's'
if type in response:
return response[result_type]
return response
def login(self, url, username, password, domain="/", verifysslcert=False):
"""
Login and obtain a session to be used for subsequent API calls
Wrong username/password leads to HTTP error code 531
"""
args = {}
args["command"] = 'login'
args["username"] = username
args["password"] = password
args["domain"] = domain
args["response"] = "json"
sessionkey = ''
session = requests.Session()
try:
resp = session.get(url, params=args, verify=verifysslcert, timeout=60)
except requests.exceptions.ConnectionError, e:
LOG.exception("Connection refused by server: %s" % e)
return None, None, None
if resp.status_code == 200:
sessionkey = resp.json()['loginresponse']['sessionkey']
elif resp.status_code == 405:
LOG.exception("Method not allowed, unauthorized access on URL: %s" % url)
session = None
sessionkey = None
elif resp.status_code == 531:
LOG.exception("Error authenticating at %s using username: %s"
", password: %s, domain: %s" % (url, username, password, domain))
session = None
sessionkey = None
else:
resp.raise_for_status()
return resp, session, sessionkey
def writeError(self, msg):
sys.stderr.write(msg)
sys.stderr.write("\n")
sys.stderr.flush()
def create_key_by_rc(rc):
"""
:param rc: username password url
:return:
"""
api_url = rc["auth_url"]
username = rc["username"]
password = rc["password"]
apiKey = rc.get('apiKey')
secret = rc.get('secret')
domainid = rc.get('domainid')
os_id = rc.get("os_id")
if not apiKey or not secret:
conn = CloudStack(api_url, os_id=os_id)
domain = "/"
try:
resp = conn.login_ct( **{"url": api_url, "command": "login", "username": username, "password": password, "domain": domain})
except KeyError:
raise Exception("login failed and retry failed")
LOG.info("vvvvv resp:{}".format(resp))
sessionkey = resp["sessionkey"]
userid = resp["userid"]
LOG.info("login success,user name is [%s],user id is [%s]" % (username, userid))
return conn, sessionkey, userid
else:
if domainid:
conn = CloudStack(api_url, apiKey=apiKey, secret=secret, domainid=domainid, username=username, os_id=os_id)
else:
conn = CloudStack(api_url, apiKey=apiKey, secret=secret, os_id=os_id)
return conn, None, None
def queryAsyncJobResult(jobid, universal_obj):
'''
universal_obj can be CSimage CloudStackNetwork
conn=universal_obj.conn
sessionkey=universal_obj.sessionkey
response = queryAsyncJobResult(jobid, sessionkey, conn)
:return: jobid is required
'''
LOG.info("Begin to queryAsyncJobResult")
try:
if hasattr(universal_obj, "conn"):
conn = universal_obj.conn
else:
conn = universal_obj.handler
if not conn:
conn = universal_obj.admin_conn
sessionkey = universal_obj.sessionkey
if sessionkey:
attrs = {"sessionkey": sessionkey, "jobid": jobid}
else:
attrs = {"jobid": jobid}
resp = conn.queryAsyncJobResult(**attrs)
LOG.info(u"queryAsyncJobResult successfully response is [%s]" % unicode(resp)[:8000])
return resp
except Exception as ex:
LOG.exception(u"queryAsyncJobResult failed , excetption [%s]" % ex)
raise RuntimeError(ex)
if __name__ == '__main__':
api_url = "http://192.168.5.200:8080/client/api"
# username = "admin"
# password = "password"
# handler = CloudStack(api_url)
# resp = handler.command({"username": username, "password": password}, "login")
# print resp
#
# sessionkey = resp.get("sessionkey")
# handler2 = CloudStack(api_url)
# response = handler.listUsers(sessionkey=sessionkey)
# print response
api_key = "vk………………eSA"
secret = "2Q………………ojY-A"
admin_conn = CloudStack(api_url, apiKey=api_key, secret=secret)
list_regions = admin_conn.listRegions()
print list_regions
执行结果里包含 api和返回结果。