首页 > 编程语言 >写一个 python daemo 注册到nacos中

写一个 python daemo 注册到nacos中

时间:2024-07-22 16:10:05浏览次数:9  
标签:service python daemo nacos url token heartbeat response

 

"""
注册到nacos中的deamon
nacos:2.3.2(模式:standalone)
python:3.6.8

nohup python3 demon.py &
"""
import requests
import threading
import time

# Nacos 服务器地址和端口
nacos_url = "http://127.0.0.1:8848"

# Nacos 登录信息
username = "test"
password = "123456"

# 服务注册信息
service_name = "suyajun-service"
ip = "127.0.0.1"
port = 8080

# 登录 Nacos 获取 token
def login_to_nacos(url, username, password):
    login_url = f"{url}/nacos/v1/auth/login"
    login_data = {
        "username": username,
        "password": password
    }
    response = requests.post(login_url, data=login_data)
    if response.status_code == 200:
        return response.json().get("accessToken")
    else:
        raise Exception(f"Failed to login to Nacos: {response.text}")

# 注册服务到 Nacos
def register_service(url, token, service_name, ip, port):
    register_url = f"{url}/nacos/v1/ns/instance"
    register_data = {
        "serviceName": service_name,
        "ip": ip,
        "port": port,
        "namespaceId": "public",  # 这里使用 public 命名空间,如果有其他命名空间请替换
        "groupName": "DEFAULT_GROUP",  # 默认组名
        "weight": 1.0,
        "enable": "true",
        "healthy": "true",
        "metadata": "{}",
        "ephemeral": "true",
        "accessToken": token
    }
    response = requests.post(register_url, params=register_data)
    if response.status_code == 200:
        print("Service registered successfully")
    else:
        raise Exception(f"Failed to register service: {response.status_code} - {response.text}")

# 发送心跳信息到 Nacos
def send_heartbeat(url, token, service_name, ip, port):
    while True:
        heartbeat_url = f"{url}/nacos/v1/ns/instance/beat"
        heartbeat_data = {
            "serviceName": service_name,
            "ip": ip,
            "port": port,
            "namespaceId": "public",  # 这里使用 public 命名空间,如果有其他命名空间请替换
            "groupName": "DEFAULT_GROUP",  # 默认组名
            "ephemeral": "true",
            "accessToken": token
        }
        response = requests.put(heartbeat_url, params=heartbeat_data)
        if response.status_code == 200:
            print("Heartbeat sent successfully")
        else:
            print(f"Failed to send heartbeat: {response.status_code} - {response.text}")
        
        # 每 5 秒发送一次心跳
        time.sleep(5)

# 主函数
def main():
    try:
        # 登录获取 token
        token = login_to_nacos(nacos_url, username, password)
        print(f"Login successful, token: {token}")

        # 注册服务
        register_service(nacos_url, token, service_name, ip, port)

        # 启动心跳线程
        heartbeat_thread = threading.Thread(target=send_heartbeat, args=(nacos_url, token, service_name, ip, port))
        heartbeat_thread.daemon = True
        heartbeat_thread.start()

        # 保持主线程运行
        while True:
            time.sleep(1)
    except Exception as e:
        print(e)

if __name__ == "__main__":
    main()

 

标签:service,python,daemo,nacos,url,token,heartbeat,response
From: https://www.cnblogs.com/suyj/p/18316209

相关文章

  • Python、图形用户界面、ctk
    所以,我正在创建一个博客,现在,我在设置部分,我有一个带有按钮的滑动面板,我希望它转到一个新窗口,我将在其中创建新的小部件等...,我已经完成了这种登录和注册的事情,问题是现在我不能使用pack.forget(),它只是不起作用classSlidePanel(customtkinter.CTkFrame):def__init__(se......
  • 当我的代码损坏时,如何设置警报或蜂鸣声? (最好是Python)
    我正在为机器人运行一些代码,它将继续运行,直到我手动终止该进程。或者,如果代码意外遇到诸如SYntaxError或其他此类错误/异常之类的错误并崩溃。我想知道当我的代码崩溃时是否可以设置一些警报或蜂鸣声。我的目标就是将视线从屏幕上移开,仅在进程停止运行时才检查它。如果......
  • VScode连接虚拟机运行Python文件的方法
    声明:本文使用Linux发行版本为rocky_9.4目录1.在rocky_9.4最小安装的系统中,默认是没有tar工具的,因此,要先下载tar工具2.在安装好的vscode中下载ssh远程插件工具3.然后连接虚拟机4.查看python是否已经安装5.下载扩展插件6.新建.py文件测试1.在rocky_9.4最小安装......
  • 【介绍Python多进程】
    ......
  • 用python制作终端向上滑动的效果
    我正在开发一个项目,需要时需要过渡效果。我正在通过CRT终端模拟器“cool-retro-term”运行这个python项目。我想让它像老式CRT终端一样,屏幕平滑地向上滑动所有字符以呈现下一行或刷新屏幕。像这样:终端滑动效果不是单独打印的字符,而是屏幕的滚动。到目......
  • centos stream9(linux): 编译安装python 3.12.4
    一,官方下载地址:https://www.python.org/downloads/点击进入具体版本的下载页面,我们选择稳定版本,地址:https://www.python.org/downloads/release/python-3124/如图:复制得到下载链接:https://www.python.org/ftp/python/3.12.4/Python-3.12.4.tgz 二,下载:从命令行下载:......
  • 使用 beautifulsoup python 更改内部标签的文本
    我想更改使用Beautifulsoup获得的HTML中标签的内部文本。示例:<ahref="index.html"id="websiteName">Foo</a>变成:<ahref="index.html"id="websiteName">Bar</a>我已经设法通过其id获取标签:HTMLDocument.find(id......
  • Python - Adob​​e InDesign Javascript 脚本帮助从 Python 调用 JSX
    提前致谢。希望每个人都表现出色。我试图从python调用Adob​​eIndesignJSX文件,下面是示例代码:我想在Adob​​eINdesign2024或更高版本上运行它。我在PythonInDesign脚本编写上看到了一些示例:从预检中获取溢出文本框以自动调整大小作为参考,可能适用于Ado......
  • 为什么将小部件添加到滚动视图在 python kivy 中不起作用
    Python文件fromkivymd.appimportMDAppfromkivy.langimportBuilderfromkivy.uix.floatlayoutimportFloatLayoutfromkivy.core.windowimportWindowfromkivy.configimportConfigfromkivymd.uix.listimportOneLineListItem#UkuranwindowConfig.set(&......
  • Python 实现Excel和TXT文本格式之间的相互转换
    Excel是一种具有强大的数据处理和图表制作功能的电子表格文件,而TXT则是一种简单通用、易于编辑的纯文本文件。将Excel转换为TXT可以帮助我们将复杂的数据表格以文本的形式保存,方便其他程序读取和处理。而将TXT转换为Excel则可以将文本文件中的数据导入到Excel中进行进一步的分析和......