Sample python flask server to issue tokens based on OTT mentioned in earlier post:
# coding: utf-8
from datetime import datetime, timedelta
from flask import Flask
from flask import session, request
from flask import render_template, redirect, jsonify
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import gen_salt
from flask_oauthlib.provider import OAuth2Provider
from tokenprovider import TokenProvider
from json import dumps
import random
import string
app = Flask(__name__, template_folder='templates')
app.debug = True
app.secret_key = ''.join([random.choice(string.digits +
string.ascii_uppercase) for x in range(0, 32)])
app.config.update({
'SQLALCHEMY_DATABASE_URI': 'mysql://root:thepassword@127.0.0.1/token',
})
db = SQLAlchemy(app)
oauth = OAuth2Provider(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(40), unique=True)
salt = db.Column(db.String(4))
class Token(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(
db.Integer, db.ForeignKey('user.id')
)
user = db.relationship('User')
# currently only bearer is supported
token_type = db.Column(db.String(40))
access_token = db.Column(db.String(6))
@app.route('/', methods=('GET', 'POST'))
def home():
if request.method == 'POST':
username = request.args.get('username','')
user = User.query.filter_by(username=username).first()
if not user:
user = User(username=username,
salt=''.join([random.choice(string.digits + string.ascii_uppercase)
for x in range(0, 4)]
))
db.session.add(user)
db.session.commit()
session['id'] = user.id
print str(session)
return redirect('/')
user = None
if 'id' in session:
uid = session['id']
user= User.query.get(uid)
return jsonify({'msg': 'welcome,'+ str(user), 'success':'success'}), 200
@ott.tokengetter
def check_token(access_token=None):
if access_token:
return Token.query.filter_by(access_token=access_token).first()
return None
@app.route('/session/', methods=('GET'))
def session():
sessions = list()
[sessions.append(i) for i in db.session]
return jsonify ({'sessions': json.dumps(sessions)}), 200
@ott.tokensetter
def save_token(token, request, *args, **kwargs):
id = User.query.filter_by(username=request.args.get('username','')).first().id
toks = Token.query.filter_by(
user_id = id
).all()
# make sure that every client has only one token connected to a user
db.session.delete(toks)
tp = TokenProvider()
tok = Token(user=request.args.get('username',''),
token_type='bearer', access_token=tp.set_token())
tok.save()
tok.user_id = request.user.id
db.session.add(tok)
db.session.commit()
return tok
if __name__ == '__main__':
db.create_all()
app.run(debug=True,port=8787)
ReplyDeleteGreat Article
Final Year Projects in Python
Python Training in Chennai
FInal Year Project Centers in Chennai
Python Training in Chennai