Flask实现后台RESTful web API
目前功能
- 实现了用户注册、登录、登出、获取用户信息、删除账户五个接口
- 登录后使用token保持会话,token有效期是5分钟,过期需重新登录
- 获取用户信息需要用户登录
- 删除账户需要密码验证
首先先说流程
先注册->然后登陆(同时生成token)->再访问其他接口都要带着token和用户名否则无法访问
moudles.py
class User(db.Model):
'''
用户对象
'''
# 表名,遵守复数形式的命名约定
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(255), unique=True, index=True)
email = db.Column(db.String(255), unique=True, index=True)
passwd_hash = db.Column(db.String(128))
register_time = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime, default=datetime.utcnow)
注册的接口
@api.route('/register', methods=['POST'])
@required('username', 'email', 'passwd')
def register():
'''
用户注册接口
todo: 用户名支持中文,用户名和密码加正则匹配检验
'''
username = request.json.get('username')
email = request.json.get('email')
# 判断当前用户填写的注册邮箱和用户名是否已被注册
if user_by_username(username) is not None:
return jsonify(type='param error', msg='username is been used.')
if user_by_email(email) is not None:
return jsonify(type='param error', msg='email is been used.')
# 创建新用户并写入数据库
uid = create_user(username, email, request.json.get('passwd'))
if uid == 0:
return jsonify(type='db error', msg='register fail')
return jsonify(type='OK', msg='register seccess')
—>create_user()方法里
def create_user(username, email, passwd):
'''
根据信息创建用户,并将用户记录加入到数据库中
如果成功返回用户的主键id
如果出错将回滚并返回0
'''
user = User()
user.email = email
user.username = username
user.passwd = passwd # 利用了User类中的passwd属性的方法间接加密了用户密码
try:
db.session.add(user)
db.session.commit()
except Exception as e:
current_app.logger.error(e)
db.session.rollback()
return 0
return user.id
—>转到 user.passwd
@passwd.setter
def passwd(self, passwd):
self.passwd_hash = self._add_salt_encrypt(passwd)
—>self._add_salt_encrypt()
def _add_salt_encrypt(self, passwd):
'''
使用hashlib.sha256加盐加密,返回加密后字符串
64位的16进制字符串
'''
import hashlib
return hashlib.sha256(passwd.encode("utf-8") + self.username.encode("utf-8")).hexdigest()
到这里注册用户完成
接下来用户登录接口
@api.route('/login', methods=['POST'])
@required('username', 'passwd')
def login():
'''
登录接口
'''
username = request.json.get('username')
user = user_by_username(username)
if user is None:
return jsonify(type='param error', msg='username is not registered.')
# 验证登录密码
if user.verify_passwd(request.json.get('passwd')):
# 验证成功
# 更新登入时间
user.update_last_login()
# 生成加密token
token = user.generate_token()
print(token)
timestamp = user.last_login
# 将用户的token信息加入会话
session[username] = {'token': token, 'timestamp': timestamp}
# 返回登录成功信息
return jsonify(type='OK', msg='login OK', token=session[username]['token'])
return jsonify(type='param error', msg='username or passwd error.')
首先 —>user_by_username() #检验是否存在用户名
def user_by_username(username):
'''
根据username返回数据库中的用户对象
'''
user = User.query.filter_by(username=username).first()
if user is not None:
return user
return None
接着—>verify_passwd() # 检验密码是否正确
def verify_passwd(self, passwd):
'''
密码验证
'''
return self.passwd_hash == self._add_salt_encrypt(passwd)
更新登陆时间---->user.update_last_login()
def update_last_login(self):
'''
更新最近一次的登入时间
'''
self.last_login = datetime.utcnow()
生成token---->user.generate_token()
def generate_token(self):
'''
根据用户信息生成加密token,用于记录用户会话
'''
import hashlib
key = current_app.config['SECRET_KEY'] #这个必须有,否则会报错
return hashlib.sha224(key.encode("utf-8") + self.email.encode("utf-8") + str(self.last_login).encode("utf-8")).hexdigest()
然后更新时间、把token放入session中
登陆完成
接着访问其他接口都要带着token和用户名
访问登出接口
@api.route('/logout', methods=['POST'])
@login_check
def logout():
'''
登出接口
'''
username = request.json.get('username')
# 从session中删除相关用户的token
session.pop(username)
return jsonify(type='OK', msg='logout OK')
@login_check装饰器
def login_check(func):
@functools.wraps(func)
def wrapper(*args, **kw):
username = request.json.get('username')
token = request.headers.get('token')
# print(token)
# print(session.has_key(username))
if not token or not session[username]:
return jsonify(type='param error', msg='need to login')
item = session[username]
if token == item['token']:
if (datetime.utcnow() - item['timestamp']) <= timedelta(seconds=300):
return func(*args, **kw)
# 删除超时的token
session.pop(username)
return jsonify(type='param error', msg='login time out')
return jsonify(type='param error', msg='need to verify your passport')
return wrapper
这就很清晰了,回去token,同户名跟session的token里面做验证
超时token可以自己设置,点进去就可以设置时分秒和天数
有几处改了一下,首先加密的时候先转化为bytes格式、还有就是key = current_app.config[‘SECRET_KEY’] #这个必须有,否则会报错,在config文件里面设置