Python 代码实现高性能分布式云服务器性能监测系统
数据收集模块(Agent)
在每个服务器节点上运行,收集性能数据。
import psutil
import time
import socket
import json
class PerformanceAgent:
def __init__(self, server_ip, server_port):
self.server_ip = server_ip
self.server_port = server_port
def collect_data(self):
data = {
"cpu_usage": psutil.cpu_percent(interval=1),
"memory_usage": psutil.virtual_memory().percent,
"disk_io": psutil.disk_io_counters()._asdict(),
"network_io": psutil.net_io_counters()._asdict(),
"timestamp": time.time()
}
return data
def send_data(self, data):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.sendto(json.dumps(data).encode(), (self.server_ip, self.server_port))
def run(self):
while True:
data = self.collect_data()
self.send_data(data)
time.sleep(5) # 每5秒收集一次数据
if __name__ == "__main__":
agent = PerformanceAgent(server_ip="192.168.1.100", server_port=9999)
agent.run()
数据传输模块
负责将数据从各个Agent传输到中央服务器。
import socket
import json
class DataReceiver:
def __init__(self, host, port):
self.host = host
self.port = port
def start(self):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.bind((self.host, self.port))
while True:
data, addr = sock.recvfrom(65535)
self.process_data(json.loads(data.decode()))
def process_data(self, data):
# 此处可将数据存入数据库或其他存储系统
print("Received data:", data)
if __name__ == "__main__":
receiver = DataReceiver(host="0.0.0.0", port=9999)
receiver.start()
数据存储模块
在中央服务器上存储和管理性能数据。
from pymongo import MongoClient
class DataStorage:
def __init__(self, db_host, db_port):
self.client = MongoClient(db_host, db_port)
self.db = self.client['performance_monitoring']
self.collection = self.db['performance_data']
def store_data(self, data):
self.collection.insert_one(data)
if __name__ == "__main__":
storage = DataStorage(db_host="localhost", db_port=27017)
sample_data = {
"cpu_usage": 50.5,
"memory_usage": 70.2,
"disk_io": {"read_count": 100, "write_count": 50},
"network_io": {"bytes_sent": 1024, "bytes_recv": 2048},
"timestamp": time.time()
}
storage.store_data(sample_data)
数据分析和报警模块
分析性能数据并生成报警。
class DataAnalyzer:
def __init__(self, db_host, db_port):
self.client = MongoClient(db_host, db_port)
self.db = self.client['performance_monitoring']
self.collection = self.db['performance_data']
def analyze_data(self):
cursor = self.collection.find().sort("timestamp", -1).limit(1)
for data in cursor:
if data["cpu_usage"] > 80:
self.send_alert(f"High CPU usage detected: {data['cpu_usage']}%")
if data["memory_usage"] > 90:
self.send_alert(f"High Memory usage detected: {data['memory_usage']}%")
def send_alert(self, message):
print("ALERT:", message)
if __name__ == "__main__":
analyzer = DataAnalyzer(db_host="localhost", db_port=27017)
analyzer.analyze_data()
数据展示模块(Dashboard)
以可视化的方式展示性能数据和报警信息。
from flask import Flask, render_template, jsonify
from pymongo import MongoClient
app = Flask(__name__)
client = MongoClient("localhost", 27017)
db = client['performance_monitoring']
collection = db['performance_data']
@app.route('/')
def index():
return render_template('index.html')
@app.route('/data')
def get_data():
cursor = collection.find().sort("timestamp", -1).limit(10)
data = []
for doc in cursor:
data.append({
"cpu_usage": doc["cpu_usage"],
"memory_usage": doc["memory_usage"],
"timestamp": doc["timestamp"]
})
return jsonify(data)
if __name__ == "__main__":
app.run(debug=True)
C++ 代码实现高性能分布式云服务器性能监测系统
数据收集模块(Agent)
在每个服务器节点上运行,收集性能数据。
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <sys/sysinfo.h>
#include <fstream>
#include <json/json.h>
#include <arpa/inet.h>
#include <sys/socket.h>
class PerformanceAgent {
public:
PerformanceAgent(const std::string& server_ip, int server_port)
: server_ip(server_ip), server_port(server_port) {}
void collect_data() {
struct sysinfo sys_info;
sysinfo(&sys_info);
Json::Value data;
data["cpu_usage"] = get_cpu_usage();
data["memory_usage"] = (sys_info.totalram - sys_info.freeram) * 100 / sys_info.totalram;
data["timestamp"] = std::time(nullptr);
send_data(data);
}
void run() {
while (true) {
collect_data();
std::this_thread::sleep_for(std::chrono::seconds(5));
}
}
private:
std::string server_ip;
int server_port;
float get_cpu_usage() {
std::ifstream file("/proc/stat");
std::string line;
std::getline(file, line);
unsigned long long user, nice, system, idle;
std::sscanf(line.c_str(), "cpu %llu %llu %llu %llu", &user, &nice, &system, &idle);
static unsigned long long prev_user, prev_nice, prev_system, prev_idle;
unsigned long long diff_idle = idle - prev_idle;
unsigned long long diff_total = (user + nice + system + idle) - (prev_user + prev_nice + prev_system + prev_idle);
prev_user = user;
prev_nice = nice;
prev_system = system;
prev_idle = idle;
return (diff_total - diff_idle) * 100.0 / diff_total;
}
void send_data(const Json::Value& data) {
int sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
inet_pton(AF_INET, server_ip.c_str(), &server_addr.sin_addr);
std::string data_str = Json::writeString(Json::StreamWriterBuilder(), data);
sendto(sock, data_str.c_str(), data_str.size(), 0, (struct sockaddr*)&server_addr, sizeof(server_addr));
close(sock);
}
};
int main() {
PerformanceAgent agent("192.168.1.100", 9999);
agent.run();
return 0;
}
数据传输模块
负责将数据从各个Agent传输到中央服务器。
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <json/json.h>
class DataReceiver {
public:
DataReceiver(int port) : port(port) {}
void start() {
int sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in server_addr, client_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = INADDR_ANY;
bind(sock, (struct sockaddr*)&server_addr, sizeof(server_addr));
char buffer[65536];
while (true) {
socklen_t client_len = sizeof(client_addr);
ssize_t len = recvfrom(sock, buffer, sizeof(buffer) - 1, 0, (struct sockaddr*)&client_addr, &client_len);
buffer[len] = '\0';
process_data(buffer);
}
close(sock);
}
private:
int port;
void process_data(const std::string& data_str) {
Json::CharReaderBuilder reader;
Json::Value data;
std::string errs;
std::istringstream s(data_str);
std::string doc;
while (std::getline(s, doc)) {
if (!Json::parseFromStream(reader, s, &data, &errs)) {
std::cerr << "Failed to parse data: " << errs << std::endl;
continue;
}
std::cout << "Received data: " << data.toStyledString() << std::endl;
// 这里可以将数据存入数据库或其他存储系统
}
}
};
int main() {
DataReceiver receiver(9999);
receiver.start();
return 0;
}
数据存储模块
在中央服务器上存储和管理性能数据。
#include <iostream>
#include <sqlite3.h>
#include <string>
#include <json/json.h>
class DataStorage {
public:
DataStorage(const std::string& db_name) {
sqlite3_open(db_name.c_str(), &db);
create_table();
}
~DataStorage() {
sqlite3_close(db);
}
void store_data(const Json::Value& data) {
std::string sql = "INSERT INTO performance_data (cpu_usage, memory_usage, timestamp) VALUES (?, ?, ?);";
sqlite3_stmt* stmt;
sqlite3_prepare_v2(db, sql.c_str(), -1, &stmt, 0);
sqlite3_bind_double(stmt, 1, data["cpu_usage"].asFloat());
sqlite3_bind_double(stmt, 2, data["memory_usage"].asFloat());
sqlite3_bind_int64(stmt, 3, data["timestamp"].asInt64());
sqlite3_step(stmt);
sqlite3_finalize(stmt);
}
private:
sqlite3* db;
void create_table() {
std::string sql = "CREATE TABLE IF NOT EXISTS performance_data ("
"id INTEGER PRIMARY KEY AUTOINCREMENT, "
"cpu_usage REAL, "
"memory_usage REAL, "
"timestamp INTEGER);";
char* errMsg = nullptr;
sqlite3_exec(db, sql.c_str(), 0, 0, &errMsg);
if (errMsg) {
std::cerr << "Failed to create table: " << errMsg << std::endl;
sqlite3_free(errMsg);
}
}
};
int main() {
DataStorage storage("performance_data.db");
Json::Value sample_data;
sample_data["cpu_usage"] = 50.5;
sample_data["memory_usage"] = 70.2;
sample_data["timestamp"] = std::time(nullptr);
storage.store_data(sample_data);
return 0;
}
数据分析和报警模块
分析性能数据并生成报警。
#include <iostream>
#include <sqlite3.h>
#include <string>
class DataAnalyzer {
public:
DataAnalyzer(const std::string& db_name) {
sqlite3_open(db_name.c_str(), &db);
}
~DataAnalyzer() {
sqlite3_close(db);
}
void analyze_data() {
std::string sql = "SELECT cpu_usage, memory_usage FROM performance_data ORDER BY timestamp DESC LIMIT 1;";
sqlite3_stmt* stmt;
sqlite3_prepare_v2(db, sql.c_str(), -1, &stmt, 0);
if (sqlite3_step(stmt) == SQLITE_ROW) {
float cpu_usage = sqlite3_column_double(stmt, 0);
float memory_usage = sqlite3_column_double(stmt, 1);
if (cpu_usage > 80) {
send_alert("High CPU usage detected: " + std::to_string(cpu_usage) + "%");
}
if (memory_usage > 90) {
send_alert("High Memory usage detected: " + std::to_string(memory_usage) + "%");
}
}
sqlite3_finalize(stmt);
}
private:
sqlite3* db;
void send_alert(const std::string& message) {
std::cout << "ALERT: " << message << std::endl;
}
};
int main() {
DataAnalyzer analyzer("performance_data.db");
analyzer.analyze_data();
return 0;
}
数据展示模块(Dashboard)
以可视化的方式展示性能数据和报警信息。
#include <cpprest/http_listener.h>
#include <cpprest/json.h>
#include <sqlite3.h>
#include <iostream>
using namespace web;
using namespace web::http;
using namespace web::http::experimental::listener;
class PerformanceDashboard {
public:
PerformanceDashboard(const std::string& url, const std::string& db_name)
: listener(url), db_name(db_name
标签:__,self,db,server,高性能,usage,服务器,data,分布式
From: https://blog.51cto.com/u_14882565/10890410