Actualizados scripts

This commit is contained in:
2026-03-31 13:49:42 +02:00
parent f9ff11543e
commit 7565c18a78
2 changed files with 580 additions and 207 deletions
+284 -79
View File
@@ -1,118 +1,317 @@
#!/usr/bin/env bash
set -euo pipefail
GREEN=$'\033[0;32m'
YELLOW=$'\033[1;33m'
RED=$'\033[0;31m'
NC=$'\033[0m'
USE_COLOR=0
if [ -t 1 ] && [ -z "${NO_COLOR:-}" ]; then
if [[ -t 1 && -z "${NO_COLOR:-}" ]]; then
USE_COLOR=1
fi
deps=(smartctl lsblk awk)
for dep in "${deps[@]}"; do
if ! command -v "$dep" >/dev/null 2>&1; then
echo "Falta dependencia: $dep" >&2
warn() {
printf 'Aviso: %s\n' "$*" >&2
}
die() {
printf 'Error: %s\n' "$*" >&2
exit 1
fi
}
supports_command() {
command -v "$1" >/dev/null 2>&1
}
check_deps() {
local deps=(smartctl lsblk awk grep sed head tr)
local dep
for dep in "${deps[@]}"; do
supports_command "$dep" || die "Falta dependencia: $dep"
done
}
init_sudo() {
if ! supports_command sudo; then
warn "sudo no está disponible; se intentará leer SMART sin elevación."
return
fi
if sudo -n true 2>/dev/null; then
SUDO="sudo -n"
SUDO_CMD=(sudo -n)
return
fi
if sudo -v; then
SUDO_CMD=(sudo)
return
fi
warn "sin sudo válido; algunos datos SMART podrían no estar disponibles."
}
smartctl_cmd() {
if ((${#SUDO_CMD[@]})); then
"${SUDO_CMD[@]}" smartctl "$@"
else
SUDO="sudo"
if ! $SUDO -v; then
SUDO=""
echo "Aviso: sin sudo, no se leeran datos SMART." >&2
smartctl "$@"
fi
}
list_disks() {
lsblk -dnpo PATH,TYPE | awk '$2 == "disk" { print $1 }'
}
format_size() {
local size_bytes=$1
local value unit
if [[ -z "$size_bytes" || ! "$size_bytes" =~ ^[0-9]+$ ]]; then
printf '—'
return
fi
header="Disco\tModelo\tSerial\tCapacidad\tTemp\tUptime(h)\tUptime\tMontaje\tSMART\tRealloc\tPending\tOffline"
rows=()
shopt -s nullglob
disks=(/dev/sd? /dev/nvme?n?)
shopt -u nullglob
if [ "${#disks[@]}" -eq 0 ]; then
echo "No se encontraron discos en /dev/sd? o /dev/nvme?n?" >&2
exit 1
fi
for disk in "${disks[@]}"; do
if [ -n "$SUDO" ]; then
model=$($SUDO smartctl -i "$disk" | awk -F: '/Device Model|Model Number/ {gsub(/^ +| +$/, "", $2); print $2; exit}')
serial=$($SUDO smartctl -i "$disk" | awk -F: '/Serial Number/ {gsub(/^ +| +$/, "", $2); print $2; exit}')
health=$($SUDO smartctl -H "$disk" | awk -F: '/SMART overall-health|SMART Health Status/ {gsub(/^ +| +$/, "", $2); print $2; exit}')
if ((size_bytes >= 1024**4)); then
value=$(awk -v s="$size_bytes" 'BEGIN { printf "%.2f", s/1024/1024/1024/1024 }')
unit="T"
else
model="—"
serial=""
health="—"
fi
[ -z "$model" ] && model="—"
[ -z "$serial" ] && serial="—"
[ -z "$health" ] && health="—"
size_bytes=$(lsblk -ndo SIZE -b "$disk")
if [ "$size_bytes" -ge 1000000000000 ]; then
size=$(awk -v s="$size_bytes" 'BEGIN {printf "%.2f", s/1024/1024/1024/1024}')
size="${size}T"
else
size=$(awk -v s="$size_bytes" 'BEGIN {printf "%.2f", s/1024/1024/1024}')
size="${size}G"
value=$(awk -v s="$size_bytes" 'BEGIN { printf "%.2f", s/1024/1024/1024 }')
unit="G"
fi
temp=$($SUDO smartctl -A "$disk" 2>/dev/null | awk '/Temperature_Celsius|Current_Temperature|Airflow_Temperature_Cel|Temperature/ {print $10; exit}')
[ -z "$temp" ] && temp="N/A"
printf '%s%s' "$value" "$unit"
}
if [[ "$temp" != "N/A" ]]; then
if [ "$temp" -lt 45 ]; then
parse_after_colon() {
local pattern=$1
local text=$2
awk -F: -v pattern="$pattern" '
$0 ~ pattern {
value = substr($0, index($0, ":") + 1)
gsub(/^[[:space:]]+|[[:space:]]+$/, "", value)
print value
exit
}
' <<< "$text"
}
parse_attr_value() {
local pattern=$1
local text=$2
local value
value=$(awk -v pattern="$pattern" '
$0 ~ pattern {
for (i = NF; i >= 1; i--) {
if ($i ~ /^[0-9]+$/) {
print $i
exit
}
}
}
' <<< "$text")
printf '%s' "${value:-}"
}
format_temp() {
local temp=$1
local colorized color
if [[ -z "$temp" || "$temp" == "N/A" || ! "$temp" =~ ^[0-9]+$ ]]; then
printf 'N/A'
return
fi
if ((temp < 45)); then
color=$GREEN
elif [ "$temp" -lt 51 ]; then
elif ((temp < 51)); then
color=$YELLOW
else
color=$RED
fi
if [ "$USE_COLOR" -eq 1 ]; then
temp_color="${color}${temp}°C${NC}"
if ((USE_COLOR)); then
colorized="${color}${temp}°C${NC}"
else
temp_color="${temp}°C"
fi
else
temp_color="N/A"
colorized="${temp}°C"
fi
uptime_raw=$($SUDO smartctl -A "$disk" 2>/dev/null | awk '/Power_On_Hours/ {print $10}' | grep -o '^[0-9]*')
if [ -z "$uptime_raw" ]; then
uptime_raw="—"
uptime_fancy="—"
else
years=$(( uptime_raw / 8760 ))
days=$(( (uptime_raw % 8760) / 24 ))
uptime_fancy="${years}y ${days}d"
printf '%s' "$colorized"
}
format_uptime() {
local hours=$1
local years days
if [[ -z "$hours" || ! "$hours" =~ ^[0-9]+$ ]]; then
printf '—'
return
fi
mountpoint=$(lsblk -ndo MOUNTPOINT "${disk}"* | grep -v "^$" | head -n 1)
[ -z "$mountpoint" ] && mountpoint="—"
years=$((hours / 8760))
days=$(((hours % 8760) / 24))
printf '%sy %sd' "$years" "$days"
}
reallocated=$($SUDO smartctl -A "$disk" 2>/dev/null | awk '/Reallocated_Sector_Ct|Reallocated_Event_Count/ {print $10; exit}')
pending=$($SUDO smartctl -A "$disk" 2>/dev/null | awk '/Current_Pending_Sector/ {print $10; exit}')
offline=$($SUDO smartctl -A "$disk" 2>/dev/null | awk '/Offline_Uncorrectable/ {print $10; exit}')
[ -z "$reallocated" ] && reallocated="—"
[ -z "$pending" ] && pending="—"
[ -z "$offline" ] && offline="—"
first_mountpoint() {
local disk=$1
local mountpoint
line=$(printf "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s" \
"$disk" "$model" "$serial" "$size" "$temp_color" "$uptime_raw" "$uptime_fancy" "$mountpoint" "$health" "$reallocated" "$pending" "$offline")
rows+=("$line")
mountpoint=$(lsblk -nrpo MOUNTPOINT "$disk" | awk 'NF { print; exit }')
printf '%s' "${mountpoint:-}"
}
normalize_health() {
local text=$1
done
case "$text" in
PASSED|OK)
printf 'OK'
;;
*)
printf '%s' "${text:-}"
;;
esac
}
disk_status() {
local health=$1
local temp=$2
local reallocated=$3
local pending=$4
local offline=$5
if [[ "$health" != "—" && "$health" != "OK" && "$health" != "PASSED" ]]; then
printf 'FAIL'
return
fi
if [[ "$pending" =~ ^[1-9][0-9]*$ || "$offline" =~ ^[1-9][0-9]*$ ]]; then
printf 'FAIL'
return
fi
if [[ "$reallocated" =~ ^[1-9][0-9]*$ ]]; then
printf 'WARN'
return
fi
if [[ "$temp" =~ ^[0-9]+$ ]]; then
if ((temp >= 55)); then
printf 'FAIL'
return
fi
if ((temp >= 45)); then
printf 'WARN'
return
fi
fi
printf 'OK'
}
parse_disk_data() {
local disk=$1
local smart_out=$2
local size_bytes model serial health temp uptime_raw uptime_fancy mountpoint
local reallocated pending offline critical_warning media_errors error_log_entries status
size_bytes=$(lsblk -dnbo SIZE "$disk" 2>/dev/null | head -n 1 || true)
model=$(parse_after_colon '^(Device Model|Model Number|Product):' "$smart_out")
serial=$(parse_after_colon '^Serial Number:' "$smart_out")
health=$(parse_after_colon '^(SMART overall-health self-assessment test result|SMART Health Status|SMART overall-health):' "$smart_out")
temp=$(parse_after_colon '^Temperature:' "$smart_out" | grep -o '[0-9]\+' | head -n 1 || true)
if [[ -z "$temp" ]]; then
temp=$(parse_attr_value 'Temperature_Celsius|Current_Temperature|Airflow_Temperature_Cel|Temperature_Internal' "$smart_out")
fi
uptime_raw=$(parse_after_colon '^Power On Hours:' "$smart_out" | grep -o '^[[:space:]]*[0-9]\+' | tr -d ' ' || true)
if [[ -z "$uptime_raw" ]]; then
uptime_raw=$(parse_attr_value 'Power_On_Hours|Power_On_Hours_and_Msec' "$smart_out")
fi
reallocated=$(parse_attr_value 'Reallocated_Sector_Ct|Reallocated_Event_Count' "$smart_out")
pending=$(parse_attr_value 'Current_Pending_Sector' "$smart_out")
offline=$(parse_attr_value 'Offline_Uncorrectable' "$smart_out")
critical_warning=$(parse_after_colon '^Critical Warning:' "$smart_out")
media_errors=$(parse_after_colon '^Media and Data Integrity Errors:' "$smart_out" | tr -dc '0-9' || true)
error_log_entries=$(parse_after_colon '^Error Information Log Entries:' "$smart_out" | tr -dc '0-9' || true)
if [[ -n "$critical_warning" && "$critical_warning" != "0x00" ]]; then
health="WARN ($critical_warning)"
fi
[[ -n "$media_errors" && "$media_errors" != "0" ]] && offline="$media_errors"
[[ -n "$error_log_entries" && "$error_log_entries" != "0" && -z "$pending" ]] && pending="$error_log_entries"
uptime_fancy=$(format_uptime "${uptime_raw:-}")
mountpoint=$(first_mountpoint "$disk")
status=$(disk_status "${health:-}" "${temp:-}" "${reallocated:-}" "${pending:-}" "${offline:-}")
printf "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n" \
"$disk" \
"${model:-}" \
"${serial:-}" \
"$(format_size "${size_bytes:-}")" \
"$(format_temp "${temp:-N/A}")" \
"${uptime_raw:-}" \
"$uptime_fancy" \
"$mountpoint" \
"$(normalize_health "${health:-}")" \
"${reallocated:-}" \
"${pending:-}" \
"${offline:-}" \
"$status"
}
print_table() {
local header
local -a rows=("$@")
header="Disco\tModelo\tSerial\tCapacidad\tTemp\tUptime(h)\tUptime\tMontaje\tSMART\tRealloc\tPending\tOffline\tEstado"
if supports_command column; then
{
echo -e "$header"
printf '%b\n' "$header"
printf '%s\n' "${rows[@]}"
} | column -t -s $'\t'
return
fi
warn "column no está disponible; se mostrará salida tabulada sin alinear."
printf '%b\n' "$header"
printf '%s\n' "${rows[@]}"
}
main() {
local -a disks=() rows=()
local disk smart_out size_bytes mountpoint
check_deps
init_sudo
mapfile -t disks < <(list_disks)
((${#disks[@]})) || die "No se encontraron discos con lsblk."
for disk in "${disks[@]}"; do
if smart_out=$(smartctl_cmd -a "$disk" 2>/dev/null); then
rows+=("$(parse_disk_data "$disk" "$smart_out")")
else
warn "no se pudo leer SMART de $disk; se mostrarán solo datos básicos."
size_bytes=$(lsblk -dnbo SIZE "$disk" 2>/dev/null | head -n 1 || true)
mountpoint=$(first_mountpoint "$disk")
rows+=("$(printf "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s" \
"$disk" "—" "—" "$(format_size "$size_bytes")" "N/A" "—" "—" "$mountpoint" "—" "—" "—" "—" "N/A")")
fi
done
print_table "${rows[@]}"
cat <<'EOF'
@@ -121,13 +320,19 @@ Leyenda:
Modelo : Modelo reportado por SMART.
Serial : Numero de serie del disco.
Capacidad : Tamano total del disco.
Temp : Temperatura actual (SMART).
Uptime(h) : Horas encendido (Power_On_Hours).
Temp : Temperatura actual.
Uptime(h) : Horas encendido.
Uptime : Antiguedad aproximada (años/dias).
Montaje : Punto de montaje detectado.
SMART : Estado general de salud.
Realloc : Sectores reasignados.
Pending : Sectores pendientes de reasignacion.
Offline : Sectores no corregibles offline.
Pending : Sectores pendientes o errores pendientes de revisar.
Offline : Sectores no corregibles o errores de integridad reportados.
Estado : Resumen rapido del riesgo (OK/WARN/FAIL).
N/A / — : Dato no disponible.
EOF
}
declare -a SUDO_CMD=()
main "$@"
+265 -97
View File
@@ -1,140 +1,308 @@
import subprocess
import platform
import sys
from colorama import init, Fore, Style
#!/usr/bin/env python3
import argparse
import json
import platform
import shutil
import subprocess
import sys
from typing import Dict, List, Optional, Tuple
from colorama import Fore, Style, init
# Inicialización de colorama para el uso de colores en la salida
init(autoreset=True)
def run_upsc_command(ups_name):
try:
result = subprocess.run(["upsc", f"{ups_name}@localhost:4500"], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"{Fore.RED}Error running upsc command: {e}")
return None
except Exception as e:
print(f"{Fore.RED}Unexpected error: {e}")
return None
DEFAULT_UPS = "nutdev1"
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 4500
DEFAULT_TIMEOUT = 8
def parse_upsc_output(output):
parsed_output = {}
lines = output.split("\n")
for line in lines:
if ":" in line:
try:
key, value = line.split(": ", 1)
parsed_output[key] = value
except ValueError:
print(f"{Fore.RED}Error parsing line: {line}")
return parsed_output
def beautify_upsc_output(parsed_output):
print(f"{Fore.BLUE}UPS Information")
print("-" * 70)
units = {
UNITS = {
"battery.charge": "%",
"battery.voltage": "V",
"output.frequency": "Hz",
"input.voltage": "V",
"output.frequency": "Hz",
"output.voltage": "V",
"ups.load": "%",
}
for key, value in parsed_output.items():
pretty_key = key.replace('.', ' ').title()
pretty_value = f"{Fore.GREEN}{value} {units.get(key, '')}{Style.RESET_ALL}"
print(f"{Fore.YELLOW}{pretty_key:<40}{Fore.WHITE}: {pretty_value}")
DISPLAY_ORDER = [
"device.model",
"device.mfr",
"ups.status",
"battery.charge",
"battery.voltage",
"ups.load",
"input.voltage",
"output.voltage",
"output.frequency",
"battery.runtime",
"ups.realpower",
]
print("-" * 70)
RANGES = {
"battery.charge": {"warn_min": 50, "crit_min": 25},
"battery.voltage": {"warn_min": 20, "warn_max": 30, "crit_min": 18, "crit_max": 32},
"output.frequency": {"warn_min": 48, "warn_max": 52, "crit_min": 47, "crit_max": 53},
"input.voltage": {"warn_min": 210, "warn_max": 245, "crit_min": 200, "crit_max": 250},
"output.voltage": {"warn_min": 210, "warn_max": 245, "crit_min": 200, "crit_max": 250},
"ups.load": {"warn_max": 80, "crit_max": 95},
}
def check_values(parsed_output, normal_ranges):
print(f"\n{Fore.BLUE}Value Checks")
print("-" * 50)
EXIT_OK = 0
EXIT_WARN = 1
EXIT_CRIT = 2
EXIT_ERROR = 3
alerts = []
for key, (min_value, max_value) in normal_ranges.items():
def supports_color(no_color: bool) -> bool:
return not no_color and sys.stdout.isatty()
def colorize(text: str, color: str, enabled: bool) -> str:
if not enabled:
return text
return f"{color}{text}{Style.RESET_ALL}"
def pretty_key(key: str) -> str:
return key.replace(".", " ").title()
def endpoint_for(ups_name: str, host: str, port: int) -> str:
return f"{ups_name}@{host}:{port}"
def run_upsc_command(ups_name: str, host: str, port: int, timeout: int) -> str:
if shutil.which("upsc") is None:
raise RuntimeError("No se encontró el comando 'upsc'. Instala NUT client tools.")
endpoint = endpoint_for(ups_name, host, port)
try:
value = float(parsed_output.get(key, 0))
except ValueError:
print(f"{Fore.RED}⚠️ Error: {key.replace('.', ' ').title()} has a non-numeric value.")
result = subprocess.run(
["upsc", endpoint],
capture_output=True,
text=True,
check=True,
timeout=timeout,
)
except FileNotFoundError as exc:
raise RuntimeError("No se encontró el comando 'upsc'.") from exc
except subprocess.TimeoutExpired as exc:
raise RuntimeError(f"Timeout al consultar {endpoint} tras {timeout}s.") from exc
except subprocess.CalledProcessError as exc:
stderr = (exc.stderr or "").strip()
detail = f": {stderr}" if stderr else ""
raise RuntimeError(f"upsc devolvió error al consultar {endpoint}{detail}") from exc
return result.stdout
def parse_upsc_output(output: str) -> Dict[str, str]:
parsed_output: Dict[str, str] = {}
for raw_line in output.splitlines():
line = raw_line.strip()
if not line or ":" not in line:
continue
if min_value <= value <= max_value:
print(f"{Fore.GREEN}{key.replace('.', ' ').title()} is normal.")
else:
alerts.append(f"{key.replace('.', ' ').title()} is {value}, should be {min_value}-{max_value}")
print(f"{Fore.RED}⚠️ {key.replace('.', ' ').title()} is outside normal range ({min_value}-{max_value}).")
key, value = line.split(":", 1)
key = key.strip()
value = value.strip()
if key:
parsed_output[key] = value
print("-" * 50)
return parsed_output
if alerts:
print(f"{Fore.RED}Alerts: {', '.join(alerts)}")
def system_info():
print(f"\n{Fore.BLUE}System Information")
def parse_float(value: Optional[str]) -> Optional[float]:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def evaluate_metric(key: str, value: Optional[str]) -> Tuple[str, str]:
limits = RANGES.get(key)
if not limits:
return "UNKNOWN", "Sin umbral definido"
numeric_value = parse_float(value)
if numeric_value is None:
return "UNKNOWN", "Valor ausente o no numérico"
crit_min = limits.get("crit_min")
crit_max = limits.get("crit_max")
warn_min = limits.get("warn_min")
warn_max = limits.get("warn_max")
if crit_min is not None and numeric_value < crit_min:
return "CRIT", f"{numeric_value} por debajo de {crit_min}"
if crit_max is not None and numeric_value > crit_max:
return "CRIT", f"{numeric_value} por encima de {crit_max}"
if warn_min is not None and numeric_value < warn_min:
return "WARN", f"{numeric_value} por debajo de {warn_min}"
if warn_max is not None and numeric_value > warn_max:
return "WARN", f"{numeric_value} por encima de {warn_max}"
return "OK", "Dentro de rango"
def evaluate_status(parsed_output: Dict[str, str]) -> Tuple[str, List[Dict[str, str]]]:
checks: List[Dict[str, str]] = []
overall = "OK"
for key in RANGES:
state, detail = evaluate_metric(key, parsed_output.get(key))
checks.append(
{
"key": key,
"label": pretty_key(key),
"value": parsed_output.get(key, "—"),
"state": state,
"detail": detail,
}
)
if state == "CRIT":
overall = "CRIT"
elif state == "WARN" and overall != "CRIT":
overall = "WARN"
ups_status = parsed_output.get("ups.status", "").upper()
if "OB" in ups_status or "LB" in ups_status:
overall = "CRIT"
elif "RB" in ups_status and overall == "OK":
overall = "WARN"
return overall, checks
def display_value(key: str, value: str) -> str:
unit = UNITS.get(key, "")
return f"{value} {unit}".strip()
def print_ups_info(parsed_output: Dict[str, str], color_enabled: bool) -> None:
print(colorize("UPS Information", Fore.BLUE, color_enabled))
print("-" * 70)
system_info = {
printed = set()
ordered_keys = [key for key in DISPLAY_ORDER if key in parsed_output]
remaining_keys = sorted(key for key in parsed_output if key not in ordered_keys)
for key in ordered_keys + remaining_keys:
printed.add(key)
label = colorize(f"{pretty_key(key):<40}", Fore.YELLOW, color_enabled)
value = colorize(display_value(key, parsed_output[key]), Fore.GREEN, color_enabled)
print(f"{label}: {value}")
print("-" * 70)
def print_checks(checks: List[Dict[str, str]], overall: str, color_enabled: bool) -> None:
print()
print(colorize("Value Checks", Fore.BLUE, color_enabled))
print("-" * 70)
state_colors = {
"OK": Fore.GREEN,
"WARN": Fore.YELLOW,
"CRIT": Fore.RED,
"UNKNOWN": Fore.CYAN,
}
for check in checks:
label = f"{check['label']:<24}"
state = colorize(f"{check['state']:<7}", state_colors.get(check["state"], Fore.WHITE), color_enabled)
value = display_value(check["key"], check["value"])
print(f"{label} {state} {value} ({check['detail']})")
print("-" * 70)
overall_color = state_colors.get(overall, Fore.WHITE)
print(f"Overall Status: {colorize(overall, overall_color, color_enabled)}")
def system_info() -> Dict[str, str]:
return {
"Platform": platform.platform(),
"Hostname": platform.node(),
"Processor": platform.processor(),
"Python": platform.python_version(),
}
for key, value in system_info.items():
print(f"{Fore.YELLOW}{key:<40}{Fore.WHITE}: {Fore.GREEN}{value}{Style.RESET_ALL}")
def print_system_info(info: Dict[str, str], color_enabled: bool) -> None:
print()
print(colorize("System Information", Fore.BLUE, color_enabled))
print("-" * 70)
for key, value in info.items():
label = colorize(f"{key:<40}", Fore.YELLOW, color_enabled)
rendered = colorize(value, Fore.GREEN, color_enabled)
print(f"{label}: {rendered}")
print("-" * 70)
def show_help():
print(f"""{Fore.BLUE}{Style.BRIGHT}UPS Monitor Script Help{Style.RESET_ALL}
{Fore.GREEN}This script retrieves and displays information about an Uninterruptible Power Supply (UPS) and the system it's running on.{Style.RESET_ALL}
def exit_code_for(status: str) -> int:
if status == "CRIT":
return EXIT_CRIT
if status == "WARN":
return EXIT_WARN
return EXIT_OK
{Fore.BLUE}{Style.BRIGHT}UPS Information:{Style.RESET_ALL}
{Fore.YELLOW}{Style.DIM}- battery.charge{Style.RESET_ALL}: Battery charge level in percentage
{Fore.YELLOW}{Style.DIM}- battery.voltage{Style.RESET_ALL}: Battery voltage level in volts
{Fore.YELLOW}{Style.DIM}- output.frequency{Style.RESET_ALL}: Input frequency in Hertz
{Fore.YELLOW}{Style.DIM}- input.voltage{Style.RESET_ALL}: Input voltage in volts
{Fore.YELLOW}{Style.DIM}- output.voltage{Style.RESET_ALL}: Output voltage in volts
{Fore.YELLOW}{Style.DIM}- ups.load{Style.RESET_ALL}: Load level in percentage
{Fore.BLUE}{Style.BRIGHT}System Information:{Style.RESET_ALL}
{Fore.YELLOW}{Style.DIM}- Platform{Style.RESET_ALL}: Operating system platform
{Fore.YELLOW}{Style.DIM}- Hostname{Style.RESET_ALL}: Hostname of the system
{Fore.YELLOW}{Style.DIM}- Processor{Style.RESET_ALL}: Processor information""")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Consulta y valida el estado de un UPS vía NUT/upsc.")
parser.add_argument("--ups", default=DEFAULT_UPS, help=f"Nombre del UPS en NUT. Por defecto: {DEFAULT_UPS}")
parser.add_argument("--host", default=DEFAULT_HOST, help=f"Host de NUT. Por defecto: {DEFAULT_HOST}")
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"Puerto de NUT. Por defecto: {DEFAULT_PORT}")
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"Timeout para upsc en segundos. Por defecto: {DEFAULT_TIMEOUT}")
parser.add_argument("--json", action="store_true", help="Muestra la salida en JSON")
parser.add_argument("--no-color", action="store_true", help="Desactiva colores en la salida")
parser.add_argument("--no-system-info", action="store_true", help="No muestra información del sistema")
return parser
def main():
if "-h" in sys.argv or "--help" in sys.argv:
show_help()
return
ups_name = "nutdev1"
def main() -> int:
args = build_parser().parse_args()
color_enabled = supports_color(args.no_color)
upsc_output = run_upsc_command(ups_name)
try:
upsc_output = run_upsc_command(args.ups, args.host, args.port, args.timeout)
except RuntimeError as exc:
print(colorize(f"Error: {exc}", Fore.RED, color_enabled), file=sys.stderr)
return EXIT_ERROR
if upsc_output:
parsed_output = parse_upsc_output(upsc_output)
if not parsed_output:
print(colorize("Error: la salida de upsc no contiene datos parseables.", Fore.RED, color_enabled), file=sys.stderr)
return EXIT_ERROR
normal_ranges = {
"battery.charge": (50, 100),
"battery.voltage": (20, 30),
"output.frequency": (48, 52),
"input.voltage": (220, 240),
"output.voltage": (220, 240),
"ups.load": (0, 80),
overall, checks = evaluate_status(parsed_output)
info = system_info()
if args.json:
payload = {
"endpoint": endpoint_for(args.ups, args.host, args.port),
"status": overall,
"ups": parsed_output,
"checks": checks,
"system": None if args.no_system_info else info,
}
beautify_upsc_output(parsed_output)
check_values(parsed_output, normal_ranges)
system_info()
print(json.dumps(payload, indent=2, ensure_ascii=False))
else:
print(f"{Fore.RED}Couldn't retrieve UPS information.")
print_ups_info(parsed_output, color_enabled)
print_checks(checks, overall, color_enabled)
if not args.no_system_info:
print_system_info(info, color_enabled)
return exit_code_for(overall)
if __name__ == "__main__":
main()
sys.exit(main())