首页 > 其他分享 >用Flask实现钉钉免登录

用Flask实现钉钉免登录

时间:2023-10-29 16:31:46浏览次数:33  
标签:return 登录 form Flask user 钉钉免 import login data

需求背景

运维平台开发完成后,由运维执行业务变更操作,但是有时候研发那边比较急,而运维也有不在电脑前的时候,这样的话就比较麻烦了,所以想做成钉钉免登陆的方式,当工单任务下发的时候,直接通过手机钉钉登陆到执行页面点击操作,然后再根据项目分执行权限给研发,这样的话就省心多了

前提

1,需要一台外网测试机(阿里云服务器)

2,钉钉上注册一个组织,然后登录钉钉管理后台,创建部门,添加成员,角色信息

浏览器输入https://oa.dingtalk.com/register_new.htm?showmenu=false#/

用Flask实现钉钉免登录_User

用Flask实现钉钉免登录_User_02


https://oa.dingtalk.com/contacts.htm#/contacts?deptManage

用Flask实现钉钉免登录_User_03

用Flask实现钉钉免登录_钉钉免登陆_04


3,登陆钉钉开发者后台,点击应用开发,选择企业内部应用,然后选择小程序,然后单机目标应用详情页面在基础信息页面可以查看到应用的SuiteKey/SuiteSecret(第三方企业应用)或AppKey/AppSecret(企业内部应用)

open-dev.dingtalk.com/fe/app#/

用Flask实现钉钉免登录_钉钉免登陆_05

创建应用后,点击应用进入应用详情页面

用Flask实现钉钉免登录_flask_06

用Flask实现钉钉免登录_User_07

然后在权限管理这里开通获取用户个人信息, 查询个人授权记录,通讯录部门信息读权限,成员信息读权限,通讯录部门成员读权限 这些权限开通

用Flask实现钉钉免登录_flask_08


3,钉应用上配置登陆与分享回调地址

用Flask实现钉钉免登录_钉钉免登陆_09


参考文档

钉钉内免登第三方网站 - 钉钉开放平台 (dingtalk.com)


flask测试钉免密登陆系统项目结构

用Flask实现钉钉免登录_User_10


代码详情

app.py

# -*- coding:UTF-8 -*-
import base64
import hmac
import json
import time
from hashlib import sha256
from urllib.parse import unquote, quote

import click
import requests
from flask import Flask, url_for, render_template, redirect, flash, request, make_response, jsonify
import os
from actlog import HaLog
from flask_login import LoginManager, login_user, login_required, logout_user

from Ding_Dev_Tools.DDtoken import get_token
from Ding_Dev_Tools.DDuser import DingUser
from db import db
from form import LoginForm, RegistrationForm
from models import User
from flask_login import current_user
from Ding_Dev_Tools.DDinfo import *

# 获取当前项目的绝对路径
basedir = os.path.abspath(os.path.dirname(__file__))
# 当前项目服务部署的机器ip server或域名server地址
Ding_Login_Server = "http://xxx.xxx.xxx."
# 初始化logger
logger = HaLog("running.log")


def init_app():
    ha_app = Flask(__name__)
    # 载入配置项
    ha_app.config['DEBUG'] = True
    # 尤其在涉及(flask-WTF)登陆页面提交敏感信息时,一定要设置密钥
    ha_app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY') or 'westos'
    # 数据库引擎配置,用sqlite测试
    ha_app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'data-dev.sqlite')
    # 是否自动提交
    ha_app.config['SQLALCHEMY_COMMIT_ON_TEARDOM'] = True
    # 是否追踪修改,从Flask-SQLALchemy文档中查看
    ha_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
    # 初始化绑定DB 与 Flask
    db.init_app(ha_app)
    return ha_app


app = init_app()
login_manager = LoginManager(app)
# session_protection 属性提供不同的安全等级防止用户会话遭篡改。
login_manager.session_protection = 'basic'
# login_view 属性设置登录页面的端点。
login_manager.login_view = 'login'  # required_login 校验不通过,默认跳转


@login_manager.user_loader
def load_user(user_id):
    """
    加载用户的回调函数接收以unicode字符串形式表示的用户标识符,如果能找到用户,这个函数必须返回用户对象,否则应该返回None
    :param user_id:
    :return:
    """
    return User.query.get(int(user_id))


def get_login(FULL_URL, form):
    """
    :param FULL_URL:
    :param form:
    :return:
    """
    # 如果useragent是钉钉的话直接登录
    user_agent = request.headers.get('User-Agent')
    if "AliApp(DingTalk" in user_agent:
        return dinglogin(FULL_URL)
    if current_user.is_authenticated:  # 已登录状态
        logger.info(f"get_login {current_user.username}用户已登录")
        # noinspection PyBroadException
        try:
            rurl = FULL_URL.split("next=")[1].strip("&")  # 截取字符串 获取next 清除结尾的&
        except:
            rurl = None
        if rurl is not None and rurl != '':  # redirectUrl 参数存在切不为空
            # --start-- 生成response
            r = make_response('', 302)
            r.headers["Location"] = rurl
            # --end-- 生成response
            return r
        else:
            return "欢迎来到主页"
    else:  # 未登录状态
        if FULL_URL is not None and FULL_URL != '':  # redirectUrl 参数存在且不为空
            return render_template('login.html', form=form, redirectUrl=FULL_URL)
        else:
            return render_template('login.html', form=form, redirectUrl=None)


def post_login(form):
    """
    :param form:
    :return:
    """
    FULL_URL = unquote(request.url)
    logger.info("post_login FULL_URL: {}".format(FULL_URL))
    # noinspection PyBroadException
    try:
        rurl = FULL_URL.split("redirectUrl=")[1].strip("&")  # 截取字符串 获取redirectUrl 清除结尾的&
    except:
        rurl = None
    logger.info("post_login rurl: {}".format(rurl))
    if form.validate_on_submit():
        # 判断用户是否存在且用户密码是否正确
        user = User.query.filter_by(email=form.email.data).first()
        if user is None:
            logger.info("post_login 没有这个用户,需要跳转注册页面")
            flash("没有该用户,请先注册用户", category='error')
            return redirect(url_for('register'))
        if user.verity_password(form.password.data):
            login_user(user)  # 登录操作
            # 利用login_user函数将登录信息存入session中
            logger.info(f"post_login {user.username} 通过pc login登录成功")
            flash('用户%s登录成功' % user.username, category='success')
            if rurl is not None and rurl != '':  # redirectUrl 参数存在切不为空
                # --start-- 生成response
                r = make_response('', 302)
                r.headers["Location"] = rurl
                # --end-- 生成response
                return r
            else:
                return "欢迎来到主页"
        else:
            logger.info(f"post_login {user.username} 通过pc login登录失败")
            flash('用户%s登录失败' % form.email.data, category='error')  # 这里用form.email.data是因为登录信息不正确,所以需要从data中获取
            # @login_required 会自动添加一个next parma,如果有这个param 则给redirectUrl加上一个next参数,然后把next挂在上面
            if rurl:
                return redirect(url_for('login', next=rurl))
            else:
                return redirect(url_for('login'))


# 验证钉钉免登陆
# 具体流程:
# 1 根据来访请求,判断来源UA是否为钉钉客户端
# 2 如果是,交由dinglogin 函数处理,
#
# 构建跳转链接 形如
# https://oapi.dingtalk.com/connect/oauth2/sns_authorize?
# appid=APPID&response_type=code&scope=snsapi_auth&state=STATE&redirect_uri=REDIRECT_URI
# 请求钉钉 其中rediect_uri 为 Ding_Login_Server
#
# 3 钉钉携带追加临时授权码code及state两个参数 请求redirct_uri 即/dingauth 。 这里需要考虑访问权限相关问题。负载均衡上需要加一定的白名单。
# 4 根据 授权码code 生成 个人免登场景签名
# 5 根据 个人免登场景签名 获取用户信息
# 6 根据unionid获取userid。
# 7 根据userid获取用户详情。
# 验证钉钉免登陆


def dinglogin(rurl):
    appId = AppKey
    if rurl:
        rediect_url_raw = f'{Ding_Login_Server}/dingauth?redirectUrl=' + rurl
    else:
        rediect_url_raw = f'{Ding_Login_Server}/dingauth'
    rediect_uri = quote(rediect_url_raw)
    dingjump_uri = 'https://oapi.dingtalk.com/connect/oauth2/sns_authorize?' \
                   'appid={}&response_type=code&scope=snsapi_auth&redirect_uri={}' \
        .format(appId, rediect_uri)
    logger.info('检测到钉钉客户端UA dinglogin 跳转 url: {}'.format(dingjump_uri))
    return redirect(dingjump_uri)


# 用于接收处理钉钉免登陆回调
@app.route('/dingauth', methods=["GET", "POST"])
def dingauth():
    FULL_URL = unquote(request.url)
    try:
        REDIRECT_URL = FULL_URL.split("redirectUrl=")[1].strip("&")  # 截取字符串 获取redirectUrl 清除结尾的&
    except:
        REDIRECT_URL = None
    logger.info('钉钉回调重定向完整url:{}'.format(FULL_URL))
    logger.info('用户原始跳转REDIRECT_URL:{}'.format(REDIRECT_URL))
    appId = AppKey.encode('utf-8')
    appSecret = AppSecret.encode('utf-8')
    if request.method == 'GET':
        code = request.args.get('code', None)
        if not code:
            return make_response(jsonify({"success": False, "msg": "缺少合适的参数"}))
        # start 通过code 获取用户信息
        # 计算签名
        timestamp = str(int(time.time() * 1000)).encode('utf-8')
        signature = quote(base64.b64encode(hmac.new(appSecret, timestamp, digestmod=sha256).digest()))
        logger.info("生成钉钉验证签名:".format(signature))
        # 构建数据
        data = {'tmp_auth_code': code}  # post 数据
        json_data = json.dumps(data)  # 字典 to json str
        # 获取用户unionid信息
        unionid_url = 'https://oapi.dingtalk.com/sns/getuserinfo_bycode?' \
                      + 'signature=' + signature \
                      + '×tamp=' + timestamp.decode('utf-8') \
                      + '&accessKey=' + appId.decode('utf-8')

        r_unionid = requests.post(unionid_url, data=json_data).json()
        logger.info("向钉钉请求用户信息,"+str(r_unionid))
        logger.info("通过sns想钉钉请求用户信息返回结果:" + str(r_unionid))
        if r_unionid['errcode'] != 0:
            logger.error("钉钉数据异常,"+str(unionid_url))
            return make_response(jsonify({"success": False, "msg": "钉钉信息获取失败"}))
        # 获取钉钉的 unionid
        # return r_unionid['user_info']
        unionid = r_unionid['user_info']['unionid']

        # 通过unionid 获取钉钉id

        dingtoken = get_token()
        if dingtoken["success"]:
            data = {"unionid": unionid}
            params = {"access_token": dingtoken['msg']}
            r_dingdingID = requests.post('https://oapi.dingtalk.com/topapi/user/getbyunionid',
                                         params=params, data=data).json()
        else:
            logger.error("获取钉钉token失败")
            return make_response(jsonify({"success": False, "msg": "钉钉token失败"}))

        if r_dingdingID['errcode'] == 0:
            logger.info("获取DDID:" + str(r_dingdingID))
            dingding_id = r_dingdingID['result']['userid']
        else:
            logger.error("获取钉钉id失败")
            return make_response(jsonify({"success": False, "msg": "钉钉id失败"}))

        # 通过钉钉ID获取详细信息
        dinguserinfo = DingUser.get_userinfo(dingtoken['msg'], dingding_id)
        logger.info("钉钉请求用户信息:" + str(dinguserinfo))
        # end 通过code 获取钉钉信息
        if dinguserinfo['success']:
            username = dinguserinfo['msg']['name']
            # 判断username信息和数据库人员信息进行核对,核对成功才能进行登录操作
            user = User.query.filter_by(username=username).first()
            if user:
                logger.info('获取用户名:{} ding_login继续登录'.format(username))
                login_user(user)  # 登录操作
                # 利用login_user函数将登录信息存入session中
                logger.info(f"{user.username} 通过ding_login登录成功")
                flash('用户%s登录成功' % user.username, category='success')
                if REDIRECT_URL is not None and REDIRECT_URL != '':  # redirectUrl 参数存在切不为空
                    # --start-- 生成response
                    r = make_response('', 302)
                    r.headers["Location"] = REDIRECT_URL
                    # --end-- 生成response
                    return r
                else:
                    return "欢迎来到主页"
            else:
                logger.error('{} 钉钉 和本地人员数据无法适配'.format(username))
                return make_response(jsonify({"success": False, "msg": '{} 钉钉 和本地人员数据无法适配'.format(username)}))
        else:
            logger.error('钉钉用户信息获取失败')
            return make_response(jsonify({"success": False, "msg": "钉钉用户信息获取失败"}))


@app.route('/login', methods=["GET", "POST"], strict_slashes=False)
def login():
    FULL_URL = unquote(request.url)
    form = LoginForm()
    if request.method == "POST":
        return post_login(form)
    elif request.method == "GET":
        return get_login(FULL_URL, form)
    else:
        logger.error("不支持的请求方式")
        return make_response(jsonify({"success": False, "msg": "不支持的请求方式"}))


@app.route('/register', methods=['GET', 'POST'], strict_slashes=False)  # strict_slashes的意思是是否要求百分百符合前面的路由规则
def register():
    """
        register:
          GET:获取注册页面
          POST:获取注册页面提交的数据信息
            1)判断是否为POST方法提交数据,并且数据是否通过表单验证
            2)如果通过验证,将表单提交的数据存储到数据库中;注册成功,跳转到登陆页面
               获取表单提交的数据有两种方式:
                  i、form.data  是一个字典,通过key值获取
                  ii、form.email.data   form.username.data
        """
    form = RegistrationForm()
    if form.validate_on_submit():
        user = User()
        user.username = form.username.data
        user.password = form.password.data
        user.email = form.email.data
        try:
            local_object = db.session.merge(user)
            db.session.add(local_object)
            db.session.commit()
        except Exception as e:
            db.session.rollback()
            click.echo('register user failed: {reason}'.format(reason=str(e)))
        flash(u'用户%s注册成功' % user.username, category='success')
        return redirect(url_for('login'))

    return render_template('register.html', form=form)


@app.route('/logout')
@login_required
# 此装饰器是用来判断用户是否已经登录
# 当一个函数被多个装饰器装饰时,执行的顺序时从上往下执行,被装饰的顺序是从下往上装饰
# 所以这里是先进入logout路由,然后判断是否已经登录,如果已经登录则执行注销
def logout():
    logout_user()
    return redirect(url_for('login'))


# login_required如果不在登录状态的用户访问这个路由,Flask-Login会拦截请求,把请求发往登录页面(跳转login由login_manager.login_view控制)
@app.route('/wf/demo1/1')
@login_required
def opt_wf():
    return {"success": True, "msg": f"当前用户:{current_user.username} 正在操作工单任务"}


if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000, debug=True)


db.py

from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()


models.py

import datetime

from werkzeug.security import generate_password_hash, check_password_hash

from db import db
from flask_login import UserMixin


class User(UserMixin, db.Model):
    """
        用户信息
        """
    __tablename__ = 'users'  # 自定义数据表的表名
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    username = db.Column(db.String(100), unique=True, nullable=False)
    password_hash = db.Column(db.String(200), nullable=True)
    email = db.Column(db.String(50))
    create_date = db.Column(db.DATETIME(), default=datetime.datetime.now())

    # ........
    # 密码不可读
    @property
    def password(self):
        raise AttributeError('password is not a readable attribute')

    @password.setter
    def password(self, password):
        # generate_password_hash(password,method=pbkd2:sha1,salt_length=8):密码加密的散列值,为密码进行哈希加密
        self.password_hash = generate_password_hash(password)

    def verity_password(self, password):
        # check_password_hash(hash,password):密码散列值和用户输入的密码是否一致
        return check_password_hash(self.password_hash, password)

    def __repr__(self):
        return '<User % r>' % self.username

form.py

from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField, ValidationError
from wtforms.validators import DataRequired, Length, Email, Regexp, EqualTo
from models import User


class RegistrationForm(FlaskForm):  # 还有一种写法class RegistrationForm (Form):
    email = StringField('电子邮箱', validators=[  # email=StringField(
        DataRequired(), Length(1, 64), Email()],  # label='电子邮箱'
                        # 给前端的标签添加下面的属性                            #                   validators=[
                        render_kw={  # validators.data_required(message=u"邮箱不能为空"),
                            'class': 'layui-input',  # validators.length(min=1,max=64),
                            'placeholder': '电子邮箱'  # validators.email],
                        })
    username = StringField('用户名', validators=[
        DataRequired(), Length(1, 64), Regexp('^\w*$', message='用户名只能由字母数字或者下划线组成')],
                           # 给前端的标签添加下面的属性
                           render_kw={
                               'class': 'layui-input',
                               'placeholder': '用户名'
                           })
    password = PasswordField('密码', validators=[
        DataRequired(), EqualTo('repassword', message='密码不一致')],
                             # 给前端的标签添加下面的属性
                             render_kw={
                                 'class': 'layui-input',
                                 'placeholder': '密码'
                             })
    repassword = PasswordField('确认密码', validators=[
        DataRequired()],
                               # 给前端的标签添加下面的属性
                               render_kw={
                                   'class': 'layui-input',
                                   'placeholder': '确认密码'
                               })
    submit = SubmitField('注册')

    # 两个自动验证的函数,已validate_开头且跟着字段名的方法,这个方法和常规的验证函数一起调用
    def validate_email(self, field):
        # field是email表单对象,filed.data是email表单里提交的数据信息
        if User.query.filter_by(email=field.data).first():
            raise ValidationError("邮箱地址%s已经注册" % (field.data))

    def validate_username(self, field):
        if User.query.filter_by(username=field.data).first():
            raise ValueError('用户名%s已经注册' % (field.data))


class LoginForm(FlaskForm):
    """用户登录表单"""
    email = StringField('电子邮箱', validators=[
        DataRequired(), Length(1, 64), Email()],
                        # 给前端得标签添加下面的属性
                        render_kw={
                            'class': 'layui-input',
                            'placeholder': '电子邮箱'
                        })
    password = PasswordField('密码', validators=[
        DataRequired()],
                             # 给前端得标签添加下面得属性信息
                             render_kw={
                                 'class': 'layui-input',
                                 'placeholder': '密码'
                             })
    submit = SubmitField('登录')

actlog.py

import os
import datetime
import sys
basedir = os.path.abspath(os.path.dirname(__file__))  # 获取当前项目的绝对路径


class HaLog:
    def __init__(self, file_name, level='info', log_path=os.path.join(basedir, "log")):
        self.log_file = os.path.join(log_path, file_name)
        self.log_path = log_path
        level_dic = ["info", "warning", "error", "cri"]
        if level in level_dic:
            self.level = level
        else:
            self.level = "info"

    def info(self, msg):
        level = 'info'
        try:
            raise Exception
        except:
            f = sys.exc_info()[2].tb_frame.f_back
        filename = f.f_code.co_filename
        fun_co_name = f.f_code.co_name
        f_lineno = f.f_lineno
        try:
            os.mkdir(self.log_path)
        except:
            pass

        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        data = "{} - {} {} {} {} : {} \n".format(now, level,filename, fun_co_name, f_lineno, msg)
        with open(self.log_file, mode='a', encoding='utf-8') as f:
            f.write(data)

    def warning(self, msg):
        level = 'warning'
        try:
            raise Exception
        except:
            f = sys.exc_info()[2].tb_frame.f_back
        filename = f.f_code.co_filename
        fun_co_name = f.f_code.co_name
        f_lineno = f.f_lineno
        try:
            os.mkdir(self.log_path)
        except:
            pass

        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        data = "{} - {} {} {} {} : {} \n".format(now, level, filename, fun_co_name, f_lineno, msg)
        with open(self.log_file, mode='a', encoding='utf-8') as f:
            f.write(data)

    def error(self, msg):
        level = 'error'
        try:
            raise Exception
        except:
            f = sys.exc_info()[2].tb_frame.f_back
        filename = f.f_code.co_filename
        fun_co_name = f.f_code.co_name
        f_lineno = f.f_lineno
        try:
            os.mkdir(self.log_path)
        except:
            pass

        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        data = "{} - {} {} {} {} : {} \n".format(now, level, filename, fun_co_name, f_lineno, msg)
        with open(self.log_file, mode='a', encoding='utf-8') as f:
            f.write(data)

    def debug(self, msg):
        level = 'debug'
        try:
            raise Exception
        except:
            f = sys.exc_info()[2].tb_frame.f_back
        filename = f.f_code.co_filename
        fun_co_name = f.f_code.co_name
        f_lineno = f.f_lineno
        try:
            os.mkdir(self.log_path)
        except:
            pass

        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        data = "{} - {} {} {} {} : {} \n".format(now, level, filename, fun_co_name, f_lineno, msg)
        with open(self.log_file, mode='a', encoding='utf-8') as f:
            f.write(data)

    def write(self, msg, level=None):
        try:
            raise Exception
        except:
            f = sys.exc_info()[2].tb_frame.f_back
        filename = f.f_code.co_filename
        fun_co_name = f.f_code.co_name
        f_lineno = f.f_lineno
        if level:
            level = level
        else:
            level = self.level
        try:
            os.mkdir(self.log_path)
        except:
            pass

        now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
        data = "{} - {} {} {} {} : {} \n".format(now, level,filename, fun_co_name, f_lineno, msg)
        with open(self.log_file, mode='a', encoding='utf-8') as f:
            f.write(data)

manager.py

from flask_migrate import Migrate
from app import app
from db import db

# 迁移命令管理与app,建立关系
# sqlite使用此参数render_as_batch=True使用batch操作替换普通操作,因为普通操作不支持表名,列名的改变!
migrate = Migrate(app, db, render_as_batch=True)

DDinfo.py

# 钉钉小程序基础信息
AgentId = 'xxxxxxxxx'
AppKey = 'xxxxxxxxx'
AppSecret = 'xxxxxx'
agentid = AgentId
appkey = AppKey
appsecret = AppSecret
api_prefix = "https://oapi.dingtalk.com"

DDtoken.py

import requests
from Ding_Dev_Tools.DDinfo import *


def get_token():
    """
    获取token
    :return:
    """

    api = "/gettoken"
    try:
        params = {"appkey": appkey,
                  "appsecret": appsecret
                  }
        r = requests.get(api_prefix + api, params=params)
        result = r.json()
        if result["errcode"] == 0 and result["errmsg"] == "ok":
            token = result["access_token"]
            return {'success': True, 'msg': token}
        else:
            return {'success': False, 'msg': "Requests Fail"}

    except Exception as e:
        print(e)
        return {'success': False, 'msg': e}

DDuser.py

from Ding_Dev_Tools.DDinfo import *
import requests
from actlog import HaLog

logger = HaLog("running.log")


class DingUser:
    @staticmethod
    def get_userinfo(dingtoken, userid):
        api = '/user/get'
        try:
            params = {"access_token": dingtoken, "userid": userid}
            r = requests.get(api_prefix + api, params=params)
            result = r.json()
            print(result)
            logger.info(str(result))
            if result["errcode"] == 0 and result["errmsg"] == "ok":
                name = result["name"]
                position = result['position']
                department = result['department']
                userid = result['userid']
                # jobnumber = result["jobnumber"]
                # hiredDate = result['hiredDate']
                return {'success': True, 'msg': {'name': name,
                                                 'position': position,
                                                 'userid': userid,
                                                 'department': department,
                                                 }
                        }
            else:
                return {'success': False, 'msg': str(result)}
        except Exception as e:
            print(e)
            return {'success': False, 'msg': e}

flash.html

<html>
<head>
    <link href="{{ url_for('static',filename='css/bootstrap.min.css') }}" ,rel="stylesheet">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/font.css') }}">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/xadmin.css') }}">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/login.css') }}">
    <script type="text/javascript" src="{{ url_for('static',filename="js/jquery.min1.js") }}"></script>
    <script src="{{ url_for('static',filename='js/jquery.min2.js') }}"></script>
    <script src="{{ url_for('static',filename='js/bootstrap.min1.js') }}"></script>
    <script src="{{ url_for('static',filename='lib/layui/layui.js') }}"></script>
    <script src="{{ url_for('static',filename='js/html5.min.js') }}"></script>
    <script src="{{ url_for('static',filename='js/respind.min.js') }}"></script>
</head>
<body>
{% for message in get_flashed_messages(category_filter=['success']) %}
    <div class="alert alert-success alert-dismissible" role="alert">
        <button type="button" class="close" data-dismiss="alert" aria-label="Close">
            <span aria-hidden="true">×</span>
        </button>
        <strong>Success! </strong> {{ message }}
    </div>
{% endfor %}

{% for message in get_flashed_messages(category_filter=['error']) %}
    <div class="alert alert-danger alert-dismissible" role="alert">
        <button type="button" class="close" data-dismiss="alert" aria-label="Close">
            <span aria-hidden="true">×</span>
        </button>
        <strong>Warning! </strong> {{ message }}
    </div>
{% endfor %}
</body>
</html>

login.html

<html>
<head>
    <link href="{{ url_for('static',filename='css/bootstrap.min.css') }}" ,rel="stylesheet">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/font.css') }}">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/xadmin.css') }}">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/login.css') }}">
    <script type="text/javascript" src="{{ url_for('static',filename="js/jquery.min1.js") }}"></script>
    <script src="{{ url_for('static',filename='js/jquery.min2.js') }}"></script>
    <script src="{{ url_for('static',filename='js/bootstrap.min1.js') }}"></script>
    <script src="{{ url_for('static',filename='lib/layui/layui.js') }}"></script>
    <script src="{{ url_for('static',filename='js/html5.min.js') }}"></script>
    <script src="{{ url_for('static',filename='js/respind.min.js') }}"></script>
</head>
<body>
<div class="login layui-anim layui-anim-up">
    <div class="title">任务清单系统登录</div>
    <div id="darkbannerwrap"></div>
    {% include 'flash.html' %}
    <form method="post" class="layui-form" action="{{ url_for('login', redirectUrl=redirectUrl) }}">
        {{ form.hidden_tag() }}
        {{ form.email() }}
        {# <input name="username" placeholder="用户名" type="text" lay- verify="required" class="layui-input">#}
        {# <p class="error">用户登录失败</p>#}
        <p class="error">{{ form.email.errors[0] }}</p>
        <hr class="hr15">
        {{ form.password() }}
        {# <input name="password" lay-verify="required" placeholder="密码" type="password" class="layui-input">#}
        {# <p class="error"> 密码失败</p>#}
        <p class="error"> {{ form.password.errors[0] }}</p>
        <hr class="hr15">
        {{ form.submit() }}
        {# <input value="登录" style="width:100%;" type="submit">#}
        <hr class="hr20">
    </form>
</div>
</body>
</html>

register.html

<html>
<head>
    <link href="{{ url_for('static',filename='css/bootstrap.min.css') }}" ,rel="stylesheet">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/font.css') }}">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/xadmin.css') }}">
    <link rel="stylesheet" href="{{ url_for('static',filename='css/login.css') }}">
    <script type="text/javascript" src="{{ url_for('static',filename="js/jquery.min1.js") }}"></script>
    <script src="{{ url_for('static',filename='js/jquery.min2.js') }}"></script>
    <script src="{{ url_for('static',filename='js/bootstrap.min1.js') }}"></script>
    <script src="{{ url_for('static',filename='lib/layui/layui.js') }}"></script>
    <script src="{{ url_for('static',filename='js/html5.min.js') }}"></script>
    <script src="{{ url_for('static',filename='js/respind.min.js') }}"></script>
</head>
<body>
<div class="login layui-anim layui-anim-up">
    <div class="title">任务清单系统注册</div>
    <div id="darkbannerwrap"></div>
    {% include 'flash.html' %}
    <form method="post" class="layui-form" action="{{ url_for('register') }}">
        {{ form.hidden_tag() }}
        {{ form.email() }}
        <p class="error">{{ form.email.errors[0] }}</p>
        <hr class="hr15">
        {{ form.username() }}
        <p class="error">{{ form.username.errors[0] }}</p>
        <hr class="hr15">
        {{ form.password() }}
        <p class="error"> {{ form.password.errors[0] }}</p>
        <hr class="hr15">
        {{ form.repassword() }}
        <p class="error"> {{ form.repassword.errors[0] }}</p>
        <hr class="hr15">
        {{ form.submit() }}
        <hr class="hr20">
    </form>
</div>
</body>
</html>

requirements.txt

alembic==1.7.7
certifi==2023.7.22
charset-normalizer==2.0.12
click==8.0.4
colorama==0.4.5
dataclasses==0.8
dnspython==2.2.1
email-validator==1.3.1
Flask==2.0.3
Flask-Login==0.5.0
Flask-Migrate==3.1.0
Flask-SQLAlchemy==2.5.1
Flask-WTF==1.0.1
greenlet==2.0.2
idna==3.4
importlib-metadata==4.8.3
importlib-resources==5.4.0
itsdangerous==2.0.1
Jinja2==3.0.3
Mako==1.1.6
MarkupSafe==2.0.1
requests==2.27.1
SQLAlchemy==1.4.49
typing_extensions==4.1.1
urllib3==1.26.18
Werkzeug==2.0.3
WTForms==3.0.0
zipp==3.6.0


将代码部署到公网服务器上

这个我们就简单部署下

yum install -y python3,nginx
yum install -y unzip
pip3 install virtualenv -i https://pypi.tuna.tsinghua.edu.cn/simple
unzip 钉钉免登陆demo.zip
cd 钉钉免登陆demo/
virtualenv venv
source venv/bin/activate
pip3 install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
export FLASK_APP="manager"
flask db init
flask db migrate
flask db upgrade
nohup python app.py &


nginx配置

server {
        listen       80;
        server_name  自己的服务器的公网ip;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;
        #access_log  "pipe:rollback logs/host.access_log interval=1d baknum=7 maxsize=2G"  main;

        location / {
           add_header Access-Control-Allow-Origin *;
        add_header Access-Control-Allow-Methods 'GET, POST, OPTIONS';
        add_header Access-Control-Allow-Headers 'DNT,X-Mx-ReqToken,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Authorization';
        proxy_pass http://127.0.0.1:5000/;
        }
}


测试效果

先注册用户

用Flask实现钉钉免登录_User_11

测试pc端访问效果

测试url http://xxxx.xxx.xxx.xx/wf/demo1/1

用Flask实现钉钉免登录_钉钉免登陆_12

用Flask实现钉钉免登录_钉钉免登陆_13


测试收集钉钉登陆

用Flask实现钉钉免登录_flask_14


标签:return,登录,form,Flask,user,钉钉免,import,login,data
From: https://blog.51cto.com/u_15703497/8080917

相关文章

  • VMware VCSA 5480 后台登录提示无法登陆问题解决
     通过控制台登入启用shell使用service-control--status--all查看applmgmt服务状态(显示已停止) 使用service-control--startapplmgmt启动服务 回车后会自动退出命令行模式 此时回到浏览器新建标签页重新登录5480端口成功    使用官网说明使用SingleS......
  • 多平台cookie登录工具,提供源码和思路
    下面是界面: 下面是程序集代码:.版本2.支持库ietb.支持库spec.支持库iext.支持库eAPI.程序集窗口程序集_启动窗口.程序集变量页面,谷歌页面.程序集变量ccookiesA,文本型,,"0".子程序_按钮1_被单击.局部变量是否成功,逻辑型.局部变量谷歌浏览器路径......
  • ubuntu配置SSH登录xsheel
    ubuntu配置SSH登录xsheel环境Ubuntu-22.04.2-desktop-amd64Ubuntu安装后自带SSH客户端,但是要想使用xsheel连接Ubuntu需要在Ubuntu中下载SSH-Server。步骤安装openssh-serversudoapt-getinstallopenssh-server#然后输入密码,填写Y然后,需要通过编辑/etc......
  • 服务器配置密钥登录
    背景服务器如果是密码登录很容易被人攻破,为了避免这种情况,可以采取密钥的登录方式并且把密码登录给关闭掉。参考文章服务器上生成密钥对将私钥给客户端#生成密钥对[root@czf~]#ssh-keygenGeneratingpublic/privatersakeypair.Enterfileinwhichtosavethekey......
  • Linux免密登录脚本
    首先安装sshpassyuminstall-ysshpassLinux免密登录脚本:#!/bin/bashexportIP="192.168.100.140192.168.100.141192.168.100.142"exportSSHPASS=086530forHOSTin$IP;dosshpass-essh-copy-id-oStrictHostKeyChecking=no$HOST scp/etc/hostsroot@$H......
  • node+mysql+express实现登录/注册/修改密码/删除用户 接口
    实现用户的注册、登录、修改密码、删除用户操作用到的数据库:nodecms;表:user目录结构:db目录下存放数据库操作语句:userSQL.js用户有关的操作语句router目录接口路由文件user.js用户接口路由connect.js数据库连接index.html前端测试页面index.js入口文件package.js......
  • 鸿蒙极速入门(四)-通过登录Demo了解ArkTS
    ArkTS是HarmonyOS优选的主力应用开发语言。ArkTS围绕应用开发在TypeScript(简称TS)生态基础上做了进一步扩展,继承了TS的所有特性,是TS的超集。ArkTS在TS的基础上主要扩展了如下能力:基本语法:ArkTS定义了声明式UI描述、自定义组件和动态扩展UI元素的能力,再配合ArkUI开发框架中的系统......
  • 登录页面--图片验证码
    登陆界面实现图片验证码功能 开始吧!!!如何生成图片呢安装pillow模块pipinstallpillow新建myproject/app01/utils/ttf目录,将字体放在其下新建myproject/app01/utils/code.py,编辑验证码生成函数fromPILimportImage,ImageDraw,ImageFilter,ImageFontimpor......
  • 直播系统源码,自动登录及记住密码实现
    直播系统源码,自动登录及记住密码实现分为两个activity,mainActivity是登录页面,homeActivity是登录成功页面。HomeActivity.java代码 publicclassHomeActivityextendsAppCompatActivity{@OverrideprotectedvoidonCreate(BundlesavedInstanceState){  super.onCrea......
  • Spring Boot整合OAuth2实现GitHub第三方登录
    GithubOAuth第三方登录示例1、第三方登录原理第三方登录的原理是借助OAuth授权来实现,首先用户先向客户端提供第三方网站的数据证明自己的身份获取授权码,然后客户端拿着授权码与授权服务器建立连接获得一个AccessToken,之后客户端就可以通过AccessToken来与资源服务器进行交互......