Flask 实现Token认证机制
在Flask框架中,实现Token认证机制并不是一件复杂的事情。除了使用官方提供的flask_httpauth
模块或者第三方模块flask-jwt
,我们还可以考虑自己实现一个简易版的Token认证工具。自定义Token认证机制的本质是生成一个令牌(Token),并在用户每次请求时验证这个令牌的有效性。
整个过程可以分为以下几个步骤:
- 用户登录时生成Token,并将Token与用户关联存储在服务器端。
- 用户在请求时携带Token。
- 服务器在收到请求后,验证Token的有效性。
- 如果Token有效,允许用户访问相应资源;否则,拒绝访问。
这种自定义的Token认证机制相对简单,适用于一些小型应用或者对于Token认证机制有特殊需求的场景。搭建这样一个简易的认证系统有助于理解Token认证的基本原理,并可以根据实际需求进行灵活的定制。
创建表结构
通过表结构的创建,建立用户认证和会话管理表。UserAuthDB
表存储了用户的账号密码信息,而SessionAuthDB
表则存储了用户登录后生成的Token
信息,包括用户名、Token本身以及Token的过期时间。这为后续实现用户注册、登录以及Token认证等功能提供了数据库支持。
UserAuthDB表:
- 用途:存储用户账号密码信息。
- 字段:
id
: 主键,自增,唯一标识每个用户。username
: 用户名,非空,唯一,用于登录时识别用户。password
: 密码,非空,用于验证用户身份。
SessionAuthDB表:
- 用途:存储登录成功后用户的Token信息。
- 字段:
id
: 主键,自增,唯一标识每个登录会话。username
: 用户名,非空,唯一,关联到UserAuthDB
表的用户名。token
: 用户登录后生成的Token,非空,唯一,用于身份验证。invalid_date
: Token的过期时间,用于判断Token是否过期。
代码通过Flask路由/create
实现了数据库表结构的创建,主要包括两张表,分别是UserAuthDB
和SessionAuthDB
。
@app.route("/create",methods=["GET"])
def create():
conn = sqlite3.connect("./database.db")
cursor = conn.cursor()
create_auth = "create table UserAuthDB(" \
"id INTEGER primary key AUTOINCREMENT not null unique," \
"username varchar(64) not null unique," \
"password varchar(64) not null" \
")"
cursor.execute(create_auth)
create_session = "create table SessionAuthDB(" \
"id INTEGER primary key AUTOINCREMENT not null unique," \
"username varchar(64) not null unique," \
"token varchar(128) not null unique," \
"invalid_date int not null" \
")"
cursor.execute(create_session)
conn.commit()
cursor.close()
conn.close()
return "create success"
验证函数
该验证函数用于保证传入的用户名和密码满足一定的安全性和格式要求。通过对长度和字符内容的检查,确保了传入的参数不会导致潜在的安全问题。这样的验证机制在用户注册、登录等场景中可以有效地防止一些常见的安全漏洞。
参数验证:
- 接受不定数量的参数
*kwargs
,可传入多个参数。 - 对于每个传入的参数,首先验证其长度是否在合法范围内(小于128个字符且不为空)。
字符串处理:
- 将参数转换为小写形式,然后去除两侧空格,并移除所有空格。
字符内容验证:
- 遍历处理后的字符串,检查其中的字符是否仅包含大写字母、小写字母和数字。如果出现其他字符,则认为非法。
返回结果:
- 如果所有参数验证通过,即长度合法且字符内容符合要求,则返回
True
,表示参数合法。 - 如果有任何一个参数不合法,则返回
False
,表示参数存在非法字符或超出长度限制。
代码定义了一个名为CheckParameters
的验证函数,该函数用于验证传入的参数是否合法。主要验证的对象是用户名和密码,具体概述如下:
def CheckParameters(*kwargs):
for item in range(len(kwargs)):
# 先验证长度
if len(kwargs[item]) >= 128 or len(kwargs[item]) == 0:
return False
# 先小写,然后去掉两侧空格,去掉所有空格
local_string = kwargs[item].lower().strip().replace(" ","")
# 判断是否只包含 大写 小写 数字
for kw in local_string:
if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
return False
return True
登录认证函数
该函数实现了用户登录认证的核心逻辑。首先对输入的用户名和密码进行验证,然后检查用户是否存在以及是否已经有生成的Token。如果用户存在但Token不存在,生成一个新的Token并存入数据库,最终返回生成的Token。
路由定义:
- 使用
@app.route("/login", methods=["POST"])
定义了一个POST请求的路由,用于处理用户登录请求。
参数获取:
- 通过
request.form.to_dict()
获取POST请求中的参数,包括用户名(username)和密码(password)。
参数验证:
- 调用之前定义的
CheckParameters
函数对获取的用户名和密码进行合法性验证,确保其符合安全性和格式要求。
用户存在性验证:
- 调用
RunSqlite
函数查询UserAuthDB
表,验证用户名和密码是否匹配。如果存在匹配的用户,则继续执行下一步。
生成Token:
- 查询
SessionAuthDB
表,检查是否存在该用户的Token记录。如果存在,则直接返回该Token。 - 如果不存在Token记录,则生成一个32位的随机Token,并设置过期时间为当前时间戳加上360秒(6分钟)。
Token写入数据库:
- 将生成的Token和过期时间写入
SessionAuthDB
表。
返回结果:
- 返回生成的Token,作为登录成功的标识。
@app.route("/login",methods=["POST"])
def login():
if request.method == "POST":
# 获取参数信息
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 2:
username = obtain_dict["username"]
password = obtain_dict["password"]
# 验证是否合法
is_true = CheckParameters(username,password)
if is_true == True:
# 查询是否存在该用户
select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
if select[0][0] == username and select[0][1] == password:
# 查询Session列表是否存在
select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
if select_session != []:
ref = {"message": ""}
ref["message"] = select_session[0][0]
return json.dumps(ref, ensure_ascii=False)
# Session不存在则需要重新生成
else:
# 生成并写入token和过期时间戳
token = ''.join(random.sample(string.ascii_letters + string.digits, 32))
# 设置360秒周期,过期时间
time_stamp = int(time.time()) + 360
insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
if insert == True:
ref = {"message": ""}
ref["message"] = token
return json.dumps(ref, ensure_ascii=False)
else:
return json.dumps("{'message': '用户名或密码错误'}", ensure_ascii=False)
else:
return json.dumps("{'message': '输入参数不可用'}", ensure_ascii=False)
return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
登录认证装饰器
检查用户登录状态Token是否过期的装饰器,装饰器用于装饰某一些函数,当主调函数被调用时,会优先执行装饰器内的代码,执行后根据装饰器执行结果返回或退出,装饰器分为两种模式,一种是FBV模式,另一种是CBV模式。
FBV(Function-Based Views)和CBV(Class-Based Views)是两种不同的视图设计模式,用于处理Web框架中的请求和生成响应。这两种模式在Django框架中被广泛使用。
FBV(Function-Based Views)
- 定义: FBV是指使用普通的Python函数来处理请求和生成响应的视图设计模式。
- 特点:
- 每个视图对应一个函数,函数接收请求作为参数,返回响应。
- 简单,易于理解和使用。
- 视图的逻辑和处理集中在一个函数中。
示例:
def my_view(request):
# 处理逻辑
return HttpResponse("Hello, World!")
CBV(Class-Based Views)
- 定义: CBV是指使用基于类的Python类来处理请求和生成响应的视图设计模式。
- 特点:
- 视图是类,每个类中可以包含多个方法来处理不同HTTP方法(GET、POST等)的请求。
- 提供了更多的代码组织和复用的可能性,可以使用类的继承、Mixin等方式。
- 更灵活,适用于复杂的业务逻辑和共享逻辑。
示例:
class MyView(View):
def get(self, request):
# 处理 GET 请求的逻辑
return HttpResponse("Hello, World!")
def post(self, request):
# 处理 POST 请求的逻辑
return HttpResponse("Received a POST request")
FBV与CBV区别
- 结构差异: FBV使用函数,逻辑较为集中;CBV使用类,允许通过类的继承和Mixin等方式更好地组织代码。
- 代码复用: CBV更容易实现代码复用,可以通过继承和Mixin在不同的类之间共享逻辑;而FBV需要显式地将共享逻辑提取为函数。
- 装饰器: 在FBV中,使用装饰器来添加额外的功能;而在CBV中,通过类的继承和Mixin来实现相似的功能。
- 可读性: 对于简单的视图逻辑,FBV可能更直观易懂;对于较为复杂的业务逻辑,CBV提供了更好的组织和扩展性。
在Flask中,两种设计模式都可以使用,开发者可以根据项目的需求和个人喜好选择使用FBV或CBV。
基于FBV的装饰器设置使用时,需要注意装饰器嵌入的位置,装饰器需要在请求进入路由之前,即在请求未走原逻辑代码的时候介入,对原业务逻辑进行业务拓展。
from flask import Flask, request,render_template
from functools import wraps
app = Flask(__name__)
def login(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("登录请求: {}".format(request.url))
value = request.form.get("value")
if value == "lyshark":
# 调用原函数,并返回
function_ptr = func(*args, **kwargs)
return function_ptr
else:
return "登录失败"
return wrapper
@app.route('/', methods=['GET', 'POST'])
@login
def index():
if request.method == "POST":
value = request.form.get("value")
return "index"
if __name__ == '__main__':
app.run()
而基于CBV的装饰器设置,使用就显得更加细分化,可以定制管理专属功能,在外部定义装饰器可以全局使用,内部定义可以针对特定路由函数特殊处理。
from flask import Flask, request,render_template,views
from functools import wraps
app = Flask(__name__)
# 装饰器
def login(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("登录请求: {}".format(request.url))
value = request.form.get("value")
if value == "lyshark":
# 调用原函数,并返回
function_ptr = func(*args, **kwargs)
return function_ptr
else:
return "登录失败"
return wrapper
# 类视图
class index(views.MethodView):
@login
def get(self):
return request.args
@login
def post(self):
return "success"
# 增加路由
app.add_url_rule(rule='/', view_func=index.as_view('index'))
if __name__ == '__main__':
app.run()
此处为了实现起来更简单一些此处直接使用FBV模式,我们实现的login_check
装饰器通过FVB模式构建,代码中取得用户的Token以及用户名对用户身份进行验证。
def login_check(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("处理登录逻辑部分: {}".format(request.url))
# 得到token 验证是否登陆了,且token没有过期
local_timestamp = int(time.time())
get_token = request.headers.get("token")
# 验证传入参数是否合法
if CheckParameters(get_token) == True:
select = RunSqlite("database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
print(select)
# 判断是否存在记录,如果存在,在判断时间戳是否合理
if select != []:
# 如果当前时间与数据库比对,大于说明过期了需要删除原来的,让用户重新登录
if local_timestamp >= int(select[0][1]):
print("时间戳过期了")
# 删除原来的Token
delete = RunSqlite("database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
if delete == True:
return json.dumps("{'token': 'Token 已过期,请重新登录获取'}", ensure_ascii=False)
else:
return json.dumps("{'token': '数据库删除异常,请联系开发者'}", ensure_ascii=False)
else:
# 验证Token是否一致
if select[0][0] == get_token:
print("Token验证正常,继续执行function_ptr指向代码.")
# 返回到原函数
return func(*args, **kwargs)
else:
print("Token验证错误 {}".format(select))
return json.dumps("{'token': 'Token 传入错误'}", ensure_ascii=False)
# 装饰器调用原函数
# function_ptr = func(*args, **kwargs)
return json.dumps("{'token': 'Token 验证失败'}", ensure_ascii=False)
return wrapper
调用演示
主调用函数则是具体的功能实现可以自定义扩展,当用户访问该路由时会优先调用login_check
装饰器来验证用户携带Token的合法性,如果合法则会通过return func(*args, **kwargs)
返回执行主调函数,否则直接返回验证失败的消息。
# 获取参数函数
@app.route("/GetPage", methods=["POST"])
@login_check
def GetPage():
if request.method == "POST":
# 获取参数信息
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 1:
pagename = obtain_dict["pagename"]
print("查询名称: {}".format(obtain_dict["pagename"]))
# 相应头的完整写法
req = Response(response="ok", status=200, mimetype="application/json")
req.headers["Content-Type"] = "text/json; charset=utf-8"
req.headers["Server"] = "LyShark Server 1.0"
req.data = json.dumps("{'message': 'hello world'}")
return req
else:
return json.dumps("{'message': '传入参数错误,请携带正确参数请求'}", ensure_ascii=False)
return json.dumps("{'token': '未知错误'}", ensure_ascii=False)
# 用户注册函数
@app.route("/register", methods=["POST"])
def Register():
if request.method == "POST":
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 2:
print("用户名: {} 密码: {}".format(obtain_dict["username"], obtain_dict["password"]))
reg_username = obtain_dict["username"]
reg_password = obtain_dict["password"]
# 验证是否合法
if CheckParameters(reg_username, reg_password) == False:
return json.dumps("{'message': '传入用户名密码不合法'}", ensure_ascii=False)
# 查询用户是否存在
select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
if select != []:
return json.dumps("{'message': '用户名已被注册'}", ensure_ascii=False)
else:
insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
if insert == True:
return json.dumps("{'message': '注册成功'}", ensure_ascii=False)
else:
return json.dumps("{'message': '注册失败'}", ensure_ascii=False)
else:
return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
# 密码修改函数
@app.route("/modify", methods=["POST"])
@login_check
def modify():
if request.method == "POST":
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 1:
mdf_password = obtain_dict["password"]
get_token = request.headers.get("token")
print("获取token: {} 修改后密码: {}".format(get_token,mdf_password))
# 验证是否合法
if CheckParameters(get_token, mdf_password) == False:
return json.dumps("{'message': '传入密码不合法'}", ensure_ascii=False)
# 先得到token对应用户名
select = RunSqlite("database.db","SessionAuthDB","select","username",f"token='{get_token}'")
if select != []:
# 接着直接修改密码即可
modify_username = str(select[0][0])
print("得到的用户名: {}".format(modify_username))
update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
if update == True:
# 删除原来的token,让用户重新获取
delete = RunSqlite("database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
print("删除token状态: {}".format(delete))
return json.dumps("{'message': '修改成功,请重新登录获取Token'}", ensure_ascii=False)
else:
return json.dumps("{'message': '修改失败'}", ensure_ascii=False)
else:
return json.dumps("{'message': '不存在该Token,无法修改密码'}", ensure_ascii=False)
else:
return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
FBV模式下的完整代码,以下是对代码的概述:
主要功能:
- 数据库操作: 封装了对 SQLite 数据库的基本增删改查操作(
RunSqlite
函数)。 - 用户认证: 提供了用户登录、注册和密码修改的功能。使用了 Token 机制进行登录认证,并通过装饰器
login_check
来验证 Token 的有效性。 - 创建数据库表: 提供了一个用于初始化数据库表结构的接口
/create
。 - 获取页面信息: 通过
/GetPage
接口,使用了login_check
装饰器来验证用户登录状态,仅对已登录用户提供页面信息。
主要路由
/create
:创建数据库表结构。/login
:用户登录接口,返回用户的 Token。/GetPage
:获取页面信息,需要用户登录并携带有效 Token。/register
:用户注册接口。/modify
:修改用户密码接口,需要用户登录并携带有效 Token。
代码结构
- 数据库操作:
- 提供了对 SQLite 数据库的基本操作,包括插入、更新、查询和删除。
- 用户认证:
- 使用了装饰器
login_check
对需要登录的路由进行认证。 - 提供了用户登录、注册和密码修改的路由。
- 使用了装饰器
- 创建数据库表:
- 提供了一个用于初始化数据库表结构的路由。
- 获取页面信息:
- 提供了一个用于获取页面信息的路由,需要用户登录并携带有效 Token。
from flask import Flask,render_template,request,Response,redirect,jsonify
from functools import wraps
import json,sqlite3,random,string,time
app = Flask(__name__)
# 增删改查简单封装
def RunSqlite(db,table,action,field,value):
connect = sqlite3.connect(db)
cursor = connect.cursor()
# 执行插入动作
if action == "insert":
insert = f"insert into {table}({field}) values({value});"
if insert == None or len(insert) == 0:
return False
try:
cursor.execute(insert)
except Exception:
return False
# 执行更新操作
elif action == "update":
update = f"update {table} set {value} where {field};"
if update == None or len(update) == 0:
return False
try:
cursor.execute(update)
except Exception:
return False
# 执行查询操作
elif action == "select":
# 查询条件是否为空
if value == "none":
select = f"select {field} from {table};"
else:
select = f"select {field} from {table} where {value};"
try:
ref = cursor.execute(select)
ref_data = ref.fetchall()
connect.commit()
connect.close()
return ref_data
except Exception:
return False
# 执行删除操作
elif action == "delete":
delete = f"delete from {table} where {field};"
if delete == None or len(delete) == 0:
return False
try:
cursor.execute(delete)
except Exception:
return False
try:
connect.commit()
connect.close()
return True
except Exception:
return False
@app.route("/create",methods=["GET"])
def create():
conn = sqlite3.connect("./database.db")
cursor = conn.cursor()
create_auth = "create table UserAuthDB(" \
"id INTEGER primary key AUTOINCREMENT not null unique," \
"username varchar(64) not null unique," \
"password varchar(64) not null" \
")"
cursor.execute(create_auth)
create_session = "create table SessionAuthDB(" \
"id INTEGER primary key AUTOINCREMENT not null unique," \
"username varchar(64) not null unique," \
"token varchar(128) not null unique," \
"invalid_date int not null" \
")"
cursor.execute(create_session)
conn.commit()
cursor.close()
conn.close()
return "create success"
# 验证用户名密码是否合法
def CheckParameters(*kwargs):
for item in range(len(kwargs)):
# 先验证长度
if len(kwargs[item]) >= 256 or len(kwargs[item]) == 0:
return False
# 先小写,然后去掉两侧空格,去掉所有空格
local_string = kwargs[item].lower().strip().replace(" ","")
# 判断是否只包含 大写 小写 数字
for kw in local_string:
if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
return False
return True
# 登录认证模块
@app.route("/login",methods=["POST"])
def login():
if request.method == "POST":
# 获取参数信息
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 2:
username = obtain_dict["username"]
password = obtain_dict["password"]
# 验证是否合法
is_true = CheckParameters(username,password)
if is_true == True:
# 查询是否存在该用户
select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
if select[0][0] == username and select[0][1] == password:
# 查询Session列表是否存在
select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
if select_session != []:
ref = {"message": ""}
ref["message"] = select_session[0][0]
return json.dumps(ref, ensure_ascii=False)
# Session不存在则需要重新生成
else:
# 生成并写入token和过期时间戳
token = ''.join(random.sample(string.ascii_letters + string.digits, 32))
# 设置360秒周期,过期时间
time_stamp = int(time.time()) + 360
insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
if insert == True:
ref = {"message": ""}
ref["message"] = token
return json.dumps(ref, ensure_ascii=False)
else:
return json.dumps("{'message': '用户名或密码错误'}", ensure_ascii=False)
else:
return json.dumps("{'message': '输入参数不可用'}", ensure_ascii=False)
return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
# 检查登录状态 token是否过期的装饰器
def login_check(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("处理登录逻辑部分: {}".format(request.url))
# 得到token 验证是否登陆了,且token没有过期
local_timestamp = int(time.time())
get_token = request.headers.get("token")
# 验证传入参数是否合法
if CheckParameters(get_token) == True:
select = RunSqlite("./database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
print(select)
# 判断是否存在记录,如果存在,在判断时间戳是否合理
if select != []:
# 如果当前时间与数据库比对,大于说明过期了需要删除原来的,让用户重新登录
if local_timestamp >= int(select[0][1]):
print("时间戳过期了")
# 删除原来的Token
delete = RunSqlite("./database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
if delete == True:
return json.dumps("{'token': 'Token 已过期,请重新登录获取'}", ensure_ascii=False)
else:
return json.dumps("{'token': '数据库删除异常,请联系开发者'}", ensure_ascii=False)
else:
# 验证Token是否一致
if select[0][0] == get_token:
print("Token验证正常,继续执行function_ptr指向代码.")
# 返回到原函数
return func(*args, **kwargs)
else:
print("Token验证错误 {}".format(select))
return json.dumps("{'token': 'Token 传入错误'}", ensure_ascii=False)
# 装饰器调用原函数
# function_ptr = func(*args, **kwargs)
return json.dumps("{'token': 'Token 验证失败'}", ensure_ascii=False)
return wrapper
# 获取参数函数
@app.route("/GetPage", methods=["POST"])
@login_check
def GetPage():
if request.method == "POST":
# 获取参数信息
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 1:
pagename = obtain_dict["pagename"]
print("查询名称: {}".format(obtain_dict["pagename"]))
# 相应头的完整写法
req = Response(response="ok", status=200, mimetype="application/json")
req.headers["Content-Type"] = "text/json; charset=utf-8"
req.headers["Server"] = "LyShark Server 1.0"
req.data = json.dumps("{'message': 'hello world'}")
return req
else:
return json.dumps("{'message': '传入参数错误,请携带正确参数请求'}", ensure_ascii=False)
return json.dumps("{'token': '未知错误'}", ensure_ascii=False)
# 用户注册函数
@app.route("/register", methods=["POST"])
def Register():
if request.method == "POST":
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 2:
print("用户名: {} 密码: {}".format(obtain_dict["username"], obtain_dict["password"]))
reg_username = obtain_dict["username"]
reg_password = obtain_dict["password"]
# 验证是否合法
if CheckParameters(reg_username, reg_password) == False:
return json.dumps("{'message': '传入用户名密码不合法'}", ensure_ascii=False)
# 查询用户是否存在
select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
if select != []:
return json.dumps("{'message': '用户名已被注册'}", ensure_ascii=False)
else:
insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
if insert == True:
return json.dumps("{'message': '注册成功'}", ensure_ascii=False)
else:
return json.dumps("{'message': '注册失败'}", ensure_ascii=False)
else:
return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
# 密码修改函数
@app.route("/modify", methods=["POST"])
@login_check
def modify():
if request.method == "POST":
obtain_dict = request.form.to_dict()
if len(obtain_dict) != 0 and len(obtain_dict) == 1:
mdf_password = obtain_dict["password"]
get_token = request.headers.get("token")
print("获取token: {} 修改后密码: {}".format(get_token,mdf_password))
# 验证是否合法
if CheckParameters(get_token, mdf_password) == False:
return json.dumps("{'message': '传入密码不合法'}", ensure_ascii=False)
# 先得到token对应用户名
select = RunSqlite("./database.db","SessionAuthDB","select","username",f"token='{get_token}'")
if select != []:
# 接着直接修改密码即可
modify_username = str(select[0][0])
print("得到的用户名: {}".format(modify_username))
update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
if update == True:
# 删除原来的token,让用户重新获取
delete = RunSqlite("./database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
print("删除token状态: {}".format(delete))
return json.dumps("{'message': '修改成功,请重新登录获取Token'}", ensure_ascii=False)
else:
return json.dumps("{'message': '修改失败'}", ensure_ascii=False)
else:
return json.dumps("{'message': '不存在该Token,无法修改密码'}", ensure_ascii=False)
else:
return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
if __name__ == '__main__':
app.run(debug=True)
首先需要在Web页面访问http://127.0.0.1/create
路径实现对数据库的初始化,并打开Postman
工具,通过传入参数来使用这个案例。
相关文章:

elasticsearc DSL查询文档
文章浏览阅读68次。精确查询常见的有哪些?term查询:根据词条精确匹配,一般搜索keyword类型、数值类型、布尔类型、日期类型字段range查询:根据数值范围查询,可以是数值、日期的范围。

什么是分布式锁?Redis实现分布式锁详解
文章浏览阅读151次,点赞4次,收藏3次。在分布式系统中,涉及多个主机访问同一块资源,此时就需要锁来做互斥控制,避免出现类似线程安全问题。而Java中的synchronized只是对当前进程中的线程有效,多个主机实际上是多个进程,那么它就无能为力了,此时就需要分布式锁。

MySQL运行在docker容器中会损失多少性能
前言 自从使用docker以来,就经常听说MySQL数据库最好别运行在容器中,性能会损失很多。一些之前没使用过容器的同事,对数据库运行在容器中也是忌讳莫深,甚至只要数据库跑在容器中出现性能问题时,首先就把问题推到容器上。 那么到底会损失多少,性能损失会很多吗? 为此我装了两个MySQL,版本都是8.

【Docker】Docker与Kubernetes:区别与优势对比
一种革新性的容器技术一、Docker与Kubernetes简介二、架构和部署模型1. Docker 部署模型2. 构建 Docker 镜像3. 运行容器4. 编排工具三、可移植性和可扩展性1. 可移植性(Portability):2. 可扩展性(Scalability):四、管理和编排能力五、生态系统和社区支持

微信小程序完整实现微信支付功能(SpringBoot和小程序)
然后到提供前端调用支付路由的类,WechatController类,注意我这里路由拼接的有/wechat/pay/notify,这个要和之前配置yml文件的支付回调函数一样,要不然不行。不久前给公司实现支付功能,折腾了一阵子,终于实现了,微信支付对于小白来说真的很困难,特别是没有接触过企业级别开发的大学生更不用说,因此尝试写一篇我如何从小白实现微信小程序支付功能的吧,使用的后端是SpringBoot。效果如下,这里因为我的手机不能截图支付页面,所以用的开发者工具支付的效果,都是一样的。4.前端(小程序端)

IT行业哪个方向比较好就业?
文章浏览阅读12次。在IT行业中,就业前景好的方向有很多,以下是一些比较热门的:

通过内网穿透本地MariaDB数据库,实现在公网环境下使用navicat图形化工具
文章浏览阅读113次,点赞50次,收藏40次。cpolar安装成功后,双击打开cpolar【或者在浏览器上访问本地9200端口 127.0.0.1:9200】,使用cpolar邮箱账号登录 web UI管理界面,如果还没有注册cpolar账号的话,点击免费注册,会跳到cpolar官网注册一个账号就可以了.在浏览器上访问9200端口,http://127.0.0.1:9200/,登录cpolar web ui管理界面,点击左侧仪表盘的隧道管理——隧道列表,找到mariaDB隧道,点击右侧的编辑。修改隧道信息,将保留成功的固定tcp地址配置到隧道中。

MySQL数据库索引以及使用唯一索引实现幂等性
一次和多次请求某一个资源对于资源本身应该具有同样的结果任意多次执行对资源本身所产生的影响均与一次执行的影响相同。

PVE 下虚拟机 Ubuntu 无法进入恢复模式的解决方案——提取原有系统文件
问题说明 某天重启虚拟机 Ubuntu,发现虚拟机只有容器IP,桥接的接口在虚拟机显示状态为 DOWN: 想重启进入恢复模式,却发现恢复模式一直花屏,无法使用: 没有办法了,只能想办法提取原有系统内原有文件。 解决方案 定位虚拟机编号: 找到虚拟机主硬盘: SSH 登录宿主机,执行以下命令 ls -

Nginx 核心配置文 nginx.conf介绍
文章浏览阅读38次。我们都知道浏览器中可以显示的内容有HTML、XML、GIF等种类繁多的文件、媒体等资源,浏览器为了区分这些资源,就需要使用MIME Type。所以说MIME Type是网络资源的媒体类型。Nginx作为web服务器,也需要能够识别前端请求的资源类型。在Nginx的配置文件中,默认有两行配置:用来配置Nginx响应前端请求默认的MIME类型。语法默认值位置在default_type之前还有一句。

如何快速本地搭建悟空CRM结合内网穿透工具高效远程办公
如何快速本地搭建悟空CRM结合内网穿透工具高效远程办公。

Kafka 集群如何实现数据同步?
哈喽大家好,我是咸鱼 最近这段时间比较忙,将近一周没更新文章,再不更新我那为数不多的粉丝量就要库库往下掉了 T﹏T 刚好最近在学 Kafka,于是决定写篇跟 Kafka 相关的文章(文中有不对的地方欢迎大家指出) 考虑到有些小伙伴可能是第一次接触 Kafka ,所以先简单介绍一下什么是 Kafka

使用 Hexo 搭建个人博客并部署到云服务器
目录1 整体流程2. 本地环境准备2.1 安装 Node.js 和 Git2.2 安装 Hexo3. 服务端环境准备3.1 Nginx 环境配置3.1.1 安装 Nginx3.1.2 更改 Nginx 配置文件3.2 Node.js 环境配置3.3 Git 环境配置3.3.1 安装 Git3.3.2

过滤器模式 rust和java的实现
文章浏览阅读301次。我们将创建一个 Person 对象、Criteria 接口和实现了该接口的实体类,来过滤 Person 对象的列表。我们制作一个Person实体类,Criteria为标准条件,CriteriaMale等为实现的具体判断器,是需要为person类使用meetCriteria方法便可以进行不同条件的判断。我们制作一个Person实体类,Criteria为标准条件,CriteriaMale等为实现的具体判断器,是需要为person类使用meetCriteria方法便可以进行不同条件的判断。

python最流行的适合计算积分和微分方程的库
SciPy有一个子模块scipy.integrate,包含多种数值积分方法,如牛顿哥特法(quad)、梯形法(trapz)、辛普森法(simps)等

c语言如何生成随机数以及设置随机数的范围
这篇文章介绍c语言如何生成随机数以及设置随机数的范围。本文主要介绍了rand函数、srand函数、以及time函数和时间戳的概念和如何控制随机数的范围。下一篇文章将介绍利用随机数和循环来写一个猜数字游戏。

Java虚拟机的垃圾回收机制
在Java语言中不再被任何引用所指向的对象被称为垃圾,这也是非常容易理解的,因为我们在对对象进行创建时,就需要给出一个对象引用变量用于指向我们在堆内存中创建的对象,所以如果一个对象没有被任何引用变量所指向的话,那么我们也就获取不到对象,也就没有办法对对象进行操作,这个对象自然就成了垃圾。引用计数算法判断对象是否为垃圾时比较方便快捷只是根据计数器中的值进行判断,但是会在每个对象的对象头中添加属性占用额外的空间,并且引用变量每一次指向对象都需要对对象的属性进行更新操作,占用大量的时间。为什么需要回收垃圾?

require()、import、import()有哪些区别?
require()、import、import()是我们常用的引入模块的三种方式,代码中几乎处处用到。如果对它们存在模糊,就会在工作过程中不断产生困惑,更无法做到对它们的使用挥洒自如。今天我们来一起捋一下,它们之间有哪些区别? 一、前世今生 学一个东西,先弄清楚它为什么会出现、它的发展历史、它是做什

如何保护电动汽车充电站免受网络攻击
文章浏览阅读228次。从电动汽车、传感器、充电站和支持基础设施的设计阶段开始就强调安全性作为首要任务。

Vite4+Typescript+Vue3+Pinia 从零搭建(3) - vite配置
项目代码同步至码云 weiz-vue3-template 关于vite的详细配置可查看 vite官方文档,本文简单介绍vite的常用配置。 初始内容 项目初建后,vite.config.ts 的默认内容如下: import { defineConfig } from 'vite' i

原型模式 rust和java的实现
文章浏览阅读218次。意图:用原型实例指定创建对象的种类,并且通过拷贝这些原型创建新的对象。主要解决:在运行期建立和删除原型。何时使用: 1、当一个系统应该独立于它的产品创建,构成和表示时。2、当要实例化的类是在运行时刻指定时,例如,通过动态装载。3、为了避免创建一个与产品类层次平行的工厂类层次时。4、当一个类的实例只能有几个不同状态组合中的一种时。建立相应数目的原型并克隆它们可能比每次用合适的状态手工实例化该类更方便一些。如何解决:利用已有的一个原型对象,快速地生成和原型对象一样的实例。关键代码。

c语言const修饰变量与assert断言详解
const修饰变量与assert断言详解,const修饰变量。作用:const用于修饰变量使其不能再被修改。

Linux如何修改主机名(hostname)(亲测可用)
文章浏览阅读1k次。要想在虚拟机的 Linux 系统内部改变主机名(hostname),需要通过系统的配置来修改。文件,将其中引用旧主机名的条目更新为新主机名。文件,并将里面的内容替换为新主机名。但是大多数情况可能无需更改,除非在。文件里做了什么硬编码骚操作🤣。替换为想要设置的新主机名。或者使用文本编辑器手动编辑。需要重新设置主机名。

Linux下内网穿透实现云原生观测分析工具的远程访问
夜莺监控是一款开源云原生观测分析工具,采用 All-in-One 的设计理念,集数据采集、可视化、监控告警、数据分析于一体,与云原生生态紧密集成,提供开箱即用的企业级监控分析和告警能力。夜莺于 2020 年 3 月 20 日,在 github 上发布 v1 版本,已累计迭代 100 多个版本。本地部署后,为解决无法远程访问的难题,今天我们介绍如何实现让本地nightingale 结合cpolar 内网穿透工具实现 远程也可以访问,提高运维效率.

Java Lambda 表达式笔记
文章浏览阅读71次。Lambda 表达式,也可称为闭包.Lambda 允许把函数作为一个方法的参数(函数作为参数传递进方法中)。

JavaScript中如何终止forEach循环?
在JavaScript中,forEach方法是用于遍历数组的,通常没有直接终止循环的机制。然而,我们可以使用一些技巧来模拟终止forEach循环。以下是几种常见的方法

Docker本地部署Drupal并实现公网访问
文章浏览阅读498次,点赞35次,收藏33次。Dupal是一个强大的CMS,适用于各种不同的网站项目,从小型个人博客到大型企业级门户网站。它的学习曲线可能相对较陡,但一旦熟悉了它的工作方式,用户就能够充分利用其功能和灵活性。在本文中,我们将介绍如何使用Docker快速部署Drupal,并且结合cpolar内网穿透工具实现公网远程访问首先,您需要在您的机器上安装Docker,并且启动,可以按照Docker官方文档中的说明进行安装。

如何使用 RestTemplate 进行 Spring Boot 微服务通信示例
概述 下面我们将学习如何创建多个 Spring boot 微服务以及如何使用 RestTemplate 类在多个微服务之间进行同步通信。 微服务通信有两种风格: 同步通讯 异步通信 同步通讯 在同步通信的情况下,客户端发送请求并等待服务的响应。这里重要的一点是协议(HTTP/HTTPS)是同步的,客

win10 通过wmic命令行设置系统环境变量
而通过编程修改系统环境变量,需要调用注册表API或调用wmi API接口,都有些过于麻烦。此时,如果通过system函数,直接调用批处理文件,则只需要一行代码。批处理中,分别给出了创建环境变量,修改环境变量,删除环境变量的demo。可以根据需要调整批处理文件。在系统维护或编写程序过程中,经常需要对系统环境变量进行设置、修改、删除炒作。注:修改系统环境变量,需要有管理员权限。

c#操作mongodb数据库工具类
新建c#项目,在nuget中引入MongoDB.Driver驱动,然后新建一个MongoDBToolHelper类,将下面代码复制进去 using MongoDB.Bson; using MongoDB.Bson.Serialization; using MongoDB.Driver; using