from sshtunnel import SSHTunnelForwarder import pymysql class MySqlSSH: def __init__(self): self.server = SSHTunnelForwarder( ssh_address_or_host=('13.229.92.6', 22), # ssh host ssh_username='lenox', # ssh 账号 ssh_password='......', # ssh 密码 remote_bind_address=('......', 3306) # 数据库配置 ) # 启动隧道服务 self.server.start() mysql_config = { 'user': '......', 'passwd': '......', 'host': self.server.local_bind_host, 'port': self.server.local_bind_port, 'db': 'ph_data_bossjob' } # 连接数据库 self.mysql = pymysql.connect(**mysql_config) self.cursor = self.mysql.cursor() def fetch_one(self, sql): # 执行SQL self.cursor.execute(sql) # 查看结果 result = self.cursor.fetchone() return result def fetch_all(self, sql): # 执行SQL self.cursor.execute(sql) # 查看结果 result = self.cursor.fetchall() return result def execute_sql(self, sql): # 执行SQL self.cursor.execute(sql) # 提交 self.mysql.commit() # 返回影响行数 result = "受影响的行数:{}".format(self.cursor.rowcount) return result def close(self): self.cursor.close() # 关闭查询 self.mysql.close() self.server.close() # 关闭服务 if __name__ == '__main__': cc = None try: cc = MySqlSSH() qq = cc.fetch_all("select count(*) from users where date(created_at)='2024-01-18'") print(qq) except Exception as e: print(e) print("fail") finally: cc.close()
标签:__,python,self,mysql,cursor,ssh,sql From: https://www.cnblogs.com/kylin5201314/p/17973339