import os
|
import psycopg2
|
from psycopg2 import OperationalError
|
from dotenv import load_dotenv
|
|
# 加载环境变量
|
load_dotenv()
|
|
class DatabaseConnection:
|
def __init__(self):
|
self.host = os.getenv('DB_HOST')
|
self.port = os.getenv('DB_PORT')
|
self.database = os.getenv('DB_NAME')
|
self.user = os.getenv('DB_USER')
|
self.password = os.getenv('DB_PASSWORD')
|
self.connection = None
|
|
def connect(self):
|
try:
|
self.connection = psycopg2.connect(
|
host=self.host,
|
port=self.port,
|
database=self.database,
|
user=self.user,
|
password=self.password
|
)
|
|
|
with self.connection.cursor() as cursor:
|
cursor.execute("SET timezone = 'UTC'")
|
return True
|
except OperationalError as e:
|
print(f"数据库连接失败: {e}")
|
return False
|
|
def disconnect(self):
|
if self.connection:
|
self.connection.close()
|
self.connection = None
|
|
def get_connection(self):
|
if not self.connection:
|
if not self.connect():
|
return None
|
return self.connection
|
|
def is_connected(self):
|
if not self.connection:
|
return False
|
try:
|
# 执行一个简单的查询来测试连接
|
with self.connection.cursor() as cursor:
|
cursor.execute("SELECT 1")
|
cursor.fetchone()
|
return True
|
except:
|
return False
|