import psycopg2 class PostgreSQLDB: def __init__(self, host, port, user, password, database): self.host = host self.port = port self.user = user self.password = password self.database = database self.connection = None def connect(self): try: self.connection = psycopg2.connect( host=self.host, port=self.port, user=self.user, password=self.password, database=self.database ) return True except psycopg2.Error as err: print(f"Error connecting to PostgreSQL: {err}") return False def close(self): if self.connection: self.connection.close() def insert_batch(self, table, columns, data_list): placeholders = ', '.join(['%s'] * len(columns)) column_names = ', '.join(columns) query = f"INSERT INTO {table} ({column_names}) VALUES ({placeholders})" cursor = self.connection.cursor() try: psycopg2.extras.execute_batch(cursor, query, data_list) self.connection.commit() return True except psycopg2.Error as err: self.connection.rollback() print(f"Error inserting batch data: {err}") return False finally: cursor.close() def update(self, table, set_values, condition): set_statements = ', '.join([f"{key} = %s" for key in set_values]) query = f"UPDATE {table} SET {set_statements} WHERE {condition}" cursor = self.connection.cursor() try: cursor.execute(query, tuple(set_values.values())) self.connection.commit() return True except psycopg2.Error as err: self.connection.rollback() print(f"Error updating data: {err}") return False finally: cursor.close() def delete(self, table, condition): query = f"DELETE FROM {table} WHERE {condition}" cursor = self.connection.cursor() try: cursor.execute(query) self.connection.commit() return True except psycopg2.Error as err: self.connection.rollback() print(f"Error deleting data: {err}") return False finally: cursor.close() def query(self, query): cursor = self.connection.cursor() try: cursor.execute(query) results = cursor.fetchall() return results except psycopg2.Error as err: print(f"Error querying data: {err}") return None finally: cursor.close() db = PostgreSQLDB('localhost', 5432, 'your_user', 'your_password', 'your_database') if db.connect(): # 批量插入数据 columns = ['name', 'age'] data_list = [('John', 25), ('Alice', 30)] if db.insert_batch('your_table', columns, data_list): print("Batch data inserted successfully") # 更新数据 set_values = {'age': 35} condition = "name = 'John'" if db.update('your_table', set_values, condition): print("Data updated successfully") # 删除数据 condition = "age < 18" if db.delete('your_table', condition): print("Data deleted successfully") # 查询数据 query = "SELECT * FROM your_table" results = db.query(query) if results: for row in results: print(row) db.close()
标签:psycopg2,self,cursor,connection,pg,table,query From: https://www.cnblogs.com/tanaikang/p/18399362