一、python jenkins库 api简介
1.1、连接Jenkins服务器
server = jenkins.Jenkins(url, username=None, password=None, timeout=30)
- url : 指的是所连接的Jenkins的网络地
- username/password : username/password 默认值是None, 在内网中不传参数也是可以连接成功的,但应该只有读的权限,没有操作权限。所以一般需要用户名和密码。
1.3、账户信息
- 获取连接账户的相关信息, 有关私密信息就不打印出来了
server.get_whoami()
- 获取连接Jenkins的版本信息
server.get_version()
'2.176.3'
1.4、获取job相关信息
- 所谓job的话,指的是在Jenkins里面创建的一系列项目
# 获取所有的job信息 server.get_jobs() # 获取job为name的基本信息 server.get_job_info(name) #获取job名为name的job的最后次构建号, 不会计算排队中的构建, #例如正在构建的number是3, 在排队的有4和5, 那么最后次构建号是3 server.get_job_info(name)['lastBuild']['number'] # 正则匹配到相应job的基本信息 server.get_job_info_regex(pattern) #判断job是否存在 server.job_exists(name) #获取所有job的数量 server.jobs_count()
1.5、获取build相关信息
- 所谓build的话,指的是在Job里面的一次构建
# 获取某次构建信息,举例178, 如果178不存在或者正在排队中, 则会抛出异常 # JenkinsException: job[name] number[178] does not exist server.get_build_info(name, 178) # result指构建运行的状态, 构建如果是运行完成返回结果FAILURE/ABORTED/SUCCESS/UNSTABLE等 # 构建如果是正在运行返回结果 None server.get_build_info(name, 178)['result']
1.6、操作
-
需要明确该账户在Jenkins中有对应的操作权限,否则在脚本中是无法做相应操作的
-
获取所有排队的build
server.get_queue_info()
获取所有正在运行的build
server.get_running_builds() server.stop_build(name, number) # 停止正在运行的构建
- 停止掉queue排队中的构建, 直接抛出异常NotFoundException: Requested item could not be found
- 停止掉running运行中的构建, 无返回值, 无报错,在Jenkins页面上显示该构建被XXX 账户停掉了
- 停止掉stopped已经停止的构建, 无返回值, 无报错,Jenkins页面无变化
server.cancel_queue(id) # 配合server.get_queue_info()使用,得到排队构建的id, 可以取消该构建
- server.get_queue_info() 获取某个排队构建的id, 如123.
- 如果该构建排队中直接server.cancel_queue(123), 无返回值, 无报错,Jenkins页面上排队的构建直接被撤销,看不出原来有这么一个排队的构建
- 如果等排队的构建到运行状态, 使用server.cancel_queue(123), 无返回值, 无报错, 该构建依旧在运行中
- 如果等排队的构建到完成状态, 使用server.cancel_queue(123), 无返回值, 无报错, 该构建依旧为完成结束状态
二、_get_view_jobs方法
from flask import Flask from flask import request, jsonify, abort import jenkins import json class Jenkins_Api: def __init__(self, username='账号', password='密码'): self._url = 'https://jenkins-rd.Jenkins Daemon.com/' self._username = username self._password = password def get_server(self): server = jenkins.Jenkins(self._url, username=self._username, password=self._password) return server def get_version(self): get_version = self.get_server().get_version() print(get_version) return get_version # .get_version() def get_jobs(self): get_jobs = self.get_server().get_jobs() print(get_jobs) return get_jobs def get_views(self): get_views = self.get_server().get_views() print(get_views) return get_views def get_all_jobs(self): get_all_jobs = self.get_server().get_all_jobs() print(get_all_jobs) return get_all_jobs # _get_view_jobs(name='Prod') def get_views_job(self, name): get_views_job = self.get_server()._get_view_jobs(name=name) # print(get_views_job) job_list = [] for job in get_views_job: jobs = job['name'][:-10] job_list.append({"application":jobs,}) # print(job['name'].strip('Prod-CD-job')) # print(job_list) response = { "message": "", "code": 0, "data": job_list, "result": "true", "request_id": "975184016b50440a8c195040fc1443a5" } print(response) return response app = Flask(__name__) @app.route('/bk_itms/prod/jobs', methods=['GET', 'POST']) def webhook(): if request.method == 'GET': # logging.debug('Get Method not implemented') payload = JK.get_views_job(name="RC") return jsonify(payload), 200 elif request.method == 'POST': return jsonify({'status': 'success'}), 200 else: abort(400) if __name__ == '__main__': JK = Jenkins_Api() app.run(host='0.0.0.0', port='10010') # print(jenkins.__dict__)
向往的地方很远,喜欢的东西很贵,这就是我努力的目标。
参考:https://www.cnblogs.com/dai-zhe/p/14827941.html
标签:jobs,name,get,self,视图,server,job From: https://www.cnblogs.com/pingguomang/p/17597060.html