Python接口测试之对MySQL的操作(六)
本文章主要来说python对mysql数据库的基本操作,当然,前提是已经搭建了python环境和搭建了Mysql
数据库的环境,python操作mysql数据库提供了MySQLdb库,下载的地址为:
https://pypi.python.org/pypi/MySQL-python/1.2.4
见官方下载的截图:
下载文件后,直接进行安装,安装的方式这里不在介绍,如有不明白,可以bing下,安装完成后,在python的命令行环境下
看是否可以导入MySQLdb,如果可以导入并且无任何的错误提示,表示已经安装成功了,见截图:
已经很成功的安装了python操作mysql的数据库,在这里,我们详细的介绍对python对mysql的增加,删除,修改
和查询的基本操作,这里使用的数据库名称是“day2017”,我们对数据库的操作,首先是创建数据库,然后是在数据库中
创建表,在这里,表的名称为:userInfo,见创建好的表字段信息:
OK,创建好数据库以及创建好了数据库中的表以后,下来开始操作数据库,操作数据库的第一步当然是连接数据库,然后是
创建游标,接下来是对数据库的各种操作,这里我们先来操作Insert数据的操作,见实现的代码:
#!/usr/bin/env python #coding:utf-8 import MySQLdb def insert_One(): '''插入一条数据''' try: conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='server',db='day2017') except: print u'连接mysql数据库失败' else: cur = conn.cursor() sql = 'INSERT INTO userInfo VALUES (%s,%s,%s,%s)' params = (1, 'admin', 'admin', '[email protected]',) cur.execute(sql, params) conn.commit() finally: cur.close() conn.close() if __name__=='__main__': insert_One()
查看数据库,可以看到,数据已经插入到数据库中,见查询的结果:
在上面的案例中,只是插入了单条数据,实际上,某些时候,会插入多条数据,也就是批量插入,批量插入实现的代码为:
def insert_Many(): '''批量插入数据''' try: conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='server',db='day2017') except: print u'连接mysql数据库失败' else: cur = conn.cursor() sql = 'INSERT INTO userInfo VALUES (%s,%s,%s,%s)' params = [ (2,'wuya','admin','[email protected]'), (3,'weke','admin','[email protected]') ] cur.executemany(sql, params) conn.commit() finally: cur.close() conn.close() if __name__=='__main__': insert_Many()
接下来,我们来查看数据库的查询,数据查询分为二种,一种是查询的结果是一条语句,使用的是fetchone()方法,另外一种是查询的数据
结果是多条,使用的方法是fetchmany(),我们分别来看这二个方法的使用,我们先来看单条数据的查询,见实现的代码:
import MySQLdb def select_one(): '''单条数据的查询''' try: conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='server',db='day2017') except: print u'连接mysql数据库失败' else: cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) sql = 'select * from userInfo where id=%s' params = (1,) data=cur.execute(sql, params) print cur.fetchone() conn.commit() finally: cur.close() conn.close() if __name__=='__main__': select_one()
多条数据的查询,见实现的代码:
import MySQLdb def select_Many(): '''多条数据的查询''' try: conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='server',db='day2017') except: print u'连接mysql数据库失败' else: cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) sql = 'select * from userInfo' ret=cur.execute(sql) data=cur.fetchall() for item in data: print item finally: cur.close() conn.close() if __name__=='__main__': select_Many()
下面我们来看更新语句的测试,见实现的代码:
import MySQLdb def update_Test(): '''更新语句测试''' try: conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='server',db='day2017') except: print u'连接mysql数据库失败' else: cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) sql = 'update userInfo set username=%s where id=%s' params=('system',1,) ret=cur.execute(sql,params) conn.commit() finally: cur.close() conn.close() if __name__=='__main__': update_Test()
最后一步,也就是删除数据了,直接看如下的实现代码:
import MySQLdb def delete_Test(): '''删除语句测试''' try: conn=MySQLdb.connect(host='127.0.0.1',user='root',passwd='server',db='day2017') except: print u'连接mysql数据库失败' else: cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) sql = 'delete from userInfo where id=%s' params=(3,) ret=cur.execute(sql,params) conn.commit() finally: cur.close() conn.close() if __name__=='__main__': delete_Test()
事实上,对于如上操作数据库的方式,有很多的代码是可以重够的,比如连接数据库的方式,另外,我们可以把操作数据库的
方式写在一个类里面,在业务调用的时候直接调用我们的数据库方法进行操作,见下面操作mysql数据库的方法,见源码:
#!/usr/bin/env python #coding:utf-8 import MySQLdb class MySQLHelper(object): def __init__(self): pass def get_one(self,sql,params): conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='server',db='day2017') cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) retCount = cur.execute(sql,params) data = cur.fetchone() cur.close() conn.close() return data def get_many(self,sql,params): conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='server',db='day2017') cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) retCount = cur.execute(sql,params) data = cur.fetchall() cur.close() conn.close() return data def insert_one(self,sql,params): conn = MySQLdb.connect(host='127.0.0.1', user='root', passwd='server', db='day2017') cur = conn.cursor() cur.execute(sql, params) conn.commit() cur.close() return u'插入数据库成功' def insert_many(self,sql,params): conn = MySQLdb.connect(host='127.0.0.1', user='root', passwd='server', db='day2017') cur = conn.cursor() cur.executemany(sql, params) conn.commit() cur.close() return u'批量插入数据库成功' def update_one(self,sql,params): conn = MySQLdb.connect(host='127.0.0.1', user='root', passwd='server', db='day2017') cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) ret = cur.execute(sql, params) conn.commit() cur.close() conn.close() return u'更新数据库成功' def delete_one(self,sql,params): conn = MySQLdb.connect(host='127.0.0.1', user='root', passwd='server', db='day2017') cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) ret = cur.execute(sql, params) conn.commit() cur.close() conn.close() return u'删除数据库成功'
把连接数据库部分进行重构,放到一个config.py的文件中,这样我们连接数据库的方法就只需要在config.py文件维护了,而不
需要在如上代码中每个都得看的修改,这实在是很糟糕,见重构后的config.py文件源码:
#!/usr/bin/env python #coding:utf-8 conn_dict=dict(host='127.0.0.1', user='root', passwd='server', db='day2017')
见重构后操作mysql的数据库方法,见源码:
#!/usr/bin/env python #coding:utf-8 import MySQLdb import config class MySQLHelper(object): def __init__(self): self.conn=config.conn_dict def get_one(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) retCount = cur.execute(sql,params) data = cur.fetchone() cur.close() conn.close() return data def get_many(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) retCount = cur.execute(sql,params) data = cur.fetchall() cur.close() conn.close() return data def insert_one(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor() cur.execute(sql, params) conn.commit() cur.close() return u'插入数据库成功' def insert_many(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor() cur.executemany(sql, params) conn.commit() cur.close() return u'批量插入数据库成功' def update_one(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) ret = cur.execute(sql, params) conn.commit() cur.close() conn.close() return u'更新数据库成功' def delete_one(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) ret = cur.execute(sql, params) conn.commit() cur.close() conn.close() return u'删除数据库成功'
写数据库的操作方法,是为了进行对业务的操作,要不仅仅写这些没什么实际的意义,如我们实现输入用户名和密码,在
在数据库中验证,如果用户名和密码都是admin,那么通过,如果有其中一个不是admin,就提示用户,请提示用户用户名
或者密码错误,下面来实现这样的一个过程,见实现的源码:
#!/usr/bin/env python #coding:utf-8 import MySQLdb import config class MySQLHelper(object): def __init__(self): self.conn=config.conn_dict def get_one(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) retCount = cur.execute(sql,params) data = cur.fetchone() cur.close() conn.close() return data def get_many(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) retCount = cur.execute(sql,params) data = cur.fetchall() cur.close() conn.close() return data def insert_one(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor() cur.execute(sql, params) conn.commit() cur.close() return u'插入数据库成功' def insert_many(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor() cur.executemany(sql, params) conn.commit() cur.close() return u'批量插入数据库成功' def update_one(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) ret = cur.execute(sql, params) conn.commit() cur.close() conn.close() return u'更新数据库成功' def delete_one(self,sql,params): conn = MySQLdb.connect(**self.conn) cur = conn.cursor(cursorclass=MySQLdb.cursors.DictCursor) ret = cur.execute(sql, params) conn.commit() cur.close() conn.close() return u'删除数据库成功' class CheckUserInfo(object): def __init__(self): self.__helper=MySQLHelper() def checkValid(self,username,password): sql='select * from userInfo where username=%s and password=%s' params=(username,password) return self.__helper.get_one(sql,params) def info(): username=raw_input(u'请输入你的用户名:\n') password=raw_input(u'请输入你的密码:\n') userInfo=CheckUserInfo() result=userInfo.checkValid(username,password) if not result: print u'用户名或者密码错误,请联系管理员' else: print u'恭喜您,输入正确!' if __name__=='__main__': info()