From abaeb4f6c850e68c736eabeb723f7534f7a27913 Mon Sep 17 00:00:00 2001 From: Jonathan Klimt <jonathan.klimt@eonerc.rwth-aachen.de> Date: Mon, 7 Apr 2025 23:22:01 +0200 Subject: [PATCH] Refactored server_manager into separate file --- app.py | 243 ++-------------------------------------------- auth.py | 3 +- server_manager.py | 233 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 244 insertions(+), 235 deletions(-) create mode 100644 server_manager.py diff --git a/app.py b/app.py index 54779cd..5bb67c4 100644 --- a/app.py +++ b/app.py @@ -18,19 +18,17 @@ from flask_login import ( # type: ignore from flask_limiter import Limiter from flask_limiter.util import get_remote_address from flask.typing import ResponseReturnValue -from pyghmi.ipmi import command # type: ignore import json -import time -import threading import logging import socket import sys import argparse -from typing import Dict, List, Optional, Tuple, TypedDict -from auth import LDAPAuth, User, LDAPConfig +from typing import Optional +from auth import LDAPAuth, User import os from datetime import timedelta from typing import Any +from server_manager import ServerManager, Config # Configure logging to show all levels logging.basicConfig( @@ -47,49 +45,6 @@ logging.getLogger("werkzeug").setLevel(logging.WARNING) socket.setdefaulttimeout(5) # 5 seconds timeout -class ServerConfig(TypedDict): - name: str - ipmi_ip: str - ipmi_user: str - ipmi_pass: str - locked: bool - - -class ServerStatus(TypedDict): - name: str - status: str - error: Optional[str] - ipmi_ip: str - power_consumption: Optional[float] - locked: bool - - -class GroupConfig(TypedDict): - name: str - servers: List[ServerConfig] - - -class Config(TypedDict): - ldap: LDAPConfig - groups: List[GroupConfig] - - -class ServerData(TypedDict): - config: ServerConfig - connection: Optional[command.Command] - last_error: Optional[str] - - -class ServerGroup(TypedDict): - name: str - servers: List[ServerConfig] - - -class GroupStatus(TypedDict): - name: str - servers: List[ServerStatus] - - app = Flask(__name__) app.secret_key = os.urandom(24) app.config["PERMANENT_SESSION_LIFETIME"] = timedelta( @@ -123,191 +78,6 @@ limiter = Limiter( ) -@login_manager.user_loader -def load_user(user_id: str) -> Optional[User]: - return User(user_id) - - -class ServerManager: - def __init__(self, config: Config) -> None: - self.servers: Dict[str, ServerData] = {} - self.status_cache: Dict[str, ServerStatus] = {} - self.groups: List[ServerGroup] = [] - self.load_config(config) - self.initialize_connections() - self._start_status_update_thread() - - def load_config(self, config: Config) -> None: - try: - self.groups = config["groups"] - ipmi_ips = set() - for group in self.groups: - for server in group["servers"]: - if server["ipmi_ip"] in ipmi_ips: - raise ValueError( - f"Duplicate IPMI IP address found in config: {server['ipmi_ip']}" - ) - ipmi_ips.add(server["ipmi_ip"]) - self.servers[f"{group['name']}/{server['name']}"] = { - "config": server, - "connection": None, - "last_error": None, - } - except Exception as e: - logger.error(f"Failed to load server configuration: {str(e)}") - self.servers = {} - self.groups = [] - raise # Re-raise the exception to prevent the application from starting with invalid configuration - - def initialize_connections(self) -> None: - for server_name, server_data in self.servers.items(): - try: - logger.debug(f"Initializing connection to {server_name}") - server_data["connection"] = command.Command( - bmc=server_data["config"]["ipmi_ip"], - userid=server_data["config"]["ipmi_user"], - password=server_data["config"]["ipmi_pass"], - keepalive=True, - ) - logger.debug(f"Successfully initialized connection to {server_name}") - except Exception as e: - logger.error( - f"Failed to initialize connection to {server_name}: {str(e)}" - ) - server_data["last_error"] = str(e) - - def _create_status_response( - self, - group_name: str, - server_name: str, - status: str, - error: Optional[str] = None, - power_consumption: Optional[float] = None, - ) -> ServerStatus: - """Create a standardized status response for a server.""" - server: ServerData = self.servers[f"{group_name}/{server_name}"] - return { - "name": server_name, - "status": status, - "error": error, - "ipmi_ip": server["config"]["ipmi_ip"], - "power_consumption": power_consumption, - "locked": server["config"].get("locked", False), - } - - def _update_server_status(self, group_name: str, server_name: str) -> None: - cache_key = f"{group_name}/{server_name}" - if cache_key not in self.servers: - return - - server: ServerData = self.servers[cache_key] - if not server["connection"]: - self.status_cache[cache_key] = self._create_status_response( - group_name, - server_name, - "unknown", - server["last_error"] or "No connection established", - ) - return - - try: - status = server["connection"].get_power() - server["last_error"] = None - - # Get power consumption - power_consumption: Optional[float] = None - try: - power_consumption = server["connection"].get_system_power_watts() - logger.debug( - f"Power consumption for {server_name}: {power_consumption}W" - ) - except Exception as e: - logger.debug( - f"Could not get power consumption for {server_name}: {str(e)}" - ) - - self.status_cache[cache_key] = self._create_status_response( - group_name, - server_name, - "on" if status["powerstate"] == "on" else "off", - power_consumption=power_consumption, - ) - except Exception as e: - logger.error(f"Failed to get power status for {server_name}: {str(e)}") - server["last_error"] = str(e) - self.status_cache[cache_key] = self._create_status_response( - group_name, server_name, "unknown", str(e) - ) - - def _status_update_loop(self) -> None: - while True: - try: - for group in self.groups: - for server in group["servers"]: - self._update_server_status(group["name"], server["name"]) - socketio.emit( - "server_status_update", {"groups": self.get_all_statuses()} - ) - except Exception as e: - logger.error(f"Error in status update thread: {str(e)}") - time.sleep(5) - - def _start_status_update_thread(self) -> None: - thread = threading.Thread(target=self._status_update_loop, daemon=True) - thread.start() - - def toggle_power( - self, group_name: str, server_name: str - ) -> Tuple[Optional[bool], Optional[str]]: - cache_key = f"{group_name}/{server_name}" - if cache_key not in self.servers: - logger.error( - f"Power toggle failed: Server {server_name} not found in group {group_name}" - ) - return None, "Server not found" - - server = self.servers[cache_key] - if server["config"].get("locked", False): - logger.info(f"Power toggle blocked: Server {server_name} is locked") - return None, "Server is locked and cannot be controlled via this tool" - - if not server["connection"]: - logger.error( - f"Power toggle failed: No connection established for server {server_name}" - ) - return None, "No connection established" - - try: - current_status = server["connection"].get_power() - if current_status["powerstate"] == "on": - logger.info(f"Initiating power off for server {server_name}") - server["connection"].set_power("off") - else: - logger.info(f"Initiating power on for server {server_name}") - server["connection"].set_power("on") - return True, None - except Exception as e: - logger.error( - f"Failed to initiate power toggle for server {server_name}: {str(e)}" - ) - return None, str(e) - - def get_all_statuses(self) -> List[GroupStatus]: - """Get the status of all servers grouped by their groups.""" - group_statuses: List[GroupStatus] = [] - - for group in self.groups: - group_servers: List[ServerStatus] = [] - for server in group["servers"]: - cache_key = f"{group['name']}/{server['name']}" - if cache_key in self.status_cache: - group_servers.append(self.status_cache[cache_key]) - - group_statuses.append({"name": group["name"], "servers": group_servers}) - - return group_statuses - - config_path = "config.json" if __name__ == "__main__": parser = argparse.ArgumentParser(description="IPMI Server Management") @@ -321,7 +91,12 @@ if __name__ == "__main__": with open(config_path, "r") as f: config = json.load(f) -server_manager = ServerManager(config) +server_manager = ServerManager(socketio, config) + + +@login_manager.user_loader +def load_user(user_id: str) -> Optional[User]: + return User(user_id) @app.route("/login", methods=["GET", "POST"]) diff --git a/auth.py b/auth.py index a435e9c..ad4834a 100644 --- a/auth.py +++ b/auth.py @@ -13,13 +13,14 @@ class LDAPConfig(TypedDict): user_password: str admin_group: str + class User(UserMixin): def __init__(self, username: str): self.id = username class LDAPAuth: - def __init__(self, config: 'LDAPConfig'): + def __init__(self, config: "LDAPConfig"): self.server = config["server"] self.base_dn = config["base_dn"] self.user_dn = config["user_dn"] diff --git a/server_manager.py b/server_manager.py new file mode 100644 index 0000000..1f60a24 --- /dev/null +++ b/server_manager.py @@ -0,0 +1,233 @@ +from typing import TypedDict, List, Optional, Dict, Tuple +from auth import LDAPConfig +from pyghmi.ipmi import command # type: ignore +import time +import threading +import logging +from flask_socketio import SocketIO + +logger = logging.getLogger(__name__) + + +class ServerConfig(TypedDict): + name: str + ipmi_ip: str + ipmi_user: str + ipmi_pass: str + locked: bool + + +class ServerStatus(TypedDict): + name: str + status: str + error: Optional[str] + ipmi_ip: str + power_consumption: Optional[float] + locked: bool + + +class GroupConfig(TypedDict): + name: str + servers: List[ServerConfig] + + +class Config(TypedDict): + ldap: LDAPConfig + groups: List[GroupConfig] + + +class ServerData(TypedDict): + config: ServerConfig + connection: Optional[command.Command] + last_error: Optional[str] + + +class ServerGroup(TypedDict): + name: str + servers: List[ServerConfig] + + +class GroupStatus(TypedDict): + name: str + servers: List[ServerStatus] + + +class ServerManager: + def __init__(self, socketio: SocketIO, config: Config) -> None: + self.servers: Dict[str, ServerData] = {} + self.status_cache: Dict[str, ServerStatus] = {} + self.groups: List[ServerGroup] = [] + self.load_config(config) + self.initialize_connections() + self._start_status_update_thread() + self.socketio = socketio + + def load_config(self, config: Config) -> None: + try: + self.groups = config["groups"] + ipmi_ips = set() + for group in self.groups: + for server in group["servers"]: + if server["ipmi_ip"] in ipmi_ips: + raise ValueError( + f"Duplicate IPMI IP address found in config: {server['ipmi_ip']}" + ) + ipmi_ips.add(server["ipmi_ip"]) + self.servers[f"{group['name']}/{server['name']}"] = { + "config": server, + "connection": None, + "last_error": None, + } + except Exception as e: + logger.error(f"Failed to load server configuration: {str(e)}") + self.servers = {} + self.groups = [] + raise # Re-raise the exception to prevent the application from starting with invalid configuration + + def initialize_connections(self) -> None: + for server_name, server_data in self.servers.items(): + try: + logger.debug(f"Initializing connection to {server_name}") + server_data["connection"] = command.Command( + bmc=server_data["config"]["ipmi_ip"], + userid=server_data["config"]["ipmi_user"], + password=server_data["config"]["ipmi_pass"], + keepalive=True, + ) + logger.debug(f"Successfully initialized connection to {server_name}") + except Exception as e: + logger.error( + f"Failed to initialize connection to {server_name}: {str(e)}" + ) + server_data["last_error"] = str(e) + + def _create_status_response( + self, + group_name: str, + server_name: str, + status: str, + error: Optional[str] = None, + power_consumption: Optional[float] = None, + ) -> ServerStatus: + """Create a standardized status response for a server.""" + server: ServerData = self.servers[f"{group_name}/{server_name}"] + return { + "name": server_name, + "status": status, + "error": error, + "ipmi_ip": server["config"]["ipmi_ip"], + "power_consumption": power_consumption, + "locked": server["config"].get("locked", False), + } + + def _update_server_status(self, group_name: str, server_name: str) -> None: + cache_key = f"{group_name}/{server_name}" + if cache_key not in self.servers: + return + + server: ServerData = self.servers[cache_key] + if not server["connection"]: + self.status_cache[cache_key] = self._create_status_response( + group_name, + server_name, + "unknown", + server["last_error"] or "No connection established", + ) + return + + try: + status = server["connection"].get_power() + server["last_error"] = None + + # Get power consumption + power_consumption: Optional[float] = None + try: + power_consumption = server["connection"].get_system_power_watts() + logger.debug( + f"Power consumption for {server_name}: {power_consumption}W" + ) + except Exception as e: + logger.debug( + f"Could not get power consumption for {server_name}: {str(e)}" + ) + + self.status_cache[cache_key] = self._create_status_response( + group_name, + server_name, + "on" if status["powerstate"] == "on" else "off", + power_consumption=power_consumption, + ) + except Exception as e: + logger.error(f"Failed to get power status for {server_name}: {str(e)}") + server["last_error"] = str(e) + self.status_cache[cache_key] = self._create_status_response( + group_name, server_name, "unknown", str(e) + ) + + def _status_update_loop(self) -> None: + while True: + try: + for group in self.groups: + for server in group["servers"]: + self._update_server_status(group["name"], server["name"]) + self.socketio.emit( + "server_status_update", {"groups": self.get_all_statuses()} + ) + except Exception as e: + logger.error(f"Error in status update thread: {str(e)}") + time.sleep(5) + + def _start_status_update_thread(self) -> None: + thread = threading.Thread(target=self._status_update_loop, daemon=True) + thread.start() + + def toggle_power( + self, group_name: str, server_name: str + ) -> Tuple[Optional[bool], Optional[str]]: + cache_key = f"{group_name}/{server_name}" + if cache_key not in self.servers: + logger.error( + f"Power toggle failed: Server {server_name} not found in group {group_name}" + ) + return None, "Server not found" + + server = self.servers[cache_key] + if server["config"].get("locked", False): + logger.info(f"Power toggle blocked: Server {server_name} is locked") + return None, "Server is locked and cannot be controlled via this tool" + + if not server["connection"]: + logger.error( + f"Power toggle failed: No connection established for server {server_name}" + ) + return None, "No connection established" + + try: + current_status = server["connection"].get_power() + if current_status["powerstate"] == "on": + logger.info(f"Initiating power off for server {server_name}") + server["connection"].set_power("off") + else: + logger.info(f"Initiating power on for server {server_name}") + server["connection"].set_power("on") + return True, None + except Exception as e: + logger.error( + f"Failed to initiate power toggle for server {server_name}: {str(e)}" + ) + return None, str(e) + + def get_all_statuses(self) -> List[GroupStatus]: + """Get the status of all servers grouped by their groups.""" + group_statuses: List[GroupStatus] = [] + + for group in self.groups: + group_servers: List[ServerStatus] = [] + for server in group["servers"]: + cache_key = f"{group['name']}/{server['name']}" + if cache_key in self.status_cache: + group_servers.append(self.status_cache[cache_key]) + + group_statuses.append({"name": group["name"], "servers": group_servers}) + + return group_statuses -- GitLab