快速导航
flask 作为后端数据库操作是必要的,现在记录一下一些flask数据库的相关操作,
我将使用三种方法操作数据库
暂时使用较简单的sqlite作为例子
相关环境的安装
建议使用ve虚拟环境
1sudo pacman -S sqlite # archlinux
2sudo pip install virtualenv
3# 在vertualenv环境下执行
4pip install Flask-SQLAlchemy Jinja2 SQLAlchemy
最好是多看文档
1.使用sqlite3模块API
参考资料
这是最简单的方法,不仅适用于flask,python的其他方面也一样适用,如爬虫之类
连接数据库
1#!/usr/bin/env python
2# -*- coding=UTF-8 -*-
3import sqlite3
4database = /path/test.db #数据库文件路径
5test = sqlite.connect('database') #连接数据库,如果数据库文件不存在则创建
6print('connect database successfully')
7test.close() #关闭数据库连接
如果将数据库名改为**:memory:**,则在内存中打开数据库而不是磁盘
创建表
1database = /path/test.db
2test = sqlite.connect('database')
3test.execute('''CREATE TABLE BOOKS
4 (ID INT PRIMARY KEY NOT NULL,
5 TYPE TEXT NOT NULL,
6 NAME TEXT NOT NULL,
7 CONTENT TEXT);''')
8print("Table created successfully")
9test.close()
插入数据
1database = /path/test.db
2test = sqlite.connect('database')
3test.execute("INSERT INTO BOOKS (ID,TYPE,NAME,CONTENT) \
4 VALUES (1, 'hello', 'world', 'helloworld')");
5test.execute("INSERT INTO BOOKS (ID,TYPE,NAME,CONTENT) \
6 VALUES (2, 'goodbye', 'world', 'goodbyeworld')");
7test.commit() #要使数据保存,必须提交
8print("Records commited successfully")
9test.close()
查询数据
1database = /path/test.db
2test = sqlite.connect('database')
3cursor = test.execute("SELECT ID,TYPE,NAME,CONTENT from BOOKS")
4for row in cursor:
5 print("ID =%d "%(row[0]))
6 print("TYPE =%s "%(row[1]))
7 print("NAME =%s "%(row[2]))
8 print("CONTENT =%s "%(row[3]))
9test.close()
更新数据
1database = /path/test.db
2test = sqlite.connect('database')
3test.execute("UPDATE BOOKS SET CONTENT = 'hello' WHERE ID=2")
4test.commit
5test.close()
删除数据
1database = /path/test.db
2test = sqlite.connect('database')
3test.execute("DELETE FROM BOOKS WHERE ID=2")
4test.commit
5test.close()
由于数据库文件我已经在外部使用第一种方法创建,所以第二种方法我直接打开
2.使用文档上所说的方法
1import sqlite3
2from flask import g
3
4DATABASE = '/path/to/database.db'
5
6def connect_db():
7 return sqlite3.connect(DATABASE)
8
9@app.before_request #使用app_request装饰器打开数据库
10def before_request():
11 g.db = connect_db()
12
13@app.teardown_request #使用app_request装饰器关闭数据库
14def teardown_request(exception):
15 if hasattr(g, 'db'):
16 g.db.close()
17
18def query_db(query, args=(), one=False): #数据库简化查询
19 cur = g.db.execute(query, args)
20 rv = [dict((cur.description[idx][0], value)
21 for idx, value in enumerate(row)) for row in cur.fetchall()]
22 return (rv[0] if rv else None) if one else rv
需要使用时(主要是查询)
1for book in query_db('select * from BOOKS'):
2 print book['NAME'], 'has the id', book['ID']
3#由于flask一般不使用print,可以这样
4book = query_db('select * from BOOKS')
5在模板中
6{{ book.ID }}或着{{ book['ID'] }}
如果只希望得到一个单独的结果
1book = query_db('select * from BOOKS where NAME = ?',
2 [the_bookname], one=True)
3if book is None:
4 print 'No such user'
5else:
6 print the_bookname, 'has the id', book['ID']
创建,更新,插入,删除数据请使用第一个方法
初始化数据库模型
1from contextlib import closing
2
3def init_db():
4 with closing(connect_db()) as db:
5 with app.open_resource('schema.sql') as f:
6 db.cursor().executescript(f.read())
7 db.commit()
3.使用Flask-SQLAlchemy扩展 (这应该是最推荐的方法)
一个最小应用
1from flask import Flask
2from flask.ext.sqlalchemy import SQLAlchemy
3
4app = Flask(__name__)
5app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db'
6# sqlite打开的格式是sql:///三个"/",接着是数据库文件的**绝对路径**
7db = SQLAlchemy(app)
8
9class User(db.Model):
10 id = db.Column(db.Integer, primary_key=True)
11 username = db.Column(db.String(80), unique=True)
12 email = db.Column(db.String(120), unique=True)
13
14 def __init__(self, username, email):
15 self.username = username
16 self.email = email
17
18 def __repr__(self):
19 return '<User %r>' % self.username
使用
假若上面代码保存为test.py
打开python shell
1>>> from test import db,User
2>>> db.create_all() #创建表
3>>> admin = User('admin', '[email protected]') #创建数据
4# 这时数据还未真正写入数据库,需要提交
5>>> db.session.add(admin)
6>>> db.session.commit() #这时数据已经写入数据库中
7# 简单数据查询
8>>> users = User.query.all()
9>>> print(users)
10[<User u'admin'>]
11>>> admin = User.query.filter_by(username='admin').first()
12>>> print(admin)
13<User u'admin'>
如果想要简单的查看数据,推荐firefox的一个sqlite插件 sqlite manager
配置
1SQLALCHEMY_DATABASE_URI #用于连接的数据库
2SQLALCHEMY_BINDS #连接多个数据库
3# 比如
4SQLALCHEMY_BINDS = {
5 'users': 'mysqldb://localhost/users',
6 'appmeta': 'sqlite:////path/to/appmeta.db'
7}
8# 创建删除表
9>>> db.create_all(bind=['users'])
10>>> db.create_all(bind='appmeta')
11# 引用绑定,使用 __bind_key__
12class User(db.Model):
13 __bind_key__ = 'users'
14 id = db.Column(db.Integer, primary_key=True)
15 username = db.Column(db.String(80), unique=True)
选择,插入,删除
插入
1>>> from test import User
2>>> me = User('admin', '[email protected]')
3>>> db.session.add(me)
4>>> db.session.commit()
删除
1>>> db.session.delete(me)
2>>> db.session.commit()
查询
首先插入如下数据
|id |username | email | |:-------:|:-------:|:---------------:| |1 |admin |[email protected]| |2 |peter |[email protected]| |3 |guest |[email protected]| 通过用户名查询用户:
1>>> admin = User.query.filter_by(username='admin').first()
2>>> print(admin.id)
31
4>>> print(admin.email)
5u'[email protected]'
查找不存在的用户名:
1>>> missing = User.query.filter_by(username='missing').first()
2>>> missing is None
3True
使用更复杂的表达式查询一些用户:
1>>> User.query.filter(User.email.endswith('@example.com')).all()
2[<User u'admin'>, <User u'guest'>]
按某种规则对用户排序:
1>>> User.query.order_by(User.username)
2[<User u'admin'>, <User u'guest'>, <User u'peter'>]
限制返回用户的数量:
1>>> User.query.limit(1).all()
2[<User u'admin'>]
用主键查询用户:
1>>> User.query.get(1)
2<User u'admin'>
在视图中使用
使用 get_or_404() 来代替 get(),使用 first_or_404() 来代替 first()。 这样会抛出一个 404 错误,而不是返回 None:
1@app.route('/user/<username>')
2def show_user(username):
3 user = User.query.filter_by(username=username).first_or_404()
4 return render_template('show_user.html', user=user)
主要就是这样,最好看完整的文档
具体例子可以查看GitHub
知识共享署名-非商业性使用-相同方式共享4.0国际许可协议