flask 作为后端数据库操作是必要的,现在记录一下一些flask数据库的相关操作,
我将使用三种方法操作数据库
暂时使用较简单的sqlite作为例子
相关环境的安装
建议使用ve虚拟环境
sudo pacman -S sqlite # archlinux sudo pip install virtualenv # 在vertualenv环境下执行 pip install Flask-SQLAlchemy Jinja2 SQLAlchemy
最好是多看文档
1.使用sqlite3模块API
参考资料
这是最简单的方法,不仅适用于flask,python的其他方面也一样适用,如爬虫之类
连接数据库
#!/usr/bin/env python # -*- coding=UTF-8 -*- import sqlite3 database = /path/test.db #数据库文件路径 test = sqlite.connect('database') #连接数据库,如果数据库文件不存在则创建 print('connect database successfully') test.close() #关闭数据库连接
如果将数据库名改为:memory:,则在内存中打开数据库而不是磁盘
创建表
database = /path/test.db test = sqlite.connect('database') test.execute('''CREATE TABLE BOOKS (ID INT PRIMARY KEY NOT NULL, TYPE TEXT NOT NULL, NAME TEXT NOT NULL, CONTENT TEXT);''') print("Table created successfully") test.close()
插入数据
database = /path/test.db test = sqlite.connect('database') test.execute("INSERT INTO BOOKS (ID,TYPE,NAME,CONTENT) \ VALUES (1, 'hello', 'world', 'helloworld')"); test.execute("INSERT INTO BOOKS (ID,TYPE,NAME,CONTENT) \ VALUES (2, 'goodbye', 'world', 'goodbyeworld')"); test.commit() #要使数据保存,必须提交 print("Records commited successfully") test.close()
查询数据
database = /path/test.db test = sqlite.connect('database') cursor = test.execute("SELECT ID,TYPE,NAME,CONTENT from BOOKS") for row in cursor: print("ID =%d "%(row[0])) print("TYPE =%s "%(row[1])) print("NAME =%s "%(row[2])) print("CONTENT =%s "%(row[3])) test.close()
更新数据
database = /path/test.db test = sqlite.connect('database') test.execute("UPDATE BOOKS SET CONTENT = 'hello' WHERE ID=2") test.commit test.close()
删除数据
database = /path/test.db test = sqlite.connect('database') test.execute("DELETE FROM BOOKS WHERE ID=2") test.commit test.close()
由于数据库文件我已经在外部使用第一种方法创建,所以第二种方法我直接打开
2.使用文档上所说的方法
import sqlite3 from flask import g DATABASE = '/path/to/database.db' def connect_db(): return sqlite3.connect(DATABASE) @app.before_request #使用app_request装饰器打开数据库 def before_request(): g.db = connect_db() @app.teardown_request #使用app_request装饰器关闭数据库 def teardown_request(exception): if hasattr(g, 'db'): g.db.close() def query_db(query, args=(), one=False): #数据库简化查询 cur = g.db.execute(query, args) rv = [dict((cur.description[idx][0], value) for idx, value in enumerate(row)) for row in cur.fetchall()] return (rv[0] if rv else None) if one else rv
需要使用时(主要是查询)
for book in query_db('select * from BOOKS'): print book['NAME'], 'has the id', book['ID'] #由于flask一般不使用print,可以这样 book = query_db('select * from BOOKS') 在模板中 {{ book.ID }}或着{{ book['ID'] }}
如果只希望得到一个单独的结果
book = query_db('select * from BOOKS where NAME = ?', [the_bookname], one=True) if book is None: print 'No such user' else: print the_bookname, 'has the id', book['ID']
创建,更新,插入,删除数据请使用第一个方法
初始化数据库模型
from contextlib import closing def init_db(): with closing(connect_db()) as db: with app.open_resource('schema.sql') as f: db.cursor().executescript(f.read()) db.commit()
3.使用Flask-SQLAlchemy扩展 (这应该是最推荐的方法)
一个最小应用
from flask import Flask from flask.ext.sqlalchemy import SQLAlchemy app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db' # sqlite打开的格式是sql:///三个"/",接着是数据库文件的**绝对路径** db = SQLAlchemy(app) class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True) email = db.Column(db.String(120), unique=True) def __init__(self, username, email): self.username = username self.email = email def __repr__(self): return '<User %r>' % self.username
使用
假若上面代码保存为test.py
打开python shell
>>> from test import db,User >>> db.create_all() #创建表 >>> admin = User('admin', '[email protected]') #创建数据 # 这时数据还未真正写入数据库,需要提交 >>> db.session.add(admin) >>> db.session.commit() #这时数据已经写入数据库中 # 简单数据查询 >>> users = User.query.all() >>> print(users) [<User u'admin'>] >>> admin = User.query.filter_by(username='admin').first() >>> print(admin) <User u'admin'>
如果想要简单的查看数据,推荐firefox的一个sqlite插件 sqlite manager
配置
SQLALCHEMY_DATABASE_URI #用于连接的数据库 SQLALCHEMY_BINDS #连接多个数据库 # 比如 SQLALCHEMY_BINDS = { 'users': 'mysqldb://localhost/users', 'appmeta': 'sqlite:////path/to/appmeta.db' } # 创建删除表 >>> db.create_all(bind=['users']) >>> db.create_all(bind='appmeta') # 引用绑定,使用 __bind_key__ class User(db.Model): __bind_key__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True)
选择,插入,删除
插入
>>> from test import User >>> me = User('admin', '[email protected]') >>> db.session.add(me) >>> db.session.commit()
删除
>>> db.session.delete(me) >>> db.session.commit()
查询
首先插入如下数据
id | username | |
---|---|---|
1 | admin | [email protected] |
2 | peter | [email protected] |
3 | guest | [email protected] |
通过用户名查询用户:
>>> admin = User.query.filter_by(username='admin').first() >>> print(admin.id) 1 >>> print(admin.email) u'[email protected]'
查找不存在的用户名:
>>> missing = User.query.filter_by(username='missing').first() >>> missing is None True
使用更复杂的表达式查询一些用户:
>>> User.query.filter(User.email.endswith('@example.com')).all() [<User u'admin'>, <User u'guest'>]
按某种规则对用户排序:
>>> User.query.order_by(User.username) [<User u'admin'>, <User u'guest'>, <User u'peter'>]
限制返回用户的数量:
>>> User.query.limit(1).all() [<User u'admin'>]
用主键查询用户:
>>> User.query.get(1) <User u'admin'>
在视图中使用
使用 get_or_404() 来代替 get(),使用 first_or_404() 来代替 first()。 这样会抛出一个 404 错误,而不是返回 None:
@app.route('/user/<username>') def show_user(username): user = User.query.filter_by(username=username).first_or_404() return render_template('show_user.html', user=user)
主要就是这样,最好看完整的文档
具体例子可以查看GitHub