专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

postgresql的copy_from批量快速导入数据

postgresql提供了copy函数,方便批量导入数据。

注意:sql字符串拼接时 表名 专用函数,在字符串两边添加 ” 双引号 ,否则 英文中文混合表名出错

def str_tablenm(text,is_return_null=False):
    '''
    sql字符串拼接时 表名 专用函数,在字符串两边添加 " 双引号 ,否则 英文中文混合表名出错
    '''
    if not text is None and text!='':
        return '"'+str(text)+'"'
    elif not is_return_null:
        return '""'
    else:
        return 'null'

copy_from的参数说明:copy_from(file, table, sep=’\t’, null=’\N’, size=8192, columns=None)#colunms为元组数据

python写法如下:

#encoding=utf-8
import psycopg2
import StringIO
if __name__=='__main__':
    s = ""
    for i in range(0,1000000):
        s += str(i)+"\taaa\t13434\t1\t2013-01-11\t1\t2\n"
    conn = psycopg2.connect(host='localhost',user="test",password="********",database="test")
    cur = conn.cursor()
    cur.copy_from(StringIO.StringIO(s),'tb_user',columns=('id','userame','passwd','roleid','lasttime','failnum','info'))
    conn.commit()
    cur.close()
    conn.close()
    print('done')

#链接:blog.csdn.net/rongyongfei… #=================================

#1、查询数据库中数据表是否存在,不存在则创建

bottle上传excel并导入数据库

#==============================上传excel并导入数据库
@post('/api/xlsfiles/')
@exception_handling
def excel_upload():
    data={"state":0,"data":[],"msg":'','rows_count':0}
    '''
    upload = request.files.getall('upfile')
    for meta in upload:
        buf = meta.file.read()
        print ('image:',str(buf))
    '''
    excel_obj=request.files.getall('upfile')
    #excel_obj=request.files.get('upfile')

    try:
        # 组合成服务器端存储绝对路径
        '''
        #先保存再读取,注意后面的os.remove(file_path)
        file_path=upfile_path + '/'+ datetime_helper.to_number('%Y%m%d')+excel_obj[0].filename
        print(file_path)
        # 保存文件
        excel_obj[0].save(file_path, overwrite=True)
        #ecding=getFileChar(excel_obj[0].file)
        excel_data=pd.read_excel(file_path)#,encoding=ecding
        '''

        #ecding=getFileChar(excel_obj[0].file)
        excel_data=pd.read_excel(excel_obj[0].file)#不保存直接读取,此方式不能使用getFileChar,encoding=ecding
    except Exception as e:
        print('excel_upload',str(e))
        data["msg"]="读文件出错"
        #return JsonResponse(data)
        return json.dumps(data, cls=json_helper.CJsonEncoder)
    #excel_data["data_time"]=excel_data["data_time"].map(lambda x: str(x).split(" ")[0])
    col_n = ['号码','项目编号']
    pds_tbl = pd.DataFrame(excel_data,columns = col_n)
    #print(pds_tbl)
    #mysql_engine =create_engine('mysql+pymysql://root:jjjinl@localhost:3306/administrationsystem')
    try:

        _phcodes = base_logic_codes.Codes_Logic()
        _phcodes.execute('truncate table tmp_tbl_phonecodes')
        #excel_data.tosql('score',con=mysql_engine,if_exists='append',index=false)
        columns=('telcode','projectid')
        write_to_table(pds_tbl,'tmp_tbl_phonecodes',columns,'append')
        # 实例化phcodes表操作类CurrFlowLogic

        strsql='select f_add_codes()'#执行存储过程,防止重复导入
        s=_phcodes.execute(strsql)
        data["msg"]="上传保存成功"
        data["state"]=1
        data['rows_count']=s[0].get('f_add_codes')
    except Exception as e:
        print('pd2pgsql',str(e))
        data["msg"]="上传保存失败"

    #return JsonResponse(data)
    #os.remove(file_path)
    return json.dumps(data, cls=json_helper.CJsonEncoder)

  • 表不存在可以直接创建 sqlalchemy连接
def write_to_table(df, table_name,cols_name, if_exists='append'):#表不存在可以直接创建 sqlalchemy连接
    '''
    pwd='qais@123'
    conn_str="postgresql://qais:%s@localhost/db_qais"%urllib.parse.quote_plus(pwd)#?charset=utf8
    '''
    dbcfg=db_config.DB
    db_host=dbcfg.get('host')
    db_database=dbcfg.get('database')
    db_uname=dbcfg.get('user')
    db_upwd=dbcfg.get('password')

    conn_str="postgresql://%s:%s@%s/%s"%(db_uname,urllib.parse.quote_plus(db_upwd),db_host,db_database)#?charset=utf8

    db_engine = create_engine(conn_str,encoding='utf-8',echo=False)#

    string_data_io = io.StringIO()
    df.to_csv(string_data_io, sep='|', index=False, header=False)#header行 不当作内容写入,否则需启用readline()存入数据库
    pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine)
    table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                               index=False, if_exists=if_exists,schema = 'public')#goods_code
    table.create()
    string_data_io.seek(0)
    #string_data_io.readline()  # remove header 如上面header=True,则启用本行
    with db_engine.connect() as connection:
        with connection.connection.cursor() as cursor:
            #copy_cmd = "COPY public.%s (%s) FROM STDIN HEADER DELIMITER '|' CSV" %(table_name,','.join(k for k in cols_name))#goods_code
            #cursor.copy_expert(copy_cmd, string_data_io)
            #或者使用如下一行
            cursor.copy_from(string_data_io, table_name, null='', sep='|',columns=cols_name)
        connection.connection.commit()

def getFileChar(filename):
    f=open(filename,'rb')
    data=f.read(200)
    f.close()
    #print(chardet.detect(data))
    #print(chardet.detect(data).get('encoding'))
    return chardet.detect(data).get('encoding')

文章永久链接:https://tech.souyunku.com/42131

未经允许不得转载:搜云库技术团队 » postgresql的copy_from批量快速导入数据

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们