前文使用flask-login实现了用户登入登出,在实际使用中过于简单,这里引入统一登录。关于统一登录可以看一下早前的这篇文档:一个简单的SSO统一登录设计
对于接入统一登录可以参考一下的时序图:
根据这个时序图,要接入已有的SSO,需要对现有系统进行一些修改:
- 未登录情况下,在login视图中添加从cookie或header中检测Authorization。并根据Authorization是否存在进行判断。
- 如果无Authorization,跳转至SSO进行登录
- 有Authorization,发送给SSO进行验证
- 已登录状态下,验证Authorization是否合法,如果不合法重新登陆。
相关代码如下
在本实例中
SSO的域名为sso.xxx.com,自身应用域名为t.xxx.com,因为在SSO中设定了cookie作用域为xxx.com。
其他代码逻辑,例如访问SSO的/attach/valid验证token,处理flask_login的next参数等均在时序图中有所描述。
使用到了redis存储flask-login中所使用的User对象,并在logout时从redis中移除该条数据,用以实现load_user返回None。
from flask import Flask, redirect, render_template, request, session, url_for
from flask_login import LoginManager, login_user, UserMixin, current_user, login_required, logout_user
import pickle
import requests
from redis import Redis
app = Flask(__name__, template_folder="templates")
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db' # 使用SQLite数据库,可以根据需要更改
db.init_app(app) # 初始化绑定DB 与 Flask
login_manager = LoginManager(app) # Setup a Flask-Login Manager
login_manager.login_view = "login" # required_login 校验不通过,默认跳转
# 用戶登錄Redis配置
user_redis_con = Redis(
host='127.0.0.1', # 服务器地址
port=6379, # 服务器端口
db=10
)
class User(UserMixin):
"""
flask-login 必要 User类
"""
def __init__(self, username):
self.username = username
def __repr__(self):
return self.username
def get_id(self):
return self.username
# 用户加载函数
# 使用redis存储User对象,如果Redis中没有,则认为未登录,返回None
@login_manager.user_loader
def load_user(id):
"""
flask-login必要,重要的回调函数
从redis中返回User类对象,或者None
:param id:
:return:
"""
redis_id = user_redis_con.get(id)
if redis_id is None:
return None
else:
try:
user = pickle.loads(redis_id) # 使用 pickle.loads 方法 将 存储在redis中的序列化对象读出来,还原成User对象
return user
except Exception as E:
return None
# 定义错误处理函数,捕获404错误
@app.errorhandler(404)
def page_not_found(error):
# 重定向到根路由
return redirect(url_for('index'))
# 定义根路由
@app.route('/')
@login_required
def index():
user = current_user
return '欢迎你, ' + user.username
@app.route('/login', methods=["GET", "POST"])
def login():
"""
接入统一登录
未登录状态的所有请求都会被重定向到此URL
如果请求参数中没有token,则会重定向到sso进行登录操作。
通过@login_required 撞过来的请求,会带着next参数,表明来源url。
如果有next参数则将来源URL作为n参数挂在redirectUrl中,用以回跳到登录前页面。(中间跳到了SSO,所以这段有点复杂)
如果请求参数中有token,则将token放入header Authorization中发送给sso进行验证会验证。
验证token通过后,直接登录。否则返回提示。
:return:
"""
token = request.cookies.get('Authorization', None)\
if request.cookies.get("Authorization", None) \
else request.headers.get("Authorization", None)
if token:
print("token " + token)
# token 参数存在,向SSO请求验证。
headers = {'Authorization': token}
attach_valid_url = "http://sso.xxx.com/attach_valid"
r = requests.get(attach_valid_url, headers=headers) # 将token放入header Authorization中发送给sso进行验证。
data = r.json()
if data["code"] == 1001:
username = data["result"]["username"]
user = User(username)
# 保存用户信息
user_dump = pickle.dumps(user) # 使用 pickle.dumps 方法 将 User 对象序列化存储在redis中
user_redis_con.set(username, user_dump)
# 登录操作
login_user(user)
# 处理登录路由
n = request.args.get('next')
print(n)
if n:
return redirect(n)
else:
return redirect(url_for('index'))
else:
n = request.args.get('next')
# @login_required 会自动添加一个next parma。如果有这个param 则给redirectUrl加上一个n参数,然后把n挂在上面。
if n:
return redirect("%s/login?redirectUrl=%s/login?next=%s" % (
'http://sso.xxx.com', 'http://t.xxx.com', n))
else:
return redirect("%s/login?redirectUrl=%s/login" % ('htto://sso.xxx.com', 'http://t.xxx.com'))
else:
print("无token")
n = request.args.get('next')
# @login_required 会自动添加一个next parma。如果有这个param 则给redirectUrl加上一个n参数,然后把n挂在上面。
if n:
return redirect("%s/login?redirectUrl=%s/login?next=%s" % (
'http://sso.xxx.com', 'http://t.xxx.com', n))
else:
print("无next")
return redirect("%s/login?redirectUrl=%s/login" % ('http://sso.xxx.com', 'http://t.xxx.com'))
@app.route('/logout')
@login_required
def logout():
"""
退出登录
执行logout_user()进行登出操作(flask-login)
并删除redis缓存中的User类对象信息
:return:
"""
username = current_user.username
# --start-- 本地系统logout
logout_user()
user_redis_con.delete(username)
# --end--
# # 重定向SSO进行logout操作
return redirect("%s/logout" % 'http://sso.xxx.com')
if __name__ == '__main__':
app.run(debug=True, port=80)