首页 > 其他分享 >代码实现高性能分布式云服务器性能监测系统

代码实现高性能分布式云服务器性能监测系统

时间:2024-08-31 12:55:02浏览次数:6  
标签:__ self db server 高性能 usage 服务器 data 分布式

Python 代码实现高性能分布式云服务器性能监测系统

数据收集模块(Agent)

在每个服务器节点上运行,收集性能数据。

import psutil
import time
import socket
import json

class PerformanceAgent:
    def __init__(self, server_ip, server_port):
        self.server_ip = server_ip
        self.server_port = server_port
    
    def collect_data(self):
        data = {
            "cpu_usage": psutil.cpu_percent(interval=1),
            "memory_usage": psutil.virtual_memory().percent,
            "disk_io": psutil.disk_io_counters()._asdict(),
            "network_io": psutil.net_io_counters()._asdict(),
            "timestamp": time.time()
        }
        return data
    
    def send_data(self, data):
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
            sock.sendto(json.dumps(data).encode(), (self.server_ip, self.server_port))
    
    def run(self):
        while True:
            data = self.collect_data()
            self.send_data(data)
            time.sleep(5)  # 每5秒收集一次数据

if __name__ == "__main__":
    agent = PerformanceAgent(server_ip="192.168.1.100", server_port=9999)
    agent.run()

数据传输模块

负责将数据从各个Agent传输到中央服务器。

import socket
import json

class DataReceiver:
    def __init__(self, host, port):
        self.host = host
        self.port = port
    
    def start(self):
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
            sock.bind((self.host, self.port))
            while True:
                data, addr = sock.recvfrom(65535)
                self.process_data(json.loads(data.decode()))
    
    def process_data(self, data):
        # 此处可将数据存入数据库或其他存储系统
        print("Received data:", data)

if __name__ == "__main__":
    receiver = DataReceiver(host="0.0.0.0", port=9999)
    receiver.start()

数据存储模块

在中央服务器上存储和管理性能数据。

from pymongo import MongoClient

class DataStorage:
    def __init__(self, db_host, db_port):
        self.client = MongoClient(db_host, db_port)
        self.db = self.client['performance_monitoring']
        self.collection = self.db['performance_data']
    
    def store_data(self, data):
        self.collection.insert_one(data)

if __name__ == "__main__":
    storage = DataStorage(db_host="localhost", db_port=27017)
    sample_data = {
        "cpu_usage": 50.5,
        "memory_usage": 70.2,
        "disk_io": {"read_count": 100, "write_count": 50},
        "network_io": {"bytes_sent": 1024, "bytes_recv": 2048},
        "timestamp": time.time()
    }
    storage.store_data(sample_data)

数据分析和报警模块

分析性能数据并生成报警。

class DataAnalyzer:
    def __init__(self, db_host, db_port):
        self.client = MongoClient(db_host, db_port)
        self.db = self.client['performance_monitoring']
        self.collection = self.db['performance_data']
    
    def analyze_data(self):
        cursor = self.collection.find().sort("timestamp", -1).limit(1)
        for data in cursor:
            if data["cpu_usage"] > 80:
                self.send_alert(f"High CPU usage detected: {data['cpu_usage']}%")
            if data["memory_usage"] > 90:
                self.send_alert(f"High Memory usage detected: {data['memory_usage']}%")
    
    def send_alert(self, message):
        print("ALERT:", message)

if __name__ == "__main__":
    analyzer = DataAnalyzer(db_host="localhost", db_port=27017)
    analyzer.analyze_data()

数据展示模块(Dashboard)

以可视化的方式展示性能数据和报警信息。

from flask import Flask, render_template, jsonify
from pymongo import MongoClient

app = Flask(__name__)

client = MongoClient("localhost", 27017)
db = client['performance_monitoring']
collection = db['performance_data']

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/data')
def get_data():
    cursor = collection.find().sort("timestamp", -1).limit(10)
    data = []
    for doc in cursor:
        data.append({
            "cpu_usage": doc["cpu_usage"],
            "memory_usage": doc["memory_usage"],
            "timestamp": doc["timestamp"]
        })
    return jsonify(data)

if __name__ == "__main__":
    app.run(debug=True)

C++ 代码实现高性能分布式云服务器性能监测系统

数据收集模块(Agent)

在每个服务器节点上运行,收集性能数据。

#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <sys/sysinfo.h>
#include <fstream>
#include <json/json.h>
#include <arpa/inet.h>
#include <sys/socket.h>

class PerformanceAgent {
public:
    PerformanceAgent(const std::string& server_ip, int server_port)
        : server_ip(server_ip), server_port(server_port) {}

    void collect_data() {
        struct sysinfo sys_info;
        sysinfo(&sys_info);

        Json::Value data;
        data["cpu_usage"] = get_cpu_usage();
        data["memory_usage"] = (sys_info.totalram - sys_info.freeram) * 100 / sys_info.totalram;
        data["timestamp"] = std::time(nullptr);

        send_data(data);
    }

    void run() {
        while (true) {
            collect_data();
            std::this_thread::sleep_for(std::chrono::seconds(5));
        }
    }

private:
    std::string server_ip;
    int server_port;

    float get_cpu_usage() {
        std::ifstream file("/proc/stat");
        std::string line;
        std::getline(file, line);

        unsigned long long user, nice, system, idle;
        std::sscanf(line.c_str(), "cpu %llu %llu %llu %llu", &user, &nice, &system, &idle);
        
        static unsigned long long prev_user, prev_nice, prev_system, prev_idle;
        unsigned long long diff_idle = idle - prev_idle;
        unsigned long long diff_total = (user + nice + system + idle) - (prev_user + prev_nice + prev_system + prev_idle);

        prev_user = user;
        prev_nice = nice;
        prev_system = system;
        prev_idle = idle;

        return (diff_total - diff_idle) * 100.0 / diff_total;
    }

    void send_data(const Json::Value& data) {
        int sock = socket(AF_INET, SOCK_DGRAM, 0);
        struct sockaddr_in server_addr;
        server_addr.sin_family = AF_INET;
        server_addr.sin_port = htons(server_port);
        inet_pton(AF_INET, server_ip.c_str(), &server_addr.sin_addr);

        std::string data_str = Json::writeString(Json::StreamWriterBuilder(), data);
        sendto(sock, data_str.c_str(), data_str.size(), 0, (struct sockaddr*)&server_addr, sizeof(server_addr));
        close(sock);
    }
};

int main() {
    PerformanceAgent agent("192.168.1.100", 9999);
    agent.run();
    return 0;
}

数据传输模块

负责将数据从各个Agent传输到中央服务器。

#include <iostream>
#include <string>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <json/json.h>

class DataReceiver {
public:
    DataReceiver(int port) : port(port) {}

    void start() {
        int sock = socket(AF_INET, SOCK_DGRAM, 0);
        struct sockaddr_in server_addr, client_addr;
        server_addr.sin_family = AF_INET;
        server_addr.sin_port = htons(port);
        server_addr.sin_addr.s_addr = INADDR_ANY;

        bind(sock, (struct sockaddr*)&server_addr, sizeof(server_addr));

        char buffer[65536];
        while (true) {
            socklen_t client_len = sizeof(client_addr);
            ssize_t len = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr*)&client_addr, &client_len);
            buffer[len] = '\0';

            process_data(buffer);
        }

        close(sock);
    }

private:
    int port;

    void process_data(const std::string& data_str) {
        Json::CharReaderBuilder reader;
        Json::Value data;
        std::string errs;

        std::istringstream s(data_str);
        std::string doc;
        while (std::getline(s, doc)) {
            if (!Json::parseFromStream(reader, s, &data, &errs)) {
                std::cerr << "Failed to parse data: " << errs << std::endl;
                continue;
            }

            std::cout << "Received data: " << data.toStyledString() << std::endl;
            // 这里可以将数据存入数据库或其他存储系统
        }
    }
};

int main() {
    DataReceiver receiver(9999);
    receiver.start();
    return 0;
}

数据存储模块

在中央服务器上存储和管理性能数据。

#include <iostream>
#include <sqlite3.h>
#include <string>
#include <json/json.h>

class DataStorage {
public:
    DataStorage(const std::string& db_name) {
        sqlite3_open(db_name.c_str(), &db);
        create_table();
    }

    ~DataStorage() {
        sqlite3_close(db);
    }

    void store_data(const Json::Value& data) {
        std::string sql = "INSERT INTO performance_data (cpu_usage, memory_usage, timestamp) VALUES (?, ?, ?);";
        sqlite3_stmt* stmt;
        sqlite3_prepare_v2(db, sql.c_str(), -1, &stmt, 0);
        sqlite3_bind_double(stmt, 1, data["cpu_usage"].asFloat());
        sqlite3_bind_double(stmt, 2, data["memory_usage"].asFloat());
        sqlite3_bind_int64(stmt, 3, data["timestamp"].asInt64());

        sqlite3_step(stmt);
        sqlite3_finalize(stmt);
    }

private:
    sqlite3* db;

    void create_table() {
        std::string sql = "CREATE TABLE IF NOT EXISTS performance_data ("
                          "id INTEGER PRIMARY KEY AUTOINCREMENT, "
                          "cpu_usage REAL, "
                          "memory_usage REAL, "
                          "timestamp INTEGER);";
        char* errMsg = nullptr;
        sqlite3_exec(db, sql.c_str(), 0, 0, &errMsg);
        if (errMsg) {
            std::cerr << "Failed to create table: " << errMsg << std::endl;
            sqlite3_free(errMsg);
        }
    }
};

int main() {
    DataStorage storage("performance_data.db");
    Json::Value sample_data;
    sample_data["cpu_usage"] = 50.5;
    sample_data["memory_usage"] = 70.2;
    sample_data["timestamp"] = std::time(nullptr);

    storage.store_data(sample_data);
    return 0;
}

数据分析和报警模块

分析性能数据并生成报警。

#include <iostream>
#include <sqlite3.h>
#include <string>

class DataAnalyzer {
public:
    DataAnalyzer(const std::string& db_name) {
        sqlite3_open(db_name.c_str(), &db);
    }

    ~DataAnalyzer() {
        sqlite3_close(db);
    }

    void analyze_data() {
        std::string sql = "SELECT cpu_usage, memory_usage FROM performance_data ORDER BY timestamp DESC LIMIT 1;";
        sqlite3_stmt* stmt;
        sqlite3_prepare_v2(db, sql.c_str(), -1, &stmt, 0);

        if (sqlite3_step(stmt) == SQLITE_ROW) {
            float cpu_usage = sqlite3_column_double(stmt, 0);
            float memory_usage = sqlite3_column_double(stmt, 1);

            if (cpu_usage > 80) {
                send_alert("High CPU usage detected: " + std::to_string(cpu_usage) + "%");
            }
            if (memory_usage > 90) {
                send_alert("High Memory usage detected: " + std::to_string(memory_usage) + "%");
            }
        }

        sqlite3_finalize(stmt);
    }

private:
    sqlite3* db;

    void send_alert(const std::string& message) {
        std::cout << "ALERT: " << message << std::endl;
    }
};

int main() {
    DataAnalyzer analyzer("performance_data.db");
    analyzer.analyze_data();
    return 0;
}

数据展示模块(Dashboard)

以可视化的方式展示性能数据和报警信息。

#include <cpprest/http_listener.h>
#include <cpprest/json.h>
#include <sqlite3.h>
#include <iostream>

using namespace web;
using namespace web::http;
using namespace web::http::experimental::listener;

class PerformanceDashboard {
public:
    PerformanceDashboard(const std::string& url, const std::string& db_name) 
        : listener(url), db_name(db_name

标签:__,self,db,server,高性能,usage,服务器,data,分布式
From: https://blog.51cto.com/u_14882565/10890410

相关文章

  • 【网络编程通关之路】 Tcp 基础回显服务器(Java实现)及保姆式知识原理详解 ! ! !
    本篇会加入个人的所谓鱼式疯言❤️❤️❤️鱼式疯言:❤️❤️❤️此疯言非彼疯言而是理解过并总结出来通俗易懂的大白话,小编会尽可能的在每个概念后插入鱼式疯言,帮助大家理解的.......
  • Broker服务器模块
    一.Broker模块介绍二.Broker模块具体实现1.类的成员变量与构造函数成员变量事件循环和TCP服务器:muduo::net::EventLoop_baseloop;muduo::net::TcpServer_server;这些是muduo库提供的核心组件,负责处理网络事件和管理TCP连接。消息分发和编码:muduo::net::Protobuf......
  • 基于live555开发的多线程RTSPServer轻量级流媒体服务器EasyRTSPServer开源代码及其调
    EasyRTSPServer参考live555testProg中的testOnDemandRTSPServer示例程序,将一个live555testOnDemandRTSPServer封装在一个类中,例如,我们称为ClassEasyRTSPServer,在EasyRTSPServer_Create接口调用时,我们新建一个EasyRTSPServer对象,再通过调用EasyRTSPServer_Startup接口,将EasyRTSP......
  • ScaleLLM: 高性能推理系统助力大型语言模型部署
    ScaleLLM:革新大型语言模型推理的新利器在人工智能快速发展的今天,大型语言模型(LLMs)已成为推动技术进步的重要力量。然而,这些庞大的模型在实际部署中常常面临效率和资源消耗的挑战。为了解决这一难题,ScaleLLM应运而生,为LLM推理带来了全新的可能性。什么是ScaleLLM?Scale......
  • 一个linux服务器安装多个java版本,如何选择指定的 java版本去执行
    linux中有时候可能你由于不同的项目需要使用不同版本的javajdk部署,你就需要在你的linux服务中安装很多个版本的javajdk,那么在linux中如何安装和使用不同版本的javajdk呢?1.安装第一个javajdk版本:到java官网下载一个javajdk版本,并解压,然后配置环境变量。javajdk地址:wge......
  • nginx服务器如何配置ssl证书演示
    nginx服务器如何配置ssl证书,配置代码如下:server{#listen80default_server;listen443;#listen[::]:80default_serveripv6only=on;server_name你的域名;indexindex.phpindex.htmlindex.htm;root/mnt/te......
  • Redisson与Redis分布式锁
    Redis分布式锁Redis分布式锁是一种在分布式系统中用于确保多个进程对共享资源互斥访问的机制。它通常通过Redis的原子指令来实现,比如使用SETNX(SetifNoteXists)指令来设置键,如果键不存在则操作成功,可以认为获取了锁;如果键已存在,则操作失败,表示锁被其他进程持有。但是,这种基本......
  • 分布式锁的实现:ZooKeeper 的解决方案
    在分布式系统中,不同的服务或进程需要访问共享资源时,常常需要一种机制来确保在同一时刻只有一个服务或进程能够访问资源。这种机制被称为分布式锁。ZooKeeper,一个为分布式应用提供一致性服务的开源协调服务,提供了一种实现分布式锁的有效方法。ZooKeeper分布式锁的原理ZooKeep......
  • 【GaussDB】分布式性能分析常用的SQL
    --查看连接数selectcoorname,usename,application_name,substr(query,1,50)asquery,statefrompgxc_stat_activitywherestate='active'andusename='xxx';selectcoorname,count()frompgxc_stat_activitywhereusename='cbsprd'groupb......
  • 服务器数据恢复—异常断电导致ESXI主机共享存储中raid6阵列崩溃的数据恢复案例
    服务器存储数据恢复环境:一台存储中有一组由12块SAS硬盘组建的raid6磁盘阵列,划分了1个卷,由数台VmwareESXI主机共享存储。卷中存放了大量的Windows系统虚拟机。这些虚拟机系统盘大小一致,数据盘大小不确定,数据盘都是精简模式。服务器存储故障:机房异常断电导致存储瘫痪,加电后存储依......