将大熊猫数据框写入SQL Server数据库
问题描述:
我有74个相对较大的熊猫数据框(大约34,600行和8列),我试图尽可能快地插入到SQL Server数据库中。在做了一些研究之后,我了解到这个功能对于插入SQL Server数据库的这种大型数据库并不好,这是我采用的最初方法(非常慢 - 应用程序完成大约需要一个小时,大约4分钟。使用MySQL数据库时)将大熊猫数据框写入SQL Server数据库
This article,和许多其他StackOverflow的帖子一直在指着我在正确的方向有帮助的,但是我已经打了一个路障:
我尝试使用SQLAlchemy的核心,而不是ORM的原因在上面的链接中解释。所以,我的数据帧转换为字典,使用pandas.to_dict
,然后做一个和insert()
:
self._session_factory.engine.execute(
TimeSeriesResultValues.__table__.insert(),
data)
# 'data' is a list of dictionaries.
的问题是,插入没有得到任何价值 - 他们似乎是一堆空括号和我的得到这个错误:
(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot
insert the value NULL into the column...
我在传递的字典列表中有值,所以我不明白为什么值没有显示出来。
编辑:
这里是我要去关的例子:
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in range(n)]
)
print("SQLAlchemy Core: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
答
我有一个坏消息给你,其实SQLAlchemy的未实现的SQL Server批量导入,这是实际上只是做与to_sql
正在做的相同的缓慢的单个INSERT语句。我会说你最好的选择是尝试使用bcp
命令行工具编写脚本。下面是我在过去使用的脚本,但不保证:
from subprocess import check_output, call
import pandas as pd
import numpy as np
import os
pad = 0.1
tablename = 'sandbox.max.pybcp_test'
overwrite=True
raise_exception = True
server = 'P01'
trusted_connection= True
username=None
password=None
delimiter='|'
df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False)
def get_column_def_sql(col):
if col.dtype == object:
width = col.str.len().max() * (1+pad)
return '[{}] varchar({})'.format(col.name, int(width))
elif np.issubdtype(col.dtype, float):
return'[{}] float'.format(col.name)
elif np.issubdtype(col.dtype, int):
return '[{}] int'.format(col.name)
else:
if raise_exception:
raise NotImplementedError('data type {} not implemented'.format(col.dtype))
else:
print('Warning: cast column {} as varchar; data type {} not implemented'.format(col, col.dtype))
width = col.str.len().max() * (1+pad)
return '[{}] varchar({})'.format(col.name, int(width))
def create_table(df, tablename, server, trusted_connection, username, password, pad):
if trusted_connection:
login_string = '-E'
else:
login_string = '-U {} -P {}'.format(username, password)
col_defs = []
for col in df:
col_defs += [get_column_def_sql(df[col])]
query_string = 'CREATE TABLE {}\n({})\nGO\nQUIT'.format(tablename, ',\n'.join(col_defs))
if overwrite == True:
query_string = "IF OBJECT_ID('{}', 'U') IS NOT NULL DROP TABLE {};".format(tablename, tablename) + query_string
query_file = 'c:\\pybcp_tempqueryfile.sql'
with open (query_file,'w') as f:
f.write(query_string)
if trusted_connection:
login_string = '-E'
else:
login_string = '-U {} -P {}'.format(username, password)
o = call('sqlcmd -S {} {} -i {}'.format(server, login_string, query_file), shell=True)
if o != 0:
raise BaseException("Failed to create table")
# o = call('del {}'.format(query_file), shell=True)
def call_bcp(df, tablename):
if trusted_connection:
login_string = '-T'
else:
login_string = '-U {} -P {}'.format(username, password)
temp_file = 'c:\\pybcp_tempqueryfile.csv'
#remove the delimiter and change the encoding of the data frame to latin so sql server can read it
df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin'))
df.to_csv(temp_file, index = False, sep = '|', errors='ignore')
o = call('bcp sandbox.max.pybcp_test2 in c:\pybcp_tempqueryfile.csv -S "localhost" -T -t^| -r\n -c')
*约4分钟,同时使用mysql数据库* ...这样的'to_sql()'是一个可行的解决方案时,只是连接MSSQL比MySQL更慢?你正在使用哪种ODBC API?数据库服务器是本地还是远程?考虑临时表导入,然后迁移到最终表。 – Parfait
@Parfait:使用'''to_sql()''可以在MySQL中产生可以接受的性能,但不会产生MSSQL。我正在使用pyodbc。数据库是远程的,因此写入CSV文件然后通过原始的sql代码进行批量插入在这种情况下也不会真正起作用。此外,用户需要批量管理权限才能这样做,这对于此应用程序的用户来说并不总是可能的。 – denvaar
考虑绕过odbc驱动程序并严格使用Python API - [pmyssl](http://www.pymssql.org/en/latest/)和MySQL ODBC API? pymysql?两者中的表结构和数据类型相同?相同数量的记录?真的调查这一点。两者都是高级企业RDMS,不应该执行那么广的范围(4分钟比~60分钟)。 – Parfait