Flask
1. flask的使用
1.1 安装
pip3 install flask
1.2 使用
- 新建py文件
- 从flask模块导入Flask
- 创建flask实例
- 启动flask(和执行py文件一样,右击run)
from flask import Flask
app = Flask(__name__)
app.run()
- 但是没有路由....
- 接下来就引出我们的基础操作了~~~
2. flask的三件套
- 与django一样,也有三件套
- 不过还有两个小儿子
2.1 HttpResponse
- 此
HttpResponse
与django就不一样了,不需要自己导入,自己返回一个字符串就是HttpResponse
from flask import Flask
app = Flask(__name__)
# 这里的路由就是flask的实例通过装饰器的方式给某个函数加上路由
@app.route('/index')
def index():
# 没错,这就是HttpResponse,返回一个自定义的字符串
return '这是index页面'
app.run()
2.2 render_template
- 与django的一样,也是返回一个页面,也需要自己导入
- 也需要自己创建一个文件夹
templates
from flask import Flask, render_template # 导入模块
app = Flask(__name__)
@app.route('/index')
def index():
return render_template('index.html')
app.run()
2.3 redirect
- 作用与django一致,使用方法也是一致
from flask import Flask, redirect
app = Flask(__name__)
# 对了,路由和函数名字可以不一样,只是为了方便区分
@app.route('/index')
def index():
return redirect('/test')
@app.route('/test')
def test():
return '这是通过redirect跳转过来的'
app.run()
2.4 两个小儿子
2.4.1 send_file()
- 看方法名就知道返回一个文件哈哈哈
from flask import send_file
@app.route('/img')
def img():
return send_file('1.jpg') # 接收该文件的路径
if __name__ == '__main__':
app.run(debug=True)
2.4.2 jsonify()
- 返回json格式的数据
from flask import jsonify
@app.route('/json_data')
def json_data():
return jsonify({'name': 'www', 'age': 20})
if __name__ == '__main__':
app.run(debug=True)
3. request
-
我们了解了三件套之后是不是也对前端传过来的东西如何接收产生疑惑?
-
同样的的,flask也有一个request,我们可以通过request来访问各种属性
-
但是,这个request与django有点不一样,它也是通过导入来使用
-
from flask import request request.method 请求方式 request.form 存储的是所有FormData中的所有数据 request.args 存储的是所有URL中的所有数据 request.json Content-Type: application/json 存放在request.json中 request.data 当Content-Type无法被解析时,存放原始数据 request.url 请求地址 request.path url路由地址 request.host 主机地址 request.host_url 将主机地址转换为httpurl
-
那么,这就说明,request是一个公共变量...那么flask如何区分呢 --> 这个我们到说请求上下文讲
-
先看看request有哪些属性
from flask import Flask, redirect, request, render_template
app = Flask(__name__)
@app.route('/login', methods=('GET', 'POST')) # 指定允许的请求方式,默认为GET
def login():
if request.method == 'GET':
# args接收路由参数 ?id=1&name=www --> {'id': '1', 'name': 'www'}
print('args--->', request.args.to_dict())
return render_template('login.html')
else:
# path接收我们的访问路由 path---> /login
print('path--->', request.path)
# url返回全路径url---> http://127.0.0.1:5000/login?id=1&name=www
print('url--->', request.url)
# values接收我们的路由参数和post提交过来的数据--> {'username': '111', 'pwd': '222', 'id': '1', 'name': 'www'}
print('values--->', request.values.to_dict())
# form存储我们post提交过来的数据-->{'username': '111', 'pwd': '222'}
print('form--->', request.form.to_dict())
# host返回我们的主机和端口 --> 127.0.0.1:5000
print('host--->', request.host)
# host_url--> http://127.0.0.1:5000/
print('host_url--->', request.host_url)
msg = '登录成功'
return render_template('login.html', msg=msg)
if __name__ == '__main__':
app.run(debug=True) # 可以不用重启项目
4.jinja2语法
- 符合python用法!!! 所以可以使用点python的方法
- 也可以看一下django的模板那里,异曲同工之妙
4.1 返回字典
from flask import Flask, render_template
app = Flask(__name__)
@app.route('/detail')
def detail():
STUDENT = {'name': 'Old', 'age': 38, 'gender': '中'}
STUDENT_DICT = {
1: {'name': 'Old', 'age': 38, 'gender': '中'},
2: {'name': 'Boy', 'age': 3, 'gender': '男'},
3: {'name': 'EDU', 'age': 84, 'gender': '女'},
}
return render_template('datali.html',info=STUDENT)
if __name__ == '__main__':
app.run(debug=True)
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<table border="1">
<thead>
<tr>
<th>name</th>
<th>age</th>
<th>gender</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{ info.name }}</td>
<td>{{ info.age }}</td>
<td>{{ info.gender }}</td>
{# {% for key, value in info.items() %}#}
{# <td>{{ value }}</td> #}
{#{% endfor %}#}
</tr>
</tbody>
</table>
</body>
</html>
-
字典在模板也可以使用keys()、values()、items()取值
4.2 列表嵌套字典
......
STUDENT_LIST = [
{'name': 'Old', 'age': 38, 'gender': '中'},
{'name': 'Boy', 'age': 73, 'gender': '男'},
{'name': 'EDU', 'age': 84, 'gender': '女'}
]
......
return render_template('datali.html',info=STUDENT_LIST)
{% for student in info %}
<tr>
<td>{{ student.name }}</td>
<td>{{ student.get('age') }}</td>
<td>{{ student['gender'] }}</td>
</tr>
{% endfor %}
4.3 字典嵌套字典
......
STUDENT_DICT = {
1: {'name': 'Old', 'age': 38, 'gender': '中'},
2: {'name': 'Boy', 'age': 3, 'gender': '男'},
3: {'name': 'EDU', 'age': 84, 'gender': '女'},
}
......
return render_template('datali.html',info=STUDENT_DICT)
{% for student in info %}
<tr>
<td>{{ info[student].get('name') }}</td>
<td>{{ info[student]['age'] }}</td>
<td>{{ info.get(student)['gender'] }}</td>
</tr>
{% endfor %}
4.4 block块
- 与django一样
4.5 include、extend
- 也与django一样
4.6 宏定义
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>Welcome OldboyEDU</h1>
{% macro type_text(name,type) %}
<input type="{{ type }}" name="{{ name }}" value="{{ name }}">
{% endmacro %}
<h2>在下方是使用宏来生成input标签</h2>
{{ type_text("one","text") }}
{{ type_text("two","text") }}
</body>
</html>
- 宏定义一般情况下很少应用到,但是要知道有这么个概念
5.Session
5.1 session的登录
- 先写一个基于session的登录
- session也是通过导入
from flask import Flask, render_template, session, request, redirect
app = Flask(__name__)
@app.route('/login', methods=('GET', 'POST'))
def login():
if request.method == 'POST':
session['user'] = request.form.get('username')
return redirect('/detail')
return render_template('login.html')
@app.route('/detail')
def detail():
return render_template('detail.html')
if __name__ == '__main__':
app.run(debug=True)
-
secret_key是用来加密字符串的
-
当设置secret_key之后可以正常访问
app.secret_key = xxxx
5.2 使用装饰器登录的问题
- 我们知道,一些视图是需要用户登录后才能访问的
- 所以我们以前会使用装饰器装饰需要登录后才能访问的视图函数
from flask import Flask, render_template, session, redirect, request
app = Flask(__name__)
app.secret_key = xxxx
def verify(func):
def inner(*args, **kwargs):
if not session.get('user'):
return redirect('/login')
ret = func(*args, **kwargs)
return ret
return inner
@app.route('/login', methods=('GET', 'POST'))
def login():
if request.method == 'POST':
session['user'] = request.form.get('username')
return redirect('/detail')
return render_template('login.html')
@app.route('/index')
@verify
def index():
return render_template('index.html')
@app.route('/detail')
@verify
def detail():
return render_template('detail.html')
if __name__ == '__main__':
app.run(debug=True)
- 启动项目时出现的问题:
5.3 解决方法
-
指定endpoint
from flask import Flask, render_template, session, redirect, request app = Flask(__name__) def verify(func): def inner(*args, **kwargs): if not session.get('user'): return redirect('/login') ret = func(*args, **kwargs) return ret return inner @app.route('/login', methods=('GET', 'POST')) def login(): if request.method == 'POST': session['user'] = request.form.get('username') return redirect('/detail') return render_template('login.html') @app.route('/index', endpoint='index') @verify def index(): return render_template('index.html') @app.route('/detail', endpoint='detail') @verify def detail(): return render_template('detail.html') if __name__ == '__main__': app.run(debug=True)
6.flask中的路由
6.1 endpoint
-
login = decorator(login)
-
f = login
-
继续向下执行
-
下面还需要用到这个方法,别忘了!!!
-
也就是说,如果没有指定endpoint的值就会默认为视图函数名
-
如果我们添加了装饰器,那么两个被装饰的视图函数的endpoint就一样了(名字一样,但指定的对象不一样)
-
下图还是从add_url_rule看,在这个函数最下面
- 所以我们需要在使用装饰器装饰是要指定endpoint
6.2 url_for
- 可以通过endpoint的值来反向解析url
from flask import Flask, render_template, session, redirect, request, url_for
app = Flask(__name__)
app.secret_key = 'xxxx'
def verify(func):
def inner(*args, **kwargs):
if not session.get('user'):
return redirect('/login')
ret = func(*args, **kwargs)
return ret
return inner
@app.route('/login', methods=('GET', 'POST'))
def login():
if request.method == 'POST':
session['user'] = request.form.get('username')
return redirect('/detail')
return render_template('login.html')
@app.route('/index', endpoint='这是index的endpoint')
@verify # index = verify(index) --> inner = decorate(inner) endpoint = inner
def index():
print(url_for('这是index的endpoint')) # --> /index
return render_template('index.html')
@app.route('/detail', endpoint='这是detail的endpoint')
@verify # detail = verify(detail) --> inner = decorate(inner) endpoint = inner
def detail():
print(url_for('这是detail的endpoint')) # --> /detail
return render_template('detail.html')
if __name__ == '__main__':
app.run(debug=True)
6.3 methods
- 用来指定允许的请求方式
- 为一个可以迭代的对象,元素为字符串
@app.route('/login', methods=('GET', 'POST',....))
def login():
if request.method == 'POST':
return redirect('/detail')
return render_template('login.html')
6.4 默认参数
# 当发送请求时,默认会发送一个参数
# 该参数必须被接收,否则会报错
@app.route('/login',defaults={'nid': 2, 'name': 'xxx'})
def login(nid, name):
print(nid)
print(name)
return 'ok'
if __name__ == '__main__':
app.run(debug=True)
6.5 strict_slashes
-
路由是否按照严格模式
@app.route('/index', strict_slashes=False) # 默认为True def index(): return 'ok' if __name__ == '__main__': app.run(debug=True)
6.6 redirect_to
- 永久跳转,状态码301
- 视图函数都不执行,直接进行路由跳转
@app.route('/index', redirect_to='/detail')
def index():
print(111)
return 'ok'
@app.route('/detail')
def detail():
return 'detail'
if __name__ == '__main__':
app.run(debug=True)
6.7 动态路由参数
- 可选参数类型
- int、string
- 如果没有指定参数类型默认为string
@app.route('/index<int:x>/<string:s1>')
def index(x, s1):
print(x)
print(s1)
return 'index'
if __name__ == '__main__':
app.run(debug=True)
7.flask实例化配置
-
static_url_path:
- 主要感觉这个有点难理解,所以写下这个,耐心点,往下看
from flask import Flask, render_template app = Flask(__name__, static_url_path='/222') @app.route('/index') def index(): return render_template('index.html') if __name__ == '__main__': app.run(debug=True)
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body>
-
往下看
-
原理:
from flask import Flask, render_template app = Flask(__name__, static_url_path='/222') # 默认为None @app.route('/index') def index(): return render_template('index.html') @app.route('/222/<filename>') def get_static(filename): # flask -> static_folder -> static return send_file('static/' + filename) if __name__ == '__main__': app.run(debug=True)
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <img src="/222/111.jpg" alt=""> <!--- 通过/222访问路由 访问到该静态文件的地址 ----> </body> </html>
8.app对象配置
- 参考博客
- 配置信息寻找:
Flask-->self.config-->self.make_config-->self.default_config
9. 蓝图
- 你可以理解为一个小的模块
- 这个模块存放着一部分的业务逻辑代码
- 或者理解为一个不可以run的app实例
9.1 基本的使用
- setting.py (文件名自定义)
# 1.导入蓝图
from flask import Blueprint, render_template
# 2.实例化一个蓝图,顺便指定模板的文件夹 --> temp
# 第一个参数用来区分蓝图,第二个参数:蓝图也需要知道在哪定义的,所以使用__name__
blue = Blueprint('map', __name__, template_folder='temp')
blue1 = Blueprint('map1', __name__, template_folder='temp')
# 3.注册路由
@blue.route('/bluemap')
def bluemap():
# return '这是Blueprint'
return render_template('detail.html')
@blue1.route('/bluemap1')
def bluemap():
# return '这是Blueprint'
return render_template('detil.html')
- 是不是和app很像?只不过不用run,也run不了哈哈哈
- 如果蓝图的模板中有和app模板一样的,则会被app中的模板覆盖
- app.py
from flask import Flask, render_template
# 导入蓝图所在的文件夹或实例
from blueprint import setting
app = Flask(__name__)
# 注册蓝图实例
app.register_blueprint(setting.blue)
app.register_blueprint(setting.blue1)
@app.route('/detail')
def detail():
return render_template('detail.html')
9.2 url_prefix
-
前缀
-
访问该路由需要带上前缀
blue = Blueprint('map', __name__, template_folder='temp', url_prefix='/map')
10. 特殊装饰器
10.1 @app.template_global()
-
模板全局可使用
-
app.py
......
@app.template_global()
def sum_a_b(a, b):
return a + b
......
- xxx.html
.....
{{ sum_a_b(6,3) }}
.....
- 之后通过路由访问该页面时该处为9
10.2 @app.template_filter()
- 过滤器
- app.py
......
@app.template_filter()
def mulit_a_b(a, b):
return a * b
......
-
xxx.html
{{ 9|mulit_a_b(2) }} <!-- 也可以通过使用全局装饰器的结果作为一个参数 --> {{ sum_a_b(6,3)|mulit_a_b(2) }}
-
之后模板将返回过滤器计算的值
10.3 @app.before_request
- 请求视图函数之前
- 类似于我们的django中间件 --> process_request(...)
@app.before_request
def bf1():
print('bf1')
return None
@app.before_request
def bf2():
print('bf2')
return None
@app.before_request
def bf3():
print('bf3')
return None
@app.route('/')
def index():
return 'ok'
if __name__ == '__main__':
app.run(debug=True)
- 执行顺序:
bf1 --> bf2 --> bf3
10.4 @app.after_request
- 结束视图函数之后,返回客户端之前
@app.before_request
def bf1():
print('bf1')
# return '我是bf1返回的响应'
return None
@app.before_request
def bf2():
print('bf2')
return None
@app.before_request
def bf3():
print('bf3')
return None
# res为返回的响应,如django一样,需要将响应返回
@app.after_request
def af1(res):
print('af1')
return res
@app.after_request
def af2(res):
print('af2')
return res
@app.after_request
def af3(res):
print('af3')
return res
@app.route('/')
def index():
return 'ok'
if __name__ == '__main__':
app.run(debug=True)
- 正常:be1 - be2 - be3 - af3 - af2 - af1
- 异常(bf1时返回了响应,自定义的响应):be1 - af3 - af2 - af1
10.5 @app.errorhandler(状态码)
- 根据错误状态码来执行相应的操作
@app.errorhandler(404)
def error404(args): # args存储错误信息
print(args)
return "您访问的页面不存在或者走丢了,,,,,,%s" % (args)
11. CBV
- 如django一样,有了FBV自然也有CBV
# 1. 从flask中导入views
from flask import Flask, views, url_for
# 2. 创建一个flask实例
cbv = Flask(__name__)
# 3.定义一个类,该类继承于 views.MethodView
class RegView(views.MethodView):
# 4.写请求方法(我们类里面写了什么请求方法就说明允许什么请求方法,不需要重新指定)
def get(self):
print(url_for(reg))
return '这是CBV的get请求'
def post(self):
return '这是CBV的post请求'
# 5.有了视图函数但是我们发现没有路由,怎么办呢?
# 6.注册路由(我们当时看源码可以知道需要执行此方法)
# flask实例.add_url_rule(自定义路由, endpoint=None, view_func=指定视图类.as_view(name=自定义字符串)),
# 你可以理解为该自定义字符串为endpoint的值,然后可以通过url_for(自定义字符串)反向解析到路由。不建议再次指定endpoint(None)
cbv.add_url_rule('/reg', view_func=RegView.as_view('reg'))
if __name__ == '__main__':
cbv.run(debug=True)
12. flash
- 可以认为是一个闪存
- 拿出来就没有了
- 还需要设置secret_key,否则会报错
# 1. 从flask中导入views
from flask import Flask, views, url_for, flash, get_flashed_messages
# 2. 创建一个flask实例
cbv = Flask(__name__)
cbv.secret_key = 'afasfa'
@cbv.before_request
def b1():
flash('666') # 将数据存储
flash('777', 'tag') # 将数据存储,tag为数据的标签
class RegView(views.MethodView):
def get(self):
print(get_flashed_messages()) # 从flash拿出所有数据
print(get_flashed_messages(category_filter=['tag'])) # 通过标签从flash拿出数据
return '这是CBV的get请求'
cbv.add_url_rule('/reg', view_func=RegView.as_view('reg'))
if __name__ == '__main__':
cbv.run(debug=True)
- 因为这里我是将存储数据的操作放在
before_request
,所以每一次发送请求都会存储一次数据 - 然后再get请求取出数据
13. Flask-Session
- 我们知道,flask是将session存储在客户端的,这就会有点不安全
- 并且要是别人一不小心知道了你的
secret_key
,那就更不好了 - 所以,这里我们引出另一种存储和加密session的方式
- 这次的存储是使用数据库存储,使用uuid方式加密
13.1 基本使用
-
先下载相关模块:
pip3 install Flask-Session
-
基本使用
- 使用该flask-session还是需要session,总得设置session值吧哈哈
# 1.从flask_session导入Session from flask_session import Session from flask import Flask, views, session, request, render_template # 2.导入需要存储session的数据库对象 from redis import Redis # 3.创建flask实例 cbv = Flask(__name__) # 4.进行相关的配置 cbv.config['SESSION_TYPE'] = 'redis' cbv.config['SESSION_REDIS'] = Redis(host='127.0.0.1', port=6379, db=7) # 4.使用新的Session覆盖之前session的存储和加密方法(先这样理解吧....) Session(cbv) class Login(views.MethodView): def get(self): return render_template('login.html') def post(self): if request.form.get('username'): # 5.设置session session['user'] = '666' return '我有session了' cbv.add_url_rule('/login', view_func=Login.as_view('login')) if __name__ == '__main__': cbv.run(debug=True)
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <form action="" method="post"> 用户名:<input type="text" name="username"> <br> 密码:<input type="password" name="pwd"> <br> <button>提交</button> </form> </body> </html>
13.2 源码解析
-
下面参考博客吧
-
我不知道这两个方法啥时候执行的...
-
只能说你通过这个大概可以知道这个session是怎么存储和加密的,不至于那么糊涂
-
就是不知道这方法啥时候执行的,session的值不知道啥时候存储的....
14.WTForms
- 相当于django的Form
14.1 Form的创建
-
安装模块:
pip3 insatll WTForms
-
创建
from wtforms.fields import core, simple from wtforms import Form, validators, widgets class RegForm(Form): user = simple.StringField(label='用户名', validators=[validators.DataRequired(message='不能为空'), # 校验字段是否为空 validators.length(max=5, min=3, message='长度不能小于3位,不能大于5位'),], # 给生成的标签添加属性,相当于django中Form的attr render_kw={"class": "my_username"}, ) pwd = simple.PasswordField(label='密码', validators=[validators.length(max=5, min=3, message='长度不能小于3位,不能大于5位'), validators.DataRequired(message='不能为空'), # 将数据进行正则匹配,只能为数字 validators.Regexp(regex='\d+', message='密码要为数字')]) repwd = simple.PasswordField(label='重新输入密码', validators=[validators.EqualTo(fieldname='pwd', message='两次密码不一致')]) gender = core.RadioField( label="性别", coerce=str, choices=( ('1', "女"), ('2', "男"), ('q', "男") ), default='q' ) hobby = core.SelectMultipleField( label="爱好", coerce=int, choices=( (1, "小姐姐"), (2, "小萝莉"), (3, "小哥哥"), (4, "小正太"), (5, "阿姨"), (6, "大叔"), ), default=(1, 2, 5) ) submit = simple.SubmitField( label="提交" )
-
from wtforms.fields import core, simple
,core与simple中有我们需要的字段类型 -
validators
校验器,将数据进行校验 -
render_kw
给标签添加相应的属性,与django中Form的widget中attr一样
14.2 基于Flask的使用
- 与django中Form使用方法一致
from flask import Flask, views, render_template
cbv = Flask(__name__)
class RegView(views.MethodView):
def get(self):
regform = RegForm()
return render_template('reg.html', reg=regform)
def post(self):
data = RegForm(request.form)
if data.validate():
return '这是CBV的post请求'
return render_template('reg.html', reg=data)
cbv.add_url_rule('/reg', view_func=RegView.as_view('reg'))
if __name__ == '__main__':
cbv.run(debug=True)
-
reg.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <form action="" method="post"> <p>{{ reg.user.label }}{{ reg.user }}</p> <p>{{ reg.user.errors.0 }}</p> <p>{{ reg.pwd.label }}{{ reg.pwd }}</p> <p>{{ reg.pwd.errors.0 }}</p> <p>{{ reg.repwd.label }}{{ reg.repwd }}</p> <p>{{ reg.repwd.errors.0 }}</p> <div> {{ reg.gender.label }} {{ reg.gender }} </div> <div> {{ reg.hobby.label }} {{ reg.hobby}} </div> <p><input type="submit" value="注册"></p> </form> </body> </html>
15.DBUtils
- 原生sql语句很重要,要熟悉sql语句!!!
- 因为我们有时候用到数据库时动不动创建链接,然后断开链接,这就会消耗性能
- 所以这里引出DBUtils,它可以创建数据库链接池
- 之后断开数据库链接是假的,只是将可使用的链接还给了链接池,这就可以避免必要的断开数据库链接,提高性能
- 不过这里还是看博客吧.....
- 博客地址
16. websocket
socket我们熟悉吧,那么加了web是干嘛的呢?
web + socket: 在web端可以进行用来进行收发消息,那么这个东西如何去运用呢?
- 下载模块:
pip3 install gevent-websocket
16.1 基本使用
-
服务端
from flask import Flask, request from geventwebsocket.websocket import WebSocket # 这个作智能提示用 from geventwebsocket.handler import WebSocketHandler from gevent.pywsgi import WSGIServer import json app = Flask(__name__) @app.route('/ws') def ws(): # 获取请求原始信息 print(request.environ) # 从请求原始信息中获取客户端的链接 user_socket = request.environ.get('wsgi.websocket') # type:WebSocket while True: # 接收客户端传过来的信息 msg = user_socket.receive() print(msg) # 向客户端发送信息 user_socket.send(json.dumps({'id': msg})) if __name__ == '__main__': # app.run() # 指定运行的ip和port,app实例和handler_class http_server = WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) # 启动项目 http_server.serve_forever()
-
客户端
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> </body> <script type="text/javascript"> // 链接服务端 var ws = new WebSocket('ws://127.0.0.1:5000/ws'); // 当客户端接收到信息时会触发该方法,data存储着接收到的信息 // 接收到的信息一定要是字符串类型 ws.onmessage = function (data) { let msg = JSON.parse(data.data); console.log(msg) } </script> </html>
-
客户端接收到信息会触发
onmessage
方法
16.2 建立群聊
-
服务端
from flask import Flask, request, render_template from geventwebsocket.websocket import WebSocket from geventwebsocket.handler import WebSocketHandler from gevent.pywsgi import WSGIServer app = Flask(__name__) # 创建一个存储客户端链接的列表 user_list = [] @app.route('/ws') def ws(): # print(request.environ) # 获取客户端的链接 user_socket = request.environ.get('wsgi.websocket') # type:WebSocket # 将客户端的链接存入列表 user_list.append(user_socket) print(len(user_list), user_list) while True: """ 1.这里有一个小bug,如果我一直刷新页面呢?想一想 2.是不是也会创建一个socket客户端?但列表确实也将链接存入了,并且还报错了,下面有报错图 3.那怎么办捏?下面就有解决的方法 """ # msg = user_socket.receive() # print(msg) # for userscoket in user_list: # if userscoket != user_socket: # userscoket.send(msg) # 数据将发往前端 ws.html --------分割线,下面的可行---------------- # 我们try一下,如果报错那直接将socket删掉 try: msg = user_socket.receive() print(msg) # 循环客户端列表,用于该客户端在群聊发送的数据该客户端自己在前端页面看不到 for userscoket in user_list: if userscoket != user_socket: userscoket.send(msg) # 数据将发往前端 ws.html except: user_list.remove(user_socket) # 访问该路由访问群聊页面 @app.route('/chat') def chat(): return render_template('ws.html') if __name__ == '__main__': # app.run() http_server = WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) http_server.serve_forever()
-
客户端
chat.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <div id="d2"> <input type="text" id="d1"> <button id="b1" onclick="send_msg()">发送</button> </div> </body> <script type="text/javascript"> // 链接服务器 var ws = new WebSocket('ws://127.0.0.1:5000/ws'); // 监听获取数据 ws.onmessage = function (data) { let msg = data.data; console.log(msg); // 将获取到的数据通过p标签展示在页面上 var ptag = document.createElement('p'); ptag.innerText = msg; document.getElementById('d2').appendChild(ptag) }; // 发送数据 function send_msg() { var msg = document.getElementById('d1').value; console.log(msg); ws.send(msg); // 将input框的内容清空 document.getElementById('d1').value='' } </script> </html>
16.3 建立单聊
-
指定发送方和接收方
-
服务端
from flask import Flask, render_template, request from geventwebsocket.handler import WebSocketHandler from geventwebsocket.websocket import WebSocket from gevent.pywsgi import WSGIServer import json app = Flask(__name__) # 建立一个字典,通过key的方式获取到要发送方和接收方 user_dict = {} # 访问该路由访问单聊页面 @app.route('/single') def single(): return render_template('single.html') @app.route('/ws/<username>') # 通过路由传参的方式获取发送方 def ws(username): user_socket = request.environ.get('wsgi.websocket') # type:WebSocket # 通过字典的方式存储socket {nickname: socket1,.....} user_dict[username] = user_socket while True: # 从发送方获取到接收方的nickname和要发送的数据 msg = user_socket.receive() msg = json.loads(msg) to_user = msg.get('nick') content = msg.get('msg') # 获取接收方的socket链接 userscoket = user_dict.get(to_user) # 构造数据 recv_obj = {'from_user': username, 'content': content} # 通过接收方的链接发送数据给前端 userscoket.send(json.dumps(recv_obj)) if __name__ == '__main__': ws_socket = WSGIServer(('0.0.0.0', 5000), app, handler_class=WebSocketHandler) ws_socket.serve_forever()
-
客户端
single.html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <div> <p><input type="text" id="username"> {# 发送方链接服务器 #} <button id="b1" onclick="link()">链接服务</button> </p> {# 指定接收方 #} <p>向谁发送:<input type="text" id="nickname"></p> <p id="p1"><input type="text" id="msg"> <button onclick="send_info()">发送</button> </p> </div> <script type="text/javascript"> var ws = null; function link() { var username = document.getElementById('username').value; ws = new WebSocket('ws://127.0.0.1:5000/ws/' + username); ws.onmessage = function (data) { var recv_info = JSON.parse(data.data); var ptag = document.createElement('p'); ptag.innerText = recv_info.from_user + ': ' + recv_info.content; document.getElementById('p1').appendChild(ptag) } } function send_info() { var nick = document.getElementById('nickname').value; var msg = document.getElementById('msg').value; var send_obj = {nick:nick,msg:msg}; console.log(send_obj); ws.send(JSON.stringify(send_obj)) } </script> </body> </html>
16.4 websocket握手原理
import socket, base64, hashlib
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 9527))
sock.listen(5)
# 获取客户端socket对象
conn, address = sock.accept()
# 获取客户端的【握手】信息
# 当获取到一个websocket客户端链接时,会接收到请求信息
# 该请求原始信息中含有Sec-WebSocket-Key,通过这个建立握手
data = conn.recv(1024)
print(data)
# 请求信息
"""
b'GET / HTTP/1.1\r\n
Host: 127.0.0.1:9527\r\n
Connection: Upgrade\r\n
Pragma: no-cache\r\n
Cache-Control: no-cache\r\n
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36\r\n
Upgrade: websocket\r\n
Origin: http://localhost:63342\r\n
Sec-WebSocket-Version: 13\r\n
Accept-Encoding: gzip, deflate, br\r\n
Accept-Language: zh-CN,zh;q=0.9\r\n
Sec-WebSocket-Key: 7TFBOAd/FKrR2tj2IofnBg==\r\n
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits\r\n\r\n'
"""
# magic string为:258EAFA5-E914-47DA-95CA-C5AB0DC85B11, 固定值
magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
# 抽取出Sec-WebSocket-Key的值
def get_headers(data):
header_dict = {}
header_str = data.decode("utf8")
for i in header_str.split("\r\n"):
if str(i).startswith("Sec-WebSocket-Key"):
header_dict["Sec-WebSocket-Key"] = i.split(":")[1].strip() # 默认去掉首尾空格或空字符
return header_dict # {Sec-WebSocket-Key : 3vbd1/UIZdjSZJ+LmF9+Wg==}
# def get_header(data):
# """
# 将请求头格式化成字典
# :param data:
# :return:
# """
# header_dict = {}
# data = str(data, encoding='utf-8')
#
# header, body = data.split('\r\n\r\n', 1)
# header_list = header.split('\r\n')
# for i in range(0, len(header_list)):
# if i == 0:
# if len(header_list[i].split(' ')) == 3:
# header_dict['method'], header_dict['url'], header_dict['protocol'] = header_list[i].split(' ')
# else:
# k, v = header_list[i].split(':', 1)
# header_dict[k] = v.strip()
# return header_dict
headers = get_headers(data) # 提取请求头信息
# 对请求头中的sec-websocket-key进行加密
response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \
"Upgrade:websocket\r\n" \
"Connection: Upgrade\r\n" \
"Sec-WebSocket-Accept: %s\r\n" \
"WebSocket-Location: ws://127.0.0.1:9527\r\n\r\n"
# 可以说这就是websocket的握手原理
value = headers['Sec-WebSocket-Key'] + magic_string
# value = 3vbd1/UIZdjSZJ+LmF9+Wg==258EAFA5-E914-47DA-95CA-C5AB0DC85B11
ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())
response_str = response_tpl % (ac.decode('utf-8'))
# 响应【握手】信息
conn.send(response_str.encode("utf8"))
while True:
msg = conn.recv(8096)
print(msg)
- 数据被加密了,那么数据是又如何被解密的呢?
16.5 数据解密
# b'\x81 \x8bn \x8eP\x1d>\xe2<\x1a{\xf9?\x077\xea'
hashstr = b'\x81\x8bu[\x8eP\x1d>\xe2<\x1a{\xf9?\x077\xea'
# 将第二个字节也就是 \x8bu[ 第9-16位 进行与127进行位运算
# 计算payload的长度,然后进行if判断进行解密
payload = hashstr[1] & 127
print(payload)
if payload == 127:
extend_payload_len = hashstr[2:10]
mask = hashstr[10:14] # 钥匙
decoded = hashstr[14:] # 数据
# 当位运算结果等于127时,则第3-10个字节为数据长度
# 第11-14字节为mask 解密所需字符串
# 则数据为第15字节至结尾
if payload == 126:
extend_payload_len = hashstr[2:4]
mask = hashstr[4:8]
decoded = hashstr[8:]
# 当位运算结果等于126时,则第3-4个字节为数据长度
# 第5-8字节为mask 解密所需字符串
# 则数据为第9字节至结尾
# b'\x81\x8bn\xf0\x89\xae\x06\x95\xe5\xc2\x01\xd0\xfe\xc1\x1c\x9c\xed'
if payload <= 125:
extend_payload_len = None
mask = hashstr[2:6] # \xf0\x89\xae\x06
decoded = hashstr[6:] # \x95\xe5\xc2\x01\xd0\xfe\xc1\x1c\x9c\xed
# 当位运算结果小于等于125时,则这个数字就是数据的长度
# 第3-6字节为mask 解密所需字符串
# 则数据为第7字节至结尾
str_byte = bytearray() #[b"",b"",b""]
for i in range(len(decoded)):
byte = decoded[i] ^ mask[i % 4]
str_byte.append(byte)
print(str_byte.decode("utf8"))
16.6 数据加密
-
加密就是明文加密.....
-
看结果吧
import struct msg_bytes = "hello world".encode("utf8") token = b"\x81" length = len(msg_bytes) if length < 126: token += struct.pack("B", length) elif length == 126: token += struct.pack("!BH", 126, length) else: token += struct.pack("!BQ", 127, length) msg = token + msg_bytes print(msg)