1、单独定义一个连接池文件: dbpools.py
#!/usr/bin/env python
# coding=utf-8
import pymssql
import psycopg2
from DBUtils.PooledDB import PooledDB,SharedDBConnection
import time
from io import StringIO
import datetime
import config
from config import db_config
db_Pools =None
class DBPool_MS(object):
def __init__(self):
self.msdbcfg=db_config.MSDB
self.pgdbcfg=db_config.DB
self.pool_ms=PooledDB(pymssql, mincached=1, blocking=True,**self.msdbcfg)
self.pool_pg=PooledDB(psycopg2, mincached=1, blocking=True,**self.pgdbcfg)
#print('db_pool get')
def getConnect(self,db):
if db.get('host')==self.msdbcfg.get('host') and db.get('database')==self.msdbcfg.get('database'):
return self.pool_ms.connection()
if db.get('host')==self.pgdbcfg.get('host') and db.get('database')==self.pgdbcfg.get('database'):
return self.pool_pg.connection()
return None
if __name__=='__main__':
db=db_config.MSDB
#getConnect(db)
2、定义连接信息: db_config.py
### 数据库链接参数 ###
DB = {
'host': '192.168.1.20', # 数据库链接地址'127.0.0.1'
'port': 5432, # 数据库端口
'database': 'simple_db', # 数据库名称
'user': 'pg', # 数据库账号
'password': 'pgtest2017' # 数据库登录密码
}
MSDB = {
'server': '192.168.1.21', # 数据库链接地址'127.0.0.1'host
'port': 1433, # 数据库端口
'database': 'test_db', # 数据库名称
'user': 'msdbuser', # 数据库账号
'password': 'mstest2017' # 数据库登录密码
}
# 是否输出执行的Sql语句到日志中
IS_OUTPUT_SQL = False
3、调用:
from config import db_config
import datetime
import dbpools#使用全局变量,不能使用from引入
class PgHelper(object):
def __init__(self, db, is_output_sql):
self.connect = None
self.cursor = None
if dbpools.db_Pools is None:
dbpools.db_Pools=dbpools.DBPool_MS()
if db==None:
self.dbcfg=db_config.DB
else:
self.dbcfg=db
try:
self.connect = dbpools.db_Pools.getConnect(self.dbcfg)# globalvars.getConnect(self.dbcfg)
self.cursor = self.connect.cursor()
except Exception as e:
log.error('数据库连接错误:%s'%e)
raise
def open_conn(self):
return self.connect
def close_conn(self):
"""关闭postgresql数据库链接"""
# 关闭游标
try:
if self.cursor:
self.cursor.close()
except Exception:
pass
# 关闭数据库链接
try:
if self.connect:
self.connect.close()
except Exception:
pass
def __enter__(self):
"""初始化数据库链接"""
self.open_conn()
return self
def __exit__(self, type, value, trace):
"""关闭postgresql数据库链接"""
self.close_conn()
def rollback(self):
"""回滚操作"""
try:
# 异常时,进行回滚操作
if self.connect:
self.connect.rollback()
except Exception as e:
log_helper.error('回滚操作失败:' + str(e.args))
def commit(self):
"""提交事务"""
try:
if self.connect:
self.connect.commit()
self.close_conn()
except Exception as e:
log_helper.error('提交事务失败:' + str(e.args))