百万行Excel数据全自动导入MYSQL之Python

import pandas as pd
import pymysql
import pypinyin
import re
import time
import os

start_time_count = time.time()
# 文件名
filename = '****.xlsx'
host = "你的数据ip"
user = "数据库用户"
password = "数据库密码"
database = "数据库名"
# 当前python文件位置
folder_path = os.path.dirname(__file__)
fiel_path = os.path.join(folder_path, filename)
# 数据库表名 根据文件名来获取
list_table = filename.split('.')
table_name = str(list_table[0])
# 中文转换为拼音
def pinyin(word):
    # 匹配不是中文、大小写、数字的其他字符
    cop = re.compile("[^\u4e00-\u9fa5^a-z^A-Z^0-9]")
    word = cop.sub('', word)
    s = ''
    # x = 0
    for i in pypinyin.pinyin(word, style=pypinyin.NORMAL):
        if s != '':
            s = s+'_'+''.join(i)
        else:
            s += ''.join(i)
    return s


# mysql
# 打开数据库连接
db = pymysql.connect(host=host, user=user,
                     password=password, database=database)
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = db.cursor()
# 使用 execute()  方法执行 SQL 查询
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取单条数据
data = cursor.fetchone()

print('数据库连接成功')
print("Database version : %s " % data)

print('正在打开文件……')
if list_table[1] == "csv":
    df = pd.read_csv(fiel_path)  # 默认读取第一个sheet的内容
else:
    df = pd.read_excel(fiel_path)  # 默认读取第一个sheet的内容

end_time_read_excel = time.time()
print('打开文件耗时:'+str(end_time_read_excel-start_time_count))
print('文件打开成功……')
# 把表头转换为字段名
print('把表头转换为字段名C1_')
list_col_name = []
# 批量定义字段名 类型 长度
list_col_name_sql_creat = []
index_col = 1
for i in df.columns.tolist():
    # name_col = "c"+str(index_col)+i
    name_col = "c"+str(index_col)
    # name_col = pinyin(name_col)
    list_col_name.append(name_col)
    name_col = name_col+" VARCHAR(255) "
    list_col_name_sql_creat.append(name_col)
    index_col += 1
print("有%s列" % str(len(list_col_name)))
# 添加id
list_col_name.append('id')
list_col_name_sql_creat.append(
    'id INT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)')

# 拼接字段名
print('拼接字段名')
sql_list_col_name = ",".join(list_col_name_sql_creat)
print('创建表格')
sql_create_table = 'CREATE TABLE IF NOT EXISTS ' + table_name + '('+sql_list_col_name+')'
cursor.execute(sql_create_table)
print(sql_create_table)

print("--------------正在组装sql-------------------------")
# 创建批量添加sql
s_count = len(list_col_name) * "%s,"
cols = ','.join(list_col_name)
insert_sql = "insert into " + table_name + \
    " (" + cols + ") values (" + s_count[:-1] + ")"


def sql_exec_manys(data_list):
    try:
        cursor.executemany(insert_sql, data_list)
        db.commit()
    except Exception as e:
        db.rollback()
        print("--------------insert_sql-----error--------------------")
        print(insert_sql)
        print("--------------insert_data_list----error---------------")
        print(insert_data_list[0])
        print(e)


print("--------------正在新增id列-------------------------")
# 新增加的excel写入内存
start_time_ram = time.time()
df['id'] = ''
# 删除空行
df.drop_duplicates(inplace=True)
end_time_ram = time.time()
df.fillna('', inplace=True)
insert_data_list = [tuple(i) for i in df.values]
print("替换与tuple耗时:%s" % str(time.time()-end_time_ram))
print("--------------正在写入MYSQL数据库------------------------")
len_list = len(insert_data_list)
count_c = len_list
# 每次导入 30W 行
deal_c = 300000
start_time_insert = time.time()

while(len_list > deal_c):
    data_list = insert_data_list[0:deal_c]
    sql_exec_manys(data_list)
    print("已导入%s条数据" % deal_c)
    del insert_data_list[0:deal_c]
    len_list = len_list-deal_c
else:
    sql_exec_manys(insert_data_list)
    print("已导入%s条数据" % len_list)

end_time_mysql = time.time()
# 写入结束时间
end_time_count = time.time()

print('数据已经全部导入完成')
print('共计导入:'+str(count_c)+' 条数据')
print('打开文件耗时:'+str(end_time_read_excel-start_time_count))
print("写入内存消耗时间:"+str(end_time_ram-start_time_ram))
print("操作临时文件耗时:"+str(end_time_mysql-start_time_ram))
print("insert into总时间:"+str(end_time_count - start_time_insert))
print("运行总时间:"+str(end_time_count-start_time_count))
db.close()